/
runner.js
105 lines (96 loc) · 3.31 KB
/
runner.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import r from 'rethinkdb';
import uuid from 'node-uuid';
import logger from '../../logger';
import setupDb, {testExchange, PipelineLog, Pipeline} from '../../db';
import {runPipeline} from './runPipeline';
import service from '../../runner/service';
// get args
const [,, id, pipelineJSON] = process.argv;
const pipeline = JSON.parse(pipelineJSON);
logger.debug('running child with id', id);
// generate sessionId
const sessionId = uuid.v4();
// promises to do before exit
const promises = [];
// setup db, then do work
setupDb().then(async () => {
// get reply topic
const replyTopic = testExchange.topic(id + '.out');
// delayed exit command
const delayedExit = ({
status = 'done',
message = 'success',
} = {}) => {
// do delayed notification that we're done
setTimeout(() => {
promises.push(replyTopic.publish({
data: [],
done: true
}));
// if executed in production - update status to done
promises.push(Pipeline.update(id, {status, message}));
// shut down microwork service
promises.push(service.stop());
// do delayed close to say we're done
Promise.all(promises).then(() => process.exit());
}, 500);
};
// if executed in production - set pipeline status to 'running'
promises.push(Pipeline.update(id, {
status: 'running',
message: ''
}));
// start pipeline
logger.debug('executing pipeline..');
const {stream, clean} = await runPipeline(pipeline, true);
// listen for commands
testExchange
.queue(topic => topic.eq(id + '.in'))
.subscribe((topic, payload) => {
logger.debug('[IN] for topic:', topic, 'got payload:', payload);
if (payload.command === 'kill') {
clean().forEach(p => promises.push(p));
delayedExit();
}
});
// subscribe to results
stream.subscribe(
data => {
logger.debug('[OUT] seding pipeline response:', data, 'to topic:', id);
promises.push(replyTopic.publish({
data, done: false
}));
// if executed in production - push to persistent db log
promises.push(PipelineLog.create({
pipeline: id,
sessionId,
data,
added_on: r.now(), // eslint-disable-line
}));
},
e => {
logger.error('[OUT] error in pipline:', e);
// if executed in production - push error to persistent db log
promises.push(PipelineLog.create({
pipeline: id,
sessionId,
data: {
type: 'error',
error: e,
},
added_on: r.now(), // eslint-disable-line
}));
// schedule exit
logger.debug('[OUT] scheduling exit...');
clean().forEach(p => promises.push(p));
delayedExit({
status: 'error',
message: e.message || JSON.stringify(e)
});
}, () => {
logger.debug('[OUT] pipeline done, scheduling exit');
clean().forEach(p => promises.push(p));
delayedExit();
}
);
});