Skip to content

Commit 9dcbd87

Browse files
Deploy and run resource (#1012)
## Changes * show deploy and run button in the UI * Open a new terminal that shows the live run of the deployment. Terminals are keyed by `resource-key` and `target`. This allows us to reuse existing terminals for a new run. * Running state is preserved across target changes. * The run button changes to cancel button when running. Also show a spinner for the relevant resources. **Note**: The cancel button ONLY kills the cli process. It does not cancel the online run. ## Tests <!-- How is this tested? -->
1 parent f6dcb48 commit 9dcbd87

File tree

13 files changed

+488
-49
lines changed

13 files changed

+488
-49
lines changed

packages/databricks-vscode/package.json

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,26 @@
245245
"enablement": "databricks.context.activated && databricks.context.bundle.isTargetSet",
246246
"category": "Databricks"
247247
},
248+
{
249+
"command": "databricks.bundle.deployAndRun",
250+
"icon": "$(debug-start)",
251+
"title": "Deploy the bundle and run the resource",
252+
"enablement": "databricks.context.activated && databricks.context.bundle.isTargetSet",
253+
"category": "Databricks"
254+
},
255+
{
256+
"command": "databricks.bundle.cancelAllRuns",
257+
"title": "Cancel all runs",
258+
"enablement": "databricks.context.activated && databricks.context.bundle.isTargetSet",
259+
"category": "Databricks"
260+
},
261+
{
262+
"command": "databricks.bundle.cancelRun",
263+
"title": "Cancel run",
264+
"icon": "$(debug-stop)",
265+
"enablement": "databricks.context.activated && databricks.context.bundle.isTargetSet",
266+
"category": "Databricks"
267+
},
248268
{
249269
"command": "databricks.bundle.initNewProject",
250270
"title": "Initialize new project",
@@ -404,6 +424,16 @@
404424
"command": "databricks.cluster.start",
405425
"when": "view == configurationView && viewItem == databricks.configuration.cluster.terminated",
406426
"group": "inline@0"
427+
},
428+
{
429+
"command": "databricks.bundle.deployAndRun",
430+
"when": "view == dabsResourceExplorerView && viewItem =~ /^databricks.bundle.*.runnable.*$/",
431+
"group": "inline@0"
432+
},
433+
{
434+
"command": "databricks.bundle.cancelRun",
435+
"when": "view == dabsResourceExplorerView && viewItem =~ /^databricks.bundle.*.running.*$/",
436+
"group": "inline@0"
407437
}
408438
],
409439
"editor/title": [

packages/databricks-vscode/src/bundle/BundleCommands.ts

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,42 @@
11
import {Disposable, window} from "vscode";
22
import {BundleRemoteStateModel} from "./models/BundleRemoteStateModel";
33
import {onError} from "../utils/onErrorDecorator";
4+
import {
5+
TreeNode as BundleResourceExplorerTreeNode,
6+
ResourceTreeNode as BundleResourceExplorerResourceTreeNode,
7+
} from "../ui/bundle-resource-explorer/types";
8+
import {BundleRunManager} from "./BundleRunManager";
9+
10+
const RUNNABLE_RESOURCES = [
11+
"pipelines",
12+
"jobs",
13+
] satisfies BundleResourceExplorerTreeNode["type"][];
14+
15+
function isRunnable(
16+
treeNode: BundleResourceExplorerTreeNode
17+
): treeNode is BundleResourceExplorerResourceTreeNode {
18+
return (RUNNABLE_RESOURCES as string[]).includes(treeNode.type);
19+
}
420

521
export class BundleCommands implements Disposable {
622
private disposables: Disposable[] = [];
723
private outputChannel = window.createOutputChannel(
824
"Databricks Asset Bundles"
925
);
1026

11-
constructor(private bundleRemoteStateModel: BundleRemoteStateModel) {
27+
private writeToChannel = (data: string) => {
28+
this.outputChannel.append(data);
29+
};
30+
31+
private prepareOutputChannel() {
32+
this.outputChannel.show(true);
33+
this.outputChannel.appendLine("");
34+
}
35+
36+
constructor(
37+
private readonly bundleRemoteStateModel: BundleRemoteStateModel,
38+
private readonly bundleRunManager: BundleRunManager
39+
) {
1240
this.disposables.push(this.outputChannel);
1341
}
1442

@@ -19,21 +47,44 @@ export class BundleCommands implements Disposable {
1947

2048
@onError({popup: {prefix: "Error deploying the bundle."}})
2149
async deploy() {
22-
this.outputChannel.show(true);
23-
this.outputChannel.appendLine("");
50+
await window.withProgress(
51+
{location: {viewId: "dabsResourceExplorerView"}},
52+
async () => {
53+
await this.bundleRemoteStateModel.deploy(
54+
this.writeToChannel,
55+
this.writeToChannel
56+
);
57+
}
58+
);
59+
}
2460

25-
const writeToChannel = (data: string) => {
26-
this.outputChannel.append(data);
27-
};
61+
@onError({popup: {prefix: "Error running resource."}})
62+
async deployAndRun(treeNode: BundleResourceExplorerTreeNode) {
63+
if (!isRunnable(treeNode)) {
64+
throw new Error(`Cannot run resource of type ${treeNode.type}`);
65+
}
66+
//TODO: Don't deploy if there is no diff between local and remote state
67+
this.prepareOutputChannel();
2868
await window.withProgress(
2969
{location: {viewId: "dabsResourceExplorerView"}},
3070
async () => {
3171
await this.bundleRemoteStateModel.deploy(
32-
writeToChannel,
33-
writeToChannel
72+
this.writeToChannel,
73+
this.writeToChannel
3474
);
3575
}
3676
);
77+
78+
await this.bundleRunManager.run(treeNode.resourceKey);
79+
}
80+
81+
@onError({popup: {prefix: "Error cancelling run."}})
82+
async cancelRun(treeNode: BundleResourceExplorerTreeNode) {
83+
if (!isRunnable(treeNode)) {
84+
throw new Error(`Resource of ${treeNode.type} is not runnable`);
85+
}
86+
87+
this.bundleRunManager.cancel(treeNode.resourceKey);
3788
}
3889

3990
dispose() {
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
import {
2+
CancellationTokenSource,
3+
Disposable,
4+
EventEmitter,
5+
Terminal,
6+
window,
7+
} from "vscode";
8+
import {BundleRemoteStateModel} from "./models/BundleRemoteStateModel";
9+
import {CustomOutputTerminal} from "./CustomOutputTerminal";
10+
11+
export class BundleRunManager implements Disposable {
12+
private onDidChangeEmitter = new EventEmitter<void>();
13+
readonly onDidChange = this.onDidChangeEmitter.event;
14+
15+
private disposables: Disposable[] = [];
16+
private terminalDetails: Map<
17+
string,
18+
{
19+
terminal: Terminal;
20+
pty: CustomOutputTerminal;
21+
}
22+
> = new Map();
23+
24+
private cancellationTokenSources: Map<string, CancellationTokenSource> =
25+
new Map();
26+
27+
private _isRunning: Map<string, boolean> = new Map();
28+
29+
isRunning(resourceKey: string) {
30+
const target = this.bundleRemoteStateModel.target;
31+
if (target === undefined) {
32+
return false;
33+
}
34+
const terminalName = this.getTerminalName(target, resourceKey);
35+
return this._isRunning.get(terminalName) ?? false;
36+
}
37+
constructor(
38+
private readonly bundleRemoteStateModel: BundleRemoteStateModel
39+
) {}
40+
41+
getTerminalName(target: string, resourceKey: string) {
42+
return `Run ${resourceKey} (${target})`;
43+
}
44+
45+
async run(resourceKey: string) {
46+
const target = this.bundleRemoteStateModel.target;
47+
if (target === undefined) {
48+
throw new Error(`Cannot run ${resourceKey}, Target is undefined`);
49+
}
50+
const terminalName = this.getTerminalName(target, resourceKey);
51+
52+
if (!this.terminalDetails.has(terminalName)) {
53+
this.terminalDetails.set(
54+
terminalName,
55+
this.createTerminal(terminalName)
56+
);
57+
}
58+
let terminal = this.terminalDetails.get(terminalName)!;
59+
60+
const disposables: Disposable[] = [];
61+
try {
62+
terminal.terminal.show();
63+
if (
64+
window.terminals.find(
65+
(i) => i.name === terminal?.terminal.name
66+
) === undefined
67+
) {
68+
// The terminal has been closed. Recreate everything.
69+
terminal = this.createTerminal(terminalName);
70+
this.terminalDetails.set(terminalName, terminal);
71+
}
72+
if (terminal.pty.process !== undefined) {
73+
// There is already a process running. Raise error
74+
throw new Error(
75+
`Process already running. Pid: ${terminal.pty.process.pid}`
76+
);
77+
}
78+
79+
const cancellationTokenSource = new CancellationTokenSource();
80+
this.cancellationTokenSources.set(
81+
terminalName,
82+
cancellationTokenSource
83+
);
84+
const onCancellationEvent =
85+
cancellationTokenSource.token.onCancellationRequested(() => {
86+
terminal?.pty.close();
87+
//Dispose self on cancellation
88+
onCancellationEvent.dispose();
89+
}, this.disposables);
90+
91+
const cmd = this.bundleRemoteStateModel.getRunCommand(resourceKey);
92+
93+
this._isRunning.set(terminalName, true);
94+
this.onDidChangeEmitter.fire();
95+
96+
// spawn a new process with the latest command, in the same terminal.
97+
terminal.pty.spawn(cmd);
98+
terminal.terminal.show();
99+
100+
// Wait for the process to exit
101+
await new Promise<void>((resolve, reject) => {
102+
if (terminal === undefined) {
103+
resolve();
104+
return;
105+
}
106+
disposables.push(
107+
terminal.pty.onDidCloseProcess((exitCode) => {
108+
if (exitCode === 0) {
109+
resolve();
110+
} else {
111+
reject(
112+
new Error(
113+
`Process exited with code ${exitCode}`
114+
)
115+
);
116+
}
117+
}),
118+
window.onDidCloseTerminal((e) => {
119+
e.name === terminal.terminal.name &&
120+
reject(new Error("Terminal closed"));
121+
})
122+
);
123+
});
124+
} finally {
125+
this._isRunning.delete(terminalName);
126+
this.onDidChangeEmitter.fire();
127+
128+
disposables.forEach((i) => i.dispose());
129+
130+
this.cancellationTokenSources.get(terminalName)?.cancel();
131+
this.cancellationTokenSources.get(terminalName)?.dispose();
132+
this.cancellationTokenSources.delete(terminalName);
133+
}
134+
}
135+
136+
createTerminal(terminalName: string) {
137+
const pty = new CustomOutputTerminal();
138+
const terminal = {
139+
pty,
140+
terminal: window.createTerminal({
141+
name: terminalName,
142+
pty,
143+
isTransient: true,
144+
}),
145+
};
146+
147+
this.disposables.push(terminal.terminal);
148+
return terminal;
149+
}
150+
151+
cancel(resourceKey: string) {
152+
const target = this.bundleRemoteStateModel.target;
153+
if (target === undefined) {
154+
throw new Error(
155+
`Cannot delete ${resourceKey}, Target is undefined`
156+
);
157+
}
158+
159+
const terminalName = this.getTerminalName(target, resourceKey);
160+
this.cancellationTokenSources.get(terminalName)?.cancel();
161+
this.cancellationTokenSources.get(terminalName)?.dispose();
162+
this.cancellationTokenSources.delete(terminalName);
163+
}
164+
165+
cancelAll() {
166+
this.cancellationTokenSources.forEach((cs) => {
167+
cs.cancel();
168+
});
169+
170+
this.cancellationTokenSources.clear();
171+
}
172+
173+
dispose() {
174+
this.disposables.forEach((i) => i.dispose());
175+
}
176+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import {ChildProcess, SpawnOptions, spawn} from "child_process";
2+
import {Pseudoterminal, Event, EventEmitter} from "vscode";
3+
4+
export class CustomOutputTerminal implements Pseudoterminal {
5+
private writeEmitter = new EventEmitter<string>();
6+
onDidWrite: Event<string> = this.writeEmitter.event;
7+
8+
private closeEmitter = new EventEmitter<number | void>();
9+
onDidClose: Event<number | void> = this.closeEmitter.event;
10+
11+
private onDidCloseProcessEmitter = new EventEmitter<number | null>();
12+
onDidCloseProcess: Event<number | null> =
13+
this.onDidCloseProcessEmitter.event;
14+
15+
private _process: ChildProcess | undefined;
16+
public get process(): ChildProcess | undefined {
17+
return this._process;
18+
}
19+
20+
constructor() {}
21+
22+
open(): void {}
23+
24+
spawn({
25+
cmd,
26+
args,
27+
options,
28+
}: {
29+
cmd: string;
30+
args: string[];
31+
options: SpawnOptions;
32+
}): void {
33+
// Clear and reset terminal
34+
this.writeEmitter.fire("\x1b[2J\x1b[H");
35+
this._process = spawn(cmd, args, options);
36+
if (!this.process) {
37+
throw new Error("Can't start process: process is undefined");
38+
}
39+
40+
if (!this.process.stderr) {
41+
throw new Error("Can't start process: can't pipe stderr process");
42+
}
43+
44+
if (!this.process.stdout) {
45+
throw new Error("Can't start process: can't pipe stdout process");
46+
}
47+
48+
const handleOutput = (data: Buffer) => {
49+
let dataStr = data.toString();
50+
dataStr = dataStr.replaceAll("\n", "\r\n");
51+
if (!dataStr.endsWith("\r\n")) {
52+
dataStr = dataStr + "\r\n";
53+
}
54+
this.writeEmitter.fire(dataStr);
55+
};
56+
this.process.stdout.on("data", handleOutput);
57+
this.process.stderr.on("data", handleOutput);
58+
59+
this.process.on("close", (exitCode) => {
60+
this.onDidCloseProcessEmitter.fire(exitCode);
61+
this._process = undefined;
62+
});
63+
64+
this.process.on("error", (err) => {
65+
this.writeEmitter.fire("\x1b[31m" + err.message + "\x1b[0m\r\n");
66+
this.writeEmitter.fire(
67+
"\x1b[31m" + (err.stack ?? "") + "\x1b[0m\r\n"
68+
);
69+
});
70+
}
71+
72+
close(): void {
73+
if (this.process !== undefined) {
74+
this.writeEmitter.fire(
75+
"\x1b[31mProcess killed by user input\x1b[0m\r\n"
76+
);
77+
}
78+
this.process?.kill();
79+
}
80+
}

0 commit comments

Comments
 (0)