Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task Architecture #1

thejmazz opened this issue Jun 23, 2016 · 5 comments

Task Architecture #1

thejmazz opened this issue Jun 23, 2016 · 5 comments


Copy link

thejmazz commented Jun 23, 2016

A Task is the basic building block of pipelines in Waterwheel. It needs to be able to handle streams, synchronous, and asynchronous functions. There needs to be an standard way to declare a Task is over. As well, Tasks need to be joined into new Tasks, and parallelized into new Tasks. You should be able to join two Tasks that stream one into another without additional config.

Tasks can be:

  • BlockingTask:
    • input: anything - Array or Object of primitive, File, Stream
    • output: will not provide a Readable Stream, instead ending with or without a return value
  • *StreamingTask:
    • input: anything
    • output: Readable Stream
  • AsyncTask:
    • input: anything
    • output: Promise preferred, or callback

A File is just a class that hold a value and is used (at the moment) in the task action creator for an input instanceof File check to then determine if the file pattern (e.g. **/*.sra) should be resolved with globby. This is so that the task stream/promise creator can use a resolved input variable with the actual filename.

We will refer to the joining:

  • join(task1, task2, taskn) -> Task
    and parallelization:
  • parallel(task1, [task2, task3], [task4, task5] -> Task
    and forking:
  • consider this pipeline:
    new doc 6_1
    The seqtk merge task can be forked and provided to two other tasks: filter kmc and filter khmer. This can be done by defining a filter task with an array of task action creators. Then, since the kmc and khmer variants produce the same output, just via a different tool, the pipeline will automatically duplicate the bwa mem | samtools view | samtools sort; samtools index; samtools mpileup | bcftools call section of the pipeline for each of them.

as the orchestration of Tasks.

One way to enable the orchestration of Tasks is with callbacks. The task creator takes an object defining input and output, and then a function describing the task action creator. This returns a function of next. The join method and parallel method programmatically assign a function to the next to achieve their goals.

Another way is with Promises. Perhaps more elegant than callbacks, can reject when things go bad.

Another way to do this can be through events. The Task function can thrown an event whence the return Object from the task creator has completed. This has the advantage that it helps define a standard way to declare tasks are over. However, perhaps it can become messy listening for the same event when doing joins and parallels. This can be superseded by emitting a taskFinish event with some data that perhaps has a task uuid.

Copy link
Member Author

@maxogden @mafintosh your thoughts on using callbacks, promises (would be nice cause can reject when a task fails), or an event emitter as the backbone for the Task object? Current it is with a callback that is given on a curry function.

@thejmazz thejmazz mentioned this issue Jun 24, 2016
38 tasks
Copy link

could you add a couple of code examples show casing how you would use this api in a simple application?

Copy link
Member Author

Using this dataset with 1809 samples:

It is an expression profiling dataset - so transcriptomic - i.e. "what genes are active, when?" - which is done by sampling active RNAs and counting them. As per the central dogma, DNA -> RNA -> protein, so by investigate RNAs in a cell over time we can hypothesize some things.

// Get a stream of data which has description, run SRA ID, other metadata

// task(props, actionCreator)(next)
// curries a next callback for now, need to investigate promises/events/streams as alternative
const sraStream = Task(
  input: {
    db: 'sra',
    term: 'SRP061902'
  // declares this task is a readable stream
  output: stdout()
 // then pass the "task action creator", which uses props
({ input }) =>, input.term)

// some notes:
// allow task action creator to return only a stream, or also Promise/callback, event emitter (stream)
// I think generic JS promises should be allowed, one might want to run some http requests on data, etc
// forcing everything to be a stream may be too much overhead for consumers
// but perhaps the task objects themselves can still return a stream/even emitter wrapping these callbacks/promises. so might only emit a single 'data' and 'end', or a bunch if its an actual stream

// Example of forking
// somewhat forced, but lets say we want to fork the stream of samples data to
// 1. an analysis pipeline (the heavy stuff)
// 2. a metadata analysis - key words in experiment summaries, common terms, gene names, etc

// So, we have a forking in the pipeline
// but we don't want to force the user to have extra mental overhead for handling these
// the idea is that if everything is a Task, waterwheel can automatically orchestrate them

const metadataAnalysis = Task(
  input: stdin() // declares this task can be joined from another task that ouputs stdout()
  output: stdout() // declares this task is a duplex stream. perhaps emits values to be written to disk
() => myMetadataDuplexStream()

const getSamples = Task({
  input: stdin()
  output: file('**/*.sra') // the next task needs the sample SRA to be fully written to disk, so can't duplex here
  // this illustrates we will need to mix: readable, writable, duplex, promises, cbs
}, ({ input }) => {
   // give this action creator each data event or the whole stream?
   // also need to handle how each data event returns a stream
   // tasks returns a stream of streams? or because file() it is ok to not handle these
   input.on('data', ({ id }) =>'sra', id)
   // this essentially downloads a bunch of files
   // the output pattern is globbed and can be given to next task

// this will be off, I have not done RNAseq pipelines yet, but still illustrates API
const alignRNAs = Task({
  input: file('**/*.sra'),
  output: file('someAlignmentFiles')
}, ({ input }) => shell(`some-tool ${input}`)

const doStuffWithAlignedRNAs = Task({ ... }) 

const indexReference = Task({ ... }) // alignment needs an indexed reference

// Example of  parallel, and joins

// need to think out this API call way more
const pipeline = (
  // everything in the array gets a fork of this
  fork(getSamples, [
      // how to make sure alignRNAs goes after indexReference?
     // this syntax is not describing that
       join(getSamples, alignRNAs, ..., doStuffWithAlignedRNAs)

// So at a high level:
// Task: stream, promise, callback (perhaps kill callback in favour of promise)
// Tasks can be orchestrated no matter what they are
// 1. linear joins that pass info between
// 2. forks
// 3. parallel
// 4. what else?

Something along those lines. Let me know of your confusions/questions. I'd like to get a stable version of these features in the next few days, using toy streams (does not need to bioinformatics to show task orchestration).

Copy link
Member Author

After thinking about it a bit more, I'm leaning towards Task returning a stream/event emitter sort of object. Callbacks fn(done) and Promises could be wrapped and the Task can emit a taskDone event after done() callback called or promise resolved, respectively. If the action creator returns a stream, the task can then be that stream. Or perhaps instead of taskDone use duplex stream events: end and finish.

Essentially: Task returns a fully malleable (readable, writable, duplex)stream. It wraps Promises/callbacks into tiny streams that emit end once.

Task(props, actionCreator) -> stream/E.E

  • props: Object with input, output
  • actionCreator: returns a stream, Promise, or callback of style fn(err, data)

writable stream events: close, drain, error, finish, pipe, unpipe
readable: close, data, end, error

Should Task emit its own events? Or use readable/writable/duplex stream events?

Copy link
Member Author

thejmazz commented Jul 4, 2016

This discussion is largely resolved, and implementation details will be discussed in new issues. Closing for now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

No branches or pull requests

3 participants