Skip to content

Commit

Permalink
add sse and eventsource for serving updates
Browse files Browse the repository at this point in the history
  • Loading branch information
yorkie committed May 25, 2020
1 parent 4101f66 commit 0a98617
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 52 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cli.yml
Expand Up @@ -61,7 +61,7 @@ jobs:
./packages/cli/dist/bin/pipcook init
- name: test client basic usage run
run: |
./packages/cli/dist/bin/pipcook run ./test/pipelines/text-bayes-classification.json
./packages/cli/dist/bin/pipcook run ./test/pipelines/text-bayes-classification.json --verbose
- name: test client basic usage plugin-dev
run: |
./packages/cli/dist/bin/pipcook plugin-dev -t dataCollect
40 changes: 40 additions & 0 deletions packages/cli/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/cli/package.json
Expand Up @@ -25,11 +25,13 @@
"dependencies": {
"@pipcook/pipcook-core": "^0.5.16",
"@types/command-exists": "^1.2.0",
"@types/eventsource": "^1.1.2",
"@types/inquirer": "^6.5.0",
"@types/semver": "^7.1.0",
"chalk": "^3.0.0",
"command-exists": "^1.2.8",
"commander": "^4.0.1",
"eventsource": "^1.0.7",
"fastify": "^2.13.1",
"find-process": "^1.4.3",
"fs-extra": "^8.1.0",
Expand Down
33 changes: 28 additions & 5 deletions packages/cli/src/actions/start.ts
Expand Up @@ -5,6 +5,8 @@ import { spawn, SpawnOptions, ChildProcess } from 'child_process';
import { startJob } from '../service/job';
import { StartHandler } from '../types';
import { Constants } from '../utils';
import { listen, get } from '../request';
import { route } from '../router';

const spinner = ora();

Expand Down Expand Up @@ -32,12 +34,33 @@ const start: StartHandler = async (filename: string, verbose: boolean) => {
return process.exit(1);
}

const job = await startJob(filename, process.cwd());
spinner.succeed(`create job ${job.id} succeeded`);
if (verbose === true) {
tail(job.id, 'stdout');
tail(job.id, 'stderr');
const opts = { cwd: process.cwd(), config: filename };
if (!verbose) {
const job = await get(`${route.job}/start`, opts);
spinner.succeed(`create job(${job.id}) succeeded.`);
} else {
const es = await listen(`${route.job}/start`, opts);
let stdout: ChildProcess, stderr: ChildProcess;
es.addEventListener('job created', (e: MessageEvent) => {
const job = JSON.parse(e.data);
spinner.succeed(`create job(${job.id}) succeeded.`);
stdout = tail(job.id, 'stdout');
stderr = tail(job.id, 'stderr');
});
es.addEventListener('job finished', (e: MessageEvent) => {
const job = JSON.parse(e.data);
spinner.succeed(`job(${job.id}) is finished with ${e.data}`);
stdout.kill();
stderr.kill();
});
}

// const job = await startJob(filename, process.cwd());
// spinner.succeed(`create job ${job.id} succeeded`);
// if (verbose === true) {
// tail(job.id, 'stdout');
// tail(job.id, 'stderr');
// }
};

export default start;
3 changes: 2 additions & 1 deletion packages/cli/src/bin/pipcook-job.ts
Expand Up @@ -2,6 +2,7 @@

import program from 'commander';
import ora from 'ora';
import EventSource from 'eventsource';

import { runJob, getJobs, getLogById, removeJobs } from '../service/job';
import { fetchLog } from '../utils';
Expand All @@ -13,7 +14,7 @@ async function list(): Promise<void> {
const jobs = await getJobs();
const outputs = jobs.rows.map((row: Record<'id' | 'status' | 'createdAt' | 'endTime', any>) => ({
id: row.id,
status: row.status,
status: PipelineStatus[row.status],
createdAt: row.createdAt,
endTime: row.endTime
}));
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/config.ts
Expand Up @@ -10,4 +10,4 @@ export const daemonPackage = LOCAL ? path.join(__dirname, '../../daemon') : '@pi
export const boardPackage = LOCAL ? path.join(__dirname, '../../pipboard') : '@pipcook/pipboard';

export const pipcookLogName = 'pipcook-output';
export const optionalNpmClients: string[] = [ 'npm', 'cnpm', 'tnpm' ];
export const optionalNpmClients: string[] = [ 'npm', 'cnpm' ];
60 changes: 43 additions & 17 deletions packages/cli/src/request.ts
@@ -1,28 +1,54 @@
import * as qs from 'querystring';
import axios from 'axios';
import ora from 'ora';

const spinner = ora();
import EventSource from 'eventsource';

export type RequestParams = Record<string, any>;

export type ResponseParams = Record<string, any>;
const spinner = ora();

const createGeneralRequest = (agent: Function) => async (...args: any[]) => {
try {
let response = await agent(...args);
if (response.data.status === true) {
return response.data.data;
}
} catch (err) {
if (err?.response?.data?.message) {
spinner.fail(err.response.data.message);
process.exit();
} else {
throw err;
}
class EventSourceError extends TypeError {
event: Event;
constructor(e: Event) {
super('event source error event');
this.event = e;
}
};
}

function createGeneralRequest(agent: Function): Function {
return async (...args: any[]) => {
try {
let response = await agent(...args);
if (response.data.status === true) {
return response.data.data;
}
} catch (err) {
if (err?.response?.data?.message) {
spinner.fail(err.response.data.message);
process.exit();
} else {
throw err;
}
}
};
}

export const get = async (host: string, params?: RequestParams) => createGeneralRequest(axios.get)(host, params);
export const post = async (host: string, body?: RequestParams, params?: RequestParams) => createGeneralRequest(axios.post)(host, body, params);
export const put = async (host: string, body?: RequestParams, params?: RequestParams) => createGeneralRequest(axios.put)(host, body, params);
export const remove = async (host: string) => createGeneralRequest(axios.delete)(host);
export const listen = async (host: string, params?: RequestParams): Promise<EventSource> => {
const uri = `${host}?${qs.stringify({ verbose: 1, ...params })}`;
const es = new EventSource(uri);
return new Promise((resolve, reject) => {
es.addEventListener('error', (e: Event) => {
reject(new EventSourceError(e));
});
es.addEventListener('session', (e: MessageEvent) => {
if (e.data === 'close') {
es.close();
}
});
resolve(es);
});
};
4 changes: 2 additions & 2 deletions packages/cli/src/service/job.ts
@@ -1,4 +1,4 @@
import { get, post, remove } from '../request';
import { get, post, remove, listen } from '../request';
import { route } from '../router';

export const runJob = (pipelineId: string) => post(route.job, { pipelineId });
Expand All @@ -11,7 +11,7 @@ export const getJobs = () => get(route.job);

export const getLogById = (id: string) => get(`${route.job}/${id}/log`);

export const startJob = (path: string, cwd: string) => post(`${route.job}/start`, {
export const startJob = (path: string, cwd: string) => listen(`${route.job}/start`, {
config: path,
cwd
});
Expand Down
23 changes: 12 additions & 11 deletions packages/daemon/package.json
Expand Up @@ -3,24 +3,25 @@
"version": "0.5.0",
"description": "pipcook daemon",
"dependencies": {
"@types/cls-hooked": "^4.3.0",
"@pipcook/costa": "^0.5.0",
"@pipcook/pipcook-core": "^0.5.16",
"@tensorflow/tfjs-node-gpu": "1.7.0",
"@types/cls-hooked": "^4.3.0",
"axios": "^0.18.1",
"chalk": "^3.0.0",
"cls-hooked": "^4.2.2",
"egg-scripts": "yorkie/egg-scripts#master",
"fs-extra": "^8.1.0",
"glob": "^7.1.6",
"glob-promise": "^3.4.0",
"jimp": "^0.10.0",
"midway": "^1.19.0",
"rxjs": "^6.5.3",
"sequelize": "^5.21.7",
"sqlite3": "^4.1.1",
"ssestream": "^1.1.0",
"uuid": "^8.0.0",
"uuid-validate": "^0.0.3",
"@pipcook/pipcook-core": "^0.5.16",
"glob": "^7.1.6",
"glob-promise": "^3.4.0",
"fs-extra": "^8.1.0",
"@tensorflow/tfjs-node-gpu": "1.7.0",
"jimp": "^0.10.0",
"axios": "^0.18.1",
"chalk": "^3.0.0",
"rxjs": "^6.5.3"
"uuid-validate": "^0.0.3"
},
"devDependencies": {
"@types/mocha": "^5.2.7",
Expand Down
39 changes: 26 additions & 13 deletions packages/daemon/src/app/controller/job.ts
@@ -1,4 +1,5 @@
import { Context, controller, inject, provide, post, get, del } from 'midway';
import SseStream from 'ssestream';

import { successRes, failRes } from '../../utils/response';
import { PipelineService } from '../../service/pipeline';
Expand Down Expand Up @@ -40,19 +41,31 @@ export class JobController {
}
}

@post('/start')
@get('/start')
public async startPipeline() {
const { ctx } = this;
try {
const { config, cwd } = ctx.request.body;
const { config, cwd, verbose } = ctx.request.query;
const parsedConfig = await parseConfig(config);
const pipeline = await this.pipelineService.initPipeline(parsedConfig);
const job = await this.pipelineService.createJob(pipeline.id);
this.pipelineService.startJob(job, cwd);
successRes(ctx, {
message: 'create pipeline and jobs successfully',
data: job
}, 201);

if (verbose === '1') {
const sse = new SseStream(this.ctx.req);
const res = this.ctx.res as NodeJS.WritableStream;
sse.pipe(res);
sse.write({ event: 'job created', data: job });
await this.pipelineService.startJob(job, cwd);
sse.write({ event: 'job finished', data: job });
sse.write({ event: 'session', data: 'close' });
sse.unpipe(res);
} else {
this.pipelineService.startJob(job, cwd);
successRes(ctx, {
message: 'create pipeline and jobs successfully',
data: job
}, 201);
}
} catch (err) {
if (err.errors && err.errors[0] && err.errors[0].message) {
err.message = err.errors[0].message;
Expand All @@ -63,6 +76,12 @@ export class JobController {
}
}

@del('')
public async deleteAll() {
await this.pipelineService.deleteAllJobs();
successRes(this.ctx, {});
}

@get('/:jobId/log')
public async getLog() {
const { ctx } = this;
Expand Down Expand Up @@ -126,10 +145,4 @@ export class JobController {
});
}
}

@del('')
public async deleteAll() {
await this.pipelineService.deleteAllJobs();
successRes(this.ctx, {});
}
}
2 changes: 1 addition & 1 deletion run_pipeline.sh
Expand Up @@ -4,4 +4,4 @@ PIPCOOK=./packages/cli/dist/bin/pipcook

$PIPCOOK init
$PIPCOOK daemon start
$PIPCOOK run ./test/pipelines/$1.json
$PIPCOOK run ./test/pipelines/$1.json --verbose

0 comments on commit 0a98617

Please sign in to comment.