/
ctx.ts
executable file
·440 lines (376 loc) · 10.4 KB
/
ctx.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
import * as events from "./events";
import { StorageProvider, Source, Destination } from "../storage";
import { Callable, logger } from "../utils";
export class CommandContainer {
private _commands;
constructor() {
this._commands = [];
return new Proxy(this, this.getattr());
}
commands() {
return this._commands;
}
getattr() {
const self = this;
return {
get(target, name) {
if (target[name] !== undefined) {
return target[name];
}
const newFunction = function () {
let _arguments = {};
let args = arguments[0] || {};
for (const [key, value] of Object.entries(args)) {
_arguments = {
..._arguments,
[key.startsWith("_") ? key.slice(1) : key]: value,
};
}
let idx = self._commands.length;
self._commands.push({ [name]: _arguments });
return idx;
};
return new Proxy(newFunction, {
apply: function (target, thisArg, argumentsList) {
return target.apply(thisArg, argumentsList);
},
});
},
};
}
}
export class Work {
public output: object[] = [];
public attestation?: object;
// Executes before commands are send to provider.
async prepare() {}
// A hook which adds the required command to the exescript.
register(commands: CommandContainer) {}
// A hook to be executed on requestor's end after the script has finished.
async post() {}
timeout() {
return null
}
}
class _InitStep extends Work {
register(commands: any) {
//CommandContainer
commands.deploy();
commands.start();
}
}
class _SendWork extends Work {
private _storage: StorageProvider;
private _dst_path: string;
private _src?: Source | null;
private _idx: Number | null;
constructor(storage: StorageProvider, dst_path: string) {
super();
this._storage = storage;
this._dst_path = dst_path;
this._src = null;
this._idx = null;
}
async do_upload(storage: StorageProvider): Promise<Source> {
return new Promise((resolve) => resolve(new Source())); //TODO check this
}
async prepare(): Promise<void> {
this._src = await this.do_upload(this._storage);
}
register(commands: any) {
//CommandContainer
if (!this._src) throw "cmd prepared";
this._idx = commands.transfer({
_from: this._src.download_url(),
_to: `container:${this._dst_path}`,
_args: {},
});
}
}
class _SendJson extends _SendWork {
private _cnt: number;
private _data: Buffer | null;
constructor(storage: StorageProvider, data: {}, dst_path: string) {
super(storage, dst_path);
this._cnt = 0;
this._data = Buffer.from(JSON.stringify(data), "utf-8"); //Optional[bytes]
}
async do_upload(storage: StorageProvider): Promise<Source> {
this._cnt += 1;
if (!this._data) throw `json buffer unintialized ${this._cnt}`;
let src = await storage.upload_bytes(this._data);
this._data = null;
return src;
}
}
class _SendFile extends _SendWork {
private _src_path: string;
constructor(storage: StorageProvider, src_path: string, dst_path: string) {
super(storage, dst_path);
this._src_path = src_path;
}
async do_upload(storage: StorageProvider): Promise<Source> {
return await storage.upload_file(this._src_path);
}
}
class _Run extends Work {
private cmd;
private args;
private env;
private _idx;
private _stdout?: CaptureContext;
private _stderr?: CaptureContext;
constructor(
cmd: string,
args: Iterable<string> = [],
env: {} | null = null,
stdout?: CaptureContext,
stderr?: CaptureContext,) {
super();
this.cmd = cmd;
this.args = args;
this.env = env;
this._idx = null;
this._stdout = stdout;
this._stderr = stderr;
}
register(commands: any) {
let capture = {}
if (this._stdout)
capture['stdout'] = this._stdout.to_dict()
if (this._stderr)
capture['stderr'] = this._stderr.to_dict()
//CommandContainer
this._idx = commands.run({
entry_point: this.cmd,
args: this.args || [],
capture,
});
}
}
class _Sign extends Work {
private _idx;
constructor() {
super();
this._idx = null;
}
register(commands: any) {
//CommandContainer
this._idx = commands.sign({});
}
}
const StorageEvent = events.DownloadStarted || events.DownloadFinished;
class _RecvFile extends Work {
private _storage;
private _dst_path;
private _src_path!: string;
private _dst_slot: Destination | null;
private _idx: number | null;
private _emitter?: Callable<[StorageEvent], void> | null = null;
constructor(
storage: StorageProvider,
src_path: string,
dst_path: string,
emitter: Callable<[StorageEvent], void> | null = null
) {
super();
this._storage = storage;
this._dst_path = dst_path;
this._src_path = src_path;
this._dst_slot = null;
this._idx = null;
this._emitter = emitter;
}
async prepare() {
this._dst_slot = await this._storage.new_destination(this._dst_path);
}
register(commands: any) {
//CommandContainer
if (!this._dst_slot) throw "_RecvFile command creation without prepare";
this._idx = commands.transfer({
_from: `container:${this._src_path}`,
_to: this._dst_slot!.upload_url(),
});
}
async post(): Promise<void> {
if (!this._dst_slot) throw "_RecvFile post without prepare";
if (this._emitter)
this._emitter(new events.DownloadStarted({ path: this._src_path }));
await this._dst_slot.download_file(this._dst_path);
if (this._emitter)
this._emitter(new events.DownloadFinished({ path: this._dst_path }));
}
}
class _Steps extends Work {
private _steps: Work[] = [];
private _timeout?: number;
constructor(steps: Work | Work[], timeout?: number) {
super();
if (steps instanceof Work) this._steps.push(steps);
else steps.forEach((step) => this._steps.push(step));
this._timeout = timeout;
}
timeout(): any {
return this._timeout;
}
// Execute the `prepare` hook for all the defined steps.
async prepare() {
for (let step of this._steps) {
await step.prepare();
}
}
// Execute the `register` hook for all the defined steps.
register(commands: CommandContainer) {
for (let step of this._steps) {
step.register(commands);
}
}
// Execute the `post` step for all the defined steps.
async post() {
for (let step of this._steps) {
await step.post();
}
}
}
export class ExecOptions {
wait_for_results: boolean = true;
batch_timeout?: number | null;
}
/**
* An object used to schedule commands to be sent to provider.
*/
export class WorkContext {
private _id;
private _storage: StorageProvider;
private _pending_steps: Work[];
private _started: boolean;
private _emitter: Callable<[StorageEvent], void> | null;
constructor(
ctx_id: string,
storage: StorageProvider,
emitter: Callable<[StorageEvent], void> | null = null
) {
this._id = ctx_id;
this._storage = storage;
this._pending_steps = [];
this._started = false;
this._emitter = emitter;
}
_prepare() {
if (!this._started) {
this._pending_steps.push(new _InitStep());
this._started = true;
}
}
begin() {}
/**
* Schedule sending JSON data to the provider.
*
* @param json_path remote (provider) path
* @param data object representing JSON data
*/
send_json(json_path: string, data: {}) {
this._prepare();
this._pending_steps.push(new _SendJson(this._storage, data, json_path));
}
/**
* Schedule sending file to the provider.
*
* @param src_path local (requestor) path
* @param dst_path remote (provider) path
*/
send_file(src_path: string, dst_path: string) {
this._prepare();
this._pending_steps.push(new _SendFile(this._storage, src_path, dst_path));
}
/**
* Schedule running a command.
*
* @param cmd command to run on the provider, e.g. /my/dir/run.sh
* @param args command arguments, e.g. "input1.txt", "output1.txt"
* @param env optional object with environmental variables
*/
run(cmd: string, args?: Iterable<string>, env: object | null = null) {
const stdout = CaptureContext.build("all");
const stderr = CaptureContext.build("all");
this._prepare();
this._pending_steps.push(new _Run(cmd, args, env, stdout, stderr));
}
/**
* Schedule downloading remote file from the provider.
*
* @param src_path remote (provider) path
* @param dst_path local (requestor) path
*/
download_file(src_path: string, dst_path: string) {
this._prepare();
this._pending_steps.push(
new _RecvFile(this._storage, src_path, dst_path, this._emitter)
);
}
sign() {
this._prepare();
this._pending_steps.push(new _Sign());
}
log(args) {
logger.info(`${this._id}: ${args}`);
}
/**
* Creates sequence of commands to be sent to provider.
*
* @returns Work object (the latter contains sequence commands added before calling this method)
*/
commit({ timeout }: { timeout?: number } = {}): Work {
let steps = this._pending_steps;
this._pending_steps = [];
return new _Steps(steps, timeout);
}
}
enum CaptureMode {
HEAD = "head",
TAIL = "tail",
HEAD_TAIL = "headTail",
STREAM = "stream"
}
enum CaptureFormat {
BIN = "bin",
STR = "str"
}
class CaptureContext {
private mode!: string;
private limit?: number;
private fmt?: string;
constructor(mode, limit?, fmt?) {
this.mode = mode;
this.limit = limit;
this.fmt = fmt
}
static build(mode?: string, limit?: number, fmt?: string): CaptureContext {
if(!mode || mode === "all") {
mode = CaptureMode.HEAD;
limit = undefined;
}
fmt = fmt ? fmt.toLowerCase() : "str";
const get_key = (e: object, s: string) => Object.keys(e).find(k => e[k] == s);
if (!get_key(CaptureMode, mode)) {
throw new Error(`Invalid output capture mode: ${mode}`)
}
if (!get_key(CaptureFormat, fmt)) {
throw new Error(`Invalid output capture format: ${fmt}`)
}
return new CaptureContext(mode, limit, fmt);
}
to_dict() {
let inner = {}
if(this.limit) {
inner[this.mode] = this.limit;
}
if(this.fmt) {
inner["format"] = this.fmt;
}
return {[this.mode === CaptureMode.STREAM ? "stream" : "atEnd"]: inner}
}
is_streaming() {
return this.mode === CaptureMode.STREAM;
}
}