Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #3 from Netflix/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
huangyiminghappy committed Apr 25, 2018
2 parents c472876 + 5ab9419 commit 3489f77
Show file tree
Hide file tree
Showing 10 changed files with 655 additions and 533 deletions.
116 changes: 73 additions & 43 deletions src/api/wfe.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { join } from 'path';
import { Router } from 'express';
import {Router} from 'express';
import http from '../core/HttpClient';
import moment from 'moment';
import filter from "lodash/fp/filter";
import forEach from "lodash/fp/forEach";
import map from "lodash/fp/map";
import transform from "lodash/transform";
import identity from "lodash/identity";

const router = new Router();
const baseURL = process.env.WF_SERVER;
Expand Down Expand Up @@ -42,49 +46,75 @@ router.get('/', async (req, res, next) => {
}
});

const LOG_DATE_FORMAT = 'MM/DD/YY, HH:mm:ss:SSS';

router.get('/id/:workflowId', async (req, res, next) => {
try {
let s = new Date().getTime();
const result = await http.get(baseURL2 + req.params.workflowId + '?includeTasks=true');
const meta = await http.get(baseURLMeta + 'workflow/' + result.workflowType + '?version=' + result.version);
const subs = [];
const subworkflows = {};
result.tasks.forEach(task => {
if(task.taskType == 'SUB_WORKFLOW'){
let subWorkflowId = task.outputData && task.outputData.subWorkflowId;
if(subWorkflowId == null) {
subWorkflowId = task.inputData.subWorkflowId;
}
if(subWorkflowId != null) {
subs.push({name: task.inputData.subWorkflowName, version: task.inputData.subWorkflowVersion, referenceTaskName: task.referenceTaskName, subWorkflowId: subWorkflowId});
}
}
});
for(let t = 0; t < result.tasks.length; t++) {
let task = result.tasks[t];
let logs = await http.get(baseURLTask + task.taskId + '/log');
logs = logs || [];
let logs2 = [];
logs.forEach(log => {
const dtstr = moment(log.createdTime).format('MM/DD/YY, HH:mm:ss:SSS');
logs2.push(dtstr + ' : ' + log.log);
});
task.logs = logs2;
}
let submeta = {};
for(let i = 0; i < subs.length; i++){
let submeta = await http.get(baseURLMeta + 'workflow/' + subs[i].name + '?version=' + subs[i].version);
let subes = await http.get(baseURL2 + subs[i].subWorkflowId + '?includeTasks=true');
let prefix = subs[i].referenceTaskName;
subworkflows[prefix] = {meta: submeta, wfe: subes};
}
let e = new Date().getTime();
let time = e-s;
try {
const result = await http.get(baseURL2 + req.params.workflowId + '?includeTasks=true');
const meta = await http.get(baseURLMeta + 'workflow/' + result.workflowType + '?version=' + result.version);

res.status(200).send({result, meta, subworkflows:subworkflows});
} catch (err) {
next(err);
}
const subs = filter(identity)(map(task => {
if (task.taskType === 'SUB_WORKFLOW') {
let subWorkflowId = task.outputData && task.outputData.subWorkflowId;

if (subWorkflowId == null) {
subWorkflowId = task.inputData.subWorkflowId;
}

if (subWorkflowId != null) {
return {
name: task.inputData.subWorkflowName,
version: task.inputData.subWorkflowVersion,
referenceTaskName: task.referenceTaskName,
subWorkflowId: subWorkflowId
};
}
}
})(result.tasks));

result.tasks.forEach(task => {
if (task.taskType === 'SUB_WORKFLOW') {
let subWorkflowId = task.outputData && task.outputData.subWorkflowId;
if (subWorkflowId == null) {
subWorkflowId = task.inputData.subWorkflowId;
}
if (subWorkflowId != null) {
subs.push({
name: task.inputData.subWorkflowName,
version: task.inputData.subWorkflowVersion,
referenceTaskName: task.referenceTaskName,
subWorkflowId: subWorkflowId
});
}
}
});

const logs = map(task => Promise.all([task, http.get(baseURLTask + task.taskId + '/log')]))(result.tasks);

await Promise.all(logs).then(result => {
forEach(([task, logs]) => {
if (logs) {
task.logs = map(({createdTime, log}) => `${moment(createdTime).format(LOG_DATE_FORMAT)} : ${log}`)(logs)
}
})(result);
});

const promises = map(({name, version, subWorkflowId, referenceTaskName}) => Promise.all([
referenceTaskName,
http.get(baseURLMeta + 'workflow/' + name + '?version=' + version),
http.get(baseURL2 + subWorkflowId + '?includeTasks=true')
]))(subs);

const subworkflows = await Promise.all(promises).then(result => {
return transform(result, (result, [key, meta, wfe]) => {
result[key] = {meta, wfe};
}, {});
});

res.status(200).send({result, meta, subworkflows: subworkflows});
} catch (err) {
next(err);
}
});
router.delete('/terminate/:workflowId', async (req, res, next) => {
try {
Expand Down
18 changes: 10 additions & 8 deletions src/api/wfegraph.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import clone from "lodash/fp/clone";

class Workflow2Graph {

constructor() {
Expand All @@ -7,16 +9,16 @@ class Workflow2Graph {
this._convert(wfe, meta);
return {edges: this.edges, vertices: this.vertices, id: wfe.workflowId};
}
_convert(wfe, meta) {

let subworkflows = {};
let metaTasks = meta.tasks;
_convert(wfe = {}, meta = {}) {
const subworkflows = {};
const metaTasks = meta.tasks && clone(meta.tasks) || [];
metaTasks.push({type:'final', name:'final', label: '', taskReferenceName: 'final', system: true});
metaTasks.unshift({type:'start', name:'start', label: '', taskReferenceName: 'start', system: true});

let forks = [];
wfe.tasks.forEach(tt=>{
if(tt.taskType == 'FORK'){
const forks = [];
const tasks = wfe.tasks || [];
tasks.forEach(tt=>{
if(tt.taskType === 'FORK'){
let wfts = [];
let forkedTasks = tt.inputData && tt.inputData.forkedTasks || [];
forkedTasks.forEach(ft =>{
Expand All @@ -33,7 +35,7 @@ class Workflow2Graph {
let joins = {};
wfe.tasks.forEach(t => {
this.executedTasks[t.referenceTaskName] = {status: t.status, input: t.inputData, output: t.outputData, taskType: t.taskType, reasonForIncompletion: t.reasonForIncompletion, task: t};
if(t.taskType == 'JOIN' ){
if(t.taskType === 'JOIN' ){
joins[t.referenceTaskName] = t.inputData.joinOn;
}
});
Expand Down
Loading

0 comments on commit 3489f77

Please sign in to comment.