Skip to content

Commit c9cec26

Browse files
authored
[DECO-167] Fix cluster stop/start (#253)
1 parent 5497c7b commit c9cec26

File tree

10 files changed

+150
-155
lines changed

10 files changed

+150
-155
lines changed

packages/databricks-sdk-js/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,17 @@
3636
},
3737
"devDependencies": {
3838
"@istanbuljs/nyc-config-typescript": "^1.0.2",
39+
"@sinonjs/fake-timers": "^10.0.0",
3940
"@types/ini": "^1.3.31",
4041
"@types/mocha": "^10.0.0",
4142
"@types/node": "^18.11.9",
42-
"@types/sinon": "^10.0.13",
43+
"@types/sinonjs__fake-timers": "^8.1.2",
4344
"@types/tmp": "^0.2.3",
4445
"@types/uuid": "^8.3.4",
4546
"eslint": "^8.26.0",
4647
"mocha": "^10.1.0",
4748
"nyc": "^15.1.0",
4849
"prettier": "^2.7.1",
49-
"sinon": "^14.0.2",
5050
"tmp-promise": "^3.0.3",
5151
"ts-loader": "^9.4.1",
5252
"ts-mocha": "^10.0.0",

packages/databricks-sdk-js/src/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ export * from "./types";
4545

4646
export {ClusterFixture, TokenFixture} from "./test/fixtures";
4747

48-
export {RetryConfigs, default as retry} from "./retries/retries";
48+
export {default as retry} from "./retries/retries";
49+
export * as retries from "./retries/retries";
50+
export type {RetryPolicy} from "./retries/retries";
4951
export {TimeUnits, default as Time} from "./retries/Time";
5052

5153
export * as logging from "./logging";

packages/databricks-sdk-js/src/retries/retries.test.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
import retry, {RetriableError, TimeoutError} from "./retries";
22
import Time, {TimeUnits} from "./Time";
3-
import * as sinon from "sinon";
3+
import FakeTimers from "@sinonjs/fake-timers";
44
import * as assert from "node:assert";
55

66
class NonRetriableError extends Error {}
77

8-
describe(__filename, function () {
9-
let fakeTimer: sinon.SinonFakeTimers;
8+
describe(__filename, () => {
9+
let fakeTimer: FakeTimers.InstalledClock;
1010

1111
beforeEach(() => {
12-
fakeTimer = sinon.useFakeTimers();
12+
fakeTimer = FakeTimers.install();
1313
});
1414

1515
afterEach(() => {
16-
fakeTimer.restore();
16+
fakeTimer.uninstall();
1717
});
1818

19-
this.timeout(1000 * 3);
2019
it("should return result if timeout doesn't expire", async function () {
2120
const startTime = Date.now();
2221

packages/databricks-sdk-js/src/retries/retries.ts

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,32 +10,63 @@ export class TimeoutError extends Error {
1010
}
1111
}
1212

13-
export class RetryConfigs {
14-
static maxJitter = new Time(750, TimeUnits.milliseconds);
15-
static minJitter = new Time(50, TimeUnits.milliseconds);
16-
static maxWaitTime = new Time(10, TimeUnits.seconds);
17-
static defaultTimeout = new Time(10, TimeUnits.minutes);
18-
19-
static waitTime(attempt: number) {
20-
const jitter = RetryConfigs.maxJitter
21-
.sub(RetryConfigs.minJitter)
13+
export interface RetryPolicy {
14+
waitTime(attempt: number): Time;
15+
}
16+
17+
export class LinearRetryPolicy {
18+
constructor(readonly _waitTime: Time) {}
19+
20+
waitTime(): Time {
21+
return this._waitTime;
22+
}
23+
}
24+
25+
export class ExponetionalBackoffWithJitterRetryPolicy implements RetryPolicy {
26+
maxJitter: Time;
27+
minJitter: Time;
28+
maxWaitTime: Time;
29+
30+
constructor(
31+
options: {
32+
maxJitter?: Time;
33+
minJitter?: Time;
34+
maxWaitTime?: Time;
35+
} = {}
36+
) {
37+
this.maxJitter =
38+
options.maxJitter || new Time(750, TimeUnits.milliseconds);
39+
this.minJitter =
40+
options.minJitter || new Time(50, TimeUnits.milliseconds);
41+
this.maxWaitTime =
42+
options.maxWaitTime || new Time(10, TimeUnits.seconds);
43+
}
44+
45+
waitTime(attempt: number): Time {
46+
const jitter = this.maxJitter
47+
.sub(this.minJitter)
2248
.multiply(Math.random())
23-
.add(RetryConfigs.minJitter);
49+
.add(this.minJitter);
2450
const timeout = new Time(attempt, TimeUnits.seconds).add(jitter);
2551

26-
return timeout.gt(RetryConfigs.maxWaitTime)
27-
? RetryConfigs.maxWaitTime
28-
: timeout;
52+
return timeout.gt(this.maxWaitTime) ? this.maxWaitTime : timeout;
2953
}
3054
}
3155

56+
export const DEFAULT_RETRY_CONFIG =
57+
new ExponetionalBackoffWithJitterRetryPolicy();
58+
59+
export const DEFAULT_MAX_TIMEOUT = new Time(10, TimeUnits.minutes);
60+
3261
interface RetryArgs<T> {
3362
timeout?: Time;
63+
retryPolicy?: RetryPolicy;
3464
fn: () => Promise<T>;
3565
}
3666

3767
export default async function retry<T>({
38-
timeout = RetryConfigs.defaultTimeout,
68+
timeout = DEFAULT_MAX_TIMEOUT,
69+
retryPolicy: retryConfig = DEFAULT_RETRY_CONFIG,
3970
fn,
4071
}: RetryArgs<T>): Promise<T> {
4172
let attempt = 1;
@@ -67,7 +98,7 @@ export default async function retry<T>({
6798
await new Promise((resolve) =>
6899
setTimeout(
69100
resolve,
70-
RetryConfigs.waitTime(attempt).toMillSeconds().value
101+
retryConfig.waitTime(attempt).toMillSeconds().value
71102
)
72103
);
73104

packages/databricks-sdk-js/src/services/Cluster.integ.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,33 @@ describe(__filename, function () {
3535
assert(clusterA.id);
3636
assert.equal(clusterA.id, clusterB?.id);
3737
});
38+
39+
// skipping because running the test takes too long
40+
it.skip("should start a stopping cluster", async () => {
41+
const token = {
42+
isCancellationRequested: false,
43+
};
44+
45+
const cluster = integSetup.cluster;
46+
// stop cluster
47+
await Promise.race([
48+
cluster.stop(token, async (info) =>
49+
// eslint-disable-next-line no-console
50+
console.log(`Stopping - ${info.state}`)
51+
),
52+
new Promise<void>((resolve) => {
53+
// cancel stop
54+
setTimeout(() => {
55+
token.isCancellationRequested = true;
56+
resolve();
57+
}, 500);
58+
}),
59+
]);
60+
61+
// start cluster
62+
await cluster.start(undefined, (state) =>
63+
// eslint-disable-next-line no-console
64+
console.log(`Starting ${state}`)
65+
);
66+
});
3867
});

packages/databricks-sdk-js/src/services/Cluster.test.ts

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,25 @@ import Time, {TimeUnits} from "../retries/Time";
77
import {getMockTestCluster} from "../test/fixtures/ClusterFixtures";
88
import {ClusterInfo} from "../apis/clusters";
99
import {TokenFixture} from "../test/fixtures/TokenFixtures";
10-
import {RetryConfigs} from "../retries/retries";
10+
import FakeTimers from "@sinonjs/fake-timers";
1111

1212
describe(__filename, function () {
1313
this.timeout(new Time(10, TimeUnits.minutes).toMillSeconds().value);
1414

1515
let mockedClient: ApiClient;
1616
let mockedCluster: Cluster;
1717
let testClusterDetails: ClusterInfo;
18+
let fakeTimer: FakeTimers.InstalledClock;
1819

1920
beforeEach(async () => {
2021
({mockedCluster, mockedClient, testClusterDetails} =
2122
await getMockTestCluster());
2223

23-
RetryConfigs.waitTime = () => {
24-
return new Time(0, TimeUnits.milliseconds);
25-
};
24+
fakeTimer = FakeTimers.install();
25+
});
26+
27+
afterEach(() => {
28+
fakeTimer.uninstall();
2629
});
2730

2831
it("calling start on a non terminated state should not throw an error", async () => {
@@ -65,7 +68,9 @@ describe(__filename, function () {
6568
await mockedCluster.refresh();
6669
assert.notEqual(mockedCluster.state, "RUNNING");
6770

68-
await mockedCluster.start();
71+
const startPromise = mockedCluster.start();
72+
await fakeTimer.runToLastAsync();
73+
await startPromise;
6974
assert.equal(mockedCluster.state, "RUNNING");
7075

7176
verify(
@@ -125,7 +130,10 @@ describe(__filename, function () {
125130

126131
assert.equal(mockedCluster.state, "RUNNING");
127132

128-
await mockedCluster.stop();
133+
const stopPromise = mockedCluster.stop();
134+
await fakeTimer.runToLastAsync();
135+
await stopPromise;
136+
129137
assert.equal(mockedCluster.state, "TERMINATED");
130138

131139
verify(
@@ -175,7 +183,10 @@ describe(__filename, function () {
175183
await mockedCluster.refresh();
176184
assert.notEqual(mockedCluster.state, "RUNNING");
177185

178-
await mockedCluster.stop();
186+
const stopPromise = mockedCluster.stop();
187+
await fakeTimer.runToLastAsync();
188+
await stopPromise;
189+
179190
assert.equal(mockedCluster.state, "TERMINATED");
180191

181192
verify(
@@ -243,7 +254,9 @@ describe(__filename, function () {
243254
await mockedCluster.refresh();
244255

245256
assert.equal(mockedCluster.state, "PENDING");
246-
await mockedCluster.start(instance(token));
257+
const startPromise = mockedCluster.start(instance(token));
258+
await fakeTimer.runToLastAsync();
259+
await startPromise;
247260

248261
verify(token.isCancellationRequested).thrice();
249262
});

packages/databricks-sdk-js/src/services/Cluster.ts

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/* eslint-disable @typescript-eslint/naming-convention */
22

33
import {ApiClient} from "../api-client";
4-
import retry, {RetriableError} from "../retries/retries";
4+
import retry, {LinearRetryPolicy, RetriableError} from "../retries/retries";
55
import {
66
JobsService,
77
RunLifeCycleState,
@@ -11,7 +11,7 @@ import {
1111
import {CancellationToken} from "../types";
1212
import {ExecutionContext} from "./ExecutionContext";
1313
import {WorkflowRun} from "./WorkflowRun";
14-
import {commands, PermissionsService} from "..";
14+
import {commands, PermissionsService, Time, TimeUnits} from "..";
1515
import {
1616
ClusterInfo,
1717
ClustersService,
@@ -189,6 +189,8 @@ export class Cluster {
189189
onProgress: (state: ClusterInfoState) => void = () => {}
190190
) {
191191
await this.refresh();
192+
onProgress(this.state);
193+
192194
if (this.state === "RUNNING") {
193195
return;
194196
}
@@ -203,6 +205,30 @@ export class Cluster {
203205
});
204206
}
205207

208+
// wait for cluster to be stopped before re-starting
209+
if (this.state === "TERMINATING") {
210+
await retry<void>({
211+
timeout: new Time(1, TimeUnits.minutes),
212+
retryPolicy: new LinearRetryPolicy(
213+
new Time(1, TimeUnits.seconds)
214+
),
215+
fn: async () => {
216+
if (token?.isCancellationRequested) {
217+
return;
218+
}
219+
await this.refresh();
220+
onProgress(this.state);
221+
222+
if (this.state === "TERMINATING") {
223+
throw new RetriableError();
224+
}
225+
},
226+
});
227+
await this.clusterApi.start({
228+
cluster_id: this.id,
229+
});
230+
}
231+
206232
this._canExecute = undefined;
207233
await retry({
208234
fn: async () => {

packages/databricks-vscode/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,11 +564,13 @@
564564
},
565565
"devDependencies": {
566566
"@istanbuljs/nyc-config-typescript": "^1.0.2",
567+
"@sinonjs/fake-timers": "^10.0.0",
567568
"@types/fs-extra": "^9.0.13",
568569
"@types/glob": "^8.0.0",
569570
"@types/mocha": "^10.0.0",
570571
"@types/mock-require": "^2.0.1",
571572
"@types/node": "^18.11.9",
573+
"@types/sinonjs__fake-timers": "^8.1.2",
572574
"@types/tmp": "^0.2.3",
573575
"@types/vscode": "^1.69.1",
574576
"@types/yargs": "^17.0.14",
@@ -609,4 +611,4 @@
609611
],
610612
"report-dir": "coverage"
611613
}
612-
}
614+
}

packages/databricks-vscode/src/cluster/ClusterManager.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ import {
55
cluster,
66
Cluster,
77
ClusterFixture,
8-
RetryConfigs,
98
Time,
109
TimeUnits,
10+
retries,
1111
} from "@databricks/databricks-sdk";
1212
import {
1313
anything,
@@ -49,7 +49,7 @@ describe(__filename, async () => {
4949

5050
resetCalls(mockedClient);
5151

52-
RetryConfigs.waitTime = () => {
52+
retries.DEFAULT_RETRY_CONFIG.waitTime = () => {
5353
return new Time(0, TimeUnits.milliseconds);
5454
};
5555
});

0 commit comments

Comments
 (0)