Permalink
Browse files

fix(pm): fix some process and pm related stuff

  • Loading branch information...
jkuri committed Sep 4, 2017
1 parent 078a22f commit 7cf2c0b1eced012743a592295480afcf7b802844
@@ -1,4 +1,6 @@
import { sendRequest, getBitBucketAccessToken, getConfig } from './utils';
import * as logger from './logger';
import { yellow, red, blue, bold } from 'chalk';

export function sendSuccessStatus(build: any, buildId: number): Promise<void> {
const config: any = getConfig();
@@ -33,6 +35,14 @@ export function sendSuccessStatus(build: any, buildId: number): Promise<void> {
return Promise.resolve();
}
} else {
let msg = [
yellow('['),
red('error'),
yellow(']'),
' --- ',
`access_token is not set for repository ${bold(build.repository.full_name)}!`
].join('');
logger.error(msg);
return Promise.resolve();
}
}
@@ -67,6 +77,14 @@ export function sendPendingStatus(buildData: any, buildId: number): Promise<void
return Promise.resolve();
}
} else {
let msg = [
yellow('['),
red('error'),
yellow(']'),
' --- ',
`access_token is not set for repository ${bold(buildData.repository.full_name)}!`
].join('');
logger.error(msg);
return Promise.resolve();
}
}
@@ -101,6 +119,14 @@ export function sendFailureStatus(buildData: any, buildId: number): Promise<void
return Promise.resolve();
}
} else {
let msg = [
yellow('['),
red('error'),
yellow(']'),
' --- ',
`access_token is not set for repository ${bold(buildData.repository.full_name)}!`
].join('');
logger.error(msg);
return Promise.resolve();
}
}
@@ -32,6 +32,7 @@ export interface JobProcess {
image_name?: string;
log?: string[];
commands?: { command: string, type: CommandType }[];
env?: string[];
sshAndVnc?: boolean;
job?: Observable<any>;
}
@@ -167,25 +168,25 @@ export function startBuild(data: any): Promise<any> {
}).catch(err => logger.error(err));
}

export function startJob(p: JobProcess): Promise<void> {
export function startJob(proc: JobProcess): Promise<void> {
return Promise.resolve()
.then(() => {
startBuildProcess(p.build_id, p.job_id, p.commands, 'abstruse', p.sshAndVnc)
startBuildProcess(proc, 'abstruse')
.subscribe(event => {
const msg: JobProcessEvent = {
build_id: p.build_id,
job_id: p.job_id,
build_id: proc.build_id,
job_id: proc.job_id,
type: event.type,
data: event.data
};

terminalEvents.next(msg);
if (event.data && event.type === 'data') {
p.log.push(event.data);
proc.log.push(event.data);
} else if (event.type === 'container') {
let msg = [
yellow('['),
blue('abstruse_' + p.build_id + '_' + p.job_id),
blue('abstruse_' + proc.build_id + '_' + proc.job_id),
yellow(']'),
' --- ',
yellow(event.data)
@@ -195,81 +196,86 @@ export function startJob(p: JobProcess): Promise<void> {
}, err => {
logger.error(err);

dbJob.getLastRunId(p.job_id)
dbJob.getLastRunId(proc.job_id)
.then(runId => {
const data = {
id: runId,
end_time: new Date(),
status: 'failed',
log: p.log.join('')
log: proc.log.join('')
};

return dbJobRuns.updateJobRun(data);
})
.then(() => getJobProcesses())
.then(processes => {
processes = processes.filter(proc => proc.job_id !== p.job_id);
processes = processes.filter(p => p.job_id !== proc.job_id);
jobProcesses.next(processes);
jobEvents.next({
type: 'process',
build_id: p.build_id,
job_id: p.job_id,
build_id: proc.build_id,
job_id: proc.job_id,
data: 'jobFailed'
});

if (processes.findIndex(proc => proc.build_id === p.build_id) === -1) {
getBuild(p.build_id).then(build => sendFailureStatus(build, build.id));
if (processes.findIndex(proc => proc.build_id === proc.build_id) === -1) {
getBuild(proc.build_id).then(build => sendFailureStatus(build, build.id));
}
})
.catch(err => logger.error(err));
}, () => {
dbJob.getLastRunId(p.job_id)
dbJob.getLastRunId(proc.job_id)
.then(runId => {
const data = {
id: runId,
end_time: new Date(),
status: 'success',
log: p.log.join('')
log: proc.log.join('')
};

return dbJobRuns.updateJobRun(data);
})
.then(() => getBuildStatus(p.build_id))
.then(() => getBuildStatus(proc.build_id))
.then(status => {
if (status === 'success') {
return updateBuild({ id: p.build_id, end_time: new Date() })
.then(() => getLastRunId(p.build_id))
return updateBuild({ id: proc.build_id, end_time: new Date() })
.then(() => getLastRunId(proc.build_id))
.then(id => updateBuildRun({ id: id, end_time: new Date()} ))
.then(() => getBuild(p.build_id))
.then(() => getBuild(proc.build_id))
.then(build => sendSuccessStatus(build, build.id))
.catch(err => logger.error(err));
} else if (status === 'failed') {
getBuild(p.build_id)
getBuild(proc.build_id)
.then(build => sendFailureStatus(build, build.id));
} else {
return Promise.resolve();
}
})
.then(() => getJobProcesses())
.then(processes => {
processes = processes.filter(proc => proc.job_id !== p.job_id);
processes = processes.filter(p => p.job_id !== proc.job_id);
jobProcesses.next(processes);
jobEvents.next({
type: 'process',
build_id: p.build_id,
job_id: p.job_id,
build_id: proc.build_id,
job_id: proc.job_id,
data: 'jobSucceded'
});
}).catch(err => logger.error(err));
});
})
.then(() => dbJob.getLastRunId(p.job_id))
.then(() => dbJob.getLastRunId(proc.job_id))
.then(runId => {
const data = { id: runId, start_time: new Date(), status: 'running', log: '' };
return dbJobRuns.updateJobRun(data);
})
.then(() => {
const data = { type: 'process', build_id: p.build_id, job_id: p.job_id, data: 'jobStarted' };
const data = {
type: 'process',
build_id: proc.build_id,
job_id: proc.job_id,
data: 'jobStarted'
};
jobEvents.next(data);
})
.catch(err => logger.error(err));
@@ -366,6 +372,7 @@ function queueJob(buildId: number, jobId: number, sshAndVnc = false): Promise<vo
job_id: jobId,
status: 'queued',
commands: jobData.commands,
env: jobData.env,
sshAndVnc: sshAndVnc,
log: []
};
@@ -6,6 +6,7 @@ import { getRepositoryByBuildId } from './db/repository';
import { Observable } from 'rxjs';
import { green, red, bold, yellow, blue } from 'chalk';
import { CommandType } from './config';
import { JobProcess } from './process-manager';
const nodePty = require('node-pty');

export interface Job {
@@ -28,23 +29,21 @@ export interface ProcessOutput {
}

export function startBuildProcess(
buildId: number,
jobId: number,
commands: { command: string, type: CommandType }[],
image: string,
sshAndVnc = false
proc: JobProcess,
image: string
): Observable<ProcessOutput> {
return new Observable(observer => {
const name = 'abstruse_' + buildId + '_' + jobId;
const vars = commands.filter(cmd => cmd.command.startsWith('export'))
const name = 'abstruse_' + proc.build_id + '_' + proc.job_id;
const vars = proc.commands.filter(cmd => cmd.command.startsWith('export'))
.map(cmd => cmd.command.replace('export', '-e'))
.reduce((acc, curr) => {
return acc.concat(curr.split(' '));
}, []);
commands = commands.filter(cmd => !cmd.command.startsWith('export'));
}, [])
.concat(proc.env.reduce((acc, curr) => acc.concat(['-e', curr]), []));
proc.commands = proc.commands.filter(cmd => !cmd.command.startsWith('export'));

let debug: Observable<any> = Observable.empty();
if (sshAndVnc) {
if (proc.sshAndVnc) {
debug = Observable.concat(...[
executeInContainer(name, 'sudo /etc/init.d/ssh start'),
getContainerExposedPort(name, 22),
@@ -59,7 +58,7 @@ export function startBuildProcess(

const sub = startContainer(name, image, vars)
.concat(debug)
.concat(...commands.map(cmd => executeInContainer(name, cmd.command)))
.concat(...proc.commands.map(cmd => executeInContainer(name, cmd.command)))
.subscribe((event: ProcessOutput) => {
observer.next(event);
}, err => {
@@ -7,6 +7,8 @@ import { Observable } from 'rxjs';
import * as uuid from 'uuid';
import * as request from 'request';
import * as temp from 'temp';
import * as logger from './logger';
import { blue, yellow, magenta, cyan, bold, red } from 'chalk';

const defaultConfig = {
url: null,
@@ -219,13 +221,48 @@ export function sendRequest(url: string, data: any, headers: any): Promise<any>
json: data
};

let msg = [
yellow('['),
blue('http'),
yellow(']'),
' --- ',
yellow(`sending ${options.method} request to ${bold(url)}...`)
].join('');
logger.info(msg);

request(options, (err, response, body) => {
if (err) {
let msg = [
yellow('['),
blue('http'),
yellow(']'),
' --- ',
red(`sending request to ${bold(url)} failed (${err})`)
].join('');
logger.error(msg);

reject(err);
} else {
if (response.statusCode < 300 && response.statusCode >= 200) {
let msg = [
yellow('['),
blue('http'),
yellow(']'),
' --- ',
yellow(`sending request to ${bold(url)} successful (${response.statusCode})`)
].join('');
logger.info(msg);
resolve(body);
} else {
let msg = [
yellow('['),
blue('http'),
yellow(']'),
' --- ',
red(`sending request to ${bold(url)} failed (${response.statusCode})`)
].join('');
logger.error(msg);

reject({
statusCode: response.statusCode,
response: body
Oops, something went wrong.

0 comments on commit 7cf2c0b

Please sign in to comment.