diff --git a/lib/telemetry/DatabricksTelemetryExporter.ts b/lib/telemetry/DatabricksTelemetryExporter.ts index 37cf1c70..eeeb5eea 100644 --- a/lib/telemetry/DatabricksTelemetryExporter.ts +++ b/lib/telemetry/DatabricksTelemetryExporter.ts @@ -15,7 +15,7 @@ */ import { v4 as uuidv4 } from 'uuid'; -import fetch, { RequestInit, Response } from 'node-fetch'; +import fetch, { RequestInit, Response, Request } from 'node-fetch'; import IClientContext from '../contracts/IClientContext'; import { LogLevel } from '../contracts/IDBSQLLogger'; import IAuthentication from '../connection/contracts/IAuthentication'; @@ -304,7 +304,13 @@ export default class DatabricksTelemetryExporter { private async sendRequest(url: string, init: RequestInit): Promise { const connectionProvider = await this.context.getConnectionProvider(); const agent = await connectionProvider.getAgent(); - return fetch(url, { ...init, agent }); + const retryPolicy = await connectionProvider.getRetryPolicy(); + const requestConfig: RequestInit = { agent, ...init }; + const result = await retryPolicy.invokeWithRetry(() => { + const request = new Request(url, requestConfig); + return fetch(request).then((response) => ({ request, response })); + }); + return result.response; } private toTelemetryLog( diff --git a/lib/telemetry/FeatureFlagCache.ts b/lib/telemetry/FeatureFlagCache.ts index 58c758ff..06bc59a9 100644 --- a/lib/telemetry/FeatureFlagCache.ts +++ b/lib/telemetry/FeatureFlagCache.ts @@ -14,12 +14,11 @@ * limitations under the License. */ -import fetch, { RequestInit, Response } from 'node-fetch'; +import fetch, { RequestInit, Response, Request } from 'node-fetch'; import IClientContext from '../contracts/IClientContext'; import { LogLevel } from '../contracts/IDBSQLLogger'; import IAuthentication from '../connection/contracts/IAuthentication'; import { buildTelemetryUrl, normalizeHeaders } from './telemetryUtils'; -import ExceptionClassifier from './ExceptionClassifier'; import buildUserAgentString from '../utils/buildUserAgentString'; import driverVersion from '../version'; @@ -193,20 +192,13 @@ export default class FeatureFlagCache { private async fetchWithRetry(url: string, init: RequestInit): Promise { const connectionProvider = await this.context.getConnectionProvider(); const agent = await connectionProvider.getAgent(); - const logger = this.context.getLogger(); - - try { - return await fetch(url, { ...init, agent }); - } catch (err: any) { - if (!ExceptionClassifier.isRetryable(err)) { - throw err; - } - logger.log(LogLevel.debug, `Feature flag fetch retry after transient: ${err?.code ?? err?.message ?? err}`); - await new Promise((resolve) => { - setTimeout(resolve, 100 + Math.random() * 100); - }); - return fetch(url, { ...init, agent }); - } + const retryPolicy = await connectionProvider.getRetryPolicy(); + const requestConfig: RequestInit = { agent, ...init }; + const result = await retryPolicy.invokeWithRetry(() => { + const request = new Request(url, requestConfig); + return fetch(request).then((response) => ({ request, response })); + }); + return result.response; } private async getAuthHeaders(): Promise> { diff --git a/lib/telemetry/MetricsAggregator.ts b/lib/telemetry/MetricsAggregator.ts index f5b0d171..d160db10 100644 --- a/lib/telemetry/MetricsAggregator.ts +++ b/lib/telemetry/MetricsAggregator.ts @@ -51,6 +51,8 @@ export default class MetricsAggregator { private closed = false; + private closing = false; + private batchSize: number; private flushIntervalMs: number; @@ -317,7 +319,7 @@ export default class MetricsAggregator { ); } - if (this.pendingMetrics.length >= this.batchSize) { + if (this.pendingMetrics.length >= this.batchSize && !this.closing) { // resetTimer=false so the periodic tail-drain keeps its cadence even // under sustained batch-size bursts. const logger = this.context.getLogger(); @@ -406,52 +408,27 @@ export default class MetricsAggregator { async close(): Promise { const logger = this.context.getLogger(); - this.closed = true; try { + // Suppress batch-triggered fire-and-forget flushes from addPendingMetric + // so no promises escape past the single awaited flush below. + this.closing = true; + if (this.flushTimer) { clearInterval(this.flushTimer); this.flushTimer = null; } - // Snapshot keys — completeStatement mutates statementMetrics. + // closed is still false here so completeStatement → addPendingMetric works normally. const remainingStatements = [...this.statementMetrics.keys()]; for (const statementId of remainingStatements) { - this.completeStatementForClose(statementId); + this.completeStatement(statementId); } - await this.flushForClose(); - - // Belt-and-braces: something the above awaited could in principle - // have resurrected a timer. Clear once more. - if (this.flushTimer) { - clearInterval(this.flushTimer); - this.flushTimer = null; - } + this.closed = true; + await this.flush(false); } catch (error: any) { logger.log(LogLevel.debug, `MetricsAggregator.close error: ${error.message}`); } } - - /** completeStatement variant that bypasses the `closed` guard. */ - private completeStatementForClose(statementId: string): void { - const prev = this.closed; - this.closed = false; - try { - this.completeStatement(statementId); - } finally { - this.closed = prev; - } - } - - /** flush variant that bypasses the `closed` guard on addPendingMetric. */ - private async flushForClose(): Promise { - const prev = this.closed; - this.closed = false; - try { - await this.flush(false); - } finally { - this.closed = prev; - } - } }