Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graph file #60

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/orchestrators/join.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const { mergeCtx } = require('../ctx')
const hash = require('../utils/hash.js')
const createTask = require('../task.js')

const join = (dispatch) => {
const join = (dispatch, logger) => {
return function (...tasks) {
let uids = []
let breakOut = false
Expand Down
72 changes: 57 additions & 15 deletions lib/reducers/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const _ = require('lodash')
const applicator = require('../utils/applicator.js')
const Graph = require('graph.js/dist/graph.full.js')
const fs = require('fs')

// Actions
const ADD_OUTPUT = 'collection/add-output'
Expand All @@ -24,14 +25,43 @@ const reducer = (state = defaultState, action) => {
}

const jsonifyGraph = (graph, obj = {}) => {
const graphsonObj = {
graph: {
mode: "NORMAL",
vertices: [],
edges: []
}
}

// iterate through verties
// in ECMAScript 6, you can use a for..of loop
for (let [key, value] of graph.vertices()) {
// iterates over all vertices of the graph
graphsonObj.graph.vertices.push({
_id: key,
_type: "vertex",
values: value
})
}
// iterate through edges
// in ECMAScript 6, you can use a for..of loop
for (let [from, to, value] of graph.edges()) {
// iterates over all vertices of the graph
graphsonObj.graph.edges.push({
source: from,
target: to,
_type: "edge"
// value: value
})
}

const assignVal = (obj, key, val) => {
if (val === undefined) {
obj[key] = { _value: '' }
} else {
obj[key] = { _value: val }
}
}

for (const [key, val] of graph.sources()) {
assignVal(obj, key, val)
}
Expand All @@ -42,20 +72,24 @@ const jsonifyGraph = (graph, obj = {}) => {
assignVal(obj, key, val)
Object.assign(obj[key], travelNode(key))
}

return obj
}

// console.log(JSON.stringify(graphsonObj, null, 2)) // used to log to stdout
// and then to copy to file (for now...)

// writes each graphson to file
fs.writeFile('graphson.json', JSON.stringify(graphsonObj, null, 2))

Object.keys(obj).forEach(key => Object.assign(obj[key], travelNode(key)))

return obj
}

function addOutputHandler (state, action) {
// TODO remove duplicate code b/w this, and creating the next trajection in join
const { uid, output, params, context } = action
const { uid, output, params, context, name } = action
const { trajectory } = context

// console.log(`trajectory for ${uid}: `, trajectory)
// console.log(`output for ${uid}: `, output)
// console.log(`params for ${uid}: `, params)
Expand All @@ -69,29 +103,36 @@ function addOutputHandler (state, action) {
const graph = state.clone()

// Add a node for current task
graph.addNewVertex(uid, output)
// only adds to graph if uid has in fact a name (from tasks)
if (name) {
// here needed to had taskName to better comprehend what is being
// created by junction, joins forks and tasks.
graph.addNewVertex(uid, {taskName: name, output: output}) //addNewVertex(key, [value])
}

// If params is not {}, add a node for it
const paramsString = JSON.stringify(params)
if (paramsString !== '{}') {
//const paramsString = JSON.stringify(params)
//if (paramsString !== '{}') {
// console.log(trajectory)
// TODO check if params already exists
graph.addNewVertex(paramsString)
//graph.addNewVertex(paramsString)

// Add an edge from params to current task as well
graph.addNewEdge(paramsString, uid)
}
// graph.addNewEdge(paramsString, uid)
//}

// Add edge from last trajectory node to current output if no params,
// otherwise to params (which points to current output)
if (trajectory.length > 0) {
if (paramsString === '{}') {
// if (paramsString === '{}') {
graph.addNewEdge(trajectory[trajectory.length - 1], uid)
} else {
graph.addNewEdge(trajectory[trajectory.length - 1], paramsString)
}
// } //else {
//graph.addNewEdge(trajectory[trajectory.length - 1], paramsString)
//}
}

// console.log('collection as json: ', JSON.stringify(jsonifyGraph(graph), null, 2))
// console.log('collection as json: ', JSON.stringify(jsonifyGraph(graph),
// null, 2))

return graph
}
Expand Down Expand Up @@ -134,6 +175,7 @@ reducer.ADD_OUTPUT = ADD_OUTPUT
reducer.addOutput = (uid, taskState) => ({
type: ADD_OUTPUT,
uid,
name: taskState.name, // added name to task.. to understand what is what
output: taskState.resolvedOutput,
context: taskState.context,
params: taskState.params
Expand Down
21 changes: 21 additions & 0 deletions lib/sagas/lifecycle.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ const Promise = require('bluebird')

const { tab } = require('../utils')

// new require
const fs = require('fs')

// Mostly promises
// Ran as async side effects through call() in the middle of start/end/fail sagas
const {
Expand Down Expand Up @@ -56,6 +59,9 @@ const {

const { addOutput } = require('../reducers/collection.js')

// add header to manifest file
fs.writeFile('summary_log.txt', 'Summary of the run...\n\n')

/**
* The task lifecycle.
*
Expand Down Expand Up @@ -108,6 +114,21 @@ function* lifecycle (action) {
logEmitter.emit('log', `=== ${originalTask.name} ===`)
logEmitter.emit('log', `Running task ${miniUid}`)

// Generate the object to be written to the log file
let manifestLog = { folderName: originalTask.dir,
taskName: originalTask.name,
input: originalTask.input,
output: originalTask.output,
params: originalTask.params
}
// Writes this object to the summary file
fs.appendFile('summary_log.txt', JSON.stringify(manifestLog, null, 2) + '\n' , function (err) {
if (err) {
return console.log(err)
}
console.log("Manifest file was saved!")
})

function* orderedLifecycle () {
let next
next = yield* resolveInputSaga()
Expand Down