From ba46e67bf8e36d98cd8cff810cdf91f1247ba4f8 Mon Sep 17 00:00:00 2001 From: tiagofilipe12 Date: Wed, 23 Aug 2017 14:20:02 +0100 Subject: [PATCH 01/35] added figures to pipelines in examples/pipelines/tests --- examples/pipelines/tests/README.md | 178 ++++++++++++++++++ examples/pipelines/tests/fork_fork.js | 2 +- .../pipelines/tests/fork_fork_new_test.js | 38 ---- examples/pipelines/tests/join_join.js | 14 +- examples/pipelines/tests/simple_fork.js | 5 +- 5 files changed, 186 insertions(+), 51 deletions(-) create mode 100644 examples/pipelines/tests/README.md delete mode 100644 examples/pipelines/tests/fork_fork_new_test.js diff --git a/examples/pipelines/tests/README.md b/examples/pipelines/tests/README.md new file mode 100644 index 0000000..eec0962 --- /dev/null +++ b/examples/pipelines/tests/README.md @@ -0,0 +1,178 @@ +# Graphs + +## simple_join.js + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/join_graph/index.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/join_graph/join_graph.png) + +## simple_junction.js + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/junction_graph/index.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/junction_graph/junction_graph.png) + + +## simple_fork.js + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_graph/index.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_graph/fork_graph.png) + + +## join_join.js + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/join_join/index.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/join_join/result.png) + +Note: this pipeline as `task5` twice and it should render two nodes in d3. + +## junction_junction.js + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/junction_junction/index.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/junction_junction/result.png) + + +## fork_join.js + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_join/index.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_join/fork_join.png) + + +## fork_junction.js + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_junction/index.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_junction/fork_junction.png) + + +## fork_fork_without_join.js + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_without_join_index.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_without_join.png) + + +## fork_fork.js + +### const pipeline + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_pipeline_expected.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_pipeline.png) + + +### const pipeline2 + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_pipeline2_expected.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_pipeline2.png) + + +### const pipeline3 + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_pipeline3_expected.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_pipeline3.png) + + +### const pipeline4 + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_pipeline4_expected.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_pipeline4_result_correct.png) + + +### const pipeline5 + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_junction_expected.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_junction.png) + + +### const pipeline6 + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/4thlevel_fork_expected.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/4thlevel_fork.png) + + +### const pipeline7_2 + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_after_fork.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_duplicated_tasks.png) + + +### const pipeline8_2 + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/junction_after_fork.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_junction_duplicated.png) \ No newline at end of file diff --git a/examples/pipelines/tests/fork_fork.js b/examples/pipelines/tests/fork_fork.js index 8cab9d8..4131253 100644 --- a/examples/pipelines/tests/fork_fork.js +++ b/examples/pipelines/tests/fork_fork.js @@ -176,6 +176,6 @@ const pipeline8_2 = join( // edit this line to run the desired pipeline. // documentation on the usage of these pipelines may be found in the link below // https://github.com/bionode/GSoC17/blob/master/Journal/Week_11.md -pipeline2().then((results) => { +pipeline().then((results) => { console.log('RESULTSNEW: ', results) }) \ No newline at end of file diff --git a/examples/pipelines/tests/fork_fork_new_test.js b/examples/pipelines/tests/fork_fork_new_test.js deleted file mode 100644 index ca36b82..0000000 --- a/examples/pipelines/tests/fork_fork_new_test.js +++ /dev/null @@ -1,38 +0,0 @@ -'use strict' - -// === WATERMILL === -const { - task, - join, - junction, - fork -} = require('../../..') - -const task0 = task({name: 'task0'}, () => `echo "something0"`) - -const task1 = task({name: 'task1'}, () => `echo "something1"`) - -const task2 = task({name: 'task2'}, () => `echo "something2"`) - -const task3 = task({name: 'task3'}, () => `echo "something3"`) - -const task4 = task({name: 'task4'}, () => `echo "something4"`) - -const task5 = task({name: 'task5'}, () => `echo "something5"`) - -const task6 = task({name: 'task6'}, () => `echo "something6"`) - -const pipeline = join( - task1, - fork( - join( - task2, - fork(task4, task5) - ), - join(task3, - fork(task4, task5) - ) - ) -) - -pipeline() diff --git a/examples/pipelines/tests/join_join.js b/examples/pipelines/tests/join_join.js index f7c62d4..e78d141 100644 --- a/examples/pipelines/tests/join_join.js +++ b/examples/pipelines/tests/join_join.js @@ -8,19 +8,17 @@ const { fork } = require('../../..') -const task0 = task({name: 'coco'}, () => `echo "something0"`) +const task0 = task({name: 'task0'}, () => `echo "something0"`) -const task1 = task({name: 'nut'}, () => `echo "something1"`) +const task2 = task({name: 'task2'}, () => `echo "something2"`) -const task2 = task({name: 'foo'}, () => `echo "something2"`) +const task3 = task({name: 'task3'}, () => `echo "something3"`) -const task3 = task({name: 'bar'}, () => `echo "something3"`) +const task4 = task({name: 'task4'}, () => `echo "something4"`) -const task4 = task({name: 'test'}, () => `echo "something4"`) +const task5 = task({name: 'task5'}, () => `echo "something5"`) -const task5 = task({name: 'test1'}, () => `echo "something5"`) - -const task6 = task({name: 'test6'}, () => `echo "something6"`) +const task6 = task({name: 'task6'}, () => `echo "something6"`) const pipeline = join( task0, diff --git a/examples/pipelines/tests/simple_fork.js b/examples/pipelines/tests/simple_fork.js index d321c92..32de87a 100644 --- a/examples/pipelines/tests/simple_fork.js +++ b/examples/pipelines/tests/simple_fork.js @@ -16,9 +16,6 @@ const task2 = task({name: 'foo'}, () => `echo "something2"`) const task3 = task({name: 'bar'}, () => `echo "something3"`) -const task4 = task({name: 'test'}, () => `echo "something4"`) - - -const pipeline = join(task0, fork(task1, task2), task3, task4) +const pipeline = join(task0, fork(task1, task2), task3) pipeline() \ No newline at end of file From 49354f482d95e304b254a7b193572d2d8c4045b4 Mon Sep 17 00:00:00 2001 From: tiagofilipe12 Date: Wed, 23 Aug 2017 15:14:02 +0100 Subject: [PATCH 02/35] edited task.md documentation --- docs/Task.md | 67 ++++++++++++++++++++------- examples/pipelines/tests/fork_fork.js | 2 +- 2 files changed, 52 insertions(+), 17 deletions(-) diff --git a/docs/Task.md b/docs/Task.md index b07c308..039f414 100644 --- a/docs/Task.md +++ b/docs/Task.md @@ -19,19 +19,42 @@ const { task } = watermill const myTask = task(props, operationCreator) ``` +### props + **props** is an object with the following structure: ```javascript const props = { - input: 'foo.txt', // valid input, see below - output: 'bar.txt', // valid output, see below + input: '*.txt', // valid input, see below + output: '*.txt', // valid output, see below name: 'My Task', + params: { output: 'bar.txt' }, //the actual output name that can be passed to + // operationCreator or even other params that you may wish to pass to + // operationCreator. alwaysRun: true // force rerun even if output already exists // other options, see options } ``` -*input* and *output* are required for tasks that deal with files. If either is not provided, it will be assumed the task is then a *streaming task* - i.e., it is a duplex stream with writable and/or readable portions. Consider: +*input* and *output* patterns are required for tasks that deal with files. +*params.output* allows to name the output files, while *output* is used to +check if output was properly resolved. + +```javascript +// example task with input/output files +const task = ({ + input: '*.txt', + output: '*.txt', + params: { + output: 'bar.txt' + } +}, ({ input, params }) => `cp ${input} ${params.output}` +) +``` + +If either (input and output) is +not provided, it will be assumed the task is then a *streaming task* - i.e., it +is a duplex stream with writable and/or readable portions. Consider: ```javascript const throughCapitalize = through(function (chunk, env, next) { @@ -72,7 +95,11 @@ const writeFile = ({ join(readFile, capitalize, writeFile) ``` -Of course, this could be written as one single task. This is somewhat simpler, but the power of splitting up the read, transform, and write portions of a task will become apparent once we can provide multiple sets of parameters to the transform and observe the effect, *without having to manually rewire input and output filenames*. As a single task the above would become: +Of course, this could be written as one single task. This is somewhat simpler, +but the power of splitting up the read, transform, and write portions of a task +will become apparent once we can provide multiple sets of parameters to the +transform and observe the effect, *without having to manually rewire input and +output filenames*. As a single task the above would become: TODO introduce single task version first @@ -90,6 +117,8 @@ const capitalize = task({ It is fine to run with no task name, a hashed one will be made for you. However, properly named tasks will help greatly reading pipeline output +### operationCreator + **operationCreator** is a function that will be provided with a **resolved props object**. `operationCreator` should return a **stream** or a **promise**. If the operation creator does not return a stream, it will be wrapped into a stream internally (e.g. `StreamFromPromise`). An example operation creator is ```javascript @@ -104,14 +133,16 @@ function operationCreator(resolvedProps) { > *Note* > > With assignment destructuring and arrow functions, you can write cleaner operation creators: +> +>```javascript +>const operationCreator = ({ input }) => intoStream(input).pipe(fs.createWriteStream('bar.txt')) +>``` -```javascript -const operationCreator = ({ input }) => intoStream(input).pipe(fs.createWriteStream('bar.txt')) -``` - -## Input and Output +### Input and Output -The `input` and `output` objects can be a **string glob pattern**, or a plain object of them. TODO an array will introduce task forking. The glob will be **resolved to an absolute path** when it is passed to the `operationCreator`. +The `input` and `output` objects can be a **string glob pattern**, or a plain +object of them. The glob will be +**resolved to an absolute path** when it is passed to the `operationCreator`. For example, @@ -141,13 +172,17 @@ will resolve to something like: ```javascript { input: { - reference: '/data/GCA_000988525.2_ASM98852v2_genomic.fna.gz', - reads: ['/data/ERR1229296_1.fastq.gz', '/data/ERR1229296_2.fastq.gz'] + reference: '/data//GCA_000988525.2_ASM98852v2_genomic.fna.gz', + reads: ['/data//ERR1229296_1.fastq.gz', '/data/ERR1229296_2.fastq.gz'] } } ``` -If `input` is not provided, the `operation` will be a duplexify stream with the writable portion set. Similarly, if `output` is not provided, the `operation` will be a duplexify stream with the readable portion set. If neither is provided, both the readable and writable portions will be set: the task becomes a *through task*. +If `input` is not provided, the `operation` will be a duplexify stream with the +writable portion set. Similarly, if `output` is not provided, the `operation` +will be a duplexify stream with the readable portion set. If neither is +provided, both the readable and writable portions will be set: the task becomes +a *through task*. An example *through task*: @@ -165,15 +200,15 @@ const filterSpecies = task({ ) ``` -## Resolution +### Resolution Input resolution is a **reduction over the task props**. -### Input Resolution +#### Input Resolution Match to filesystem if in first task in pipeline, otherwise glob patterns are matched to the **collection**. -### Output Resolution +#### Output Resolution The resolved values can be accessed from `myTask.resolvedOutput` after the task has emitted a `task.finish` event. diff --git a/examples/pipelines/tests/fork_fork.js b/examples/pipelines/tests/fork_fork.js index 4131253..8cab9d8 100644 --- a/examples/pipelines/tests/fork_fork.js +++ b/examples/pipelines/tests/fork_fork.js @@ -176,6 +176,6 @@ const pipeline8_2 = join( // edit this line to run the desired pipeline. // documentation on the usage of these pipelines may be found in the link below // https://github.com/bionode/GSoC17/blob/master/Journal/Week_11.md -pipeline().then((results) => { +pipeline2().then((results) => { console.log('RESULTSNEW: ', results) }) \ No newline at end of file From 5cdbcdeb32e8996eaa899a6f03928239b21d1477 Mon Sep 17 00:00:00 2001 From: tiagofilipe12 Date: Wed, 23 Aug 2017 15:34:30 +0100 Subject: [PATCH 03/35] edited streamable tasks related documentation --- docs/Task.md | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/docs/Task.md b/docs/Task.md index 039f414..65b1eb2 100644 --- a/docs/Task.md +++ b/docs/Task.md @@ -23,6 +23,8 @@ const myTask = task(props, operationCreator) **props** is an object with the following structure: +#### Input / output tasks + ```javascript const props = { input: '*.txt', // valid input, see below @@ -52,9 +54,12 @@ const task = ({ ) ``` -If either (input and output) is +## Streamable tasks + +If either (input or output) is not provided, it will be assumed the task is then a *streaming task* - i.e., it -is a duplex stream with writable and/or readable portions. Consider: +is a duplex stream with writable and/or readable portions. Consider this +javascript function: ```javascript const throughCapitalize = through(function (chunk, env, next) { @@ -65,19 +70,19 @@ const throughCapitalize = through(function (chunk, env, next) { // then call next so that next chunk can be handled next() }) +``` +And the following tasks: + +```javascript const capitalize = task({ name: 'Capitalize Through Stream' }, // Here, input is a readable stream that came from the previous task // Let's return a through stream that takes the input and capitalizes it ({ input }) => input.pipe(throughCapitalize) ) -``` -You could connect `capitalize` to a readable and writable file stream with: - -```javascript -const readFile = ({ +const readFile = task({ input: '*.lowercase', name: 'Read from *.lowercase' }, ({ input }) => { @@ -86,11 +91,17 @@ const readFile = ({ rs.inFile = input }) -const writeFile = ({ +const writeFile = task({ output: '*.uppercase', name: 'Write to *.uppercase' }, ({ input }) => fs.createWriteStream(input.inFile.swapExt('uppercase'))) +``` +You could connect `capitalize` to a readable (`readFile`) and writable +(`writeFile`) file +stream with: + +```javascript // Can now connect the three: join(readFile, capitalize, writeFile) ``` From 97c929374c990303551b8691482bc30e5ef5e9e9 Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Wed, 23 Aug 2017 17:05:49 +0100 Subject: [PATCH 04/35] added more information on I/O and shell commands within operationCreator --- docs/Internals.md | 24 +++++++---- docs/README.md | 11 ++--- docs/Task.md | 43 ++++++++++++------- .../tests/pipeline_after_pipeline.js | 42 ++++++++++++++++++ 4 files changed, 91 insertions(+), 29 deletions(-) create mode 100644 examples/pipelines/tests/pipeline_after_pipeline.js diff --git a/docs/Internals.md b/docs/Internals.md index 9153e15..45fd91f 100644 --- a/docs/Internals.md +++ b/docs/Internals.md @@ -2,20 +2,28 @@ This section documents how bionode-watermill functions internally. Read this if you would like to understand the tool better, or would like to contribute *middlewares*. -Waterwheel and [Gulp](https://github.com/gulpjs/gulp) are alike in some ways: +Watermill and [Gulp](https://github.com/gulpjs/gulp) are alike in some ways: - both provide an interface to run **tasks** in **series and parallel** -- both maintain a **registry of tasks** - Gulp uses [Undertaker](https://github.com/gulpjs/gulp) and Waterwheel uses a [Redux](https://github.com/reactjs/redux) store +- both maintain a **registry of tasks** - Gulp uses [Undertaker](https://github.com/gulpjs/gulp) and Watermill uses a [Redux](https://github.com/reactjs/redux) store - both use [async-done](https://github.com/gulpjs/async-done) to catch errors and completion from the task function Yet also differ in some respects: -- Undertaker uses [bach](https://github.com/gulpjs/bach) for composing async functions in serial and parallel, Waterwheel uses its own serial and parallel implementations, which differ in one important way: **objects are passed through between tasks** -- Waterwheel's `task` is not actually a "task function" (i.e. what is sent into async-done), but rather a wrapper around that which has an extensive lifecycle, producing the task function around the middle -- Gulp uses [vinyl-fs](https://github.com/gulpjs/vinyl-fs) to store applied transformations to a source stream of files (e.g. `gulp.src('*.lowercase').pipe(capitalize()).pipe(gulp.dest('uppercase')) )`) while Waterwheel does not: the data sets are too big to be stored in one Buffer -- In addition to serial and parallel, Waterwheel provides extra operators like fork - -Waterwheel was created to enable the development of data **pipelines with modular and swappable components**. As a result, **parameters and options are the first class citizens, while file/folder names are handled automatically**. The streaming nature lets you **mix streams and processes**: e.g. filter search results in a through stream, run a pipeline on each value. +- Undertaker uses [bach](https://github.com/gulpjs/bach) for composing async +functions in serial and parallel, Watermill uses its own serial and parallel +implementations, which differ in one important way: **objects are passed through between tasks** +- Watermill's `task` is not actually a "task function" (i.e. what is sent into +async-done), but rather a wrapper around that which has an extensive lifecycle, producing the task function around the middle +- Gulp uses [vinyl-fs](https://github.com/gulpjs/vinyl-fs) to store applied +transformations to a source stream of files (e.g. `gulp.src('*.lowercase') +.pipe(capitalize()).pipe(gulp.dest('uppercase')) )`) while Watermill does not: +the data sets are too big to be stored in one Buffer +- In addition to serial and parallel, Watermill provides extra operators like +fork + +Watermill was created to enable the development of data **pipelines with +modular and swappable components**. As a result, **parameters and options are the first class citizens, while file/folder names are handled automatically**. The streaming nature lets you **mix streams and processes**: e.g. filter search results in a through stream, run a pipeline on each value. diff --git a/docs/README.md b/docs/README.md index 10f8601..c59ca9e 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1,9 +1,10 @@ ## Table of Contents * [Beginner Walkthrough](BeginnerWalkthrough.md) -* [Task](Task.md) - - [Lifecycle](TaskLifecycle.md) -* [Orchestration](Orchestration.md) -* [Comparison](Comparison.md) -* [Internals](Internals.md) +* API + * [Task](Task.md) + - [Lifecycle](TaskLifecycle.md) + * [Orchestration](Orchestration.md) + * [Comparison](Comparison.md) + * [Internals](Internals.md) diff --git a/docs/Task.md b/docs/Task.md index 65b1eb2..6aa3722 100644 --- a/docs/Task.md +++ b/docs/Task.md @@ -58,8 +58,7 @@ const task = ({ If either (input or output) is not provided, it will be assumed the task is then a *streaming task* - i.e., it -is a duplex stream with writable and/or readable portions. Consider this -javascript function: +is a duplex stream with writable and/or readable portions. Consider: ```javascript const throughCapitalize = through(function (chunk, env, next) { @@ -69,18 +68,20 @@ const throughCapitalize = through(function (chunk, env, next) { this.push(chunk.toString().toUpperCase()) // then call next so that next chunk can be handled next() -}) -``` -And the following tasks: +You could connect `capitalize` to a readable (`readFile`) and writable +(`writeFile`) file +stream with: -```javascript const capitalize = task({ name: 'Capitalize Through Stream' }, // Here, input is a readable stream that came from the previous task // Let's return a through stream that takes the input and capitalizes it ({ input }) => input.pipe(throughCapitalize) ) +``` + +```javascript const readFile = task({ input: '*.lowercase', @@ -95,13 +96,7 @@ const writeFile = task({ output: '*.uppercase', name: 'Write to *.uppercase' }, ({ input }) => fs.createWriteStream(input.inFile.swapExt('uppercase'))) -``` - -You could connect `capitalize` to a readable (`readFile`) and writable -(`writeFile`) file -stream with: -```javascript // Can now connect the three: join(readFile, capitalize, writeFile) ``` @@ -112,8 +107,6 @@ will become apparent once we can provide multiple sets of parameters to the transform and observe the effect, *without having to manually rewire input and output filenames*. As a single task the above would become: -TODO introduce single task version first - ```javascript const capitalize = task({ input: '*.lowercase', @@ -130,7 +123,11 @@ It is fine to run with no task name, a hashed one will be made for you. However, ### operationCreator -**operationCreator** is a function that will be provided with a **resolved props object**. `operationCreator` should return a **stream** or a **promise**. If the operation creator does not return a stream, it will be wrapped into a stream internally (e.g. `StreamFromPromise`). An example operation creator is +**operationCreator** is a function that will be provided with a **resolved +props object** and that is responsible for the execution of the task itself. +`operationCreator` should return a **stream** or a **promise**. If the operation +creator does not return a stream, it will be wrapped into a stream internally +(e.g. `StreamFromPromise`). An example operation creator is ```javascript function operationCreator(resolvedProps) { @@ -149,11 +146,25 @@ function operationCreator(resolvedProps) { >const operationCreator = ({ input }) => intoStream(input).pipe(fs.createWriteStream('bar.txt')) >``` +#### shell commands + +Unix shell commands can be executed within tasks by returning a string from +`operationCreator`. HOW?! + ### Input and Output The `input` and `output` objects can be a **string glob pattern**, or a plain object of them. The glob will be **resolved to an absolute path** when it is passed to the `operationCreator`. +Bionode-watermill manages input and output files and folders run by a `task`. + All inputs and outputs are saved within `data` folder (generated by running + bionode-watermill) in the current working directory. The input files of the + first task can be elsewhere, outside `data` folder. Inside the `data` + folder, other subdirectories are created (one per `task`) with `uid` (unique + id) of + its + respective task. This `uid` is generated by `bionode-watermill` given the + props of the task and their parent tasks. For example, @@ -164,7 +175,7 @@ For example, will resolve to something like: ```javascript -{ input: '/data/ERR1229296.sra' } +{ input: '/data//ERR1229296.sra' } ``` And diff --git a/examples/pipelines/tests/pipeline_after_pipeline.js b/examples/pipelines/tests/pipeline_after_pipeline.js new file mode 100644 index 0000000..7289937 --- /dev/null +++ b/examples/pipelines/tests/pipeline_after_pipeline.js @@ -0,0 +1,42 @@ +'use strict' + +// === WATERMILL === +const { + task, + join, + junction, + fork +} = require('../../..') + +const task0 = task({name: 'task0'}, () => `echo "something0"`) + +const task1 = task( + { + name: 'task1', + output: '*.txt' + }, () => `touch task1_middle.txt`) + +const task2 = task( + { + name: 'task2', + output: '*.txt' + }, () => `touch task2_middle.txt`) + +const task3 = task( + { + name: 'task3', + output: '*.txt', + input: '*.txt' + }, ({ input }) => `touch ${input}_final`) + +const task4 = task( + { + name: 'task4', + input: '*_middle.txt', + output: '*.txt' + }, ({ input }) => `echo "something4" >> ${input}`) + + +const pipeline1 = join(task0, fork(task1, task2), task3) + +pipeline1().then(task4) \ No newline at end of file From f66cb7238fe841fd19ee32919df7a929908f1c48 Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Thu, 24 Aug 2017 10:53:02 +0100 Subject: [PATCH 05/35] updated orchestration docs --- docs/Orchestration.md | 188 ++++++++++++++++++++++++++++-------------- docs/Task.md | 30 +++++++ 2 files changed, 155 insertions(+), 63 deletions(-) diff --git a/docs/Orchestration.md b/docs/Orchestration.md index f1c7d1d..e9b6115 100644 --- a/docs/Orchestration.md +++ b/docs/Orchestration.md @@ -1,35 +1,13 @@ # Orchestration -A **task** is the fundamental **unit** for building pipelines. It +Bionode-watermill has three main orchestrators: **join**, **junction** and +**fork**. In current API, these orchestrators gather a group of tasks that +are described inside them and apply different effects on the pipeline, in +which they are used. -* has a **unique input/output pair** defined by glob pattern(s) and/or streams -* has a **single params object**. Params should be used when they can be applied - to a task with the same I/O but alter the output. e.g. command flags -* **emits** `task.finish` with a reference to that task object in state +## Join -*Example* - -```javascript -/* - * Usage: samtools index [-bc] [-m INT] [out.index] - * Options: - * -b Generate BAI-format index for BAM files [default] - * -c Generate CSI-format index for BAM files - * -m INT Set minimum interval size for CSI indices to 2^INT [14] - */ - -const samtoolsIndex = task({ - input: '*.bam', - output: '*.bai', - params: { format: 'b' } - name: 'samtools index' -}, ({ input, params }) => shell(`samtools index -${params.format} ${input}`)) - -samtoolsIndex() - .on('task.finish', (results) => console.log(results.resolvedOutput)) -``` - -A **join** is used to run a **sequence of tasks in order**. It +A `join` is used to run a **sequence of tasks in order**. It * must pass the `trajectory` between tasks (including the trajectory passed into `join`) * **emits** `join.finish` with a `trajectory` @@ -41,47 +19,131 @@ A **join** is used to run a **sequence of tasks in order**. It // task1 will resolve input to filesystem, // task2 will resolve input to files at collection nodes defined by its trajectory // (which will be created based on the trajectory task1 returns and join passes to task2) -const joined = join(task1, task2) - -joined() - .tasks((t1, t2) => { - t1.on('task.finish', (results) => { - anotherTask(results.trajectory)() - .on('task.finish', () => console.log('another task finished')) - }) - }) - .on('join.finish', (results) => console.log(results.trajectory)) -// Which looks like: -// task1 -> task2 -// task1 -> anotherTask - -// NOTE: This is only recommended for advanced or dynamic cases. Try to avoid -// retreiving task references from join or parallel. -// Another way to do it: -const pipeline = join(task1, parallel(task2, anotherTask), task3) -// However, in this case, task3 will only start after task2 and anotherTask finish -// task1 -> task2 ⌉ -// | -> task3 -// task1 -> anotherTask ⌋ -// Alternatively you could do: -const pipeline = join(task1, parallel(join(task2, task3), anotherTask)) -// task1 -> task2 -> task3 -// task1 -> anotherTask +const pipeline = join(task1, task2) +// executes the pipeline +pipeline() ``` -A **parallel** is used to run **a set of tasks simultaneously**. It +The above represented `pipeline` will first run `task1` and **only after +`task1`** is + finished, starts running `task2`. -* must pass the trajectory passed into `parallel` into **each task** -* **emits** `parallel.finish` with a list of trajections +## Junction + +A ``junction`` is used to run **a set of tasks simultaneously** and **waits +for the results** of all tasks within junction before running anythin else. +. It + +* must pass the trajectory passed into `junction` into **each task** +* **emits** `junction.finish` with a list of trajections * returns/emits reference to each task +*Example* + ```javascript -const parallelized = parallel(taskA, taskB) +// executes both taskA and taskB at the same time +// however one task may finish before the other depending on the tasks itself +const pipeline2 = junction(taskA, taskB) +// executes the pipeline +pipeline2() +``` -parallelized() - .tasks((a, b) => { /* do stuff if u want */ }) - .on('parallel.finish', (results) => { - console.log(results.trajectory) // an array - }) +The above represented `pipeline2` will run both tasks (`taskA` and `taskB`) at the +same time. + +## Fork + +A `fork` is used to run `a set of tasks simultaneously` but **it does not +wait for the results** from all tasks within `fork`. Instead, it will branch +the pipeline in the sense that each task within `fork` will have their own +set of upstream tasks. It + +* must **multiply each upstream task** (tasks proceeding the `fork`) as many +times +as the `fork` branches (number of tasks within `fork`). +* currently uses `join` operator to create each branch. + +*Example* + +```javascript +// executes task 1 and task 2 at the same time +// However it does not wait for task1 or task2 to finish before executing +// task3 after task1 and after task2 +const pipeline3 = join(fork(task1, task2), task3) +// executes the pipeline +pipeline3() ``` +The above referenced `pipeline3` will run both tasks (`task1` and `task2`) +simultaneously and will run upstream task (`task3`) twice, after `task1` and +after +`task2`, respectively. + +### Conceptual difference between junction and fork + +While `junction` is a tree with several branches but that end up in the same +leaf, `fork` is a tree with several branches that each one end up on its own +leaf. + +Therefore, `junction` should be used everytime the user wants to +**wait** for the results from the tasks within `junction` and then perform +other tasks: + +```javascript +const pipeline4 = join(junction(taskA, taskB), taskC) +``` + +In `pipeline4` example `taskA` and `taskB` will be executed simultaneously but +then `taskC` will be waiting for the `task.finish` event for both `taskA` and + `taskB`. This behavior is most useful when `taskC` inputs depend on both + tasks (`taskA` and `taskB`) outputs. + + On the other hand, `fork` should be used everytime the user **do not want to + wait** for all tasks to finish before running upstream tasks: + + ```javascript +const pipeline5 = join(fork(task1, task2), task3) +``` + +In `pipeline5`, if for instance `task1` is finished but `task2` is not, +`task3` will run in a independent branch after `task1`. Then, when `task2` is + finished, `taks3` will be run again after `task2`. In this case we will end + up with two branches: + + ```javascript +taks1 --> task3 +task2 --> task3 +``` + +### Concurrency limitations of fork and junction + +`junction` and `fork` may start running n number of processes and consume x +RAM, since they fire several tasks at the same time. Therefore, user must +have this into account. + +Imagine that `fork`(or `junction`) branches the pipeline into 8 independent +branches: + +```javascript +const pipeline6 = join( + fork(task1, task2, task3, task4, task5, task6, task7, task8), + finalTask +) +``` + +Now imagine that each task uses 1 CPU and you only have 4 CPUs available, +this will endup consuming more CPUs than we wanted. So, in current API users +must handle this manually: + +```javascript +// first run +const pipeline6 = join( + fork(task1, task2, task3, task4), + finalTask +) +// and then +const pipeline6_2 = join( + fork(task5, task6, task7, task8), + finalTask +) +``` diff --git a/docs/Task.md b/docs/Task.md index 6aa3722..0a5b78d 100644 --- a/docs/Task.md +++ b/docs/Task.md @@ -259,3 +259,33 @@ The resolved values can be accessed from `myTask.resolvedOutput` after the task alignment: 'reads.bam' } ``` + +To sum up, a **task** is the fundamental **unit** for building pipelines. It + +* has a **unique input/output pair** defined by glob pattern(s) and/or streams +* has a **single params object**. Params should be used when they can be applied + to a task with the same I/O but alter the output. e.g. command flags +* **emits** `task.finish` with a reference to that task object in state + +*Example* + +```javascript +/* +* Usage: samtools index [-bc] [-m INT] [out.index] +* Options: +* -b Generate BAI-format index for BAM files [default] +* -c Generate CSI-format index for BAM files +* -m INT Set minimum interval size for CSI indices to 2^INT [14] +*/ + +const samtoolsIndex = task({ + input: '*.bam', + output: '*.bai', + params: { format: 'b' } + name: 'samtools index' +}, ({ input, params }) => shell(`samtools index -${params.format} ${input}`)) + + +samtoolsIndex() + .on('task.finish', (results) => console.log(results.resolvedOutput)) +``` \ No newline at end of file From f97830cb63b3adfd4e97597d5638f11f1f72e2c4 Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Thu, 24 Aug 2017 11:02:50 +0100 Subject: [PATCH 06/35] updated documentation on shell commands execution --- docs/Task.md | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/docs/Task.md b/docs/Task.md index 0a5b78d..1aebb56 100644 --- a/docs/Task.md +++ b/docs/Task.md @@ -11,19 +11,17 @@ const task = watermill.task const { task } = watermill ``` -## API - `task` takes two parameters: **props** and **operationCreator**: ```javascript const myTask = task(props, operationCreator) ``` -### props +## props **props** is an object with the following structure: -#### Input / output tasks +### Input / output tasks ```javascript const props = { @@ -54,7 +52,7 @@ const task = ({ ) ``` -## Streamable tasks +### Streamable tasks If either (input or output) is not provided, it will be assumed the task is then a *streaming task* - i.e., it @@ -121,7 +119,7 @@ const capitalize = task({ It is fine to run with no task name, a hashed one will be made for you. However, properly named tasks will help greatly reading pipeline output -### operationCreator +## operationCreator **operationCreator** is a function that will be provided with a **resolved props object** and that is responsible for the execution of the task itself. @@ -133,7 +131,7 @@ creator does not return a stream, it will be wrapped into a stream internally function operationCreator(resolvedProps) { const fs = require('fs') const intoStream = require('into-stream') - const ws = intoStream(resolvedProps.input).pipe( fs.createWriteStream('bar.txt') ) + const ws = intoStream(resolvedProps.input).pipe(fs.createWriteStream('bar.txt') ) return ws } ``` @@ -146,10 +144,23 @@ function operationCreator(resolvedProps) { >const operationCreator = ({ input }) => intoStream(input).pipe(fs.createWriteStream('bar.txt')) >``` -#### shell commands +### Shell commands + +Bionode-watermill allows users to both execute code in javascript within +`operationCreator` or run unix **shell commands** within +tasks by returning a string from +`operationCreator`: + +*Example* -Unix shell commands can be executed within tasks by returning a string from -`operationCreator`. HOW?! +```javascript +// creates a test.txt file +const operationCreator = () => `touch test.txt` +// or non ES6 code style +function operationCreator() { + return 'touch test.txt' +} +``` ### Input and Output From 93d46a8a6a0ca1b3f6ef09d251c8e508b4bec474 Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Thu, 24 Aug 2017 11:52:00 +0100 Subject: [PATCH 07/35] improved beginner's walkthrough --- docs/BeginnerWalkthrough.md | 133 +++++++++++++++++++++++++----------- docs/Orchestration.md | 8 +-- docs/Task.md | 4 +- 3 files changed, 98 insertions(+), 47 deletions(-) diff --git a/docs/BeginnerWalkthrough.md b/docs/BeginnerWalkthrough.md index 57b5c72..ead8a3d 100644 --- a/docs/BeginnerWalkthrough.md +++ b/docs/BeginnerWalkthrough.md @@ -1,13 +1,13 @@ # Beginner Walkthrough -bionode-watermill can be used to create complicated bioinformatics pipelines, +Bionode-watermill can be used to create complex bioinformatics pipelines, but at its core are simple ideas. The first of these is the idea of a *task*. ## Task A `task` describes the idea that *some input* will be transformed into *some -output*. For our cases, we will consider input and output to both be a file. -Before jumping into the JavaScript, let's see what our first task is! +output*. First, we will consider input and output to both be a file. +Before jumping into JavaScript, let's see what our first task is! Our first simple pipeline will convert a file with lowercase characters into one with upper case characters. On your UNIX command line (on Windows, see @@ -27,27 +27,27 @@ cat alphabet.lowercase | tr '[:lower:]' '[:upper:]' > alphabet.uppercase cat alphabet.uppercase ``` -Okay, so how to the same thing with bionode-watermill? We can create a `task` +Okay, so how to do the same thing with bionode-watermill? We can create a `task` for it. A `task` is created by passing `watermill.task` two parameters -- object describing `input`, `output`, `name` -- the "operation creator", a function which will receive "resolved props" +- object describing `input`, `output`, `params`, `name` ("props") +- the "operation creator", a function which will receive "props" -For now, let's forget about resolved props and just translate the simple pipeline +For now, let's forget about props and just translate the simple pipeline above into a `task`: ```javascript // pipeline.js // Make sure to "npm install bionode-watermill" first -const watermill = require('bionode-watermill') +const { task } = require('bionode-watermill') -const uppercaser = watermill.task({ +const uppercaser = task({ input: '*.lowercase', output: '*.uppercase', name: 'Uppercase *.lowercase -> *.uppercase' -}, function(resolvedProps) { - const input = resolvedProps.input +}, function(props) { + const input = props.input const output = input.replace(/lowercase$/, 'uppercase') return `cat ${input} | tr '[:lower:]' '[:upper:]' > ${output}` @@ -65,12 +65,12 @@ node pipeline.js ``` What's going on here? Behind the scenes bionode-watermill will start looking for -files matching the [glob][node-glob] pattern `*.alphabet`. The file you created +files matching the [glob][node-glob] pattern `*.lowercase`. The file you created earlier will be found, *and its absolute path will be stored*. Then a unique folder to run the task is created within the `data` folder - this prevents us from worrying about file overwriting in larger pipelines. Within the folder for -the instance of this task, a symlink to the `*.alphabet` file is created. -Finally, `resolvedProps`, which holds the absolute path of `*.alphabet` inside +the instance of this task, a symlink to the `*.lowercase` file is created. +Finally, `props`, which holds the absolute path of `*.lowercase` inside `input` (the same as it was for "props", just now the glob pattern is a valid file path) is passed to our "operation creator". The operation creator returns a String (using ES6 template literals). When watermill finds a string returned from a task, @@ -78,11 +78,12 @@ it will run it as a shell child process within that task instance's folder. The log output will have each line prefixed with a hash: this is the ID of the instance of that task, and is the name of the folder in `data` which was made -for this task, look inside there and you will find the `alphabet.uppercase` file. +for this task, look inside there and you will find the `*.uppercase` file. [node-glob]: https://github.com/isaacs/node-glob There are two ways we can improve the readability of our task: + - assignment destructuring ```javascript @@ -98,17 +99,17 @@ const { input } = obj ```javascript // Without => -function actionCreator(resolvedProps) { +function operationCreator(props) { return '...' }.bind(this) // => will automatically ".bind(this)" (so that this is the same inside and outside the function) // With =>, can return object directly instead of having a function body -const actionCreator = (resolvedProps) => '...' +const operationCreator = (props) => '...' ``` -With those syntax features, our task looks like: +With those syntax features, our task looks will look like this: ```javascript -const uppercaser = watermill.task({ +const uppercaser = task({ input: '*.lowercase', output: '*.uppercase', name: 'Uppercase *.lowercase -> *.uppercase' @@ -131,7 +132,7 @@ this: ```javascript const fs = require('fs') -const watermill = require('bionode-watermill') +const { task } = require('bionode-watermill') const through = require('through2') const uppercaser = watermill.task({ @@ -159,16 +160,18 @@ const uppercaser = watermill.task({ watermill needs to be given a flowing stream in the operation creator in order to watch it finish* -## Operators +## Orchestrators -The next core idea of bionode-watermill is that of *operators*. Operators are +The next core idea of bionode-watermill is that of *orchestrators*. +Orchestrators are ways to combine tasks. The simplest of the operators is `join`. `join` takes any number of tasks as parameters, and will run each of the tasks sequentially. Where the first task will check the current working folder for files that match `input`, within a join lineage, downstream tasks will attempt to match their `input` to an upstream task's `output`. -A simple join pipeline is a download+extract one. For this we can download some +A simple join pipeline is a download + extract one. For this we can download +some reads from the NCBI Sequence Read Archive (SRA) and extract them with `fastq-dump` from sratools. @@ -200,47 +203,74 @@ The next operator you might use is `junction`. Junction lets you run a set of tasks in parallel. If `junction` is used within a `join`, the downstream tasks will have access to the outputs of each of the parallelized items from `junction`. -We can use `junction` to run our pipeline above over multiple samples: +We can use `junction` to run our pipeline above over multiple samples and +make a manifest file with all `.fna.gz` files that were downloaded: ```javascript -const pipeline = junction( +// task to create manifest file with all .fna.gz files downloaded +const listFiles = task({ + input: '*.fna.gz', // this is used to symlink files to this task directory + output: '*.txt', +}, ({ input}) => `ls | grep ".fna.gz" > listFiles.txt` +) + +const pipeline = join( + junction( join(getSamples('2492428'), fastqDump), join(getSamples('1274026'), fastqDump) + ), + listFiles ) ``` The last operator is `fork`. Fork lets you replace one task in your pipeline with more than one, and have the downstream tasks duplicated for each task you -pass into fork. This is useful for comparing tools or options within a tool. +pass into fork. This is useful for comparing tools, options within a tool or +even perform a couple of repetitive tasks for many samples from + a given pipeline. -For a simple example, we can run `ls` with different options before sending -that output to capitalize. What's important to note is that *we only define one -uppercaser task, yet it is ran on each task in the `fork`*. +For example, we can run use fork to fetch many samples using +`getSamples` and extract each one `fastqDump` and then uncompressed them all +using out new `gunzipIt` task: ```javascript -const lsMaker = (opts) => watermill.task({ - output: '*.lowercase' -}, () => `ls ${opts} > ls.lowercase`) - -// uppercaser task as above +const gunzipIt = task({ + input: '*.fna.gz', + output: '*.fa', +}, ({ input}) => `gunzip -c ${input} > ${input.split('.')[0]}.fa` +) const pipeline = join( - fork(lsMaker(''), lsMaker('-lh')), - uppercaser + fork( + join(getSamples('2492428'), fastqDump), + join(getSamples('1274026'), fastqDump) + ), + gunzipIt ) - -pipeline() ``` +What's important to note is that *we only define one +`gunzipIt` task, yet it is ran on each task in the `fork`*. Also, `gunzipIt` +will start running once each task within the `fork` gets finished and thus it + does not have to wait for all tasks within `fork` to finish. + ## Summary You've seen that bionode-watermill can be used to defined `tasks` that *resolve* their `input` and `output`. These tasks can be child processes (i.e. a program -on the command line) or just regular JavaScript functions. As well a brief introduction +on the command line) or just regular JavaScript functions. As well, a brief +introduction to the operators `join`, `junction`, and `fork` was made. -These are all the tools necessary to make a bioinformatics pipeline! See -this [variant calling pipeline](https://github.com/bionode/bionode-watermill/blob/master/examples/variant-calling-filtered/pipeline.js): +These are all the tools necessary to make a bioinformatics pipeline! + +Check out our [tutorial](https://github.com/bionode/bionode-watermill-tutorial) +if you want to get a feel on how it looks like and start assembling your own +pipelines. + +Here are some **example pipelines** + +[Variant calling pipeline](https://github.com/bionode/bionode-watermill/blob/master/examples/variant-calling-filtered/pipeline.js): ```javascript const pipeline = join( @@ -257,6 +287,27 @@ const pipeline = join( ) ``` +[Mapping with bowtie2 and bwa](https://github.com/bionode/bionode-watermill/tree/master/examples/pipelines/two-mappers) + +```javascript +const pipeline = join( + junction( + getReference, + join(getSamples,fastqDump) + ), + samtoolsFaidx, gunzipIt, /* since this will be common to both mappers, there + is no + need to be executed after fork duplicating the effort. */ + fork( + join(indexReferenceBwa, bwaMapper), + join(indexReferenceBowtie2, bowtieMapper) + ), + /* here the pipeline will break into two distinct branches, one for each + mapping approach used. */ + samtoolsView, samtoolsSort, samtoolsIndex, samtoolsDepth +) +``` + Now go out there and make something cool! If you would like to help out, see the diff --git a/docs/Orchestration.md b/docs/Orchestration.md index e9b6115..aa0989b 100644 --- a/docs/Orchestration.md +++ b/docs/Orchestration.md @@ -56,9 +56,9 @@ same time. A `fork` is used to run `a set of tasks simultaneously` but **it does not wait for the results** from all tasks within `fork`. Instead, it will branch the pipeline in the sense that each task within `fork` will have their own -set of upstream tasks. It +set of downstream tasks. It -* must **multiply each upstream task** (tasks proceeding the `fork`) as many +* must **multiply each downstream task** (tasks proceeding the `fork`) as many times as the `fork` branches (number of tasks within `fork`). * currently uses `join` operator to create each branch. @@ -75,7 +75,7 @@ pipeline3() ``` The above referenced `pipeline3` will run both tasks (`task1` and `task2`) -simultaneously and will run upstream task (`task3`) twice, after `task1` and +simultaneously and will run downstream task (`task3`) twice, after `task1` and after `task2`, respectively. @@ -99,7 +99,7 @@ then `taskC` will be waiting for the `task.finish` event for both `taskA` and tasks (`taskA` and `taskB`) outputs. On the other hand, `fork` should be used everytime the user **do not want to - wait** for all tasks to finish before running upstream tasks: + wait** for all tasks to finish before running downstream tasks: ```javascript const pipeline5 = join(fork(task1, task2), task3) diff --git a/docs/Task.md b/docs/Task.md index 1aebb56..e38b507 100644 --- a/docs/Task.md +++ b/docs/Task.md @@ -148,7 +148,7 @@ function operationCreator(resolvedProps) { Bionode-watermill allows users to both execute code in javascript within `operationCreator` or run unix **shell commands** within -tasks by returning a string from +tasks by returning a string (using ES6 template literals) from `operationCreator`: *Example* @@ -158,7 +158,7 @@ tasks by returning a string from const operationCreator = () => `touch test.txt` // or non ES6 code style function operationCreator() { - return 'touch test.txt' + return `touch test.txt` } ``` From a6dc7b93cae8e404773d9af55c17d6774f4631cd Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Thu, 24 Aug 2017 11:57:18 +0100 Subject: [PATCH 08/35] removed unnecessary grep --- docs/BeginnerWalkthrough.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/BeginnerWalkthrough.md b/docs/BeginnerWalkthrough.md index ead8a3d..095f72a 100644 --- a/docs/BeginnerWalkthrough.md +++ b/docs/BeginnerWalkthrough.md @@ -211,7 +211,7 @@ make a manifest file with all `.fna.gz` files that were downloaded: const listFiles = task({ input: '*.fna.gz', // this is used to symlink files to this task directory output: '*.txt', -}, ({ input}) => `ls | grep ".fna.gz" > listFiles.txt` +}, ({ input}) => `ls *.fna.gz > listFiles.txt` ) const pipeline = join( From ec4fd8ac2f75a0ce9a724f0cd452b3bd3c0f7385 Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Thu, 24 Aug 2017 12:04:07 +0100 Subject: [PATCH 09/35] added a minor indententation fix --- docs/BeginnerWalkthrough.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/BeginnerWalkthrough.md b/docs/BeginnerWalkthrough.md index 095f72a..b93248c 100644 --- a/docs/BeginnerWalkthrough.md +++ b/docs/BeginnerWalkthrough.md @@ -218,8 +218,8 @@ const pipeline = join( junction( join(getSamples('2492428'), fastqDump), join(getSamples('1274026'), fastqDump) - ), - listFiles + ), + listFiles ) ``` From 52bc65113263306365d8fc8c3a2930cde6c4defe Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Thu, 24 Aug 2017 15:22:20 +0100 Subject: [PATCH 10/35] added a multi-sample two-mappers example --- docs/README.md | 6 +- .../two-mappers/pipeline_multipleinput.js | 148 ++++++++++++++++++ 2 files changed, 153 insertions(+), 1 deletion(-) create mode 100644 examples/pipelines/two-mappers/pipeline_multipleinput.js diff --git a/docs/README.md b/docs/README.md index c59ca9e..3822f1f 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1,10 +1,14 @@ ## Table of Contents * [Beginner Walkthrough](BeginnerWalkthrough.md) +* [Graph visualization tool] * API * [Task](Task.md) - [Lifecycle](TaskLifecycle.md) + - [Multiple input] * [Orchestration](Orchestration.md) + * [Forkception] + * [Uids] * [Comparison](Comparison.md) * [Internals](Internals.md) - +* Example Pipelines diff --git a/examples/pipelines/two-mappers/pipeline_multipleinput.js b/examples/pipelines/two-mappers/pipeline_multipleinput.js new file mode 100644 index 0000000..ea4f24b --- /dev/null +++ b/examples/pipelines/two-mappers/pipeline_multipleinput.js @@ -0,0 +1,148 @@ +'use strict' + +// === WATERMILL === +const { + task, + join, + junction, + fork +} = require('../../..') + +// === MODULES === + +const fs = require('fs') +const path = require('path') + +const request = require('request') + +// === CONFIG === + +// specifies the number of threads to use by mappers +const THREADS = parseInt(process.env.WATERMILL_THREADS) || 2 + +const config = { + name: 'Streptococcus pneumoniae', + sraAccession: ['ERR045788', 'ERR045787'], + referenceURL: 'http://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/007/045/GCF_000007045.1_ASM704v1/GCF_000007045.1_ASM704v1_genomic.fna.gz' +} + +// === TASKS === + +// first lets get the reference genome for our mapping +const getReference = (referenceURL, name) => task({ + params: { url: referenceURL }, + output: '*_genomic.fna.gz', + name: `Download reference genome for ${name}` +}, ({ params, dir }) => { + const { url } = params + const outfile = url.split('/').pop() + + // essentially curl -O + return request(url).pipe(fs.createWriteStream(dir + '/' + outfile)) +}) + +//then get samples to work with +const getSamples = (sraAccession) => task({ + params: { + db: 'sra', + accession: sraAccession + }, + output: '**/*.sra', + dir: process.cwd(), // Set dir to resolve input/output from + name: `Download SRA ${config.sraAccession}` +}, ({ params }) => `bionode-ncbi download ${params.db} ${params.accession}` +) + +// extract the samples from fastq.gz +const fastqDump = task({ + input: '**/*.sra', + output: [1, 2].map(n => `*_${n}.fastq.gz`), + name: 'fastq-dump **/*.sra' +}, ({ input }) => `fastq-dump --split-files --skip-technical --gzip ${input}` +) + +// first lets uncompress the gz +const gunzipIt = task({ + input: '*_genomic.fna.gz', + output: '*.fa', + params: { output: 'uncompressed.fa' } + }, ({ params, input}) => `gunzip -c ${input} > ${params.output}` +) + +// then index using first bwa ... +const indexReferenceBwa = task({ + input: '*.fa', + output: { + indexFile: ['amb', 'ann', 'bwt', 'pac', 'sa'].map(suffix => + `bwa_index.${suffix}`), + //reference: 'bwa_index.fa' //names must match for bwa - between reference + // and index files + }, + //params: { output: 'bwa_index.fa' }, + name: 'bwa index bwa_index.fa -p bwa_index' +}, ({ input }) => `bwa index ${input} -p bwa_index`) + +// and bowtie2 + +const indexReferenceBowtie2 = task({ + input: '*.fa', + output: ['1.bt2', '2.bt2', '3.bt2', '4.bt2', 'rev.1.bt2', + 'rev.2.bt2'].map(suffix => `bowtie_index.${suffix}`), + params: { output: 'bowtie_index' }, + name: 'bowtie2-build -q uncompressed.fa bowtie_index' + }, ({ params, input }) => `bowtie2-build -q ${input} ${params.output}` + /* for bowtie we had to uncompress the .fna.gz file first before building + the reference */ +) + +// now use mappers with bwa + +const bwaMapper = task({ + input: { + //reference: '*.fa', + reads:[1, 2].map(n => `*_${n}.fastq.gz`), + indexFiles: ['amb', 'ann', 'bwt', 'pac', 'sa'].map(suffix => + `bwa_index.${suffix}`) //pass index files to bwa mem + }, + output: '*.sam', + params: { output: 'bwa_output.sam' }, + name: 'Mapping with bwa...' + }, ({ input, params }) => `bwa mem -t ${THREADS} bwa_index ${input.reads[0]} ${input.reads[1]} > ${params.output}` +) + +// and with bowtie2 + +const bowtieMapper = task({ + input: { + reference: '*_genomic.fna.gz', + reads:[1, 2].map(n => `*_${n}.fastq.gz`), + indexFiles: ['1.bt2', '2.bt2', '3.bt2', '4.bt2', 'rev.1.bt2', + 'rev.2.bt2'].map(suffix => `bowtie_index.${suffix}`) //pass index files to + // bowtie2 + }, + output: '*.sam', + params: { output: 'bowtie2_output.sam' }, + name: 'Mapping with bowtie2...' +}, ({ input, params }) => `bowtie2 -p ${THREADS} -x bowtie_index -1 ${input.reads[0]} -2 ${input.reads[1]} -S ${params.output}` +) + +// === PIPELINE === + +const pipeline = (sraAccession) => join( + junction( + getReference(config.referenceURL, config.name), + join(getSamples(sraAccession),fastqDump) + ), + gunzipIt, + fork( + join(indexReferenceBwa, bwaMapper), + join(indexReferenceBowtie2, bowtieMapper) + ) +) + +// actual run pipelines and return results +//pipeline().then(results => console.log('PIPELINE RESULTS: ', results)) +for (const sra of config.sraAccession) { + const pipelineMaster = pipeline(sra) + pipelineMaster() +} From c450ee7e46ae6dac6dc4c1d84db6af4eebd128c8 Mon Sep 17 00:00:00 2001 From: tiagofilipe12 Date: Thu, 24 Aug 2017 16:50:58 +0100 Subject: [PATCH 11/35] edited badge for dev --- README.md | 2 +- examples/pipelines/two-mappers/pipeline_multipleinput.js | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index fcd79eb..28e2873 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # bionode-watermill -[![npm version](https://badge.fury.io/js/bionode-watermill.svg)](https://badge.fury.io/js/bionode-watermill) [![node](https://img.shields.io/badge/node-v6.x-blue.svg)]() [![Build Status](https://travis-ci.org/bionode/bionode-watermill.svg?branch=master)](https://travis-ci.org/bionode/bionode-watermill) [![codecov.io](https://codecov.io/github/bionode/bionode-watermill/coverage.svg?branch=master)](https://codecov.io/github/bionode/bionode-watermill?branch=master) +[![npm version](https://badge.fury.io/js/bionode-watermill.svg)](https://badge.fury.io/js/bionode-watermill) [![node](https://img.shields.io/badge/node-v6.x-blue.svg)]() [![Build Status](https://travis-ci.org/bionode/bionode-watermill.svg?branch=dev)](https://travis-ci.org/bionode/bionode-watermill) [![codecov.io](https://codecov.io/github/bionode/bionode-watermill/coverage.svg?branch=master)](https://codecov.io/github/bionode/bionode-watermill?branch=master) *Watermill: A Streaming Workflow Engine* diff --git a/examples/pipelines/two-mappers/pipeline_multipleinput.js b/examples/pipelines/two-mappers/pipeline_multipleinput.js index ea4f24b..783dec5 100644 --- a/examples/pipelines/two-mappers/pipeline_multipleinput.js +++ b/examples/pipelines/two-mappers/pipeline_multipleinput.js @@ -22,7 +22,7 @@ const THREADS = parseInt(process.env.WATERMILL_THREADS) || 2 const config = { name: 'Streptococcus pneumoniae', - sraAccession: ['ERR045788', 'ERR045787'], + sraAccession: ['ERR045788', 'ERR016633'], referenceURL: 'http://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/007/045/GCF_000007045.1_ASM704v1/GCF_000007045.1_ASM704v1_genomic.fna.gz' } @@ -49,7 +49,7 @@ const getSamples = (sraAccession) => task({ }, output: '**/*.sra', dir: process.cwd(), // Set dir to resolve input/output from - name: `Download SRA ${config.sraAccession}` + name: `Download SRA ${sraAccession}` }, ({ params }) => `bionode-ncbi download ${params.db} ${params.accession}` ) @@ -130,7 +130,8 @@ const bowtieMapper = task({ const pipeline = (sraAccession) => join( junction( - getReference(config.referenceURL, config.name), + getReference, // maybe this can done in anorther task outside this + // pipeline... then hardcode the path join(getSamples(sraAccession),fastqDump) ), gunzipIt, @@ -143,6 +144,7 @@ const pipeline = (sraAccession) => join( // actual run pipelines and return results //pipeline().then(results => console.log('PIPELINE RESULTS: ', results)) for (const sra of config.sraAccession) { + console.log("sample:", sra) const pipelineMaster = pipeline(sra) pipelineMaster() } From 692008915b16080b4346da28bb2b4a5e07cbbfd2 Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Fri, 25 Aug 2017 09:37:45 +0100 Subject: [PATCH 12/35] main readme updated as well as documentation improved --- README.md | 145 ++++++++++-------- docs/BeginnerWalkthrough.md | 31 ++++ docs/PreviousReadme.md | 101 ++++++++++++ .../two-mappers/pipeline_multipleinput.js | 6 + 4 files changed, 218 insertions(+), 65 deletions(-) create mode 100644 docs/PreviousReadme.md diff --git a/README.md b/README.md index 28e2873..0b1df1e 100644 --- a/README.md +++ b/README.md @@ -1,84 +1,98 @@ +

+ + bionode logo + +
+ bionode.io +

+ # bionode-watermill -[![npm version](https://badge.fury.io/js/bionode-watermill.svg)](https://badge.fury.io/js/bionode-watermill) [![node](https://img.shields.io/badge/node-v6.x-blue.svg)]() [![Build Status](https://travis-ci.org/bionode/bionode-watermill.svg?branch=dev)](https://travis-ci.org/bionode/bionode-watermill) [![codecov.io](https://codecov.io/github/bionode/bionode-watermill/coverage.svg?branch=master)](https://codecov.io/github/bionode/bionode-watermill?branch=master) +> Node.js module to get data from the NCBI API (aka e-utils) + +[![npm version](https://badge.fury.io/js/bionode-watermill.svg)](https://badge.fury.io/js/bionode-watermill) +[![node](https://img.shields.io/badge/node-v6.x-blue.svg)]() +[![Build Status](https://travis-ci.org/bionode/bionode-watermill.svg?branch=dev)](https://travis-ci.org/bionode/bionode-watermill) +[![codecov.io](https://codecov.io/github/bionode/bionode-watermill/coverage.svg?branch=master)](https://codecov.io/github/bionode/bionode-watermill?branch=master) +[![Gitter](https://img.shields.io/gitter/room/nwjs/nw.js.svg?style=flat-square)](https://gitter.im/bionode/bionode-watermill) + +## Table of Contents + +* [What is bionode-watermill](#what-is-bionode-watermill) + * [Main features](#main-features) + * [Who is this tool for?](#who-is-this-tool-for) +* [Installation](#installation) +* [Documentation](#documentation) +* [Tutorial](#tutorial) +* [Example pipelines](#example-pipelines) +* [Why bionode-watermill?](#why-bionode-watermill) +* [Contributing](#contributing) -*Watermill: A Streaming Workflow Engine* -[![NPM](https://nodei.co/npm/bionode-watermill.png?downloads=true&stars=true)](https://nodei.co/npm/bionode-watermill/) -- [CWL?](#cwl) -- [What is a task?](#what-is-a-task) -- [What are orchestrators?](#what-are-orchestrators) -- [Check out bionode-watermill tutorial!](#check-out-bionode-watermill-tutorial) -- [Example pipelines](#example-pipelines) -- [Why bionode-watermill?](#why-bionode-watermill) -- [Who is this tool for?](#who-is-this-tool-for) +## What is bionode-watermill -Watermill lets you *orchestrate* **tasks** using operators like **join**, **junction**, and **fork**. Each task has a [lifecycle](https://thejmazz.gitbooks.io/bionode-watermill/content/TaskLifecycle.html) where +**Bionode-watermill** is a workflow engine that lets you assemble and run +bioinformatic pipelines with ease and less overhead. Bionode-watermill +pipelines are +essentially node.js scripts in which [tasks](docs/BeginnerWalkthrough.md#task) are the modules that will be +assembled in the final *pipeline* using [orchestrators](docs/BeginnerWalkthrough.md#orchestrators). -1. Input [glob patterns](https://github.com/isaacs/node-glob) are resolved to absolute file paths (e.g. `*.bam` to `reads.bam`) -2. The **operation** is ran, passed resolved input, params, and other props -3. The operation completes. -4. Output glob patterns are resolved to absolute file paths. -5. Validators are ran over the output. Check for non-null files, can pass in custom validators. -6. Post-validations are ran. Add task and output to DAG. +### Main features -## CWL? +* Modularity +* Reusability +* Automated Input/Output handling +* Ability to run programs using Unix shell +* Node.js integration -Coming soon. +### Who is this tool for? -## What is a task? +Bionode-watermill is for **biologists** who understand it is important to +experiment with sample data, parameter values, and tools. Compared to other +workflow systems, the ease of swapping around parameters and tools is much +improved, allowing you to iteratively compare results and construct more +confident inferences. Consider the ability to construct your own +[Teaser](https://genomebiology.biomedcentral.com/articles/10.1186/s13059-015-0803-1) +for *your data* with a *simple syntax*, and getting utmost performance out of the box. -A `task` is the fundamental unit pipelines are built with. For more details, see [Task](https://thejmazz.gitbooks.io/bionode-watermill/content/Task.html). At a glance, a task is created by passing in **props** and an **operationCreator**, which will later be called with the resolved input. Consider this task which takes a "lowercase" file and creates an "uppercase" one: -```javascript -const uppercase = task({ - input: '*.lowercase', - output: '*.uppercase' -}, function(resolvedProps) { - const input = resolvedProps.input +Bionode-watermill is for **programmers** who desire an efficient and +easy-to-write methodology for developing complex and dynamic data pipelines, +while handling parallelization as much as possible. Bionode-watermill is an npm +module, and is accessible by anyone willing to learn a little JavaScript. This +is in contrast to other tools which develop their own DSL +(domain specific language), which is not useful outside the tool. By leveraging +the npm ecosystem and JavaScript on the client, Bionode-watermill can be built +upon for inclusion on web apis, modern web applications, as well as native +applications through [Electron](http://electron.atom.io/). Look forward to +seeing Galaxy-like applications backed by a completely configurable Node API. - return fs.createReadStream(input) - .pipe(through(function(chunk, enc, next) { - next(null, chunk.toString().toUpperCase()) - }) - .pipe(fs.createWriteStream(input.replace(/lowercase$/, 'uppercase'))) -}) -``` -A "task declaration" like above will not immediately run the task. Instead, the task declaration returns an "invocable task" that can either be called directly or used with an orchestration operator. Tasks can also be created to **run shell programs**: +## Installation -```javascript -const fastqDump = task({ - input: '**/*.sra', - output: [1, 2].map(n => `*_${n}.fastq.gz`), - name: 'fastq-dump **/*.sra' -}, ({ input }) => `fastq-dump --split-files --skip-technical --gzip ${input}` ) -``` +Local installation: -## What are orchestrators? +```npm install bionode-watermill``` -Orchestrators are functions which can take tasks as params in order to let you compose your pipeline from a high level view. This **separates task order from task declaration**. For more details, see [Orchestration](https://thejmazz.gitbooks.io/bionode-watermill/content/Orchestration.html). At a glance, here is a complex usage of `join`, `junction`, and `fork`: +Global installation: -```javascript -const pipeline = join( - junction( - join(getReference, bwaIndex), - join(getSamples, fastqDump) - ), - trim, mergeTrimEnds, - decompressReference, // only b/c mpileup did not like fna.gz - join( - fork(filterKMC, filterKHMER), - alignAndSort, samtoolsIndex, mpileupAndCall // 2 instances each of these - ) -) -``` +```npm install bionode-watermill -g``` -## Check out bionode-watermill tutorial! +## Documentation -- [Try out bionode-watermill tutorial](https://github.com/bionode/bionode-watermill-tutorial) +Our documentation is available [here](https://thejmazz.gitbooks.io/bionode-watermill/content/). +There you may find how to **use** bionode-watermill to construct and **run** +your +pipelines. Moreover, you will also find the description of the API to help +anyone +willing to **contribute**. + + +## Tutorial + +- [Try bionode-watermill tutorial!](https://github.com/bionode/bionode-watermill-tutorial) ## Example pipelines @@ -86,7 +100,7 @@ const pipeline = join( - [Simple capitalize task](https://github.com/bionode/bionode-watermill/blob/master/examples/pipelines/capitalize/capitalize.js) - [Simple SNP calling](https://github.com/bionode/bionode-watermill/blob/master/examples/pipelines/variant-calling-simple/pipeline.js) - [SNP calling with filtering and fork](https://github.com/bionode/bionode-watermill/blob/master/examples/pipelines/variant-calling-filtered/pipeline.js) -- [Mapping with bowtie2 and bwa](https://github.com/bionode/bionode-watermill/tree/master/examples/pipelines/two-mappers) +- [Mapping with bowtie2 and bwa (with tutorial)](https://github.com/bionode/bionode-watermill/tree/master/examples/pipelines/two-mappers) ## Why bionode-watermill? @@ -94,8 +108,9 @@ const pipeline = join( compares the available tools to deal with NGS workflows, explaining the advantages of each one, including **bionode-watermill**. -## Who is this tool for? - -Bionode-watermill is for **programmers** who desire an efficient and easy-to-write methodology for developing complex and dynamic data pipelines, while handling parallelization as much as possible. Bionode-watermill is an npm module, and is accessible by anyone willing to learn a little JavaScript. This is in contrast to other tools which develop their own DSL (domain specific language), which is not useful outside the tool. By leveraging the npm ecosystem and JavaScript on the client, Bionode-watermill can be built upon for inclusion on web apis, modern web applications, as well as native applications through [Electron](http://electron.atom.io/). Look forward to seeing Galaxy-like applications backed by a completely configurable Node API. -Bionode-watermill is for **biologists** who understand it is important to experiment with sample data, parameter values, and tools. Compared to other workflow systems, the ease of swapping around parameters and tools is much improved, allowing you to iteratively compare results and construct more confident inferences. Consider the ability to construct your own [Teaser](https://genomebiology.biomedcentral.com/articles/10.1186/s13059-015-0803-1) for *your data* with a *simple syntax*, and getting utmost performance out of the box. +## Contributing +We welcome all kinds of contributions at all levels of experience, please +refer to +the [Issues section](https://github.com/bionode/bionode-watermill/issues). +Also, you can allways reach us on [gitter](https://gitter.im/bionode/bionode-watermill). diff --git a/docs/BeginnerWalkthrough.md b/docs/BeginnerWalkthrough.md index b93248c..07e1e5b 100644 --- a/docs/BeginnerWalkthrough.md +++ b/docs/BeginnerWalkthrough.md @@ -254,6 +254,37 @@ What's important to note is that *we only define one will start running once each task within the `fork` gets finished and thus it does not have to wait for all tasks within `fork` to finish. +## Pipeline execution + +Once you have defined your **tasks** and their **orchestration** make sure +you call at the end of each `pipeline.js`: + +```javascript +pipeline() // makes the assembled pipeline executed +``` + +and then on your terminal: + +```bash +node pipeline.js +``` + +You can also use **google chrome inspector** if you want to debug something: + +```bash +node --inspect pipeline.js +``` + +>**[Only recommended for more advanced users]** +> +>You may even go further by using an environmental variable +>`REDUX_LOGGER=1`, which basically lets you log more information (*redux* +>*actions* and *states*) on the workflow within each `bionode-watermill` task: +> +>```bash +>REDUX_LOGGER=1 node --inspect pipeline.js +>``` + ## Summary You've seen that bionode-watermill can be used to defined `tasks` that *resolve* diff --git a/docs/PreviousReadme.md b/docs/PreviousReadme.md new file mode 100644 index 0000000..28e2873 --- /dev/null +++ b/docs/PreviousReadme.md @@ -0,0 +1,101 @@ +# bionode-watermill + +[![npm version](https://badge.fury.io/js/bionode-watermill.svg)](https://badge.fury.io/js/bionode-watermill) [![node](https://img.shields.io/badge/node-v6.x-blue.svg)]() [![Build Status](https://travis-ci.org/bionode/bionode-watermill.svg?branch=dev)](https://travis-ci.org/bionode/bionode-watermill) [![codecov.io](https://codecov.io/github/bionode/bionode-watermill/coverage.svg?branch=master)](https://codecov.io/github/bionode/bionode-watermill?branch=master) + +*Watermill: A Streaming Workflow Engine* + +[![NPM](https://nodei.co/npm/bionode-watermill.png?downloads=true&stars=true)](https://nodei.co/npm/bionode-watermill/) + + +- [CWL?](#cwl) +- [What is a task?](#what-is-a-task) +- [What are orchestrators?](#what-are-orchestrators) +- [Check out bionode-watermill tutorial!](#check-out-bionode-watermill-tutorial) +- [Example pipelines](#example-pipelines) +- [Why bionode-watermill?](#why-bionode-watermill) +- [Who is this tool for?](#who-is-this-tool-for) + +Watermill lets you *orchestrate* **tasks** using operators like **join**, **junction**, and **fork**. Each task has a [lifecycle](https://thejmazz.gitbooks.io/bionode-watermill/content/TaskLifecycle.html) where + +1. Input [glob patterns](https://github.com/isaacs/node-glob) are resolved to absolute file paths (e.g. `*.bam` to `reads.bam`) +2. The **operation** is ran, passed resolved input, params, and other props +3. The operation completes. +4. Output glob patterns are resolved to absolute file paths. +5. Validators are ran over the output. Check for non-null files, can pass in custom validators. +6. Post-validations are ran. Add task and output to DAG. + +## CWL? + +Coming soon. + +## What is a task? + +A `task` is the fundamental unit pipelines are built with. For more details, see [Task](https://thejmazz.gitbooks.io/bionode-watermill/content/Task.html). At a glance, a task is created by passing in **props** and an **operationCreator**, which will later be called with the resolved input. Consider this task which takes a "lowercase" file and creates an "uppercase" one: + +```javascript +const uppercase = task({ + input: '*.lowercase', + output: '*.uppercase' +}, function(resolvedProps) { + const input = resolvedProps.input + + return fs.createReadStream(input) + .pipe(through(function(chunk, enc, next) { + next(null, chunk.toString().toUpperCase()) + }) + .pipe(fs.createWriteStream(input.replace(/lowercase$/, 'uppercase'))) +}) +``` + +A "task declaration" like above will not immediately run the task. Instead, the task declaration returns an "invocable task" that can either be called directly or used with an orchestration operator. Tasks can also be created to **run shell programs**: + +```javascript +const fastqDump = task({ + input: '**/*.sra', + output: [1, 2].map(n => `*_${n}.fastq.gz`), + name: 'fastq-dump **/*.sra' +}, ({ input }) => `fastq-dump --split-files --skip-technical --gzip ${input}` ) +``` + +## What are orchestrators? + +Orchestrators are functions which can take tasks as params in order to let you compose your pipeline from a high level view. This **separates task order from task declaration**. For more details, see [Orchestration](https://thejmazz.gitbooks.io/bionode-watermill/content/Orchestration.html). At a glance, here is a complex usage of `join`, `junction`, and `fork`: + +```javascript +const pipeline = join( + junction( + join(getReference, bwaIndex), + join(getSamples, fastqDump) + ), + trim, mergeTrimEnds, + decompressReference, // only b/c mpileup did not like fna.gz + join( + fork(filterKMC, filterKHMER), + alignAndSort, samtoolsIndex, mpileupAndCall // 2 instances each of these + ) +) +``` + +## Check out bionode-watermill tutorial! + +- [Try out bionode-watermill tutorial](https://github.com/bionode/bionode-watermill-tutorial) + +## Example pipelines + +- [Toy pipeline with shell/node](https://github.com/bionode/bionode-watermill/blob/master/examples/pipelines/pids/pipeline.js) +- [Simple capitalize task](https://github.com/bionode/bionode-watermill/blob/master/examples/pipelines/capitalize/capitalize.js) +- [Simple SNP calling](https://github.com/bionode/bionode-watermill/blob/master/examples/pipelines/variant-calling-simple/pipeline.js) +- [SNP calling with filtering and fork](https://github.com/bionode/bionode-watermill/blob/master/examples/pipelines/variant-calling-filtered/pipeline.js) +- [Mapping with bowtie2 and bwa](https://github.com/bionode/bionode-watermill/tree/master/examples/pipelines/two-mappers) + +## Why bionode-watermill? + +[This blog post](https://jmazz.me/blog/NGS-Workflows) +compares the available tools to deal with NGS workflows, explaining the +advantages of each one, including **bionode-watermill**. + +## Who is this tool for? + +Bionode-watermill is for **programmers** who desire an efficient and easy-to-write methodology for developing complex and dynamic data pipelines, while handling parallelization as much as possible. Bionode-watermill is an npm module, and is accessible by anyone willing to learn a little JavaScript. This is in contrast to other tools which develop their own DSL (domain specific language), which is not useful outside the tool. By leveraging the npm ecosystem and JavaScript on the client, Bionode-watermill can be built upon for inclusion on web apis, modern web applications, as well as native applications through [Electron](http://electron.atom.io/). Look forward to seeing Galaxy-like applications backed by a completely configurable Node API. + +Bionode-watermill is for **biologists** who understand it is important to experiment with sample data, parameter values, and tools. Compared to other workflow systems, the ease of swapping around parameters and tools is much improved, allowing you to iteratively compare results and construct more confident inferences. Consider the ability to construct your own [Teaser](https://genomebiology.biomedcentral.com/articles/10.1186/s13059-015-0803-1) for *your data* with a *simple syntax*, and getting utmost performance out of the box. diff --git a/examples/pipelines/two-mappers/pipeline_multipleinput.js b/examples/pipelines/two-mappers/pipeline_multipleinput.js index 783dec5..be3c096 100644 --- a/examples/pipelines/two-mappers/pipeline_multipleinput.js +++ b/examples/pipelines/two-mappers/pipeline_multipleinput.js @@ -1,5 +1,11 @@ 'use strict' +/* DISCLAIMER: This is a very experimental pipeline and currently it is not +* solved. If you have an idea on how to solve this problem please make a PR. +* The idea with this pipeline is for the two-mappers pipeline.js to be able +* to process several samples. +*/ + // === WATERMILL === const { task, From 9cfe06e02ebbb3ba0ba7db7afc1d583799e8d7cd Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Fri, 25 Aug 2017 09:40:13 +0100 Subject: [PATCH 13/35] fixed error in main readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 0b1df1e..7ee0ec8 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ # bionode-watermill -> Node.js module to get data from the NCBI API (aka e-utils) +> Bionode-watermill: A Streaming Workflow Engine [![npm version](https://badge.fury.io/js/bionode-watermill.svg)](https://badge.fury.io/js/bionode-watermill) [![node](https://img.shields.io/badge/node-v6.x-blue.svg)]() From 50089a544f06eb311523087350b856fc7056237c Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Fri, 25 Aug 2017 10:05:35 +0100 Subject: [PATCH 14/35] ammend to pipeline --- examples/pipelines/two-mappers/pipeline_multipleinput.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/pipelines/two-mappers/pipeline_multipleinput.js b/examples/pipelines/two-mappers/pipeline_multipleinput.js index be3c096..582aa6a 100644 --- a/examples/pipelines/two-mappers/pipeline_multipleinput.js +++ b/examples/pipelines/two-mappers/pipeline_multipleinput.js @@ -35,10 +35,10 @@ const config = { // === TASKS === // first lets get the reference genome for our mapping -const getReference = (referenceURL, name) => task({ - params: { url: referenceURL }, +const getReference = task({ + params: { url: config.referenceURL }, output: '*_genomic.fna.gz', - name: `Download reference genome for ${name}` + name: `Download reference genome for ${config.name}` }, ({ params, dir }) => { const { url } = params const outfile = url.split('/').pop() @@ -136,7 +136,7 @@ const bowtieMapper = task({ const pipeline = (sraAccession) => join( junction( - getReference, // maybe this can done in anorther task outside this + getReference, // maybe this can done in another task outside this // pipeline... then hardcode the path join(getSamples(sraAccession),fastqDump) ), From bd0ada0db53e8d690e2270896043c644d81a8801 Mon Sep 17 00:00:00 2001 From: tiagofilipe12 Date: Fri, 25 Aug 2017 12:01:42 +0100 Subject: [PATCH 15/35] edited multiinput pipeline example for two-mappers --- .../two-mappers/pipeline_multipleinput.js | 49 ++++++++++--------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/examples/pipelines/two-mappers/pipeline_multipleinput.js b/examples/pipelines/two-mappers/pipeline_multipleinput.js index 582aa6a..4767a10 100644 --- a/examples/pipelines/two-mappers/pipeline_multipleinput.js +++ b/examples/pipelines/two-mappers/pipeline_multipleinput.js @@ -45,6 +45,7 @@ const getReference = task({ // essentially curl -O return request(url).pipe(fs.createWriteStream(dir + '/' + outfile)) + }) //then get samples to work with @@ -68,11 +69,13 @@ const fastqDump = task({ ) // first lets uncompress the gz -const gunzipIt = task({ - input: '*_genomic.fna.gz', - output: '*.fa', - params: { output: 'uncompressed.fa' } - }, ({ params, input}) => `gunzip -c ${input} > ${params.output}` +const gunzipIt = (referenceFile) => task({ + input: referenceFile, + output: '*.fna' + }, ({ input}) => { + console.log("gunzipit:", input) + return `gunzip -c ${input} > ${input.split('.')[0]}.fna` + } ) // then index using first bwa ... @@ -134,23 +137,21 @@ const bowtieMapper = task({ // === PIPELINE === -const pipeline = (sraAccession) => join( - junction( - getReference, // maybe this can done in another task outside this - // pipeline... then hardcode the path - join(getSamples(sraAccession),fastqDump) - ), - gunzipIt, - fork( - join(indexReferenceBwa, bwaMapper), - join(indexReferenceBowtie2, bowtieMapper) +// first gets reference +getReference().then(results => { + const pipeline = (sraAccession, referenceFile) => join( + getSamples(sraAccession), + fastqDump, + gunzipIt(referenceFile), + fork( + join(indexReferenceBwa, bwaMapper), + join(indexReferenceBowtie2, bowtieMapper) + ) ) -) - -// actual run pipelines and return results -//pipeline().then(results => console.log('PIPELINE RESULTS: ', results)) -for (const sra of config.sraAccession) { - console.log("sample:", sra) - const pipelineMaster = pipeline(sra) - pipelineMaster() -} +// then fetches the samples and executes the remaining pipeline + for (const sra of config.sraAccession) { + console.log("sample:", sra) + const pipelineMaster = pipeline(sra, results.resolvedOutput) + pipelineMaster() + } +}) From e4ccd438f66d6d4d2b1a7c885766f892f15907f7 Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Fri, 25 Aug 2017 14:28:29 +0100 Subject: [PATCH 16/35] added visualization tool api description and usage --- docs/D3Visualization.md | 72 ++++++++++++++++++++++++++++++++++++++ docs/README.md | 2 +- lib/reducers/collection.js | 2 +- 3 files changed, 74 insertions(+), 2 deletions(-) create mode 100644 docs/D3Visualization.md diff --git a/docs/D3Visualization.md b/docs/D3Visualization.md new file mode 100644 index 0000000..d51e2b5 --- /dev/null +++ b/docs/D3Visualization.md @@ -0,0 +1,72 @@ +# Graph visualization + +Currently bionode-watermill has a tool that allows the user to visualize the +pipeline Directed Acyclic Graph (DAG). This tool is available while running +the scripts in `localhost:8084`. Visualization script is available in +`viz/index.html`. + +## Enabling visualization + +Just paste `localhost:8084` on your browser URL. + +## Canceling + +Now, this tool hangs each script at its end and in order to exit it, just +hit: `ctrl + c` in the shell terminal where the pipeline script is running. +Hitting `ctrl + c` will close the connection between the browser and +`lib/reducers/collections.js`. + +## How it works + +This basically uses npm package `socket.io` on `lib/reducers/collection.js` +where graph object is read. This graph object follows graphson-like structure: + +```javascript +{ + "graph": { + "mode": "NORMAL", + "vertices": [ + //...Array of vertices + ], + "edges": [ + //...Array of edges + ] + } +} +``` + +Vertices represent each task on the current graph visualization (as blue +circles) and edges represent the flow between tasks (with arrows), i.e., +which tasks runs after other tasks attending to the orchestration shape. +For instance, if we have `join(task1, task2, task3)`, this will render 3 +vertices (blue circles) and 2 edges (arrows) like this: + +```javascript +task1 --> task2 --> task3 +``` + +## Node hovering + +On node hovering it is possible to obtain metadata information on tasks +`props`, such as: + +- minUid: The short `uid` that matches the filesystem under `./data/` +- Kind: Currently this differentiates if it is a `task` or a `junction`, +because `junction`s are added as a orange circle with no relevant proprieties, +just to make the required connection on the graph. +- Task name: The name given by the user under `props` object while defining a + task. +- Output(s): The resolved outputs (`resolvedOutput`) from the task run. +- Input(s): The resolved inputs (`resolvedInput`) from the task run. +- params: The `params` given to task `props` +- Output folder: The output folder relative path. +- Cmd: The actual operation passed to shell (when passed) or the +`operationCreator` function. + + +## Limitations + +* This visualization tool does not represent input/output flow, just + the pipeline flow. + * It is also unable to know what was ran from what is running and from + what is failing. diff --git a/docs/README.md b/docs/README.md index 3822f1f..2bddbd5 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1,7 +1,7 @@ ## Table of Contents * [Beginner Walkthrough](BeginnerWalkthrough.md) -* [Graph visualization tool] +* [Graph visualization tool](D3Visualization.md) * API * [Task](Task.md) - [Lifecycle](TaskLifecycle.md) diff --git a/lib/reducers/collection.js b/lib/reducers/collection.js index 9a18ab3..46d349f 100644 --- a/lib/reducers/collection.js +++ b/lib/reducers/collection.js @@ -11,7 +11,7 @@ const path = require('path') // define path to index.html, otherwise it will be relative to dir where // scripts is run -let scr = path.join(__dirname, '../../viz/index.html') +const scr = path.join(__dirname, '../../viz/index.html') // Loading the index file . html displayed to the client From 62b00b2090d3b1bf53a1193285c1fbd0ced7512c Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Fri, 25 Aug 2017 14:55:20 +0100 Subject: [PATCH 17/35] Added multiple input handling --- docs/MultipleInput.md | 151 ++++++++++++++++++++++++++++++++++++++++++ docs/README.md | 2 +- 2 files changed, 152 insertions(+), 1 deletion(-) create mode 100644 docs/MultipleInput.md diff --git a/docs/MultipleInput.md b/docs/MultipleInput.md new file mode 100644 index 0000000..cb32383 --- /dev/null +++ b/docs/MultipleInput.md @@ -0,0 +1,151 @@ +# Multiple inputs on tasks + +There are two ways multiple inputs can be given to tasks - within the **task +scope** or within the **pipeline scope**. + +## Within task scope + +Within task scope multiple input handling can take into account +bionde-watermill capacities to fetch files using glob patterns (e.g. `*.fas`). +For instance, consider the following task: + +```javascript +const concatTask = task({ + input: '*.fas', + output: '*.fas', + params: {output: 'concat.fas'} +}, ( object ) => { + console.log('input_directory: ', object.dir) + console.log('input_variable: ', object, object.params) + return `cat ${object.input.join(' ')} > ${object.params.output}` + } +) +``` + +This `concatTask` has the ability to fetch several `*.fas` that are present +in the current working directory and pass it to the `operationCreator` +function in task (for further reference refer to [Task](Task.md)). However, +users need to be careful because these glob patterns are not the same as +shell wildcards. So they must be handled by javascript before passing them to + the shell: + + `object.input.join(' ')` - will transform the array of inputs into a string + where each `.fas` file will be separated by a space. + +## Within the pipeline scope + +Users may also have multiple samples that want to run through all the +pipeline. Imagine you have `n` fasta files and you want all of them to go +through the pipeline. Currently this will require some javascript around +tasks and pipelines. +See this example: + +First lets define a function that executes the desired pipeline for each +input file or sample: + +```javascript +// wrap every task and pipeline of a file in a single function (in this case +// 'pipeline' function +const fileFlow = (infile) => { + const concatTask = task({ + // notice that the input file (infile) is here passed as a param of the + // pipeline function + input: infile, + output: '*.fas', + params: {output: 'concat.fas'} + }, ( object ) => { + console.log('input_directory: ', object.dir) + console.log('input_variable: ', object, object.params) + return `cat ${object.input} > ${object.params.output}` + } + ) + + const task0 = task({name: 'coco'}, () => `echo "something0"`) + + const task1 = task({name: 'nut'}, () => `echo "something1"`) + + const task2 = task({name: 'foo'}, () => `echo "something2"`) + + // this returns the pipeline for a file + const pipelineAssemblage = join(concatTask, task0, fork(task1, task2)) + return pipelineAssemblage +} +``` + +Notice that it basically wrap task definition and pipeline call +(`pipelineAssemblage`) and returns it. + +Then is just a matter of cycling through each file in current working +directory (for example): + +```javascript +// checks for files in current working directory (where the script is executed) +fs.readdir(process.cwd(), (err, files) => { + files.forEach(infile => { + // checks file extension, in this case '.fas' + if (path.extname(infile) === '.fas') { + // executes the pipeline function for each infile + const pipelineMaster = fileFlow(infile) + pipelineMaster() + } + }) +}) +``` + +This will execute the function that wraps the tasks and pipeline +(`pipelineAssemblage`) and executes it each time a input file is given to +this function. This will render one graph for each input file provided for +the pipeline function (`fileFlow`). + +### Concurrency + +As you may imagine such behavior without any control may end up using more +resources (either CPU or memory) than we have. So one quick way to solve +this, would be to use `bluebird`: + +```javascript +const Promise = require('bluebird') +``` + +You can even pass a argument to be read from the shell: +```javascript +// makes concurrency the third argument of code in shell +const concurrency = parseFloat(process.argv[2] || "Infinity") +``` + +And call it like this: + +```shell +node pipeline.js 2 +``` + +This will tell that you want to have 2 files running at the same time. +Then is just a matter of returning a `bluebird` `Promise.map` with +`concurrency`: + +```javascript +// checks for files in current working directory (where the script is executed) +fs.readdir(process.cwd(), (err, files) => { + const desiredFiles = [] + files.forEach(infile => { + // checks file extension, in this case '.fas' and creates an array with + // just these files + if (path.extname(infile) === '.fas') { + desiredFiles.push(infile) + } + }) + // bluebird Promise.map is used here for concurrency where we can + // limit the flow of files into the pipeline. So, if concurrency is set + // to 1 it will first execute the first file, then the second and so on. + // But if 2 it will execute the first 2 files and then the other 2. + return Promise.map(desiredFiles, (infile) => { + const pipelineMaster = fileFlow(infile) + return pipelineMaster() + }, {concurrency}) +}) +``` + +Note that it is quite similar to the example above but returns a `Promise +.map`, which also return the pipeline executions, instead of just executing the +pipeline for each input file. This is one simple way to empower the user to +control the resources used by their scripts. \ No newline at end of file diff --git a/docs/README.md b/docs/README.md index 2bddbd5..bc522bc 100644 --- a/docs/README.md +++ b/docs/README.md @@ -5,7 +5,7 @@ * API * [Task](Task.md) - [Lifecycle](TaskLifecycle.md) - - [Multiple input] + - [Multiple input](MultipleInput.md) * [Orchestration](Orchestration.md) * [Forkception] * [Uids] From 297c3217e463401e6945285e627e8095423704a9 Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Fri, 25 Aug 2017 15:21:27 +0100 Subject: [PATCH 18/35] Added description on uid API --- docs/Forkception.md | 7 +++++++ docs/MultipleInput.md | 2 +- docs/README.md | 4 ++-- docs/Uid.md | 37 +++++++++++++++++++++++++++++++++++++ 4 files changed, 47 insertions(+), 3 deletions(-) create mode 100644 docs/Forkception.md create mode 100644 docs/Uid.md diff --git a/docs/Forkception.md b/docs/Forkception.md new file mode 100644 index 0000000..c87cf42 --- /dev/null +++ b/docs/Forkception.md @@ -0,0 +1,7 @@ +# Forkception + +Fork is a special case within bionode-watermill because it requires to know +everything that proceeds the tasks within the `fork`. In current API there is + no easy way to get graph shape before actually running the `pipeline.js` + scripts. Therefore, `fork` uses `join` to construct independent lineages for + each task within the `fork`. \ No newline at end of file diff --git a/docs/MultipleInput.md b/docs/MultipleInput.md index cb32383..398374d 100644 --- a/docs/MultipleInput.md +++ b/docs/MultipleInput.md @@ -1,4 +1,4 @@ -# Multiple inputs on tasks +# Multiple inputs There are two ways multiple inputs can be given to tasks - within the **task scope** or within the **pipeline scope**. diff --git a/docs/README.md b/docs/README.md index bc522bc..f0ea64b 100644 --- a/docs/README.md +++ b/docs/README.md @@ -7,8 +7,8 @@ - [Lifecycle](TaskLifecycle.md) - [Multiple input](MultipleInput.md) * [Orchestration](Orchestration.md) - * [Forkception] - * [Uids] + * [Forkception](Forkception.md) + * [Uids](Uid.md) * [Comparison](Comparison.md) * [Internals](Internals.md) * Example Pipelines diff --git a/docs/Uid.md b/docs/Uid.md new file mode 100644 index 0000000..135fbf2 --- /dev/null +++ b/docs/Uid.md @@ -0,0 +1,37 @@ +# Uid + +In bionode-watermill we use **uids**, that are basically hashes. These `uids` + are used to control if a task is already run or not. For instance, imagine + you have ran part of your pipeline before and it failed somewhere in the + middle. When you attempt to run your `pipeline.js` script again, + bionode-watermill will know that some of your tasks have already run and + will not run them again. Instead it will be able to crawl into the `./data` + folder too look for the inputs required for the tasks that still need to be + run. + + ## uid at the task level + + Uid at the task level is generate in two different ways: +* First, it generated given the `props` (`input`, `output` and `params`) +passed by +the user to the task +definition (this is generated under `lib/reducers/task.js`). Let's call this +`uid` the *task default* `uid` (before running). + +* Second, if we want a task to be ran twice in the same pipeline it cannot +have the same `uid` otherwise bionode-watermill will not allow the second +execution of the pipeline. However, it can properly solve this because there +is a second level of `uid` generation while the task is running. Therefore, +a task `uid` is modified on running to get its *final or run* `uid`. This new + `uid` is generated taking into account *task default* `uid` and its parent + tasks `uids` and making a unique hash of all these `uids`. This renders that + the same task can be ran twice in the pipeline **if** their `trajectory` is + different, i.e., **if** they have different parent tasks. + +## uid at the orchestrator level + +Orchestrators also have uids that are basically an hash of the `uids` of the +tasks contained in them. These `uids`are used in `lib/orchestrators/join.js` to +control the `uid` generation for `fork` because each *downstream task* after a +`fork` must be multiplied as many times as the tasks inside fork (for further +details on this see [Forkception](Forkception.md)). From 8343496fff9d19dd54473853126855471389908b Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Fri, 25 Aug 2017 15:28:18 +0100 Subject: [PATCH 19/35] added an example pipeline to docs --- docs/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/README.md b/docs/README.md index f0ea64b..63dc4c5 100644 --- a/docs/README.md +++ b/docs/README.md @@ -12,3 +12,5 @@ * [Comparison](Comparison.md) * [Internals](Internals.md) * Example Pipelines + * [two-mappers](../examples/pipelines/two-mappers/README.md) - a pipeline + that uses two mappers for benchmark purposes. From 3749f14bfdab2c5fb7b41062ce46fc0a3fba8cde Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Fri, 25 Aug 2017 15:51:47 +0100 Subject: [PATCH 20/35] added forkception to docs and its current known limitation --- docs/Forkception.md | 70 ++++++++++++++++++++++++++- examples/pipelines/tests/fork_fork.js | 2 +- 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/docs/Forkception.md b/docs/Forkception.md index c87cf42..6f7065f 100644 --- a/docs/Forkception.md +++ b/docs/Forkception.md @@ -4,4 +4,72 @@ Fork is a special case within bionode-watermill because it requires to know everything that proceeds the tasks within the `fork`. In current API there is no easy way to get graph shape before actually running the `pipeline.js` scripts. Therefore, `fork` uses `join` to construct independent lineages for - each task within the `fork`. \ No newline at end of file + each task within the `fork`. + + Currently `lib/orchestrator/fork.js` just collects the tasks inside `fork` + and then everything else is handled by `lib/orchestrator/join.js`, i.e., + the branching of the pipeline in several leaves. This branching requires + that `fork` *downstream tasks* are multiplied as many times as the number + of tasks inside `fork`. + + > For instance if fork contains 3 tasks the pipeline will have to branch + > 3 times from this point on: + > + >```javascript + >const pipeline = join(task0,fork(task1, task2, task3), task4) + >``` + > This will end up in three different branches (or lineages) with three + >leaves: + > `task0 --> task1 --> task4`, `task0 --> task2 --> task4` and `task0 --> + task3 --> task4`. + > + > Note: here task4 is a downstream task. + + Notice that task4 needed to be triplicated, since it will have different + results + depending on the results from its parent tasks (`task1`, `task2` or `task3`). + + Therefore, if a `fork` is found within a `join` wrapping the all + the pipeline a + bunch of rules were designed to make fork multiply each *downstream task* + as well as be able to properly work all other orchestrators that have their + own behavior (check [Orchestration](Orchestration.md)). + + ## Current limitations + + Take the following pipeline as an example: + + ```javascript +const pipeline2 = join( + task0, + fork(task1, task2), + task3, + fork(task4, task5), + task6) +``` + + This pipeline represents a further challenge since it requires to + duplica the second `fork` itself rather than their contained tasks, as all + other rules made for other orchestrators to work inside `fork`. The same is + true for `junction`, so if in the above example the second `fork` is in fact + a `junction`, the problem will remain the same. + +However, there is a workaround this issue that may be used for now: + +```javascript +const pipeline2 = join( + task0, + fork( + join(task1, task3, fork(task4, task5)), + join(task2, task3, fork(task4, task5)) + ), + task6 +) +``` + +In the above pice of code I just added `fork(task4, task5)` inside the +first `fork` in order to be able to get the same structure. To sum up, +the tasks between the two forks (`task3`) as well as the second `fork` (`fork +(task4, task5)`) have to be manually duplicated. + + \ No newline at end of file diff --git a/examples/pipelines/tests/fork_fork.js b/examples/pipelines/tests/fork_fork.js index 8cab9d8..0364e60 100644 --- a/examples/pipelines/tests/fork_fork.js +++ b/examples/pipelines/tests/fork_fork.js @@ -176,6 +176,6 @@ const pipeline8_2 = join( // edit this line to run the desired pipeline. // documentation on the usage of these pipelines may be found in the link below // https://github.com/bionode/GSoC17/blob/master/Journal/Week_11.md -pipeline2().then((results) => { +pipeline7_2().then((results) => { console.log('RESULTSNEW: ', results) }) \ No newline at end of file From 7cd0db9e72ea2267346dfdfc70eb4b8c940887ae Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Fri, 25 Aug 2017 16:17:05 +0100 Subject: [PATCH 21/35] added desdcription on resolution of input and changed streamable tasks to the end --- docs/Task.md | 217 ++++++++++++++++++++++++++++----------------------- 1 file changed, 120 insertions(+), 97 deletions(-) diff --git a/docs/Task.md b/docs/Task.md index e38b507..48802d4 100644 --- a/docs/Task.md +++ b/docs/Task.md @@ -40,6 +40,7 @@ const props = { *params.output* allows to name the output files, while *output* is used to check if output was properly resolved. +*Example* ```javascript // example task with input/output files const task = ({ @@ -52,73 +53,6 @@ const task = ({ ) ``` -### Streamable tasks - -If either (input or output) is -not provided, it will be assumed the task is then a *streaming task* - i.e., it -is a duplex stream with writable and/or readable portions. Consider: - -```javascript -const throughCapitalize = through(function (chunk, env, next) { - // through = require('through2') - a helper to create through streams - // takes chunk, its encoding, and a callback to notify when complete pushing - // push a chunk to the readable portion of this through stream with - this.push(chunk.toString().toUpperCase()) - // then call next so that next chunk can be handled - next() - -You could connect `capitalize` to a readable (`readFile`) and writable -(`writeFile`) file -stream with: - -const capitalize = task({ - name: 'Capitalize Through Stream' -}, -// Here, input is a readable stream that came from the previous task -// Let's return a through stream that takes the input and capitalizes it -({ input }) => input.pipe(throughCapitalize) ) -``` - -```javascript - -const readFile = task({ - input: '*.lowercase', - name: 'Read from *.lowercase' -}, ({ input }) => { - const rs = fs.createReadStream(input) - // Add file information to stream object so we have it later - rs.inFile = input -}) - -const writeFile = task({ - output: '*.uppercase', - name: 'Write to *.uppercase' -}, ({ input }) => fs.createWriteStream(input.inFile.swapExt('uppercase'))) - -// Can now connect the three: -join(readFile, capitalize, writeFile) -``` - -Of course, this could be written as one single task. This is somewhat simpler, -but the power of splitting up the read, transform, and write portions of a task -will become apparent once we can provide multiple sets of parameters to the -transform and observe the effect, *without having to manually rewire input and -output filenames*. As a single task the above would become: - -```javascript -const capitalize = task({ - input: '*.lowercase', - output: '*.uppercase', - name: 'Capitalize *.lowercase -> *.uppercase' -}, ({ input }) => - fs.createReadStream(input) - .pipe(throughCapitalize) - .pipe(fs.createWriteStream(input.swapExt('lowercase'))) -) -``` - -It is fine to run with no task name, a hashed one will be made for you. However, properly named tasks will help greatly reading pipeline output - ## operationCreator **operationCreator** is a function that will be provided with a **resolved @@ -127,6 +61,7 @@ props object** and that is responsible for the execution of the task itself. creator does not return a stream, it will be wrapped into a stream internally (e.g. `StreamFromPromise`). An example operation creator is +*Example* ```javascript function operationCreator(resolvedProps) { const fs = require('fs') @@ -162,7 +97,7 @@ function operationCreator() { } ``` -### Input and Output +## Input and Output The `input` and `output` objects can be a **string glob pattern**, or a plain object of them. The glob will be @@ -175,7 +110,7 @@ Bionode-watermill manages input and output files and folders run by a `task`. id) of its respective task. This `uid` is generated by `bionode-watermill` given the - props of the task and their parent tasks. + props of the task and their parent tasks (check [Uid](Uid.md)). For example, @@ -211,37 +146,54 @@ will resolve to something like: } ``` -If `input` is not provided, the `operation` will be a duplexify stream with the -writable portion set. Similarly, if `output` is not provided, the `operation` -will be a duplexify stream with the readable portion set. If neither is -provided, both the readable and writable portions will be set: the task becomes -a *through task*. +### Input Resolution -An example *through task*: +Match to filesystem if in first task in pipeline, using current working +directoty to search for the input files. ```javascript -const filterSpecies = task({ - name: 'Filter species by taxonomy', - params: { taxonomy: 'plantae' } -}, -({ input, params }) => input.pipe(through.obj(function (chunk, enc, next) { - if (chunk.taxonomy === params.taxonomy) { - this.push(chunk) - } - next() - })) -) +// this is our very first task glob pattern +'*.fas' +// Resolved Input +'./some.fas' ``` -### Resolution - -Input resolution is a **reduction over the task props**. +Otherwise glob patterns are matched + to the **collection**. **collection** allows bionode-watermill to search + within `./data` folder for the first files that match the glob pattern. Have + into account that it crawls the tasks tree from downstream to upstream + until it finds the desired match. + +```javascript +// now, this is not the first task +'*.fas' +// Resolved Input +'./data//some.fas +``` -#### Input Resolution +Notice the difference in the location of the inputs. + +Also and importantly, if you have multiple inputs to pass to a task, make +sure they are in the same directory. If it is the first task make sure these +inputs are all in the current working directory or specify on the input the +folder where they are (*hard-coding* it). Same for tasks that fetch inputs +from `./data`, assure that every file is within a folder. If the desired +outputs are spread through several folders and their file extension is the +same, you will have to add something to the glob pattern suffix. For example, + + ```javascript +// we have two fasta files in different directories +// fasta1.fas and fasta2.fas +const someTask = task({ + input: ['*1.fas', '*2.fas'] +}, operationCreator +) +``` -Match to filesystem if in first task in pipeline, otherwise glob patterns are matched to the **collection**. +So, if this is your case, make sure your glob pattern is unique enough for +not matching other undesired files. -#### Output Resolution +### Output Resolution The resolved values can be accessed from `myTask.resolvedOutput` after the task has emitted a `task.finish` event. @@ -250,13 +202,13 @@ The resolved values can be accessed from `myTask.resolvedOutput` after the task // Output '*.sra' // Resolved Output -'/data/human.sra' +'./data//human.sra' // Array of items // Output ['*.bam', '*.fastq.gz'] // Resolved Output -['/data/reads.bam', '/data/human.fastq.gz'] +['./data//reads.bam', './data//human.fastq.gz'] // Plain object of items // Output @@ -266,8 +218,8 @@ The resolved values can be accessed from `myTask.resolvedOutput` after the task } // Resolved Output { - reads: ['human_1.fastq.gz', 'human_2.fastq.gz'], - alignment: 'reads.bam' + reads: ['./data//human_1.fastq.gz', './data//human_2.fastq.gz'], + alignment: './data//reads.bam' } ``` @@ -299,4 +251,75 @@ const samtoolsIndex = task({ samtoolsIndex() .on('task.finish', (results) => console.log(results.resolvedOutput)) -``` \ No newline at end of file +``` + +## Streamable tasks potential + +**Warning**: the following content is just a conceptual idea for +bionode-watermill that **is still not implemented**. + + +If either (input or output) is +not provided, it will be assumed the task is then a *streaming task* - i.e., it +is a duplex stream with writable and/or readable portions. Consider: + +```javascript +const throughCapitalize = through(function (chunk, env, next) { + // through = require('through2') - a helper to create through streams + // takes chunk, its encoding, and a callback to notify when complete pushing + // push a chunk to the readable portion of this through stream with + this.push(chunk.toString().toUpperCase()) + // then call next so that next chunk can be handled + next() + +You could connect `capitalize` to a readable (`readFile`) and writable +(`writeFile`) file +stream with: + +const capitalize = task({ + name: 'Capitalize Through Stream' +}, +// Here, input is a readable stream that came from the previous task +// Let's return a through stream that takes the input and capitalizes it +({ input }) => input.pipe(throughCapitalize) ) +``` + +```javascript + +const readFile = task({ + input: '*.lowercase', + name: 'Read from *.lowercase' +}, ({ input }) => { + const rs = fs.createReadStream(input) + // Add file information to stream object so we have it later + rs.inFile = input +}) + +const writeFile = task({ + output: '*.uppercase', + name: 'Write to *.uppercase' +}, ({ input }) => fs.createWriteStream(input.inFile.swapExt('uppercase'))) + +// Can now connect the three: +join(readFile, capitalize, writeFile) +``` + +Of course, this could be written as one single task. This is somewhat simpler, +but the power of splitting up the read, transform, and write portions of a task +will become apparent once we can provide multiple sets of parameters to the +transform and observe the effect, *without having to manually rewire input and +output filenames*. As a single task the above would become: + +```javascript +const capitalize = task({ + input: '*.lowercase', + output: '*.uppercase', + name: 'Capitalize *.lowercase -> *.uppercase' +}, ({ input }) => + fs.createReadStream(input) + .pipe(throughCapitalize) + .pipe(fs.createWriteStream(input.swapExt('lowercase'))) +) +``` + +It is fine to run with no task name, a hashed one will be made for you. However, properly named tasks will help greatly reading pipeline output From 3fb094ece63b4fece83069e6c4e8a6a47d2b5be8 Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Fri, 25 Aug 2017 16:19:34 +0100 Subject: [PATCH 22/35] fixed a typo --- docs/Forkception.md | 2 +- docs/Task.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/Forkception.md b/docs/Forkception.md index 6f7065f..be160cc 100644 --- a/docs/Forkception.md +++ b/docs/Forkception.md @@ -1,7 +1,7 @@ # Forkception Fork is a special case within bionode-watermill because it requires to know -everything that proceeds the tasks within the `fork`. In current API there is +everything that follows the tasks within the `fork`. In current API there is no easy way to get graph shape before actually running the `pipeline.js` scripts. Therefore, `fork` uses `join` to construct independent lineages for each task within the `fork`. diff --git a/docs/Task.md b/docs/Task.md index 48802d4..66bef56 100644 --- a/docs/Task.md +++ b/docs/Task.md @@ -168,7 +168,7 @@ Otherwise glob patterns are matched // now, this is not the first task '*.fas' // Resolved Input -'./data//some.fas +'./data//some.fas' ``` Notice the difference in the location of the inputs. From 385498cb05e147ff043307b5f1df053d12096b87 Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Fri, 25 Aug 2017 16:21:21 +0100 Subject: [PATCH 23/35] amended the explanation of collection --- docs/Task.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/Task.md b/docs/Task.md index 66bef56..a48bb81 100644 --- a/docs/Task.md +++ b/docs/Task.md @@ -160,7 +160,8 @@ directoty to search for the input files. Otherwise glob patterns are matched to the **collection**. **collection** allows bionode-watermill to search - within `./data` folder for the first files that match the glob pattern. Have + within `./data` folder for the **first** folder that has files that match the + glob pattern. Have into account that it crawls the tasks tree from downstream to upstream until it finds the desired match. From 7c2d33d0fb1577ac0c62c04c8d5c3b8ce630068c Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Fri, 25 Aug 2017 16:39:15 +0100 Subject: [PATCH 24/35] fixed review comments --- docs/Forkception.md | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/docs/Forkception.md b/docs/Forkception.md index be160cc..92034fe 100644 --- a/docs/Forkception.md +++ b/docs/Forkception.md @@ -20,8 +20,8 @@ everything that follows the tasks within the `fork`. In current API there is >``` > This will end up in three different branches (or lineages) with three >leaves: - > `task0 --> task1 --> task4`, `task0 --> task2 --> task4` and `task0 --> - task3 --> task4`. + > `task0 --> task1 --> task4A`, `task0 --> task2 --> task4B` and `task0 --> + task3 --> task4C`. > > Note: here task4 is a downstream task. @@ -40,12 +40,13 @@ everything that follows the tasks within the `fork`. In current API there is Take the following pipeline as an example: ```javascript -const pipeline2 = join( +const pipeline = join( task0, - fork(task1, task2), - task3, - fork(task4, task5), - task6) + fork(task4, task3), + task5, + fork(task1, task2), + task6 +) ``` This pipeline represents a further challenge since it requires to @@ -60,16 +61,20 @@ However, there is a workaround this issue that may be used for now: const pipeline2 = join( task0, fork( - join(task1, task3, fork(task4, task5)), - join(task2, task3, fork(task4, task5)) + join(task4,task5,fork(task1,task2)), + join(task3,task5,fork(task1,task2)) ), task6 ) ``` -In the above pice of code I just added `fork(task4, task5)` inside the +*Diagram of pipeline and pipeline2* + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_after_fork.png) + +In the above piece of code I just added `fork(task1, task2)` inside the first `fork` in order to be able to get the same structure. To sum up, -the tasks between the two forks (`task3`) as well as the second `fork` (`fork -(task4, task5)`) have to be manually duplicated. +the tasks between the two forks (`task5`) as well as the second `fork` (`fork +(task1, task2)`) have to be manually duplicated. \ No newline at end of file From 7679b3cddec52b2fef92f95a59a4377c2067cbd5 Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Fri, 25 Aug 2017 16:46:22 +0100 Subject: [PATCH 25/35] added pipeline contributions to main readme --- README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README.md b/README.md index 7ee0ec8..0713bf1 100644 --- a/README.md +++ b/README.md @@ -114,3 +114,8 @@ We welcome all kinds of contributions at all levels of experience, please refer to the [Issues section](https://github.com/bionode/bionode-watermill/issues). Also, you can allways reach us on [gitter](https://gitter.im/bionode/bionode-watermill). + +### Feel free to submit your pipeline to us + +Just make a PR for us, that adds a pipeline under `./examples/pipelines/`. +You can check some of the already existing examples [there](examples/pipelines). From 4ead3472e3512e9148609fbacbd92a23f2f7d658 Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Fri, 25 Aug 2017 16:49:33 +0100 Subject: [PATCH 26/35] Corrected gh naming --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 0713bf1..f55ebd1 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ # bionode-watermill -> Bionode-watermill: A Streaming Workflow Engine +> Bionode-watermill: A (Not Yet Streaming) Workflow Engine [![npm version](https://badge.fury.io/js/bionode-watermill.svg)](https://badge.fury.io/js/bionode-watermill) [![node](https://img.shields.io/badge/node-v6.x-blue.svg)]() From d0931bd32e0525380894b6ed8d293c5855a891cf Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Fri, 25 Aug 2017 16:55:24 +0100 Subject: [PATCH 27/35] added npm badge --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f55ebd1..3c7d192 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,9 @@ [![node](https://img.shields.io/badge/node-v6.x-blue.svg)]() [![Build Status](https://travis-ci.org/bionode/bionode-watermill.svg?branch=dev)](https://travis-ci.org/bionode/bionode-watermill) [![codecov.io](https://codecov.io/github/bionode/bionode-watermill/coverage.svg?branch=master)](https://codecov.io/github/bionode/bionode-watermill?branch=master) -[![Gitter](https://img.shields.io/gitter/room/nwjs/nw.js.svg?style=flat-square)](https://gitter.im/bionode/bionode-watermill) +[![Gitter](https://img.shields.io/gitter/room/nwjs/nw.js.svg)](https://gitter.im/bionode/bionode-watermill) + +[![NPM](https://nodei.co/npm/bionode-watermill.png?downloads=true&stars=true)](https://nodei.co/npm/bionode-watermill/) ## Table of Contents From e8e167e13872981e4706bc705019746fd8fd22fc Mon Sep 17 00:00:00 2001 From: Tiago Jesus Date: Sun, 27 Aug 2017 08:56:27 +0100 Subject: [PATCH 28/35] updated docs to have streams in feature list --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 3c7d192..ac87415 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,8 @@ assembled in the final *pipeline* using [orchestrators](docs/BeginnerWalkthrough * Automated Input/Output handling * Ability to run programs using Unix shell * Node.js integration +* [Streamable tasks](docs/Task.md#streamable-tasks-potential) (still not +implemented - Issue [#79](https://github.com/bionode/bionode-watermill/issues/79)) ### Who is this tool for? @@ -120,4 +122,4 @@ Also, you can allways reach us on [gitter](https://gitter.im/bionode/bionode-wat ### Feel free to submit your pipeline to us Just make a PR for us, that adds a pipeline under `./examples/pipelines/`. -You can check some of the already existing examples [there](examples/pipelines). +You can check some of the already existing examples [here](examples/pipelines). From 5d610e2c1e3dfa2d1493c5fd7dd3bda7d8db06af Mon Sep 17 00:00:00 2001 From: tiagofilipe12 Date: Sun, 27 Aug 2017 10:48:37 +0100 Subject: [PATCH 29/35] updated multiple input pipeline for two-mappers --- .../two-mappers/pipeline_multipleinput.js | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/examples/pipelines/two-mappers/pipeline_multipleinput.js b/examples/pipelines/two-mappers/pipeline_multipleinput.js index 4767a10..5da54f3 100644 --- a/examples/pipelines/two-mappers/pipeline_multipleinput.js +++ b/examples/pipelines/two-mappers/pipeline_multipleinput.js @@ -28,7 +28,7 @@ const THREADS = parseInt(process.env.WATERMILL_THREADS) || 2 const config = { name: 'Streptococcus pneumoniae', - sraAccession: ['ERR045788', 'ERR016633'], + sraAccession: ['ERR045788'],// 'ERR016633'], referenceURL: 'http://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/007/045/GCF_000007045.1_ASM704v1/GCF_000007045.1_ASM704v1_genomic.fna.gz' } @@ -38,13 +38,14 @@ const config = { const getReference = task({ params: { url: config.referenceURL }, output: '*_genomic.fna.gz', + dir: process.cwd(), // this sets directory for outputs! name: `Download reference genome for ${config.name}` }, ({ params, dir }) => { const { url } = params const outfile = url.split('/').pop() // essentially curl -O - return request(url).pipe(fs.createWriteStream(dir + '/' + outfile)) + return request(url).pipe(fs.createWriteStream(outfile)) }) @@ -69,10 +70,10 @@ const fastqDump = task({ ) // first lets uncompress the gz -const gunzipIt = (referenceFile) => task({ - input: referenceFile, +const gunzipIt = task({ + input: process.cwd() + '/*_genomic.fna.gz', output: '*.fna' - }, ({ input}) => { + }, ({ input }) => { console.log("gunzipit:", input) return `gunzip -c ${input} > ${input.split('.')[0]}.fna` } @@ -139,10 +140,11 @@ const bowtieMapper = task({ // first gets reference getReference().then(results => { - const pipeline = (sraAccession, referenceFile) => join( + console.log("results:", results.resolvedOutput) + const pipeline = (sraAccession) => join( getSamples(sraAccession), fastqDump, - gunzipIt(referenceFile), + gunzipIt, fork( join(indexReferenceBwa, bwaMapper), join(indexReferenceBowtie2, bowtieMapper) @@ -150,8 +152,8 @@ getReference().then(results => { ) // then fetches the samples and executes the remaining pipeline for (const sra of config.sraAccession) { - console.log("sample:", sra) - const pipelineMaster = pipeline(sra, results.resolvedOutput) + console.log("sample:", sra, results.resolvedOutput) + const pipelineMaster = pipeline(sra) pipelineMaster() } }) From 034af26fb09870357061bca5e72fd1c7d74918c5 Mon Sep 17 00:00:00 2001 From: tiagofilipe12 Date: Sun, 27 Aug 2017 10:56:25 +0100 Subject: [PATCH 30/35] fixed formatting on docs --- docs/Task.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/Task.md b/docs/Task.md index a48bb81..0946622 100644 --- a/docs/Task.md +++ b/docs/Task.md @@ -272,11 +272,14 @@ const throughCapitalize = through(function (chunk, env, next) { this.push(chunk.toString().toUpperCase()) // then call next so that next chunk can be handled next() +} +```` You could connect `capitalize` to a readable (`readFile`) and writable (`writeFile`) file stream with: +```javascript const capitalize = task({ name: 'Capitalize Through Stream' }, From 5c7a63ba241873aa8716db851e4110d1264a0d26 Mon Sep 17 00:00:00 2001 From: tiagofilipe12 Date: Sun, 27 Aug 2017 12:36:09 +0100 Subject: [PATCH 31/35] added some minor changes to through task description and test --- docs/Task.md | 5 +++-- examples/pipelines/capitalize/alphabet.source.txt | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/Task.md b/docs/Task.md index 0946622..3c31a53 100644 --- a/docs/Task.md +++ b/docs/Task.md @@ -272,7 +272,7 @@ const throughCapitalize = through(function (chunk, env, next) { this.push(chunk.toString().toUpperCase()) // then call next so that next chunk can be handled next() -} +}) ```` You could connect `capitalize` to a readable (`readFile`) and writable @@ -322,7 +322,8 @@ const capitalize = task({ }, ({ input }) => fs.createReadStream(input) .pipe(throughCapitalize) - .pipe(fs.createWriteStream(input.swapExt('lowercase'))) + .pipe(fs.createWriteStream(input.split('/').slice(0, -1).join('/') + + 'uppercase')) ) ``` diff --git a/examples/pipelines/capitalize/alphabet.source.txt b/examples/pipelines/capitalize/alphabet.source.txt index b0883f3..3b18e51 100644 --- a/examples/pipelines/capitalize/alphabet.source.txt +++ b/examples/pipelines/capitalize/alphabet.source.txt @@ -1 +1 @@ -abcdefghijklmnopqrstuvwxyz +hello world From 98597126c43ede2aa858c05dca6ee24c44bf428c Mon Sep 17 00:00:00 2001 From: tiagofilipe12 Date: Mon, 28 Aug 2017 11:22:12 +0100 Subject: [PATCH 32/35] fixed input patterns from current working directory not being properly recognized --- lib/lifecycle/resolve-input.js | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/lib/lifecycle/resolve-input.js b/lib/lifecycle/resolve-input.js index c4823fe..af99138 100644 --- a/lib/lifecycle/resolve-input.js +++ b/lib/lifecycle/resolve-input.js @@ -2,6 +2,7 @@ const _ = require('lodash') const chalk = require('chalk') +const fs = require('fs') const applicator = require('../utils/applicator.js') const matchToFs = require('../matchers/match-to-fs.js') @@ -60,6 +61,20 @@ const resolveInput = (taskState, DAG, logger) => new Promise((resolve, reject) = } else if (minimatch(path, item)) { console.log(tab(2) + `${chalk.magenta(item)} matched to ${chalk.blue(path)}`) return resolve(path) + } else { + // this checks wether the input is on current working directory + // and tries to match it to input pattern + fs.readdir(process.cwd(), (err, files) => { + if (err) { + throw new Error(err) + } + files.forEach(file => { + if (minimatch(file, item)) { + console.log(tab(2) + `${chalk.magenta(item)} matched to ${chalk.blue(file)}`) + return resolve(file) + } + }) + }) } } From 974d58d601af82b970ffd78b941290a0dc4dd918 Mon Sep 17 00:00:00 2001 From: tiagofilipe12 Date: Mon, 28 Aug 2017 11:48:37 +0100 Subject: [PATCH 33/35] added definitive multi input pipeline for two-mappers pipeline.js --- .../two-mappers/pipeline_multipleinput.js | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/examples/pipelines/two-mappers/pipeline_multipleinput.js b/examples/pipelines/two-mappers/pipeline_multipleinput.js index 5da54f3..1280f99 100644 --- a/examples/pipelines/two-mappers/pipeline_multipleinput.js +++ b/examples/pipelines/two-mappers/pipeline_multipleinput.js @@ -28,7 +28,7 @@ const THREADS = parseInt(process.env.WATERMILL_THREADS) || 2 const config = { name: 'Streptococcus pneumoniae', - sraAccession: ['ERR045788'],// 'ERR016633'], + sraAccession: ['ERR045788', 'ERR016633'], referenceURL: 'http://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/007/045/GCF_000007045.1_ASM704v1/GCF_000007045.1_ASM704v1_genomic.fna.gz' } @@ -71,17 +71,15 @@ const fastqDump = task({ // first lets uncompress the gz const gunzipIt = task({ - input: process.cwd() + '/*_genomic.fna.gz', - output: '*.fna' - }, ({ input }) => { - console.log("gunzipit:", input) - return `gunzip -c ${input} > ${input.split('.')[0]}.fna` - } + input: '*_genomic.fna.gz', + output: '*.fna', + name: 'gunzipIt task' + }, ({ input }) => `gunzip -c ${input} > ${input.split('.').slice(0,2).join('.')}.fna` ) // then index using first bwa ... const indexReferenceBwa = task({ - input: '*.fa', + input: '*.fna', output: { indexFile: ['amb', 'ann', 'bwt', 'pac', 'sa'].map(suffix => `bwa_index.${suffix}`), @@ -89,17 +87,17 @@ const indexReferenceBwa = task({ // and index files }, //params: { output: 'bwa_index.fa' }, - name: 'bwa index bwa_index.fa -p bwa_index' + name: 'bwa index bwa_index.fna -p bwa_index' }, ({ input }) => `bwa index ${input} -p bwa_index`) // and bowtie2 const indexReferenceBowtie2 = task({ - input: '*.fa', + input: '*.fna', output: ['1.bt2', '2.bt2', '3.bt2', '4.bt2', 'rev.1.bt2', 'rev.2.bt2'].map(suffix => `bowtie_index.${suffix}`), params: { output: 'bowtie_index' }, - name: 'bowtie2-build -q uncompressed.fa bowtie_index' + name: 'bowtie2-build -q uncompressed.fna bowtie_index' }, ({ params, input }) => `bowtie2-build -q ${input} ${params.output}` /* for bowtie we had to uncompress the .fna.gz file first before building the reference */ @@ -140,7 +138,7 @@ const bowtieMapper = task({ // first gets reference getReference().then(results => { - console.log("results:", results.resolvedOutput) +// console.log("results:", results.resolvedOutput) const pipeline = (sraAccession) => join( getSamples(sraAccession), fastqDump, @@ -152,8 +150,9 @@ getReference().then(results => { ) // then fetches the samples and executes the remaining pipeline for (const sra of config.sraAccession) { - console.log("sample:", sra, results.resolvedOutput) + //console.log("sample:", sra, results.resolvedOutput) + console.log("cwd_check", process.cwd()) const pipelineMaster = pipeline(sra) - pipelineMaster() + pipelineMaster().then(results => console.log("Results: ", results)) } }) From 4e6f589561a3f98a08eb82126dc3c45cd98bbeff Mon Sep 17 00:00:00 2001 From: tiagofilipe12 Date: Mon, 28 Aug 2017 13:42:01 +0100 Subject: [PATCH 34/35] uptaded docs regarding multiple input api changes --- docs/BeginnerWalkthrough.md | 2 + docs/MultipleInput.md | 97 ++++++++++++++++++++++++++++++++++++- docs/Task.md | 10 ++-- 3 files changed, 105 insertions(+), 4 deletions(-) diff --git a/docs/BeginnerWalkthrough.md b/docs/BeginnerWalkthrough.md index 07e1e5b..db2bf8e 100644 --- a/docs/BeginnerWalkthrough.md +++ b/docs/BeginnerWalkthrough.md @@ -223,6 +223,8 @@ const pipeline = join( ) ``` +> For more details on multiple inputs check this [link](MultipleInput.md). + The last operator is `fork`. Fork lets you replace one task in your pipeline with more than one, and have the downstream tasks duplicated for each task you pass into fork. This is useful for comparing tools, options within a tool or diff --git a/docs/MultipleInput.md b/docs/MultipleInput.md index 398374d..da7b53b 100644 --- a/docs/MultipleInput.md +++ b/docs/MultipleInput.md @@ -148,4 +148,99 @@ fs.readdir(process.cwd(), (err, files) => { Note that it is quite similar to the example above but returns a `Promise .map`, which also return the pipeline executions, instead of just executing the pipeline for each input file. This is one simple way to empower the user to -control the resources used by their scripts. \ No newline at end of file +control the resources used by their scripts. + +### Pratical example + +Imagine we have a couple of samples we want to get from NCBI SRA and map them + against a given reference using both `bowtie2` and `bwa`. + + So first we have to define a task that needs to fetch the reference data and + this task must be run just once: + + ```javascript + // some config variables that will be used in this pipeline +const config = { + name: 'Streptococcus pneumoniae', + sraAccession: ['ERR045788', 'ERR016633'], + referenceURL: 'http://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/007/045/GCF_000007045.1_ASM704v1/GCF_000007045.1_ASM704v1_genomic.fna.gz' +} + +// === TASKS === + +// first lets get the reference genome for our mapping +const getReference = task({ + params: { url: config.referenceURL }, + output: '*_genomic.fna.gz', + dir: process.cwd(), // this sets directory for outputs! + name: `Download reference genome for ${config.name}` +}, ({ params, dir }) => { + const { url } = params + const outfile = url.split('/').pop() + + // essentially curl -O + return request(url).pipe(fs.createWriteStream(outfile)) +}) +``` + +Then what we need is a way for all other tasks to occur after `getReference` +task: + +```javascript +// task that defines how we get samples +//then get samples to work with +const getSamples = (sraAccession) => task({ + params: { + db: 'sra', + accession: sraAccession + }, + output: '**/*.sra', + dir: process.cwd(), // Set dir to resolve input/output from + name: `Download SRA ${sraAccession}` +}, ({ params }) => `bionode-ncbi download ${params.db} ${params.accession}` +) + +// ...other tasks... +// the pipeline can be then called like this +getReference().then(results => { + const pipeline = (sraAccession) => join( + getSamples(sraAccession), + fastqDump, + gunzipIt, + fork( + join(indexReferenceBwa, bwaMapper), + join(indexReferenceBowtie2, bowtieMapper) + ) + ) +// then fetches the samples and executes the remaining pipeline + for (const sra of config.sraAccession) { + const pipelineMaster = pipeline(sra) + pipelineMaster().then(results => console.log("Results: ", results)) + } +}) +``` + +Notice how the pipeline is ran twice (given that we have two inputs in an +array (`config.sraAccession`)). And notice that we have passed an argument to +`getSamples` task which is each `config.sraAccession`. + +This behavior will result in one vertex with the `getReference` task, which +outputs become available to each `pipeline` that each sra sample triggers. +So, we end up with a `pipeline` for each sample as well (two in this case). + +> This is possible because we rely on the `getReference` task to save the +outputs on current woring directory. Then the api searches first for outputs +that are generated within each bionode-watermill pipeline `currentCollection` + and then in the current working directory for a matching file (with the + expected glob pattern). +> +> This matching is done by the `matchee` function in +`lib/lifecycle/resolve-input.js`. +> +> The fact that bionode-watermill also searches for inputs under current +working directory is what allows to run a pipeline after another and still +get reference to the previous pipeline (in the example given above). +Therefore, use it as you please but take into account that **bionode-watermill +cannot map every folder within your file system** and **`currentCollection` +just +saves reference to the files that were run within a pipeline**. diff --git a/docs/Task.md b/docs/Task.md index 3c31a53..47563a3 100644 --- a/docs/Task.md +++ b/docs/Task.md @@ -163,7 +163,10 @@ Otherwise glob patterns are matched within `./data` folder for the **first** folder that has files that match the glob pattern. Have into account that it crawls the tasks tree from downstream to upstream - until it finds the desired match. + until it finds the desired match and that this is **run specific**, i.e., + `currentCollection` can only reference outputs and inputs generated by the + pipeline. Otherwise, bionode-watermill will also look in current working + directory for a file that matches the desired glob pattern. ```javascript // now, this is not the first task @@ -176,8 +179,7 @@ Notice the difference in the location of the inputs. Also and importantly, if you have multiple inputs to pass to a task, make sure they are in the same directory. If it is the first task make sure these -inputs are all in the current working directory or specify on the input the -folder where they are (*hard-coding* it). Same for tasks that fetch inputs +inputs are all in the current working directory. Same for tasks that fetch inputs from `./data`, assure that every file is within a folder. If the desired outputs are spread through several folders and their file extension is the same, you will have to add something to the glob pattern suffix. For example, @@ -194,6 +196,8 @@ const someTask = task({ So, if this is your case, make sure your glob pattern is unique enough for not matching other undesired files. +> For more details on multiple inputs check this [link](MultipleInput.md). + ### Output Resolution The resolved values can be accessed from `myTask.resolvedOutput` after the task has emitted a `task.finish` event. From 48c00667ad353745c7211d7c2587a6733b8dc5bf Mon Sep 17 00:00:00 2001 From: tiagofilipe12 Date: Mon, 28 Aug 2017 13:47:27 +0100 Subject: [PATCH 35/35] added headers to begginer's walkthrough --- docs/BeginnerWalkthrough.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/docs/BeginnerWalkthrough.md b/docs/BeginnerWalkthrough.md index db2bf8e..3275dd8 100644 --- a/docs/BeginnerWalkthrough.md +++ b/docs/BeginnerWalkthrough.md @@ -164,7 +164,11 @@ const uppercaser = watermill.task({ The next core idea of bionode-watermill is that of *orchestrators*. Orchestrators are -ways to combine tasks. The simplest of the operators is `join`. `join` takes +ways to combine tasks. + +### Join + +The simplest of the operators is `join`. `join` takes any number of tasks as parameters, and will run each of the tasks sequentially. Where the first task will check the current working folder for files that match `input`, within a join lineage, downstream tasks will attempt to match their @@ -199,6 +203,8 @@ const pipeline = join(getSamples('2492428'), fastqDump) pipeline() ``` +### Junction + The next operator you might use is `junction`. Junction lets you run a set of tasks in parallel. If `junction` is used within a `join`, the downstream tasks will have access to the outputs of each of the parallelized items from `junction`. @@ -225,6 +231,8 @@ const pipeline = join( > For more details on multiple inputs check this [link](MultipleInput.md). +### Fork + The last operator is `fork`. Fork lets you replace one task in your pipeline with more than one, and have the downstream tasks duplicated for each task you pass into fork. This is useful for comparing tools, options within a tool or