Skip to content

Commit 7ae869e

Browse files
Add retries with exponential backoff for Commands and Cluster (#35)
* Add retries with exponential backoff for Commands and Cluster * remove debug statements * remove debug statements * refactor * lint * add eslint fix * lint * typo fixes * partial feedback address * addressed more feedback * test fix * test fix * fix retries * don't call start on non terminal clusters * restart cluster * skip cluster state tests * lint * unit tests * lockfile fix * remove bricks * rename compile to build * yarn lock
1 parent b1ef0b5 commit 7ae869e

File tree

11 files changed

+505
-52
lines changed

11 files changed

+505
-52
lines changed

packages/databricks-sdk-js/package.json

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
"build": "tsc -p tsconfig.json && cp src/fetch.* dist/",
2121
"watch": "tsc -w -p tsconfig.json",
2222
"clean": "rm -rf dist node_modules",
23-
"fix": "prettier . --write",
23+
"fix": "eslint src --ext ts --fix && prettier . --write",
2424
"test:lint": "eslint src --ext ts && prettier . -c",
2525
"test:unit": "ts-mocha --type-check 'src/**/*.test.ts'",
2626
"test": "yarn run test:lint && yarn run test:unit",
@@ -34,12 +34,18 @@
3434
},
3535
"devDependencies": {
3636
"@istanbuljs/nyc-config-typescript": "^1.0.2",
37+
"@types/chai": "^4.3.3",
38+
"@types/chai-as-promised": "^7.1.5",
39+
"@types/chai-spies": "^1.0.3",
3740
"@types/ini": "^1.3.31",
3841
"@types/mocha": "^9.1.1",
3942
"@types/node": "^18.7.1",
4043
"@types/tmp": "^0.2.3",
4144
"@types/uuid": "^8.3.4",
42-
"eslint": "^8.21.0",
45+
"chai": "^4.3.6",
46+
"chai-as-promised": "^7.1.1",
47+
"chai-spies": "^1.0.0",
48+
"eslint": "^8.20.0",
4349
"mocha": "^10.0.0",
4450
"nyc": "^15.1.0",
4551
"prettier": "^2.7.1",
@@ -69,4 +75,4 @@
6975
],
7076
"report-dir": "coverage"
7177
}
72-
}
78+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
export enum TimeUnits {
2+
milliseconds,
3+
seconds,
4+
minutes,
5+
hours,
6+
}
7+
8+
class InvalidTimeValueError extends Error {}
9+
10+
export default class Time {
11+
value: number;
12+
units: TimeUnits;
13+
14+
constructor(value: number, units: TimeUnits) {
15+
this.units = units;
16+
this.value = value;
17+
}
18+
19+
public toMillSeconds(): Time {
20+
let secondsValue = 0;
21+
switch (this.units) {
22+
case TimeUnits.hours:
23+
secondsValue = this.value * 60 * 60 * 1000;
24+
break;
25+
case TimeUnits.minutes:
26+
secondsValue = this.value * 60 * 1000;
27+
break;
28+
case TimeUnits.seconds:
29+
secondsValue = this.value * 1000;
30+
break;
31+
case TimeUnits.milliseconds:
32+
secondsValue = this.value;
33+
break;
34+
}
35+
return new Time(secondsValue, TimeUnits.seconds);
36+
}
37+
38+
public add(other: Time): Time {
39+
return new Time(
40+
this.toMillSeconds().value + other.toMillSeconds().value,
41+
TimeUnits.milliseconds
42+
);
43+
}
44+
45+
public sub(other: Time): Time {
46+
return new Time(
47+
this.toMillSeconds().value - other.toMillSeconds().value,
48+
TimeUnits.milliseconds
49+
);
50+
}
51+
52+
public multiply(other: number): Time {
53+
return new Time(this.value * other, this.units);
54+
}
55+
56+
public gt(other: Time): boolean {
57+
return this.toMillSeconds().value > other.toMillSeconds().value;
58+
}
59+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import retry, {RetriableError} from "./retries";
2+
import Time, {TimeUnits} from "./Time";
3+
import chai, {assert, expect} from "chai";
4+
import spies from "chai-spies";
5+
import chaiAsPromised from "chai-as-promised";
6+
7+
chai.use(chaiAsPromised);
8+
chai.use(spies);
9+
class NonRetriableError extends Error {}
10+
11+
describe(__filename, function () {
12+
this.timeout(new Time(10, TimeUnits.minutes).toMillSeconds().value);
13+
14+
it("should return result if timeout doesn't expire", async function () {
15+
const startTime = Date.now();
16+
17+
const retryResult = await retry({
18+
timeout: new Time(5, TimeUnits.seconds),
19+
fn: () => {
20+
if (Date.now() - startTime < 1000) {
21+
throw new RetriableError();
22+
}
23+
return new Promise<string>((resolve) =>
24+
resolve("returned_string")
25+
);
26+
},
27+
});
28+
29+
assert.equal(retryResult, "returned_string");
30+
});
31+
32+
it("should return retriable error if timeout expires", async function () {
33+
const startTime = Date.now();
34+
await expect(
35+
retry({
36+
timeout: new Time(5, TimeUnits.seconds),
37+
fn: () => {
38+
if (Date.now() - startTime < 10000) {
39+
throw new RetriableError();
40+
}
41+
return new Promise<string>((resolve) =>
42+
resolve("returned_string")
43+
);
44+
},
45+
})
46+
).to.be.rejectedWith(RetriableError);
47+
});
48+
49+
it("should throw non retriable error immediately", async function () {
50+
const mockFunction = chai.spy();
51+
52+
await expect(
53+
retry({
54+
timeout: new Time(5, TimeUnits.seconds),
55+
fn: () => {
56+
mockFunction();
57+
throw new NonRetriableError();
58+
},
59+
})
60+
).to.be.rejectedWith(NonRetriableError);
61+
62+
expect(mockFunction).to.be.called.once;
63+
});
64+
});
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import Time, {TimeUnits} from "./Time";
2+
3+
export class RetriableError extends Error {
4+
name: string = "RetriableError";
5+
}
6+
7+
export interface RetriableResult<T> {
8+
result?: T;
9+
error?: unknown;
10+
}
11+
12+
const maxJitter = new Time(750, TimeUnits.milliseconds);
13+
const minJitter = new Time(50, TimeUnits.milliseconds);
14+
const maxWaitTime = new Time(10, TimeUnits.seconds);
15+
const defaultTimeout = new Time(10, TimeUnits.minutes);
16+
17+
function waitTime(attempt: number) {
18+
const jitter = maxJitter
19+
.sub(minJitter)
20+
.multiply(Math.random())
21+
.add(minJitter);
22+
const timeout = new Time(attempt, TimeUnits.seconds).add(jitter);
23+
24+
return timeout.gt(maxWaitTime) ? maxWaitTime : timeout;
25+
}
26+
27+
interface RetryArgs<T> {
28+
timeout?: Time;
29+
fn: () => Promise<T>;
30+
}
31+
32+
export default async function retry<T>({
33+
timeout = defaultTimeout,
34+
fn,
35+
}: RetryArgs<T>): Promise<T> {
36+
let attempt = 1;
37+
let retriableErr: RetriableError = new RetriableError("timeout");
38+
let nonRetriableErr: Error | undefined = undefined;
39+
let result: T | undefined = undefined;
40+
41+
let timedOut = false;
42+
let timer: NodeJS.Timeout = setTimeout(() => {
43+
timedOut = true;
44+
}, timeout.toMillSeconds().value);
45+
46+
let success = false;
47+
48+
while (!timedOut) {
49+
try {
50+
result = await fn();
51+
success = true;
52+
break;
53+
} catch (err: unknown) {
54+
if (err instanceof RetriableError) {
55+
retriableErr = err;
56+
} else {
57+
nonRetriableErr = err as Error;
58+
break;
59+
}
60+
}
61+
62+
await new Promise((resolve) =>
63+
setTimeout(resolve, waitTime(attempt).toMillSeconds().value)
64+
);
65+
66+
attempt += 1;
67+
}
68+
69+
clearTimeout(timer);
70+
71+
if (nonRetriableErr) {
72+
throw nonRetriableErr;
73+
}
74+
if (!success) {
75+
throw retriableErr;
76+
}
77+
return result!;
78+
}

packages/databricks-sdk-js/src/services/Cluster.integ.ts

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/* eslint-disable @typescript-eslint/naming-convention */
22

3-
import {Cluster} from "..";
3+
import {Cluster, ClusterService} from "..";
44
import assert from "assert";
55

66
import {IntegrationTestSetup} from "../test/IntegrationTestSetup";
@@ -36,4 +36,44 @@ describe(__filename, function () {
3636
assert(clusterA.id);
3737
assert.equal(clusterA.id, clusterB?.id);
3838
});
39+
40+
// TODO: run tests changing the state of cluster in a seperate job, on a single node
41+
it.skip("calling start on a non terminated state should not throw an error", async () => {
42+
await integSetup.cluster.stop();
43+
await new ClusterService(integSetup.client).start({
44+
cluster_id: integSetup.cluster.id,
45+
});
46+
47+
await integSetup.cluster.refresh();
48+
assert(integSetup.cluster.state !== "RUNNING");
49+
50+
await integSetup.cluster.start();
51+
});
52+
53+
it.skip("should terminate cluster", async () => {
54+
integSetup.cluster.start();
55+
assert.equal(integSetup.cluster.state, "RUNNING");
56+
57+
await integSetup.cluster.stop();
58+
assert.equal(integSetup.cluster.state, "TERMINATED");
59+
60+
integSetup.cluster.start();
61+
});
62+
63+
it.skip("should terminate non running clusters", async () => {
64+
await integSetup.cluster.stop();
65+
assert.equal(integSetup.cluster.state, "TERMINATED");
66+
67+
await new ClusterService(integSetup.client).start({
68+
cluster_id: integSetup.cluster.id,
69+
});
70+
await integSetup.cluster.refresh();
71+
assert.notEqual(integSetup.cluster.state, "RUNNING");
72+
73+
await integSetup.cluster.stop();
74+
assert.equal(integSetup.cluster.state, "TERMINATED");
75+
76+
await integSetup.cluster.start();
77+
assert.equal(integSetup.cluster.state, "RUNNING");
78+
});
3979
});

packages/databricks-sdk-js/src/services/Cluster.ts

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import {
77
ClusterState,
88
ClusterSource,
99
} from "../apis/cluster";
10+
import retry, {RetriableError} from "../retries/retries";
11+
import Time, {TimeUnits} from "../retries/Time";
1012
import {
1113
GetRunOutputResponse,
1214
JobsService,
@@ -17,6 +19,8 @@ import {CancellationToken} from "../types";
1719
import {ExecutionContext} from "./ExecutionContext";
1820
import {WorkflowRun} from "./WorkflowRun";
1921

22+
export class ClusterRetriableError extends RetriableError {}
23+
export class ClusterError extends Error {}
2024
export class Cluster {
2125
private clusterApi: ClusterService;
2226

@@ -74,27 +78,72 @@ export class Cluster {
7478
}
7579

7680
async start() {
77-
await this.clusterApi.start({
78-
cluster_id: this.clusterDetails.cluster_id!,
81+
await this.refresh();
82+
if (this.state === "RUNNING") {
83+
return;
84+
}
85+
86+
if (
87+
this.state === "TERMINATED" ||
88+
this.state === "ERROR" ||
89+
this.state === "UNKNOWN"
90+
) {
91+
await this.clusterApi.start({
92+
cluster_id: this.id,
93+
});
94+
}
95+
96+
await retry({
97+
fn: async () => {
98+
await this.refresh();
99+
100+
switch (this.state) {
101+
case "RUNNING":
102+
return;
103+
case "TERMINATED":
104+
throw new ClusterError(
105+
`Cluster[${
106+
this.name
107+
}]: CurrentState - Terminated; Reason - ${JSON.stringify(
108+
this.clusterDetails.termination_reason
109+
)}`
110+
);
111+
case "ERROR":
112+
throw new ClusterError(
113+
`Cluster[${this.name}]: Error in starting the cluster (${this.clusterDetails.state_message})`
114+
);
115+
default:
116+
throw new ClusterRetriableError(
117+
`Cluster[${this.name}]: CurrentState - ${this.state}; Reason - ${this.clusterDetails.state_message}`
118+
);
119+
}
120+
},
79121
});
80122
}
81123

82124
async stop() {
83125
await this.clusterApi.delete({
84126
cluster_id: this.clusterDetails.cluster_id!,
85127
});
86-
}
87128

88-
async waitForState(state: ClusterState) {
89-
while (true) {
90-
await this.refresh();
91-
92-
if (this.clusterDetails.state === state) {
93-
return;
94-
}
95-
96-
await new Promise((resolve) => setTimeout(resolve, 3000));
97-
}
129+
await retry({
130+
fn: async () => {
131+
await this.refresh();
132+
133+
switch (this.state) {
134+
case "TERMINATED":
135+
return;
136+
case "ERROR":
137+
throw new ClusterError(
138+
`Cluster[${this.name}]: Error while terminating - ${this.clusterDetails.state_message}`
139+
);
140+
default:
141+
throw new ClusterRetriableError(
142+
`Cluster[${this.name}]: CurrentState - ${this.state}; Reason - ${this.clusterDetails.state_message}`
143+
);
144+
}
145+
},
146+
});
98147
}
99148

100149
async createExecutionContext(): Promise<ExecutionContext> {

0 commit comments

Comments
 (0)