Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions lib/telemetry/DatabricksTelemetryExporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { v4 as uuidv4 } from 'uuid';
import fetch, { RequestInit, Response } from 'node-fetch';
import fetch, { RequestInit, Response, Request } from 'node-fetch';
import IClientContext from '../contracts/IClientContext';
import { LogLevel } from '../contracts/IDBSQLLogger';
import IAuthentication from '../connection/contracts/IAuthentication';
Expand Down Expand Up @@ -304,7 +304,13 @@ export default class DatabricksTelemetryExporter {
private async sendRequest(url: string, init: RequestInit): Promise<Response> {
const connectionProvider = await this.context.getConnectionProvider();
const agent = await connectionProvider.getAgent();
return fetch(url, { ...init, agent });
const retryPolicy = await connectionProvider.getRetryPolicy();
const requestConfig: RequestInit = { agent, ...init };
const result = await retryPolicy.invokeWithRetry(() => {
const request = new Request(url, requestConfig);
return fetch(request).then((response) => ({ request, response }));
});
return result.response;
}

private toTelemetryLog(
Expand Down
24 changes: 8 additions & 16 deletions lib/telemetry/FeatureFlagCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
* limitations under the License.
*/

import fetch, { RequestInit, Response } from 'node-fetch';
import fetch, { RequestInit, Response, Request } from 'node-fetch';
import IClientContext from '../contracts/IClientContext';
import { LogLevel } from '../contracts/IDBSQLLogger';
import IAuthentication from '../connection/contracts/IAuthentication';
import { buildTelemetryUrl, normalizeHeaders } from './telemetryUtils';
import ExceptionClassifier from './ExceptionClassifier';
import buildUserAgentString from '../utils/buildUserAgentString';
import driverVersion from '../version';

Expand Down Expand Up @@ -193,20 +192,13 @@ export default class FeatureFlagCache {
private async fetchWithRetry(url: string, init: RequestInit): Promise<Response> {
const connectionProvider = await this.context.getConnectionProvider();
const agent = await connectionProvider.getAgent();
const logger = this.context.getLogger();

try {
return await fetch(url, { ...init, agent });
} catch (err: any) {
if (!ExceptionClassifier.isRetryable(err)) {
throw err;
}
logger.log(LogLevel.debug, `Feature flag fetch retry after transient: ${err?.code ?? err?.message ?? err}`);
await new Promise((resolve) => {
setTimeout(resolve, 100 + Math.random() * 100);
});
return fetch(url, { ...init, agent });
}
const retryPolicy = await connectionProvider.getRetryPolicy();
const requestConfig: RequestInit = { agent, ...init };
const result = await retryPolicy.invokeWithRetry(() => {
const request = new Request(url, requestConfig);
return fetch(request).then((response) => ({ request, response }));
});
return result.response;
}

private async getAuthHeaders(): Promise<Record<string, string>> {
Expand Down
45 changes: 11 additions & 34 deletions lib/telemetry/MetricsAggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ export default class MetricsAggregator {

private closed = false;

private closing = false;

private batchSize: number;

private flushIntervalMs: number;
Expand Down Expand Up @@ -317,7 +319,7 @@ export default class MetricsAggregator {
);
}

if (this.pendingMetrics.length >= this.batchSize) {
if (this.pendingMetrics.length >= this.batchSize && !this.closing) {
// resetTimer=false so the periodic tail-drain keeps its cadence even
// under sustained batch-size bursts.
const logger = this.context.getLogger();
Expand Down Expand Up @@ -406,52 +408,27 @@ export default class MetricsAggregator {

async close(): Promise<void> {
const logger = this.context.getLogger();
this.closed = true;

try {
// Suppress batch-triggered fire-and-forget flushes from addPendingMetric
// so no promises escape past the single awaited flush below.
this.closing = true;

if (this.flushTimer) {
clearInterval(this.flushTimer);
this.flushTimer = null;
}

// Snapshot keys — completeStatement mutates statementMetrics.
// closed is still false here so completeStatement → addPendingMetric works normally.
const remainingStatements = [...this.statementMetrics.keys()];
for (const statementId of remainingStatements) {
this.completeStatementForClose(statementId);
this.completeStatement(statementId);
}

await this.flushForClose();

// Belt-and-braces: something the above awaited could in principle
// have resurrected a timer. Clear once more.
if (this.flushTimer) {
clearInterval(this.flushTimer);
this.flushTimer = null;
}
this.closed = true;
await this.flush(false);
} catch (error: any) {
logger.log(LogLevel.debug, `MetricsAggregator.close error: ${error.message}`);
}
}

/** completeStatement variant that bypasses the `closed` guard. */
private completeStatementForClose(statementId: string): void {
const prev = this.closed;
this.closed = false;
try {
this.completeStatement(statementId);
} finally {
this.closed = prev;
}
}

/** flush variant that bypasses the `closed` guard on addPendingMetric. */
private async flushForClose(): Promise<void> {
const prev = this.closed;
this.closed = false;
try {
await this.flush(false);
} finally {
this.closed = prev;
}
}
}
Loading