Skip to content

Commit

Permalink
Simple array store between tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
thejmazz committed Jul 27, 2016
1 parent d266d5d commit 8e43d1b
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 43 deletions.
66 changes: 45 additions & 21 deletions examples/vc-simple/variant-calling-simple.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
'use strict'

// === WATERWHEEL ===
const task = require('../../lib/Task.js')
const join = require('../../lib/Join.js')
const parallel = require('../../lib/parallel.js')
const { shell } = require('../../lib/wrappers.js')
const {
task,
join,
shell
} = require('../..')

// === MODULES ===
const fs = require('fs')
Expand All @@ -29,10 +30,22 @@ const config = {
* @action http request | writable
*/
const getReference = task({
input: { value: config.referenceURL },
output: { file: config.referenceURL.split('/').pop() },
params: { url: config.referenceURL },
input: null,
output: '*_genomic.fna.gz',
name: `Download reference genome for ${config.name}`
}, ({ input }) => request(input).pipe(fs.createWriteStream(input.split('/').pop())) )
}, ({ params }) => {
const { url } = params
const outfile = url.split('/').pop()

// essentially curl -O
return request(url).pipe(fs.createWriteStream(outfile))
})

// getReference()
// .on('close', function() {
// console.log('output: ', this._output)
// })


/**
Expand All @@ -42,11 +55,22 @@ const getReference = task({
* @action {shell}
*/
const bwaIndex = task({
input: { file: '*_genomic.fna.gz' },
output: ['amb', 'ann', 'bwt', 'pac', 'sa'].map(suffix => ({ file: `*_genomic.fna.gz.${suffix}` })),
input: '*_genomic.fna.gz',
output: ['amb', 'ann', 'bwt', 'pac', 'sa'].map(suffix => `*_genomic.fna.gz.${suffix}`),
name: 'bwa index *_genomic.fna.gz',
}, ({ input }) => shell(`bwa index ${input}`) )

// bwaIndex()
// .on('destroy', function() {
// console.log('output: ', this._output)
// })


join(getReference, bwaIndex)()
.on('destroy', function() {
console.log('output: ', this._output)
})


/**
* Downloads the reads samples in SRA format.
Expand All @@ -57,7 +81,7 @@ const bwaIndex = task({
*/
const getSamples = task({
input: {
db: { value: 'sra' },
db: 'sra',
accession: { value: config.sraAccession }
},
output: { file: '**/*.sra' },
Expand Down Expand Up @@ -156,14 +180,14 @@ bcftools call -c - > variants.vcf

// === task orchestration ===

const reads = join(getSamples, fastqDump)
const reference = join(getReference, parallel(bwaIndex, decompressReference))
const call = join(alignAndSort, samtoolsIndex, mpileupandcall)

const pipeline = join(parallel(reads, reference), call)

call()
.on('close', function() {
this.output()
console.log('Pipeline has finished')
})
// const reads = join(getSamples, fastqDump)
// const reference = join(getReference, parallel(bwaIndex, decompressReference))
// const call = join(alignAndSort, samtoolsIndex, mpileupandcall)
//
// const pipeline = join(parallel(reads, reference), call)
//
// call()
// .on('close', function() {
// this.output()
// console.log('Pipeline has finished')
// })
31 changes: 19 additions & 12 deletions lib/Join.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const { tab } = require('./utils.js')
* @returns {undefined}
*/
function join(...tasks) {
let store = []
let store = []

return function(resolvedOutput) {
const dup = duplexify(null, null)
Expand All @@ -21,7 +21,9 @@ function join(...tasks) {
const task = tasks[i]

const finish = once((task) => {
const output = task.output()
const output = task._output

// TODO this better
if (Array.isArray(output)) {
// will fail if it is an array of arrays... just a hotfix
output.forEach(item => store.push(item))
Expand All @@ -38,8 +40,10 @@ function join(...tasks) {
dup.output = function() {
return store
}
dup._output = store

dup.destroy()
dup.emit('destroy')
}
})

Expand All @@ -49,16 +53,19 @@ function join(...tasks) {
// Assumes no chunk on close or finish
// TODO handle a final chunk if it arrives
// .on('data', (data) => console.log('Join received data: ', data))
.on('end', function() {
// console.log('Join received: end')
finish(this)
})
.on('close', function() {
// console.log('Join received: close')
finish(this)
})
.on('finish', function() {
// console.log('Join received: finish')
// .on('end', function() {
// // console.log('Join received: end')
// finish(this)
// })
// .on('close', function() {
// // console.log('Join received: close')
// finish(this)
// })
// .on('finish', function() {
// // console.log('Join received: finish')
// finish(this)
// })
.on('destroy', function() {
finish(this)
})
.on('error', (err) => {
Expand Down
13 changes: 9 additions & 4 deletions lib/Task.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ const task = (store, actions) => (props, actionCreator) => (passedOutput) => {
.then((uid) => getTask(uid).resolvedOutput ? actions.runValidators(getTask(uid), 'before') : Promise.resolve(uid))
.then((uid) => {
if (getTask(uid).validated) {
stream._output = getTask(uid).resolvedOutput
stream.destroy()
finish(uid)
} else {
actions.resolveInput(getTask(uid))
.then(uid => actions.createAction(getTask(uid), actionCreator))
Expand All @@ -66,15 +65,21 @@ const task = (store, actions) => (props, actionCreator) => (passedOutput) => {
.then(uid => actions.runValidators(getTask(uid), 'after'))
.then((uid) => {
if (getTask(uid).validated) {
stream._output = getTask(uid).resolvedOutput
stream.destroy()
finish(uid)
}
})
.catch(err => console.log(err))
}
})
.catch(err => console.log(err))

function finish(uid) {
stream._output = getTask(uid).resolvedOutput
stream.destroy()
// TODO not this
stream.emit('destroy')
}

return stream
}

Expand Down
15 changes: 9 additions & 6 deletions lib/checksummer.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@ const checksummer = (path) => new Promise((resolve, reject) => {
currentObj = JSON.parse(fs.readFileSync('waterwheel.json', 'utf-8'))
} catch (err) {
// Probably file does not exist...
// console.log(err)
console.log('Assuming b/c nonexistence..')
// console.log(err)
}

const writeableObj = Object.assign({}, currentObj, objDetails)

return fs.writeFileAsync('waterwheel.json', JSON.stringify(writeableObj, null, 2))
.then(() => {
// console.log(`Wrote info for ${path} to waterwheel.json`)
resolve()
})
fs.writeFileSync('waterwheel.json', JSON.stringify(writeableObj, null, 2))

console.log(`Wrote info for ${path} to waterwheel.json`)
resolve()
// .then(() => {
// // console.log(`Wrote info for ${path} to waterwheel.json`)
// resolve()
// })
})
.catch(reject)
})
Expand Down
12 changes: 12 additions & 0 deletions lib/reducers/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const stringify = require('json-stable-stringify')
const defaultConfig = require('./config.js').defaultState
const { hash, tab } = require('../utils.js')
const chalk = require('chalk')
const _ = require('lodash')

// Actions
const CHECK_RESUMABLE = 'task/check-resumable'
Expand Down Expand Up @@ -239,6 +240,16 @@ reducer.resolveInput = (task) => (dispatch) => new Promise((resolve, reject) =>

const { input } = task

if (_.isNull(input)) {
dispatch({
type: TASK_LOG,
uid: task.uid,
content: tab(1) + 'Ignoring input as is null'
})

return resolve(task.uid)
}

applicator(input, (item) => matchToFs(item, null))
.then((resolvedInput) => {
dispatch({
Expand Down Expand Up @@ -373,6 +384,7 @@ reducer.catchTask = (task, stream) => (dispatch) => new Promise((resolve, reject

stream.on('close', () => {
console.log(' '+ 'Received close internally')
resolve(task.uid)
})

stream.on('data', function(data) {
Expand Down

0 comments on commit 8e43d1b

Please sign in to comment.