From d80f13ef0f22937d91aa684bba34363feb231996 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Tue, 21 Apr 2026 11:56:17 +0530 Subject: [PATCH 1/2] fix: route telemetry HTTP through shared connection stack; fix close() flush race - DatabricksTelemetryExporter.sendRequest and FeatureFlagCache.fetchWithRetry now use connectionProvider.getRetryPolicy().invokeWithRetry(), matching the CloudFetchResultHandler pattern instead of bespoke fetch/retry logic - MetricsAggregator: add closing flag so batch-triggered fire-and-forget flushes are suppressed during close(), ensuring a single awaited flushForClose() drains all remaining metrics without racing past process.exit() Co-authored-by: samikshya-chand_data --- lib/telemetry/DatabricksTelemetryExporter.ts | 10 ++++++-- lib/telemetry/FeatureFlagCache.ts | 24 +++++++------------- lib/telemetry/MetricsAggregator.ts | 7 +++++- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/lib/telemetry/DatabricksTelemetryExporter.ts b/lib/telemetry/DatabricksTelemetryExporter.ts index 37cf1c70..eeeb5eea 100644 --- a/lib/telemetry/DatabricksTelemetryExporter.ts +++ b/lib/telemetry/DatabricksTelemetryExporter.ts @@ -15,7 +15,7 @@ */ import { v4 as uuidv4 } from 'uuid'; -import fetch, { RequestInit, Response } from 'node-fetch'; +import fetch, { RequestInit, Response, Request } from 'node-fetch'; import IClientContext from '../contracts/IClientContext'; import { LogLevel } from '../contracts/IDBSQLLogger'; import IAuthentication from '../connection/contracts/IAuthentication'; @@ -304,7 +304,13 @@ export default class DatabricksTelemetryExporter { private async sendRequest(url: string, init: RequestInit): Promise { const connectionProvider = await this.context.getConnectionProvider(); const agent = await connectionProvider.getAgent(); - return fetch(url, { ...init, agent }); + const retryPolicy = await connectionProvider.getRetryPolicy(); + const requestConfig: RequestInit = { agent, ...init }; + const result = await retryPolicy.invokeWithRetry(() => { + const request = new Request(url, requestConfig); + return fetch(request).then((response) => ({ request, response })); + }); + return result.response; } private toTelemetryLog( diff --git a/lib/telemetry/FeatureFlagCache.ts b/lib/telemetry/FeatureFlagCache.ts index 58c758ff..06bc59a9 100644 --- a/lib/telemetry/FeatureFlagCache.ts +++ b/lib/telemetry/FeatureFlagCache.ts @@ -14,12 +14,11 @@ * limitations under the License. */ -import fetch, { RequestInit, Response } from 'node-fetch'; +import fetch, { RequestInit, Response, Request } from 'node-fetch'; import IClientContext from '../contracts/IClientContext'; import { LogLevel } from '../contracts/IDBSQLLogger'; import IAuthentication from '../connection/contracts/IAuthentication'; import { buildTelemetryUrl, normalizeHeaders } from './telemetryUtils'; -import ExceptionClassifier from './ExceptionClassifier'; import buildUserAgentString from '../utils/buildUserAgentString'; import driverVersion from '../version'; @@ -193,20 +192,13 @@ export default class FeatureFlagCache { private async fetchWithRetry(url: string, init: RequestInit): Promise { const connectionProvider = await this.context.getConnectionProvider(); const agent = await connectionProvider.getAgent(); - const logger = this.context.getLogger(); - - try { - return await fetch(url, { ...init, agent }); - } catch (err: any) { - if (!ExceptionClassifier.isRetryable(err)) { - throw err; - } - logger.log(LogLevel.debug, `Feature flag fetch retry after transient: ${err?.code ?? err?.message ?? err}`); - await new Promise((resolve) => { - setTimeout(resolve, 100 + Math.random() * 100); - }); - return fetch(url, { ...init, agent }); - } + const retryPolicy = await connectionProvider.getRetryPolicy(); + const requestConfig: RequestInit = { agent, ...init }; + const result = await retryPolicy.invokeWithRetry(() => { + const request = new Request(url, requestConfig); + return fetch(request).then((response) => ({ request, response })); + }); + return result.response; } private async getAuthHeaders(): Promise> { diff --git a/lib/telemetry/MetricsAggregator.ts b/lib/telemetry/MetricsAggregator.ts index f5b0d171..2702f741 100644 --- a/lib/telemetry/MetricsAggregator.ts +++ b/lib/telemetry/MetricsAggregator.ts @@ -51,6 +51,8 @@ export default class MetricsAggregator { private closed = false; + private closing = false; + private batchSize: number; private flushIntervalMs: number; @@ -317,9 +319,11 @@ export default class MetricsAggregator { ); } - if (this.pendingMetrics.length >= this.batchSize) { + if (this.pendingMetrics.length >= this.batchSize && !this.closing) { // resetTimer=false so the periodic tail-drain keeps its cadence even // under sustained batch-size bursts. + // Suppressed during close() so fire-and-forget promises don't race past + // the single awaited flushForClose(). const logger = this.context.getLogger(); Promise.resolve(this.flush(false)).catch((err: any) => { logger.log(LogLevel.debug, `Batch-trigger flush failed: ${err?.message ?? err}`); @@ -406,6 +410,7 @@ export default class MetricsAggregator { async close(): Promise { const logger = this.context.getLogger(); + this.closing = true; this.closed = true; try { From ef770d21281604e14bebcabf1f33b9640e05d7a3 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Tue, 21 Apr 2026 12:28:48 +0530 Subject: [PATCH 2/2] fix: simplify MetricsAggregator.close() flush race fix Replace completeStatementForClose/flushForClose toggle pattern with a closing flag that suppresses batch-triggered fire-and-forget flushes. Set closing=true first so completeStatement works normally via addPendingMetric (closed is still false), then seal with closed=true and drain with a single awaited flush(false). Co-authored-by: samikshya-chand_data --- lib/telemetry/MetricsAggregator.ts | 44 ++++++------------------------ 1 file changed, 8 insertions(+), 36 deletions(-) diff --git a/lib/telemetry/MetricsAggregator.ts b/lib/telemetry/MetricsAggregator.ts index 2702f741..d160db10 100644 --- a/lib/telemetry/MetricsAggregator.ts +++ b/lib/telemetry/MetricsAggregator.ts @@ -322,8 +322,6 @@ export default class MetricsAggregator { if (this.pendingMetrics.length >= this.batchSize && !this.closing) { // resetTimer=false so the periodic tail-drain keeps its cadence even // under sustained batch-size bursts. - // Suppressed during close() so fire-and-forget promises don't race past - // the single awaited flushForClose(). const logger = this.context.getLogger(); Promise.resolve(this.flush(false)).catch((err: any) => { logger.log(LogLevel.debug, `Batch-trigger flush failed: ${err?.message ?? err}`); @@ -410,53 +408,27 @@ export default class MetricsAggregator { async close(): Promise { const logger = this.context.getLogger(); - this.closing = true; - this.closed = true; try { + // Suppress batch-triggered fire-and-forget flushes from addPendingMetric + // so no promises escape past the single awaited flush below. + this.closing = true; + if (this.flushTimer) { clearInterval(this.flushTimer); this.flushTimer = null; } - // Snapshot keys — completeStatement mutates statementMetrics. + // closed is still false here so completeStatement → addPendingMetric works normally. const remainingStatements = [...this.statementMetrics.keys()]; for (const statementId of remainingStatements) { - this.completeStatementForClose(statementId); + this.completeStatement(statementId); } - await this.flushForClose(); - - // Belt-and-braces: something the above awaited could in principle - // have resurrected a timer. Clear once more. - if (this.flushTimer) { - clearInterval(this.flushTimer); - this.flushTimer = null; - } + this.closed = true; + await this.flush(false); } catch (error: any) { logger.log(LogLevel.debug, `MetricsAggregator.close error: ${error.message}`); } } - - /** completeStatement variant that bypasses the `closed` guard. */ - private completeStatementForClose(statementId: string): void { - const prev = this.closed; - this.closed = false; - try { - this.completeStatement(statementId); - } finally { - this.closed = prev; - } - } - - /** flush variant that bypasses the `closed` guard on addPendingMetric. */ - private async flushForClose(): Promise { - const prev = this.closed; - this.closed = false; - try { - await this.flush(false); - } finally { - this.closed = prev; - } - } }