Skip to content

Commit 0e09ee5

Browse files
Block exec until sync is complete (#134)
This PR adds code to parse logs from bricks sync to compute the state of sync completeness. If sync is in progress (ie upload/delete requests are inflight to the workspace), we block execution in the databricks runner Video for exec: https://user-images.githubusercontent.com/88374338/197779331-4b560bcf-63b2-4239-a3fb-4ee7a0814b62.mov Video for exec as workflow https://user-images.githubusercontent.com/88374338/198030331-737e3502-3276-4d68-acb5-4b1229431fca.mov Co-authored-by: Fabian Jakobs <fabian.jakobs@databricks.com>
1 parent 849e69c commit 0e09ee5

File tree

9 files changed

+448
-58
lines changed

9 files changed

+448
-58
lines changed
Lines changed: 104 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import {ApiClient, Repo} from "@databricks/databricks-sdk";
22
import * as assert from "assert";
33
import {anything, instance, mock, when, verify} from "ts-mockito";
4-
import {ProcessExecution, Uri} from "vscode";
4+
import {ProcessExecution, Uri, EventEmitter} from "vscode";
55
import {ConnectionManager} from "../configuration/ConnectionManager";
6-
import {SyncDestination} from "../configuration/SyncDestination";
7-
import {BricksTaskProvider, SyncTask} from "./BricksTasks";
6+
import {SyncState} from "../sync/CodeSynchronizer";
7+
import {BricksTaskProvider, SyncTask, BricksSyncParser} from "./BricksTasks";
88
import {CliWrapper} from "./CliWrapper";
99

1010
describe(__filename, () => {
@@ -26,49 +26,120 @@ describe(__filename, () => {
2626
});
2727

2828
it("should create a sync task", () => {
29-
let task = new SyncTask(connection, cli, "incremental");
29+
let task = new SyncTask(
30+
connection,
31+
cli,
32+
"incremental",
33+
(state: SyncState) => {}
34+
);
3035

3136
assert.equal(task.definition.type, "databricks");
3237
assert.equal(task.definition.task, "sync");
3338
assert.equal(task.isBackground, true);
3439
assert.deepEqual(task.problemMatchers, ["$bricks-sync"]);
3540
});
41+
});
3642

37-
it("should lazily create a process execution", async () => {
38-
let connectionMock = mock(ConnectionManager);
39-
const testSyncDestination = new SyncDestination(
40-
instance(mock(Repo)),
41-
Uri.from({
42-
scheme: "dbws",
43-
path: "/Workspace/notebook-best-practices",
44-
}),
45-
Uri.file("/Desktop/notebook-best-practices")
43+
describe("tests for BricksSycnParser", () => {
44+
let syncState: SyncState = "STOPPED";
45+
let bricksSycnParser: BricksSyncParser;
46+
47+
const syncStateCallback = (state: SyncState) => {
48+
syncState = state;
49+
};
50+
51+
beforeEach(() => {
52+
syncState = "STOPPED";
53+
bricksSycnParser = new BricksSyncParser(
54+
syncStateCallback,
55+
mock(EventEmitter<string>)
4656
);
47-
when(connectionMock.profile).thenReturn("DEFAULT");
48-
when(connectionMock.me).thenReturn("fabian.jakobs@databricks.com");
49-
when(connectionMock.syncDestination).thenReturn(testSyncDestination);
57+
});
5058

51-
let cliMock = mock(CliWrapper);
52-
when(cliMock.getSyncCommand(anything())).thenReturn({
53-
command: "bricks",
54-
args: ["sync"],
55-
});
59+
it("processing empty logs transitions sync status from INACTIVE -> WATCHING_FOR_CHANGES", () => {
60+
assert.equal(syncState, "STOPPED");
61+
bricksSycnParser.process("");
62+
assert.equal(syncState, "WATCHING_FOR_CHANGES");
63+
});
5664

57-
let task = new SyncTask(
58-
instance(connectionMock),
59-
instance(cliMock),
60-
"incremental"
65+
it("processing action log transitions sync status from INACTIVE -> INPROGRESS", () => {
66+
assert.equal(syncState, "STOPPED");
67+
bricksSycnParser.process("Action: PUT: hello.txt");
68+
assert.equal(syncState, "IN_PROGRESS");
69+
});
70+
71+
it("test bricksSycnParser.process correctly keeps track of state of inflight requests", () => {
72+
// recieving some random logs from bricks sync
73+
assert.equal(syncState, "STOPPED");
74+
bricksSycnParser.process("some random logs");
75+
assert.equal(syncState, "WATCHING_FOR_CHANGES");
76+
77+
// upload hello.txt
78+
bricksSycnParser.process("Action: PUT: hello.txt");
79+
assert.equal(syncState, "IN_PROGRESS");
80+
bricksSycnParser.process("Uploaded hello.txt");
81+
assert.equal(syncState, "WATCHING_FOR_CHANGES");
82+
83+
// delete bye.txt
84+
bricksSycnParser.process("Action: DELETE: bye.txt");
85+
assert.equal(syncState, "IN_PROGRESS");
86+
bricksSycnParser.process("Deleted bye.txt");
87+
assert.equal(syncState, "WATCHING_FOR_CHANGES");
88+
89+
// both upload and delete some random prefix string that should be ignored
90+
bricksSycnParser.process(
91+
"[INFO] foo bar Action: PUT: a.txt DELETE: b.txt"
92+
);
93+
bricksSycnParser.process("Uploaded a.txt");
94+
assert.equal(syncState, "IN_PROGRESS");
95+
bricksSycnParser.process("Deleted b.txt");
96+
assert.equal(syncState, "WATCHING_FOR_CHANGES");
97+
98+
// upload and delete multiple files
99+
bricksSycnParser.process(
100+
"Action: PUT: a.txt, c.txt DELETE: b.txt, d.txt"
61101
);
62-
assert.ok(task.execution);
102+
bricksSycnParser.process("Uploaded a.txt");
103+
assert.equal(syncState, "IN_PROGRESS");
104+
bricksSycnParser.process("Deleted b.txt");
105+
assert.equal(syncState, "IN_PROGRESS");
106+
bricksSycnParser.process("Deleted d.txt");
107+
assert.equal(syncState, "IN_PROGRESS");
108+
bricksSycnParser.process("Uploaded c.txt");
109+
assert.equal(syncState, "WATCHING_FOR_CHANGES");
63110

64-
let execution = task.execution as ProcessExecution;
111+
// multi line logs
112+
bricksSycnParser.process(
113+
"Action: PUT: a.txt, c.txt DELETE: b.txt, d.txt\n" +
114+
"Uploaded a.txt\n" +
115+
"some random text\n" +
116+
"Uploaded c.txt"
117+
);
118+
bricksSycnParser.process("Deleted b.txt");
119+
assert.equal(syncState, "IN_PROGRESS");
120+
bricksSycnParser.process("Deleted d.txt");
121+
assert.equal(syncState, "WATCHING_FOR_CHANGES");
122+
});
65123

66-
const syncCommandMock = cliMock.getSyncCommand(anything());
124+
it("uploaded logs for untracked files throw errors", () => {
125+
assert.throws(
126+
() => {
127+
bricksSycnParser.process("Uploaded a.txt");
128+
},
129+
{
130+
message: /untracked file uploaded/,
131+
}
132+
);
133+
});
67134

68-
verify(syncCommandMock).never();
69-
assert.equal(execution.process, "bricks");
70-
verify(syncCommandMock).once();
71-
assert.deepEqual(execution.args, ["sync"]);
72-
verify(syncCommandMock).once();
135+
it("delete logs for untracked files throw errors", () => {
136+
assert.throws(
137+
() => {
138+
bricksSycnParser.process("Deleted a.txt");
139+
},
140+
{
141+
message: /untracked file deleted/,
142+
}
143+
);
73144
});
74145
});

0 commit comments

Comments
 (0)