diff --git a/README.md b/README.md index fcd79eb..ac87415 100644 --- a/README.md +++ b/README.md @@ -1,84 +1,102 @@ +

+ + bionode logo + +
+ bionode.io +

+ # bionode-watermill -[![npm version](https://badge.fury.io/js/bionode-watermill.svg)](https://badge.fury.io/js/bionode-watermill) [![node](https://img.shields.io/badge/node-v6.x-blue.svg)]() [![Build Status](https://travis-ci.org/bionode/bionode-watermill.svg?branch=master)](https://travis-ci.org/bionode/bionode-watermill) [![codecov.io](https://codecov.io/github/bionode/bionode-watermill/coverage.svg?branch=master)](https://codecov.io/github/bionode/bionode-watermill?branch=master) +> Bionode-watermill: A (Not Yet Streaming) Workflow Engine -*Watermill: A Streaming Workflow Engine* +[![npm version](https://badge.fury.io/js/bionode-watermill.svg)](https://badge.fury.io/js/bionode-watermill) +[![node](https://img.shields.io/badge/node-v6.x-blue.svg)]() +[![Build Status](https://travis-ci.org/bionode/bionode-watermill.svg?branch=dev)](https://travis-ci.org/bionode/bionode-watermill) +[![codecov.io](https://codecov.io/github/bionode/bionode-watermill/coverage.svg?branch=master)](https://codecov.io/github/bionode/bionode-watermill?branch=master) +[![Gitter](https://img.shields.io/gitter/room/nwjs/nw.js.svg)](https://gitter.im/bionode/bionode-watermill) [![NPM](https://nodei.co/npm/bionode-watermill.png?downloads=true&stars=true)](https://nodei.co/npm/bionode-watermill/) +## Table of Contents + +* [What is bionode-watermill](#what-is-bionode-watermill) + * [Main features](#main-features) + * [Who is this tool for?](#who-is-this-tool-for) +* [Installation](#installation) +* [Documentation](#documentation) +* [Tutorial](#tutorial) +* [Example pipelines](#example-pipelines) +* [Why bionode-watermill?](#why-bionode-watermill) +* [Contributing](#contributing) + + + -- [CWL?](#cwl) -- [What is a task?](#what-is-a-task) -- [What are orchestrators?](#what-are-orchestrators) -- [Check out bionode-watermill tutorial!](#check-out-bionode-watermill-tutorial) -- [Example pipelines](#example-pipelines) -- [Why bionode-watermill?](#why-bionode-watermill) -- [Who is this tool for?](#who-is-this-tool-for) +## What is bionode-watermill -Watermill lets you *orchestrate* **tasks** using operators like **join**, **junction**, and **fork**. Each task has a [lifecycle](https://thejmazz.gitbooks.io/bionode-watermill/content/TaskLifecycle.html) where +**Bionode-watermill** is a workflow engine that lets you assemble and run +bioinformatic pipelines with ease and less overhead. Bionode-watermill +pipelines are +essentially node.js scripts in which [tasks](docs/BeginnerWalkthrough.md#task) are the modules that will be +assembled in the final *pipeline* using [orchestrators](docs/BeginnerWalkthrough.md#orchestrators). -1. Input [glob patterns](https://github.com/isaacs/node-glob) are resolved to absolute file paths (e.g. `*.bam` to `reads.bam`) -2. The **operation** is ran, passed resolved input, params, and other props -3. The operation completes. -4. Output glob patterns are resolved to absolute file paths. -5. Validators are ran over the output. Check for non-null files, can pass in custom validators. -6. Post-validations are ran. Add task and output to DAG. +### Main features -## CWL? +* Modularity +* Reusability +* Automated Input/Output handling +* Ability to run programs using Unix shell +* Node.js integration +* [Streamable tasks](docs/Task.md#streamable-tasks-potential) (still not +implemented - Issue [#79](https://github.com/bionode/bionode-watermill/issues/79)) -Coming soon. +### Who is this tool for? -## What is a task? +Bionode-watermill is for **biologists** who understand it is important to +experiment with sample data, parameter values, and tools. Compared to other +workflow systems, the ease of swapping around parameters and tools is much +improved, allowing you to iteratively compare results and construct more +confident inferences. Consider the ability to construct your own +[Teaser](https://genomebiology.biomedcentral.com/articles/10.1186/s13059-015-0803-1) +for *your data* with a *simple syntax*, and getting utmost performance out of the box. -A `task` is the fundamental unit pipelines are built with. For more details, see [Task](https://thejmazz.gitbooks.io/bionode-watermill/content/Task.html). At a glance, a task is created by passing in **props** and an **operationCreator**, which will later be called with the resolved input. Consider this task which takes a "lowercase" file and creates an "uppercase" one: -```javascript -const uppercase = task({ - input: '*.lowercase', - output: '*.uppercase' -}, function(resolvedProps) { - const input = resolvedProps.input +Bionode-watermill is for **programmers** who desire an efficient and +easy-to-write methodology for developing complex and dynamic data pipelines, +while handling parallelization as much as possible. Bionode-watermill is an npm +module, and is accessible by anyone willing to learn a little JavaScript. This +is in contrast to other tools which develop their own DSL +(domain specific language), which is not useful outside the tool. By leveraging +the npm ecosystem and JavaScript on the client, Bionode-watermill can be built +upon for inclusion on web apis, modern web applications, as well as native +applications through [Electron](http://electron.atom.io/). Look forward to +seeing Galaxy-like applications backed by a completely configurable Node API. - return fs.createReadStream(input) - .pipe(through(function(chunk, enc, next) { - next(null, chunk.toString().toUpperCase()) - }) - .pipe(fs.createWriteStream(input.replace(/lowercase$/, 'uppercase'))) -}) -``` -A "task declaration" like above will not immediately run the task. Instead, the task declaration returns an "invocable task" that can either be called directly or used with an orchestration operator. Tasks can also be created to **run shell programs**: +## Installation -```javascript -const fastqDump = task({ - input: '**/*.sra', - output: [1, 2].map(n => `*_${n}.fastq.gz`), - name: 'fastq-dump **/*.sra' -}, ({ input }) => `fastq-dump --split-files --skip-technical --gzip ${input}` ) -``` +Local installation: -## What are orchestrators? +```npm install bionode-watermill``` -Orchestrators are functions which can take tasks as params in order to let you compose your pipeline from a high level view. This **separates task order from task declaration**. For more details, see [Orchestration](https://thejmazz.gitbooks.io/bionode-watermill/content/Orchestration.html). At a glance, here is a complex usage of `join`, `junction`, and `fork`: +Global installation: -```javascript -const pipeline = join( - junction( - join(getReference, bwaIndex), - join(getSamples, fastqDump) - ), - trim, mergeTrimEnds, - decompressReference, // only b/c mpileup did not like fna.gz - join( - fork(filterKMC, filterKHMER), - alignAndSort, samtoolsIndex, mpileupAndCall // 2 instances each of these - ) -) -``` +```npm install bionode-watermill -g``` -## Check out bionode-watermill tutorial! +## Documentation -- [Try out bionode-watermill tutorial](https://github.com/bionode/bionode-watermill-tutorial) +Our documentation is available [here](https://thejmazz.gitbooks.io/bionode-watermill/content/). +There you may find how to **use** bionode-watermill to construct and **run** +your +pipelines. Moreover, you will also find the description of the API to help +anyone +willing to **contribute**. + + +## Tutorial + +- [Try bionode-watermill tutorial!](https://github.com/bionode/bionode-watermill-tutorial) ## Example pipelines @@ -86,7 +104,7 @@ const pipeline = join( - [Simple capitalize task](https://github.com/bionode/bionode-watermill/blob/master/examples/pipelines/capitalize/capitalize.js) - [Simple SNP calling](https://github.com/bionode/bionode-watermill/blob/master/examples/pipelines/variant-calling-simple/pipeline.js) - [SNP calling with filtering and fork](https://github.com/bionode/bionode-watermill/blob/master/examples/pipelines/variant-calling-filtered/pipeline.js) -- [Mapping with bowtie2 and bwa](https://github.com/bionode/bionode-watermill/tree/master/examples/pipelines/two-mappers) +- [Mapping with bowtie2 and bwa (with tutorial)](https://github.com/bionode/bionode-watermill/tree/master/examples/pipelines/two-mappers) ## Why bionode-watermill? @@ -94,8 +112,14 @@ const pipeline = join( compares the available tools to deal with NGS workflows, explaining the advantages of each one, including **bionode-watermill**. -## Who is this tool for? -Bionode-watermill is for **programmers** who desire an efficient and easy-to-write methodology for developing complex and dynamic data pipelines, while handling parallelization as much as possible. Bionode-watermill is an npm module, and is accessible by anyone willing to learn a little JavaScript. This is in contrast to other tools which develop their own DSL (domain specific language), which is not useful outside the tool. By leveraging the npm ecosystem and JavaScript on the client, Bionode-watermill can be built upon for inclusion on web apis, modern web applications, as well as native applications through [Electron](http://electron.atom.io/). Look forward to seeing Galaxy-like applications backed by a completely configurable Node API. +## Contributing +We welcome all kinds of contributions at all levels of experience, please +refer to +the [Issues section](https://github.com/bionode/bionode-watermill/issues). +Also, you can allways reach us on [gitter](https://gitter.im/bionode/bionode-watermill). + +### Feel free to submit your pipeline to us -Bionode-watermill is for **biologists** who understand it is important to experiment with sample data, parameter values, and tools. Compared to other workflow systems, the ease of swapping around parameters and tools is much improved, allowing you to iteratively compare results and construct more confident inferences. Consider the ability to construct your own [Teaser](https://genomebiology.biomedcentral.com/articles/10.1186/s13059-015-0803-1) for *your data* with a *simple syntax*, and getting utmost performance out of the box. +Just make a PR for us, that adds a pipeline under `./examples/pipelines/`. +You can check some of the already existing examples [here](examples/pipelines). diff --git a/docs/BeginnerWalkthrough.md b/docs/BeginnerWalkthrough.md index 57b5c72..3275dd8 100644 --- a/docs/BeginnerWalkthrough.md +++ b/docs/BeginnerWalkthrough.md @@ -1,13 +1,13 @@ # Beginner Walkthrough -bionode-watermill can be used to create complicated bioinformatics pipelines, +Bionode-watermill can be used to create complex bioinformatics pipelines, but at its core are simple ideas. The first of these is the idea of a *task*. ## Task A `task` describes the idea that *some input* will be transformed into *some -output*. For our cases, we will consider input and output to both be a file. -Before jumping into the JavaScript, let's see what our first task is! +output*. First, we will consider input and output to both be a file. +Before jumping into JavaScript, let's see what our first task is! Our first simple pipeline will convert a file with lowercase characters into one with upper case characters. On your UNIX command line (on Windows, see @@ -27,27 +27,27 @@ cat alphabet.lowercase | tr '[:lower:]' '[:upper:]' > alphabet.uppercase cat alphabet.uppercase ``` -Okay, so how to the same thing with bionode-watermill? We can create a `task` +Okay, so how to do the same thing with bionode-watermill? We can create a `task` for it. A `task` is created by passing `watermill.task` two parameters -- object describing `input`, `output`, `name` -- the "operation creator", a function which will receive "resolved props" +- object describing `input`, `output`, `params`, `name` ("props") +- the "operation creator", a function which will receive "props" -For now, let's forget about resolved props and just translate the simple pipeline +For now, let's forget about props and just translate the simple pipeline above into a `task`: ```javascript // pipeline.js // Make sure to "npm install bionode-watermill" first -const watermill = require('bionode-watermill') +const { task } = require('bionode-watermill') -const uppercaser = watermill.task({ +const uppercaser = task({ input: '*.lowercase', output: '*.uppercase', name: 'Uppercase *.lowercase -> *.uppercase' -}, function(resolvedProps) { - const input = resolvedProps.input +}, function(props) { + const input = props.input const output = input.replace(/lowercase$/, 'uppercase') return `cat ${input} | tr '[:lower:]' '[:upper:]' > ${output}` @@ -65,12 +65,12 @@ node pipeline.js ``` What's going on here? Behind the scenes bionode-watermill will start looking for -files matching the [glob][node-glob] pattern `*.alphabet`. The file you created +files matching the [glob][node-glob] pattern `*.lowercase`. The file you created earlier will be found, *and its absolute path will be stored*. Then a unique folder to run the task is created within the `data` folder - this prevents us from worrying about file overwriting in larger pipelines. Within the folder for -the instance of this task, a symlink to the `*.alphabet` file is created. -Finally, `resolvedProps`, which holds the absolute path of `*.alphabet` inside +the instance of this task, a symlink to the `*.lowercase` file is created. +Finally, `props`, which holds the absolute path of `*.lowercase` inside `input` (the same as it was for "props", just now the glob pattern is a valid file path) is passed to our "operation creator". The operation creator returns a String (using ES6 template literals). When watermill finds a string returned from a task, @@ -78,11 +78,12 @@ it will run it as a shell child process within that task instance's folder. The log output will have each line prefixed with a hash: this is the ID of the instance of that task, and is the name of the folder in `data` which was made -for this task, look inside there and you will find the `alphabet.uppercase` file. +for this task, look inside there and you will find the `*.uppercase` file. [node-glob]: https://github.com/isaacs/node-glob There are two ways we can improve the readability of our task: + - assignment destructuring ```javascript @@ -98,17 +99,17 @@ const { input } = obj ```javascript // Without => -function actionCreator(resolvedProps) { +function operationCreator(props) { return '...' }.bind(this) // => will automatically ".bind(this)" (so that this is the same inside and outside the function) // With =>, can return object directly instead of having a function body -const actionCreator = (resolvedProps) => '...' +const operationCreator = (props) => '...' ``` -With those syntax features, our task looks like: +With those syntax features, our task looks will look like this: ```javascript -const uppercaser = watermill.task({ +const uppercaser = task({ input: '*.lowercase', output: '*.uppercase', name: 'Uppercase *.lowercase -> *.uppercase' @@ -131,7 +132,7 @@ this: ```javascript const fs = require('fs') -const watermill = require('bionode-watermill') +const { task } = require('bionode-watermill') const through = require('through2') const uppercaser = watermill.task({ @@ -159,16 +160,22 @@ const uppercaser = watermill.task({ watermill needs to be given a flowing stream in the operation creator in order to watch it finish* -## Operators +## Orchestrators -The next core idea of bionode-watermill is that of *operators*. Operators are -ways to combine tasks. The simplest of the operators is `join`. `join` takes +The next core idea of bionode-watermill is that of *orchestrators*. +Orchestrators are +ways to combine tasks. + +### Join + +The simplest of the operators is `join`. `join` takes any number of tasks as parameters, and will run each of the tasks sequentially. Where the first task will check the current working folder for files that match `input`, within a join lineage, downstream tasks will attempt to match their `input` to an upstream task's `output`. -A simple join pipeline is a download+extract one. For this we can download some +A simple join pipeline is a download + extract one. For this we can download +some reads from the NCBI Sequence Read Archive (SRA) and extract them with `fastq-dump` from sratools. @@ -196,51 +203,115 @@ const pipeline = join(getSamples('2492428'), fastqDump) pipeline() ``` +### Junction + The next operator you might use is `junction`. Junction lets you run a set of tasks in parallel. If `junction` is used within a `join`, the downstream tasks will have access to the outputs of each of the parallelized items from `junction`. -We can use `junction` to run our pipeline above over multiple samples: +We can use `junction` to run our pipeline above over multiple samples and +make a manifest file with all `.fna.gz` files that were downloaded: ```javascript -const pipeline = junction( +// task to create manifest file with all .fna.gz files downloaded +const listFiles = task({ + input: '*.fna.gz', // this is used to symlink files to this task directory + output: '*.txt', +}, ({ input}) => `ls *.fna.gz > listFiles.txt` +) + +const pipeline = join( + junction( join(getSamples('2492428'), fastqDump), join(getSamples('1274026'), fastqDump) + ), + listFiles ) ``` +> For more details on multiple inputs check this [link](MultipleInput.md). + +### Fork + The last operator is `fork`. Fork lets you replace one task in your pipeline with more than one, and have the downstream tasks duplicated for each task you -pass into fork. This is useful for comparing tools or options within a tool. +pass into fork. This is useful for comparing tools, options within a tool or +even perform a couple of repetitive tasks for many samples from + a given pipeline. -For a simple example, we can run `ls` with different options before sending -that output to capitalize. What's important to note is that *we only define one -uppercaser task, yet it is ran on each task in the `fork`*. +For example, we can run use fork to fetch many samples using +`getSamples` and extract each one `fastqDump` and then uncompressed them all +using out new `gunzipIt` task: ```javascript -const lsMaker = (opts) => watermill.task({ - output: '*.lowercase' -}, () => `ls ${opts} > ls.lowercase`) - -// uppercaser task as above +const gunzipIt = task({ + input: '*.fna.gz', + output: '*.fa', +}, ({ input}) => `gunzip -c ${input} > ${input.split('.')[0]}.fa` +) const pipeline = join( - fork(lsMaker(''), lsMaker('-lh')), - uppercaser + fork( + join(getSamples('2492428'), fastqDump), + join(getSamples('1274026'), fastqDump) + ), + gunzipIt ) +``` -pipeline() +What's important to note is that *we only define one +`gunzipIt` task, yet it is ran on each task in the `fork`*. Also, `gunzipIt` +will start running once each task within the `fork` gets finished and thus it + does not have to wait for all tasks within `fork` to finish. + +## Pipeline execution + +Once you have defined your **tasks** and their **orchestration** make sure +you call at the end of each `pipeline.js`: + +```javascript +pipeline() // makes the assembled pipeline executed +``` + +and then on your terminal: + +```bash +node pipeline.js ``` +You can also use **google chrome inspector** if you want to debug something: + +```bash +node --inspect pipeline.js +``` + +>**[Only recommended for more advanced users]** +> +>You may even go further by using an environmental variable +>`REDUX_LOGGER=1`, which basically lets you log more information (*redux* +>*actions* and *states*) on the workflow within each `bionode-watermill` task: +> +>```bash +>REDUX_LOGGER=1 node --inspect pipeline.js +>``` + ## Summary You've seen that bionode-watermill can be used to defined `tasks` that *resolve* their `input` and `output`. These tasks can be child processes (i.e. a program -on the command line) or just regular JavaScript functions. As well a brief introduction +on the command line) or just regular JavaScript functions. As well, a brief +introduction to the operators `join`, `junction`, and `fork` was made. -These are all the tools necessary to make a bioinformatics pipeline! See -this [variant calling pipeline](https://github.com/bionode/bionode-watermill/blob/master/examples/variant-calling-filtered/pipeline.js): +These are all the tools necessary to make a bioinformatics pipeline! + +Check out our [tutorial](https://github.com/bionode/bionode-watermill-tutorial) +if you want to get a feel on how it looks like and start assembling your own +pipelines. + +Here are some **example pipelines** + +[Variant calling pipeline](https://github.com/bionode/bionode-watermill/blob/master/examples/variant-calling-filtered/pipeline.js): ```javascript const pipeline = join( @@ -257,6 +328,27 @@ const pipeline = join( ) ``` +[Mapping with bowtie2 and bwa](https://github.com/bionode/bionode-watermill/tree/master/examples/pipelines/two-mappers) + +```javascript +const pipeline = join( + junction( + getReference, + join(getSamples,fastqDump) + ), + samtoolsFaidx, gunzipIt, /* since this will be common to both mappers, there + is no + need to be executed after fork duplicating the effort. */ + fork( + join(indexReferenceBwa, bwaMapper), + join(indexReferenceBowtie2, bowtieMapper) + ), + /* here the pipeline will break into two distinct branches, one for each + mapping approach used. */ + samtoolsView, samtoolsSort, samtoolsIndex, samtoolsDepth +) +``` + Now go out there and make something cool! If you would like to help out, see the diff --git a/docs/D3Visualization.md b/docs/D3Visualization.md new file mode 100644 index 0000000..d51e2b5 --- /dev/null +++ b/docs/D3Visualization.md @@ -0,0 +1,72 @@ +# Graph visualization + +Currently bionode-watermill has a tool that allows the user to visualize the +pipeline Directed Acyclic Graph (DAG). This tool is available while running +the scripts in `localhost:8084`. Visualization script is available in +`viz/index.html`. + +## Enabling visualization + +Just paste `localhost:8084` on your browser URL. + +## Canceling + +Now, this tool hangs each script at its end and in order to exit it, just +hit: `ctrl + c` in the shell terminal where the pipeline script is running. +Hitting `ctrl + c` will close the connection between the browser and +`lib/reducers/collections.js`. + +## How it works + +This basically uses npm package `socket.io` on `lib/reducers/collection.js` +where graph object is read. This graph object follows graphson-like structure: + +```javascript +{ + "graph": { + "mode": "NORMAL", + "vertices": [ + //...Array of vertices + ], + "edges": [ + //...Array of edges + ] + } +} +``` + +Vertices represent each task on the current graph visualization (as blue +circles) and edges represent the flow between tasks (with arrows), i.e., +which tasks runs after other tasks attending to the orchestration shape. +For instance, if we have `join(task1, task2, task3)`, this will render 3 +vertices (blue circles) and 2 edges (arrows) like this: + +```javascript +task1 --> task2 --> task3 +``` + +## Node hovering + +On node hovering it is possible to obtain metadata information on tasks +`props`, such as: + +- minUid: The short `uid` that matches the filesystem under `./data/` +- Kind: Currently this differentiates if it is a `task` or a `junction`, +because `junction`s are added as a orange circle with no relevant proprieties, +just to make the required connection on the graph. +- Task name: The name given by the user under `props` object while defining a + task. +- Output(s): The resolved outputs (`resolvedOutput`) from the task run. +- Input(s): The resolved inputs (`resolvedInput`) from the task run. +- params: The `params` given to task `props` +- Output folder: The output folder relative path. +- Cmd: The actual operation passed to shell (when passed) or the +`operationCreator` function. + + +## Limitations + +* This visualization tool does not represent input/output flow, just + the pipeline flow. + * It is also unable to know what was ran from what is running and from + what is failing. diff --git a/docs/Forkception.md b/docs/Forkception.md new file mode 100644 index 0000000..92034fe --- /dev/null +++ b/docs/Forkception.md @@ -0,0 +1,80 @@ +# Forkception + +Fork is a special case within bionode-watermill because it requires to know +everything that follows the tasks within the `fork`. In current API there is + no easy way to get graph shape before actually running the `pipeline.js` + scripts. Therefore, `fork` uses `join` to construct independent lineages for + each task within the `fork`. + + Currently `lib/orchestrator/fork.js` just collects the tasks inside `fork` + and then everything else is handled by `lib/orchestrator/join.js`, i.e., + the branching of the pipeline in several leaves. This branching requires + that `fork` *downstream tasks* are multiplied as many times as the number + of tasks inside `fork`. + + > For instance if fork contains 3 tasks the pipeline will have to branch + > 3 times from this point on: + > + >```javascript + >const pipeline = join(task0,fork(task1, task2, task3), task4) + >``` + > This will end up in three different branches (or lineages) with three + >leaves: + > `task0 --> task1 --> task4A`, `task0 --> task2 --> task4B` and `task0 --> + task3 --> task4C`. + > + > Note: here task4 is a downstream task. + + Notice that task4 needed to be triplicated, since it will have different + results + depending on the results from its parent tasks (`task1`, `task2` or `task3`). + + Therefore, if a `fork` is found within a `join` wrapping the all + the pipeline a + bunch of rules were designed to make fork multiply each *downstream task* + as well as be able to properly work all other orchestrators that have their + own behavior (check [Orchestration](Orchestration.md)). + + ## Current limitations + + Take the following pipeline as an example: + + ```javascript +const pipeline = join( + task0, + fork(task4, task3), + task5, + fork(task1, task2), + task6 +) +``` + + This pipeline represents a further challenge since it requires to + duplica the second `fork` itself rather than their contained tasks, as all + other rules made for other orchestrators to work inside `fork`. The same is + true for `junction`, so if in the above example the second `fork` is in fact + a `junction`, the problem will remain the same. + +However, there is a workaround this issue that may be used for now: + +```javascript +const pipeline2 = join( + task0, + fork( + join(task4,task5,fork(task1,task2)), + join(task3,task5,fork(task1,task2)) + ), + task6 +) +``` + +*Diagram of pipeline and pipeline2* + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_after_fork.png) + +In the above piece of code I just added `fork(task1, task2)` inside the +first `fork` in order to be able to get the same structure. To sum up, +the tasks between the two forks (`task5`) as well as the second `fork` (`fork +(task1, task2)`) have to be manually duplicated. + + \ No newline at end of file diff --git a/docs/Internals.md b/docs/Internals.md index 9153e15..45fd91f 100644 --- a/docs/Internals.md +++ b/docs/Internals.md @@ -2,20 +2,28 @@ This section documents how bionode-watermill functions internally. Read this if you would like to understand the tool better, or would like to contribute *middlewares*. -Waterwheel and [Gulp](https://github.com/gulpjs/gulp) are alike in some ways: +Watermill and [Gulp](https://github.com/gulpjs/gulp) are alike in some ways: - both provide an interface to run **tasks** in **series and parallel** -- both maintain a **registry of tasks** - Gulp uses [Undertaker](https://github.com/gulpjs/gulp) and Waterwheel uses a [Redux](https://github.com/reactjs/redux) store +- both maintain a **registry of tasks** - Gulp uses [Undertaker](https://github.com/gulpjs/gulp) and Watermill uses a [Redux](https://github.com/reactjs/redux) store - both use [async-done](https://github.com/gulpjs/async-done) to catch errors and completion from the task function Yet also differ in some respects: -- Undertaker uses [bach](https://github.com/gulpjs/bach) for composing async functions in serial and parallel, Waterwheel uses its own serial and parallel implementations, which differ in one important way: **objects are passed through between tasks** -- Waterwheel's `task` is not actually a "task function" (i.e. what is sent into async-done), but rather a wrapper around that which has an extensive lifecycle, producing the task function around the middle -- Gulp uses [vinyl-fs](https://github.com/gulpjs/vinyl-fs) to store applied transformations to a source stream of files (e.g. `gulp.src('*.lowercase').pipe(capitalize()).pipe(gulp.dest('uppercase')) )`) while Waterwheel does not: the data sets are too big to be stored in one Buffer -- In addition to serial and parallel, Waterwheel provides extra operators like fork - -Waterwheel was created to enable the development of data **pipelines with modular and swappable components**. As a result, **parameters and options are the first class citizens, while file/folder names are handled automatically**. The streaming nature lets you **mix streams and processes**: e.g. filter search results in a through stream, run a pipeline on each value. +- Undertaker uses [bach](https://github.com/gulpjs/bach) for composing async +functions in serial and parallel, Watermill uses its own serial and parallel +implementations, which differ in one important way: **objects are passed through between tasks** +- Watermill's `task` is not actually a "task function" (i.e. what is sent into +async-done), but rather a wrapper around that which has an extensive lifecycle, producing the task function around the middle +- Gulp uses [vinyl-fs](https://github.com/gulpjs/vinyl-fs) to store applied +transformations to a source stream of files (e.g. `gulp.src('*.lowercase') +.pipe(capitalize()).pipe(gulp.dest('uppercase')) )`) while Watermill does not: +the data sets are too big to be stored in one Buffer +- In addition to serial and parallel, Watermill provides extra operators like +fork + +Watermill was created to enable the development of data **pipelines with +modular and swappable components**. As a result, **parameters and options are the first class citizens, while file/folder names are handled automatically**. The streaming nature lets you **mix streams and processes**: e.g. filter search results in a through stream, run a pipeline on each value. diff --git a/docs/MultipleInput.md b/docs/MultipleInput.md new file mode 100644 index 0000000..da7b53b --- /dev/null +++ b/docs/MultipleInput.md @@ -0,0 +1,246 @@ +# Multiple inputs + +There are two ways multiple inputs can be given to tasks - within the **task +scope** or within the **pipeline scope**. + +## Within task scope + +Within task scope multiple input handling can take into account +bionde-watermill capacities to fetch files using glob patterns (e.g. `*.fas`). +For instance, consider the following task: + +```javascript +const concatTask = task({ + input: '*.fas', + output: '*.fas', + params: {output: 'concat.fas'} +}, ( object ) => { + console.log('input_directory: ', object.dir) + console.log('input_variable: ', object, object.params) + return `cat ${object.input.join(' ')} > ${object.params.output}` + } +) +``` + +This `concatTask` has the ability to fetch several `*.fas` that are present +in the current working directory and pass it to the `operationCreator` +function in task (for further reference refer to [Task](Task.md)). However, +users need to be careful because these glob patterns are not the same as +shell wildcards. So they must be handled by javascript before passing them to + the shell: + + `object.input.join(' ')` - will transform the array of inputs into a string + where each `.fas` file will be separated by a space. + +## Within the pipeline scope + +Users may also have multiple samples that want to run through all the +pipeline. Imagine you have `n` fasta files and you want all of them to go +through the pipeline. Currently this will require some javascript around +tasks and pipelines. +See this example: + +First lets define a function that executes the desired pipeline for each +input file or sample: + +```javascript +// wrap every task and pipeline of a file in a single function (in this case +// 'pipeline' function +const fileFlow = (infile) => { + const concatTask = task({ + // notice that the input file (infile) is here passed as a param of the + // pipeline function + input: infile, + output: '*.fas', + params: {output: 'concat.fas'} + }, ( object ) => { + console.log('input_directory: ', object.dir) + console.log('input_variable: ', object, object.params) + return `cat ${object.input} > ${object.params.output}` + } + ) + + const task0 = task({name: 'coco'}, () => `echo "something0"`) + + const task1 = task({name: 'nut'}, () => `echo "something1"`) + + const task2 = task({name: 'foo'}, () => `echo "something2"`) + + // this returns the pipeline for a file + const pipelineAssemblage = join(concatTask, task0, fork(task1, task2)) + return pipelineAssemblage +} +``` + +Notice that it basically wrap task definition and pipeline call +(`pipelineAssemblage`) and returns it. + +Then is just a matter of cycling through each file in current working +directory (for example): + +```javascript +// checks for files in current working directory (where the script is executed) +fs.readdir(process.cwd(), (err, files) => { + files.forEach(infile => { + // checks file extension, in this case '.fas' + if (path.extname(infile) === '.fas') { + // executes the pipeline function for each infile + const pipelineMaster = fileFlow(infile) + pipelineMaster() + } + }) +}) +``` + +This will execute the function that wraps the tasks and pipeline +(`pipelineAssemblage`) and executes it each time a input file is given to +this function. This will render one graph for each input file provided for +the pipeline function (`fileFlow`). + +### Concurrency + +As you may imagine such behavior without any control may end up using more +resources (either CPU or memory) than we have. So one quick way to solve +this, would be to use `bluebird`: + +```javascript +const Promise = require('bluebird') +``` + +You can even pass a argument to be read from the shell: +```javascript +// makes concurrency the third argument of code in shell +const concurrency = parseFloat(process.argv[2] || "Infinity") +``` + +And call it like this: + +```shell +node pipeline.js 2 +``` + +This will tell that you want to have 2 files running at the same time. +Then is just a matter of returning a `bluebird` `Promise.map` with +`concurrency`: + +```javascript +// checks for files in current working directory (where the script is executed) +fs.readdir(process.cwd(), (err, files) => { + const desiredFiles = [] + files.forEach(infile => { + // checks file extension, in this case '.fas' and creates an array with + // just these files + if (path.extname(infile) === '.fas') { + desiredFiles.push(infile) + } + }) + // bluebird Promise.map is used here for concurrency where we can + // limit the flow of files into the pipeline. So, if concurrency is set + // to 1 it will first execute the first file, then the second and so on. + // But if 2 it will execute the first 2 files and then the other 2. + return Promise.map(desiredFiles, (infile) => { + const pipelineMaster = fileFlow(infile) + return pipelineMaster() + }, {concurrency}) +}) +``` + +Note that it is quite similar to the example above but returns a `Promise +.map`, which also return the pipeline executions, instead of just executing the +pipeline for each input file. This is one simple way to empower the user to +control the resources used by their scripts. + +### Pratical example + +Imagine we have a couple of samples we want to get from NCBI SRA and map them + against a given reference using both `bowtie2` and `bwa`. + + So first we have to define a task that needs to fetch the reference data and + this task must be run just once: + + ```javascript + // some config variables that will be used in this pipeline +const config = { + name: 'Streptococcus pneumoniae', + sraAccession: ['ERR045788', 'ERR016633'], + referenceURL: 'http://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/007/045/GCF_000007045.1_ASM704v1/GCF_000007045.1_ASM704v1_genomic.fna.gz' +} + +// === TASKS === + +// first lets get the reference genome for our mapping +const getReference = task({ + params: { url: config.referenceURL }, + output: '*_genomic.fna.gz', + dir: process.cwd(), // this sets directory for outputs! + name: `Download reference genome for ${config.name}` +}, ({ params, dir }) => { + const { url } = params + const outfile = url.split('/').pop() + + // essentially curl -O + return request(url).pipe(fs.createWriteStream(outfile)) +}) +``` + +Then what we need is a way for all other tasks to occur after `getReference` +task: + +```javascript +// task that defines how we get samples +//then get samples to work with +const getSamples = (sraAccession) => task({ + params: { + db: 'sra', + accession: sraAccession + }, + output: '**/*.sra', + dir: process.cwd(), // Set dir to resolve input/output from + name: `Download SRA ${sraAccession}` +}, ({ params }) => `bionode-ncbi download ${params.db} ${params.accession}` +) + +// ...other tasks... +// the pipeline can be then called like this +getReference().then(results => { + const pipeline = (sraAccession) => join( + getSamples(sraAccession), + fastqDump, + gunzipIt, + fork( + join(indexReferenceBwa, bwaMapper), + join(indexReferenceBowtie2, bowtieMapper) + ) + ) +// then fetches the samples and executes the remaining pipeline + for (const sra of config.sraAccession) { + const pipelineMaster = pipeline(sra) + pipelineMaster().then(results => console.log("Results: ", results)) + } +}) +``` + +Notice how the pipeline is ran twice (given that we have two inputs in an +array (`config.sraAccession`)). And notice that we have passed an argument to +`getSamples` task which is each `config.sraAccession`. + +This behavior will result in one vertex with the `getReference` task, which +outputs become available to each `pipeline` that each sra sample triggers. +So, we end up with a `pipeline` for each sample as well (two in this case). + +> This is possible because we rely on the `getReference` task to save the +outputs on current woring directory. Then the api searches first for outputs +that are generated within each bionode-watermill pipeline `currentCollection` + and then in the current working directory for a matching file (with the + expected glob pattern). +> +> This matching is done by the `matchee` function in +`lib/lifecycle/resolve-input.js`. +> +> The fact that bionode-watermill also searches for inputs under current +working directory is what allows to run a pipeline after another and still +get reference to the previous pipeline (in the example given above). +Therefore, use it as you please but take into account that **bionode-watermill +cannot map every folder within your file system** and **`currentCollection` +just +saves reference to the files that were run within a pipeline**. diff --git a/docs/Orchestration.md b/docs/Orchestration.md index f1c7d1d..aa0989b 100644 --- a/docs/Orchestration.md +++ b/docs/Orchestration.md @@ -1,35 +1,13 @@ # Orchestration -A **task** is the fundamental **unit** for building pipelines. It +Bionode-watermill has three main orchestrators: **join**, **junction** and +**fork**. In current API, these orchestrators gather a group of tasks that +are described inside them and apply different effects on the pipeline, in +which they are used. -* has a **unique input/output pair** defined by glob pattern(s) and/or streams -* has a **single params object**. Params should be used when they can be applied - to a task with the same I/O but alter the output. e.g. command flags -* **emits** `task.finish` with a reference to that task object in state +## Join -*Example* - -```javascript -/* - * Usage: samtools index [-bc] [-m INT] [out.index] - * Options: - * -b Generate BAI-format index for BAM files [default] - * -c Generate CSI-format index for BAM files - * -m INT Set minimum interval size for CSI indices to 2^INT [14] - */ - -const samtoolsIndex = task({ - input: '*.bam', - output: '*.bai', - params: { format: 'b' } - name: 'samtools index' -}, ({ input, params }) => shell(`samtools index -${params.format} ${input}`)) - -samtoolsIndex() - .on('task.finish', (results) => console.log(results.resolvedOutput)) -``` - -A **join** is used to run a **sequence of tasks in order**. It +A `join` is used to run a **sequence of tasks in order**. It * must pass the `trajectory` between tasks (including the trajectory passed into `join`) * **emits** `join.finish` with a `trajectory` @@ -41,47 +19,131 @@ A **join** is used to run a **sequence of tasks in order**. It // task1 will resolve input to filesystem, // task2 will resolve input to files at collection nodes defined by its trajectory // (which will be created based on the trajectory task1 returns and join passes to task2) -const joined = join(task1, task2) - -joined() - .tasks((t1, t2) => { - t1.on('task.finish', (results) => { - anotherTask(results.trajectory)() - .on('task.finish', () => console.log('another task finished')) - }) - }) - .on('join.finish', (results) => console.log(results.trajectory)) -// Which looks like: -// task1 -> task2 -// task1 -> anotherTask - -// NOTE: This is only recommended for advanced or dynamic cases. Try to avoid -// retreiving task references from join or parallel. -// Another way to do it: -const pipeline = join(task1, parallel(task2, anotherTask), task3) -// However, in this case, task3 will only start after task2 and anotherTask finish -// task1 -> task2 ⌉ -// | -> task3 -// task1 -> anotherTask ⌋ -// Alternatively you could do: -const pipeline = join(task1, parallel(join(task2, task3), anotherTask)) -// task1 -> task2 -> task3 -// task1 -> anotherTask +const pipeline = join(task1, task2) +// executes the pipeline +pipeline() ``` -A **parallel** is used to run **a set of tasks simultaneously**. It +The above represented `pipeline` will first run `task1` and **only after +`task1`** is + finished, starts running `task2`. -* must pass the trajectory passed into `parallel` into **each task** -* **emits** `parallel.finish` with a list of trajections +## Junction + +A ``junction`` is used to run **a set of tasks simultaneously** and **waits +for the results** of all tasks within junction before running anythin else. +. It + +* must pass the trajectory passed into `junction` into **each task** +* **emits** `junction.finish` with a list of trajections * returns/emits reference to each task +*Example* + ```javascript -const parallelized = parallel(taskA, taskB) +// executes both taskA and taskB at the same time +// however one task may finish before the other depending on the tasks itself +const pipeline2 = junction(taskA, taskB) +// executes the pipeline +pipeline2() +``` -parallelized() - .tasks((a, b) => { /* do stuff if u want */ }) - .on('parallel.finish', (results) => { - console.log(results.trajectory) // an array - }) +The above represented `pipeline2` will run both tasks (`taskA` and `taskB`) at the +same time. + +## Fork + +A `fork` is used to run `a set of tasks simultaneously` but **it does not +wait for the results** from all tasks within `fork`. Instead, it will branch +the pipeline in the sense that each task within `fork` will have their own +set of downstream tasks. It + +* must **multiply each downstream task** (tasks proceeding the `fork`) as many +times +as the `fork` branches (number of tasks within `fork`). +* currently uses `join` operator to create each branch. + +*Example* + +```javascript +// executes task 1 and task 2 at the same time +// However it does not wait for task1 or task2 to finish before executing +// task3 after task1 and after task2 +const pipeline3 = join(fork(task1, task2), task3) +// executes the pipeline +pipeline3() ``` +The above referenced `pipeline3` will run both tasks (`task1` and `task2`) +simultaneously and will run downstream task (`task3`) twice, after `task1` and +after +`task2`, respectively. + +### Conceptual difference between junction and fork + +While `junction` is a tree with several branches but that end up in the same +leaf, `fork` is a tree with several branches that each one end up on its own +leaf. + +Therefore, `junction` should be used everytime the user wants to +**wait** for the results from the tasks within `junction` and then perform +other tasks: + +```javascript +const pipeline4 = join(junction(taskA, taskB), taskC) +``` + +In `pipeline4` example `taskA` and `taskB` will be executed simultaneously but +then `taskC` will be waiting for the `task.finish` event for both `taskA` and + `taskB`. This behavior is most useful when `taskC` inputs depend on both + tasks (`taskA` and `taskB`) outputs. + + On the other hand, `fork` should be used everytime the user **do not want to + wait** for all tasks to finish before running downstream tasks: + + ```javascript +const pipeline5 = join(fork(task1, task2), task3) +``` + +In `pipeline5`, if for instance `task1` is finished but `task2` is not, +`task3` will run in a independent branch after `task1`. Then, when `task2` is + finished, `taks3` will be run again after `task2`. In this case we will end + up with two branches: + + ```javascript +taks1 --> task3 +task2 --> task3 +``` + +### Concurrency limitations of fork and junction + +`junction` and `fork` may start running n number of processes and consume x +RAM, since they fire several tasks at the same time. Therefore, user must +have this into account. + +Imagine that `fork`(or `junction`) branches the pipeline into 8 independent +branches: + +```javascript +const pipeline6 = join( + fork(task1, task2, task3, task4, task5, task6, task7, task8), + finalTask +) +``` + +Now imagine that each task uses 1 CPU and you only have 4 CPUs available, +this will endup consuming more CPUs than we wanted. So, in current API users +must handle this manually: + +```javascript +// first run +const pipeline6 = join( + fork(task1, task2, task3, task4), + finalTask +) +// and then +const pipeline6_2 = join( + fork(task5, task6, task7, task8), + finalTask +) +``` diff --git a/docs/PreviousReadme.md b/docs/PreviousReadme.md new file mode 100644 index 0000000..28e2873 --- /dev/null +++ b/docs/PreviousReadme.md @@ -0,0 +1,101 @@ +# bionode-watermill + +[![npm version](https://badge.fury.io/js/bionode-watermill.svg)](https://badge.fury.io/js/bionode-watermill) [![node](https://img.shields.io/badge/node-v6.x-blue.svg)]() [![Build Status](https://travis-ci.org/bionode/bionode-watermill.svg?branch=dev)](https://travis-ci.org/bionode/bionode-watermill) [![codecov.io](https://codecov.io/github/bionode/bionode-watermill/coverage.svg?branch=master)](https://codecov.io/github/bionode/bionode-watermill?branch=master) + +*Watermill: A Streaming Workflow Engine* + +[![NPM](https://nodei.co/npm/bionode-watermill.png?downloads=true&stars=true)](https://nodei.co/npm/bionode-watermill/) + + +- [CWL?](#cwl) +- [What is a task?](#what-is-a-task) +- [What are orchestrators?](#what-are-orchestrators) +- [Check out bionode-watermill tutorial!](#check-out-bionode-watermill-tutorial) +- [Example pipelines](#example-pipelines) +- [Why bionode-watermill?](#why-bionode-watermill) +- [Who is this tool for?](#who-is-this-tool-for) + +Watermill lets you *orchestrate* **tasks** using operators like **join**, **junction**, and **fork**. Each task has a [lifecycle](https://thejmazz.gitbooks.io/bionode-watermill/content/TaskLifecycle.html) where + +1. Input [glob patterns](https://github.com/isaacs/node-glob) are resolved to absolute file paths (e.g. `*.bam` to `reads.bam`) +2. The **operation** is ran, passed resolved input, params, and other props +3. The operation completes. +4. Output glob patterns are resolved to absolute file paths. +5. Validators are ran over the output. Check for non-null files, can pass in custom validators. +6. Post-validations are ran. Add task and output to DAG. + +## CWL? + +Coming soon. + +## What is a task? + +A `task` is the fundamental unit pipelines are built with. For more details, see [Task](https://thejmazz.gitbooks.io/bionode-watermill/content/Task.html). At a glance, a task is created by passing in **props** and an **operationCreator**, which will later be called with the resolved input. Consider this task which takes a "lowercase" file and creates an "uppercase" one: + +```javascript +const uppercase = task({ + input: '*.lowercase', + output: '*.uppercase' +}, function(resolvedProps) { + const input = resolvedProps.input + + return fs.createReadStream(input) + .pipe(through(function(chunk, enc, next) { + next(null, chunk.toString().toUpperCase()) + }) + .pipe(fs.createWriteStream(input.replace(/lowercase$/, 'uppercase'))) +}) +``` + +A "task declaration" like above will not immediately run the task. Instead, the task declaration returns an "invocable task" that can either be called directly or used with an orchestration operator. Tasks can also be created to **run shell programs**: + +```javascript +const fastqDump = task({ + input: '**/*.sra', + output: [1, 2].map(n => `*_${n}.fastq.gz`), + name: 'fastq-dump **/*.sra' +}, ({ input }) => `fastq-dump --split-files --skip-technical --gzip ${input}` ) +``` + +## What are orchestrators? + +Orchestrators are functions which can take tasks as params in order to let you compose your pipeline from a high level view. This **separates task order from task declaration**. For more details, see [Orchestration](https://thejmazz.gitbooks.io/bionode-watermill/content/Orchestration.html). At a glance, here is a complex usage of `join`, `junction`, and `fork`: + +```javascript +const pipeline = join( + junction( + join(getReference, bwaIndex), + join(getSamples, fastqDump) + ), + trim, mergeTrimEnds, + decompressReference, // only b/c mpileup did not like fna.gz + join( + fork(filterKMC, filterKHMER), + alignAndSort, samtoolsIndex, mpileupAndCall // 2 instances each of these + ) +) +``` + +## Check out bionode-watermill tutorial! + +- [Try out bionode-watermill tutorial](https://github.com/bionode/bionode-watermill-tutorial) + +## Example pipelines + +- [Toy pipeline with shell/node](https://github.com/bionode/bionode-watermill/blob/master/examples/pipelines/pids/pipeline.js) +- [Simple capitalize task](https://github.com/bionode/bionode-watermill/blob/master/examples/pipelines/capitalize/capitalize.js) +- [Simple SNP calling](https://github.com/bionode/bionode-watermill/blob/master/examples/pipelines/variant-calling-simple/pipeline.js) +- [SNP calling with filtering and fork](https://github.com/bionode/bionode-watermill/blob/master/examples/pipelines/variant-calling-filtered/pipeline.js) +- [Mapping with bowtie2 and bwa](https://github.com/bionode/bionode-watermill/tree/master/examples/pipelines/two-mappers) + +## Why bionode-watermill? + +[This blog post](https://jmazz.me/blog/NGS-Workflows) +compares the available tools to deal with NGS workflows, explaining the +advantages of each one, including **bionode-watermill**. + +## Who is this tool for? + +Bionode-watermill is for **programmers** who desire an efficient and easy-to-write methodology for developing complex and dynamic data pipelines, while handling parallelization as much as possible. Bionode-watermill is an npm module, and is accessible by anyone willing to learn a little JavaScript. This is in contrast to other tools which develop their own DSL (domain specific language), which is not useful outside the tool. By leveraging the npm ecosystem and JavaScript on the client, Bionode-watermill can be built upon for inclusion on web apis, modern web applications, as well as native applications through [Electron](http://electron.atom.io/). Look forward to seeing Galaxy-like applications backed by a completely configurable Node API. + +Bionode-watermill is for **biologists** who understand it is important to experiment with sample data, parameter values, and tools. Compared to other workflow systems, the ease of swapping around parameters and tools is much improved, allowing you to iteratively compare results and construct more confident inferences. Consider the ability to construct your own [Teaser](https://genomebiology.biomedcentral.com/articles/10.1186/s13059-015-0803-1) for *your data* with a *simple syntax*, and getting utmost performance out of the box. diff --git a/docs/README.md b/docs/README.md index 10f8601..63dc4c5 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1,9 +1,16 @@ ## Table of Contents * [Beginner Walkthrough](BeginnerWalkthrough.md) -* [Task](Task.md) - - [Lifecycle](TaskLifecycle.md) -* [Orchestration](Orchestration.md) -* [Comparison](Comparison.md) -* [Internals](Internals.md) - +* [Graph visualization tool](D3Visualization.md) +* API + * [Task](Task.md) + - [Lifecycle](TaskLifecycle.md) + - [Multiple input](MultipleInput.md) + * [Orchestration](Orchestration.md) + * [Forkception](Forkception.md) + * [Uids](Uid.md) + * [Comparison](Comparison.md) + * [Internals](Internals.md) +* Example Pipelines + * [two-mappers](../examples/pipelines/two-mappers/README.md) - a pipeline + that uses two mappers for benchmark purposes. diff --git a/docs/Task.md b/docs/Task.md index b07c308..47563a3 100644 --- a/docs/Task.md +++ b/docs/Task.md @@ -11,92 +11,62 @@ const task = watermill.task const { task } = watermill ``` -## API - `task` takes two parameters: **props** and **operationCreator**: ```javascript const myTask = task(props, operationCreator) ``` +## props + **props** is an object with the following structure: +### Input / output tasks + ```javascript const props = { - input: 'foo.txt', // valid input, see below - output: 'bar.txt', // valid output, see below + input: '*.txt', // valid input, see below + output: '*.txt', // valid output, see below name: 'My Task', + params: { output: 'bar.txt' }, //the actual output name that can be passed to + // operationCreator or even other params that you may wish to pass to + // operationCreator. alwaysRun: true // force rerun even if output already exists // other options, see options } ``` -*input* and *output* are required for tasks that deal with files. If either is not provided, it will be assumed the task is then a *streaming task* - i.e., it is a duplex stream with writable and/or readable portions. Consider: +*input* and *output* patterns are required for tasks that deal with files. +*params.output* allows to name the output files, while *output* is used to +check if output was properly resolved. +*Example* ```javascript -const throughCapitalize = through(function (chunk, env, next) { - // through = require('through2') - a helper to create through streams - // takes chunk, its encoding, and a callback to notify when complete pushing - // push a chunk to the readable portion of this through stream with - this.push(chunk.toString().toUpperCase()) - // then call next so that next chunk can be handled - next() -}) - -const capitalize = task({ - name: 'Capitalize Through Stream' -}, -// Here, input is a readable stream that came from the previous task -// Let's return a through stream that takes the input and capitalizes it -({ input }) => input.pipe(throughCapitalize) ) -``` - -You could connect `capitalize` to a readable and writable file stream with: - -```javascript -const readFile = ({ - input: '*.lowercase', - name: 'Read from *.lowercase' -}, ({ input }) => { - const rs = fs.createReadStream(input) - // Add file information to stream object so we have it later - rs.inFile = input -}) - -const writeFile = ({ - output: '*.uppercase', - name: 'Write to *.uppercase' -}, ({ input }) => fs.createWriteStream(input.inFile.swapExt('uppercase'))) - -// Can now connect the three: -join(readFile, capitalize, writeFile) -``` - -Of course, this could be written as one single task. This is somewhat simpler, but the power of splitting up the read, transform, and write portions of a task will become apparent once we can provide multiple sets of parameters to the transform and observe the effect, *without having to manually rewire input and output filenames*. As a single task the above would become: - -TODO introduce single task version first - -```javascript -const capitalize = task({ - input: '*.lowercase', - output: '*.uppercase', - name: 'Capitalize *.lowercase -> *.uppercase' -}, ({ input }) => - fs.createReadStream(input) - .pipe(throughCapitalize) - .pipe(fs.createWriteStream(input.swapExt('lowercase'))) +// example task with input/output files +const task = ({ + input: '*.txt', + output: '*.txt', + params: { + output: 'bar.txt' + } +}, ({ input, params }) => `cp ${input} ${params.output}` ) ``` -It is fine to run with no task name, a hashed one will be made for you. However, properly named tasks will help greatly reading pipeline output +## operationCreator -**operationCreator** is a function that will be provided with a **resolved props object**. `operationCreator` should return a **stream** or a **promise**. If the operation creator does not return a stream, it will be wrapped into a stream internally (e.g. `StreamFromPromise`). An example operation creator is +**operationCreator** is a function that will be provided with a **resolved +props object** and that is responsible for the execution of the task itself. +`operationCreator` should return a **stream** or a **promise**. If the operation +creator does not return a stream, it will be wrapped into a stream internally +(e.g. `StreamFromPromise`). An example operation creator is +*Example* ```javascript function operationCreator(resolvedProps) { const fs = require('fs') const intoStream = require('into-stream') - const ws = intoStream(resolvedProps.input).pipe( fs.createWriteStream('bar.txt') ) + const ws = intoStream(resolvedProps.input).pipe(fs.createWriteStream('bar.txt') ) return ws } ``` @@ -104,14 +74,43 @@ function operationCreator(resolvedProps) { > *Note* > > With assignment destructuring and arrow functions, you can write cleaner operation creators: +> +>```javascript +>const operationCreator = ({ input }) => intoStream(input).pipe(fs.createWriteStream('bar.txt')) +>``` + +### Shell commands + +Bionode-watermill allows users to both execute code in javascript within +`operationCreator` or run unix **shell commands** within +tasks by returning a string (using ES6 template literals) from +`operationCreator`: + +*Example* ```javascript -const operationCreator = ({ input }) => intoStream(input).pipe(fs.createWriteStream('bar.txt')) +// creates a test.txt file +const operationCreator = () => `touch test.txt` +// or non ES6 code style +function operationCreator() { + return `touch test.txt` +} ``` ## Input and Output -The `input` and `output` objects can be a **string glob pattern**, or a plain object of them. TODO an array will introduce task forking. The glob will be **resolved to an absolute path** when it is passed to the `operationCreator`. +The `input` and `output` objects can be a **string glob pattern**, or a plain +object of them. The glob will be +**resolved to an absolute path** when it is passed to the `operationCreator`. +Bionode-watermill manages input and output files and folders run by a `task`. + All inputs and outputs are saved within `data` folder (generated by running + bionode-watermill) in the current working directory. The input files of the + first task can be elsewhere, outside `data` folder. Inside the `data` + folder, other subdirectories are created (one per `task`) with `uid` (unique + id) of + its + respective task. This `uid` is generated by `bionode-watermill` given the + props of the task and their parent tasks (check [Uid](Uid.md)). For example, @@ -122,7 +121,7 @@ For example, will resolve to something like: ```javascript -{ input: '/data/ERR1229296.sra' } +{ input: '/data//ERR1229296.sra' } ``` And @@ -141,37 +140,63 @@ will resolve to something like: ```javascript { input: { - reference: '/data/GCA_000988525.2_ASM98852v2_genomic.fna.gz', - reads: ['/data/ERR1229296_1.fastq.gz', '/data/ERR1229296_2.fastq.gz'] + reference: '/data//GCA_000988525.2_ASM98852v2_genomic.fna.gz', + reads: ['/data//ERR1229296_1.fastq.gz', '/data/ERR1229296_2.fastq.gz'] } } ``` -If `input` is not provided, the `operation` will be a duplexify stream with the writable portion set. Similarly, if `output` is not provided, the `operation` will be a duplexify stream with the readable portion set. If neither is provided, both the readable and writable portions will be set: the task becomes a *through task*. +### Input Resolution -An example *through task*: +Match to filesystem if in first task in pipeline, using current working +directoty to search for the input files. ```javascript -const filterSpecies = task({ - name: 'Filter species by taxonomy', - params: { taxonomy: 'plantae' } -}, -({ input, params }) => input.pipe(through.obj(function (chunk, enc, next) { - if (chunk.taxonomy === params.taxonomy) { - this.push(chunk) - } - next() - })) -) +// this is our very first task glob pattern +'*.fas' +// Resolved Input +'./some.fas' ``` -## Resolution +Otherwise glob patterns are matched + to the **collection**. **collection** allows bionode-watermill to search + within `./data` folder for the **first** folder that has files that match the + glob pattern. Have + into account that it crawls the tasks tree from downstream to upstream + until it finds the desired match and that this is **run specific**, i.e., + `currentCollection` can only reference outputs and inputs generated by the + pipeline. Otherwise, bionode-watermill will also look in current working + directory for a file that matches the desired glob pattern. + +```javascript +// now, this is not the first task +'*.fas' +// Resolved Input +'./data//some.fas' +``` -Input resolution is a **reduction over the task props**. +Notice the difference in the location of the inputs. + +Also and importantly, if you have multiple inputs to pass to a task, make +sure they are in the same directory. If it is the first task make sure these +inputs are all in the current working directory. Same for tasks that fetch inputs +from `./data`, assure that every file is within a folder. If the desired +outputs are spread through several folders and their file extension is the +same, you will have to add something to the glob pattern suffix. For example, + + ```javascript +// we have two fasta files in different directories +// fasta1.fas and fasta2.fas +const someTask = task({ + input: ['*1.fas', '*2.fas'] +}, operationCreator +) +``` -### Input Resolution +So, if this is your case, make sure your glob pattern is unique enough for +not matching other undesired files. -Match to filesystem if in first task in pipeline, otherwise glob patterns are matched to the **collection**. +> For more details on multiple inputs check this [link](MultipleInput.md). ### Output Resolution @@ -182,13 +207,13 @@ The resolved values can be accessed from `myTask.resolvedOutput` after the task // Output '*.sra' // Resolved Output -'/data/human.sra' +'./data//human.sra' // Array of items // Output ['*.bam', '*.fastq.gz'] // Resolved Output -['/data/reads.bam', '/data/human.fastq.gz'] +['./data//reads.bam', './data//human.fastq.gz'] // Plain object of items // Output @@ -198,7 +223,112 @@ The resolved values can be accessed from `myTask.resolvedOutput` after the task } // Resolved Output { - reads: ['human_1.fastq.gz', 'human_2.fastq.gz'], - alignment: 'reads.bam' + reads: ['./data//human_1.fastq.gz', './data//human_2.fastq.gz'], + alignment: './data//reads.bam' } ``` + +To sum up, a **task** is the fundamental **unit** for building pipelines. It + +* has a **unique input/output pair** defined by glob pattern(s) and/or streams +* has a **single params object**. Params should be used when they can be applied + to a task with the same I/O but alter the output. e.g. command flags +* **emits** `task.finish` with a reference to that task object in state + +*Example* + +```javascript +/* +* Usage: samtools index [-bc] [-m INT] [out.index] +* Options: +* -b Generate BAI-format index for BAM files [default] +* -c Generate CSI-format index for BAM files +* -m INT Set minimum interval size for CSI indices to 2^INT [14] +*/ + +const samtoolsIndex = task({ + input: '*.bam', + output: '*.bai', + params: { format: 'b' } + name: 'samtools index' +}, ({ input, params }) => shell(`samtools index -${params.format} ${input}`)) + + +samtoolsIndex() + .on('task.finish', (results) => console.log(results.resolvedOutput)) +``` + +## Streamable tasks potential + +**Warning**: the following content is just a conceptual idea for +bionode-watermill that **is still not implemented**. + + +If either (input or output) is +not provided, it will be assumed the task is then a *streaming task* - i.e., it +is a duplex stream with writable and/or readable portions. Consider: + +```javascript +const throughCapitalize = through(function (chunk, env, next) { + // through = require('through2') - a helper to create through streams + // takes chunk, its encoding, and a callback to notify when complete pushing + // push a chunk to the readable portion of this through stream with + this.push(chunk.toString().toUpperCase()) + // then call next so that next chunk can be handled + next() +}) +```` + +You could connect `capitalize` to a readable (`readFile`) and writable +(`writeFile`) file +stream with: + +```javascript +const capitalize = task({ + name: 'Capitalize Through Stream' +}, +// Here, input is a readable stream that came from the previous task +// Let's return a through stream that takes the input and capitalizes it +({ input }) => input.pipe(throughCapitalize) ) +``` + +```javascript + +const readFile = task({ + input: '*.lowercase', + name: 'Read from *.lowercase' +}, ({ input }) => { + const rs = fs.createReadStream(input) + // Add file information to stream object so we have it later + rs.inFile = input +}) + +const writeFile = task({ + output: '*.uppercase', + name: 'Write to *.uppercase' +}, ({ input }) => fs.createWriteStream(input.inFile.swapExt('uppercase'))) + +// Can now connect the three: +join(readFile, capitalize, writeFile) +``` + +Of course, this could be written as one single task. This is somewhat simpler, +but the power of splitting up the read, transform, and write portions of a task +will become apparent once we can provide multiple sets of parameters to the +transform and observe the effect, *without having to manually rewire input and +output filenames*. As a single task the above would become: + +```javascript +const capitalize = task({ + input: '*.lowercase', + output: '*.uppercase', + name: 'Capitalize *.lowercase -> *.uppercase' +}, ({ input }) => + fs.createReadStream(input) + .pipe(throughCapitalize) + .pipe(fs.createWriteStream(input.split('/').slice(0, -1).join('/') + + 'uppercase')) +) +``` + +It is fine to run with no task name, a hashed one will be made for you. However, properly named tasks will help greatly reading pipeline output diff --git a/docs/Uid.md b/docs/Uid.md new file mode 100644 index 0000000..135fbf2 --- /dev/null +++ b/docs/Uid.md @@ -0,0 +1,37 @@ +# Uid + +In bionode-watermill we use **uids**, that are basically hashes. These `uids` + are used to control if a task is already run or not. For instance, imagine + you have ran part of your pipeline before and it failed somewhere in the + middle. When you attempt to run your `pipeline.js` script again, + bionode-watermill will know that some of your tasks have already run and + will not run them again. Instead it will be able to crawl into the `./data` + folder too look for the inputs required for the tasks that still need to be + run. + + ## uid at the task level + + Uid at the task level is generate in two different ways: +* First, it generated given the `props` (`input`, `output` and `params`) +passed by +the user to the task +definition (this is generated under `lib/reducers/task.js`). Let's call this +`uid` the *task default* `uid` (before running). + +* Second, if we want a task to be ran twice in the same pipeline it cannot +have the same `uid` otherwise bionode-watermill will not allow the second +execution of the pipeline. However, it can properly solve this because there +is a second level of `uid` generation while the task is running. Therefore, +a task `uid` is modified on running to get its *final or run* `uid`. This new + `uid` is generated taking into account *task default* `uid` and its parent + tasks `uids` and making a unique hash of all these `uids`. This renders that + the same task can be ran twice in the pipeline **if** their `trajectory` is + different, i.e., **if** they have different parent tasks. + +## uid at the orchestrator level + +Orchestrators also have uids that are basically an hash of the `uids` of the +tasks contained in them. These `uids`are used in `lib/orchestrators/join.js` to +control the `uid` generation for `fork` because each *downstream task* after a +`fork` must be multiplied as many times as the tasks inside fork (for further +details on this see [Forkception](Forkception.md)). diff --git a/examples/pipelines/capitalize/alphabet.source.txt b/examples/pipelines/capitalize/alphabet.source.txt index b0883f3..3b18e51 100644 --- a/examples/pipelines/capitalize/alphabet.source.txt +++ b/examples/pipelines/capitalize/alphabet.source.txt @@ -1 +1 @@ -abcdefghijklmnopqrstuvwxyz +hello world diff --git a/examples/pipelines/tests/README.md b/examples/pipelines/tests/README.md new file mode 100644 index 0000000..eec0962 --- /dev/null +++ b/examples/pipelines/tests/README.md @@ -0,0 +1,178 @@ +# Graphs + +## simple_join.js + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/join_graph/index.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/join_graph/join_graph.png) + +## simple_junction.js + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/junction_graph/index.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/junction_graph/junction_graph.png) + + +## simple_fork.js + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_graph/index.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_graph/fork_graph.png) + + +## join_join.js + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/join_join/index.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/join_join/result.png) + +Note: this pipeline as `task5` twice and it should render two nodes in d3. + +## junction_junction.js + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/junction_junction/index.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/junction_junction/result.png) + + +## fork_join.js + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_join/index.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_join/fork_join.png) + + +## fork_junction.js + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_junction/index.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_junction/fork_junction.png) + + +## fork_fork_without_join.js + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_without_join_index.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_without_join.png) + + +## fork_fork.js + +### const pipeline + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_pipeline_expected.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_pipeline.png) + + +### const pipeline2 + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_pipeline2_expected.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_pipeline2.png) + + +### const pipeline3 + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_pipeline3_expected.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_pipeline3.png) + + +### const pipeline4 + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_pipeline4_expected.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_pipeline4_result_correct.png) + + +### const pipeline5 + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_junction_expected.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_junction.png) + + +### const pipeline6 + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/4thlevel_fork_expected.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/4thlevel_fork.png) + + +### const pipeline7_2 + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_after_fork.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_fork_duplicated_tasks.png) + + +### const pipeline8_2 + +#### expected graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/junction_after_fork.png) + +#### actual graph + +![](https://github.com/bionode/GSoC17/blob/master/Experimental_code/Experimental_Pipelines/fork_fork/fork_junction_duplicated.png) \ No newline at end of file diff --git a/examples/pipelines/tests/fork_fork.js b/examples/pipelines/tests/fork_fork.js index 8cab9d8..0364e60 100644 --- a/examples/pipelines/tests/fork_fork.js +++ b/examples/pipelines/tests/fork_fork.js @@ -176,6 +176,6 @@ const pipeline8_2 = join( // edit this line to run the desired pipeline. // documentation on the usage of these pipelines may be found in the link below // https://github.com/bionode/GSoC17/blob/master/Journal/Week_11.md -pipeline2().then((results) => { +pipeline7_2().then((results) => { console.log('RESULTSNEW: ', results) }) \ No newline at end of file diff --git a/examples/pipelines/tests/fork_fork_new_test.js b/examples/pipelines/tests/fork_fork_new_test.js deleted file mode 100644 index ca36b82..0000000 --- a/examples/pipelines/tests/fork_fork_new_test.js +++ /dev/null @@ -1,38 +0,0 @@ -'use strict' - -// === WATERMILL === -const { - task, - join, - junction, - fork -} = require('../../..') - -const task0 = task({name: 'task0'}, () => `echo "something0"`) - -const task1 = task({name: 'task1'}, () => `echo "something1"`) - -const task2 = task({name: 'task2'}, () => `echo "something2"`) - -const task3 = task({name: 'task3'}, () => `echo "something3"`) - -const task4 = task({name: 'task4'}, () => `echo "something4"`) - -const task5 = task({name: 'task5'}, () => `echo "something5"`) - -const task6 = task({name: 'task6'}, () => `echo "something6"`) - -const pipeline = join( - task1, - fork( - join( - task2, - fork(task4, task5) - ), - join(task3, - fork(task4, task5) - ) - ) -) - -pipeline() diff --git a/examples/pipelines/tests/join_join.js b/examples/pipelines/tests/join_join.js index f7c62d4..e78d141 100644 --- a/examples/pipelines/tests/join_join.js +++ b/examples/pipelines/tests/join_join.js @@ -8,19 +8,17 @@ const { fork } = require('../../..') -const task0 = task({name: 'coco'}, () => `echo "something0"`) +const task0 = task({name: 'task0'}, () => `echo "something0"`) -const task1 = task({name: 'nut'}, () => `echo "something1"`) +const task2 = task({name: 'task2'}, () => `echo "something2"`) -const task2 = task({name: 'foo'}, () => `echo "something2"`) +const task3 = task({name: 'task3'}, () => `echo "something3"`) -const task3 = task({name: 'bar'}, () => `echo "something3"`) +const task4 = task({name: 'task4'}, () => `echo "something4"`) -const task4 = task({name: 'test'}, () => `echo "something4"`) +const task5 = task({name: 'task5'}, () => `echo "something5"`) -const task5 = task({name: 'test1'}, () => `echo "something5"`) - -const task6 = task({name: 'test6'}, () => `echo "something6"`) +const task6 = task({name: 'task6'}, () => `echo "something6"`) const pipeline = join( task0, diff --git a/examples/pipelines/tests/pipeline_after_pipeline.js b/examples/pipelines/tests/pipeline_after_pipeline.js new file mode 100644 index 0000000..7289937 --- /dev/null +++ b/examples/pipelines/tests/pipeline_after_pipeline.js @@ -0,0 +1,42 @@ +'use strict' + +// === WATERMILL === +const { + task, + join, + junction, + fork +} = require('../../..') + +const task0 = task({name: 'task0'}, () => `echo "something0"`) + +const task1 = task( + { + name: 'task1', + output: '*.txt' + }, () => `touch task1_middle.txt`) + +const task2 = task( + { + name: 'task2', + output: '*.txt' + }, () => `touch task2_middle.txt`) + +const task3 = task( + { + name: 'task3', + output: '*.txt', + input: '*.txt' + }, ({ input }) => `touch ${input}_final`) + +const task4 = task( + { + name: 'task4', + input: '*_middle.txt', + output: '*.txt' + }, ({ input }) => `echo "something4" >> ${input}`) + + +const pipeline1 = join(task0, fork(task1, task2), task3) + +pipeline1().then(task4) \ No newline at end of file diff --git a/examples/pipelines/tests/simple_fork.js b/examples/pipelines/tests/simple_fork.js index d321c92..32de87a 100644 --- a/examples/pipelines/tests/simple_fork.js +++ b/examples/pipelines/tests/simple_fork.js @@ -16,9 +16,6 @@ const task2 = task({name: 'foo'}, () => `echo "something2"`) const task3 = task({name: 'bar'}, () => `echo "something3"`) -const task4 = task({name: 'test'}, () => `echo "something4"`) - - -const pipeline = join(task0, fork(task1, task2), task3, task4) +const pipeline = join(task0, fork(task1, task2), task3) pipeline() \ No newline at end of file diff --git a/examples/pipelines/two-mappers/pipeline_multipleinput.js b/examples/pipelines/two-mappers/pipeline_multipleinput.js new file mode 100644 index 0000000..1280f99 --- /dev/null +++ b/examples/pipelines/two-mappers/pipeline_multipleinput.js @@ -0,0 +1,158 @@ +'use strict' + +/* DISCLAIMER: This is a very experimental pipeline and currently it is not +* solved. If you have an idea on how to solve this problem please make a PR. +* The idea with this pipeline is for the two-mappers pipeline.js to be able +* to process several samples. +*/ + +// === WATERMILL === +const { + task, + join, + junction, + fork +} = require('../../..') + +// === MODULES === + +const fs = require('fs') +const path = require('path') + +const request = require('request') + +// === CONFIG === + +// specifies the number of threads to use by mappers +const THREADS = parseInt(process.env.WATERMILL_THREADS) || 2 + +const config = { + name: 'Streptococcus pneumoniae', + sraAccession: ['ERR045788', 'ERR016633'], + referenceURL: 'http://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/007/045/GCF_000007045.1_ASM704v1/GCF_000007045.1_ASM704v1_genomic.fna.gz' +} + +// === TASKS === + +// first lets get the reference genome for our mapping +const getReference = task({ + params: { url: config.referenceURL }, + output: '*_genomic.fna.gz', + dir: process.cwd(), // this sets directory for outputs! + name: `Download reference genome for ${config.name}` +}, ({ params, dir }) => { + const { url } = params + const outfile = url.split('/').pop() + + // essentially curl -O + return request(url).pipe(fs.createWriteStream(outfile)) + +}) + +//then get samples to work with +const getSamples = (sraAccession) => task({ + params: { + db: 'sra', + accession: sraAccession + }, + output: '**/*.sra', + dir: process.cwd(), // Set dir to resolve input/output from + name: `Download SRA ${sraAccession}` +}, ({ params }) => `bionode-ncbi download ${params.db} ${params.accession}` +) + +// extract the samples from fastq.gz +const fastqDump = task({ + input: '**/*.sra', + output: [1, 2].map(n => `*_${n}.fastq.gz`), + name: 'fastq-dump **/*.sra' +}, ({ input }) => `fastq-dump --split-files --skip-technical --gzip ${input}` +) + +// first lets uncompress the gz +const gunzipIt = task({ + input: '*_genomic.fna.gz', + output: '*.fna', + name: 'gunzipIt task' + }, ({ input }) => `gunzip -c ${input} > ${input.split('.').slice(0,2).join('.')}.fna` +) + +// then index using first bwa ... +const indexReferenceBwa = task({ + input: '*.fna', + output: { + indexFile: ['amb', 'ann', 'bwt', 'pac', 'sa'].map(suffix => + `bwa_index.${suffix}`), + //reference: 'bwa_index.fa' //names must match for bwa - between reference + // and index files + }, + //params: { output: 'bwa_index.fa' }, + name: 'bwa index bwa_index.fna -p bwa_index' +}, ({ input }) => `bwa index ${input} -p bwa_index`) + +// and bowtie2 + +const indexReferenceBowtie2 = task({ + input: '*.fna', + output: ['1.bt2', '2.bt2', '3.bt2', '4.bt2', 'rev.1.bt2', + 'rev.2.bt2'].map(suffix => `bowtie_index.${suffix}`), + params: { output: 'bowtie_index' }, + name: 'bowtie2-build -q uncompressed.fna bowtie_index' + }, ({ params, input }) => `bowtie2-build -q ${input} ${params.output}` + /* for bowtie we had to uncompress the .fna.gz file first before building + the reference */ +) + +// now use mappers with bwa + +const bwaMapper = task({ + input: { + //reference: '*.fa', + reads:[1, 2].map(n => `*_${n}.fastq.gz`), + indexFiles: ['amb', 'ann', 'bwt', 'pac', 'sa'].map(suffix => + `bwa_index.${suffix}`) //pass index files to bwa mem + }, + output: '*.sam', + params: { output: 'bwa_output.sam' }, + name: 'Mapping with bwa...' + }, ({ input, params }) => `bwa mem -t ${THREADS} bwa_index ${input.reads[0]} ${input.reads[1]} > ${params.output}` +) + +// and with bowtie2 + +const bowtieMapper = task({ + input: { + reference: '*_genomic.fna.gz', + reads:[1, 2].map(n => `*_${n}.fastq.gz`), + indexFiles: ['1.bt2', '2.bt2', '3.bt2', '4.bt2', 'rev.1.bt2', + 'rev.2.bt2'].map(suffix => `bowtie_index.${suffix}`) //pass index files to + // bowtie2 + }, + output: '*.sam', + params: { output: 'bowtie2_output.sam' }, + name: 'Mapping with bowtie2...' +}, ({ input, params }) => `bowtie2 -p ${THREADS} -x bowtie_index -1 ${input.reads[0]} -2 ${input.reads[1]} -S ${params.output}` +) + +// === PIPELINE === + +// first gets reference +getReference().then(results => { +// console.log("results:", results.resolvedOutput) + const pipeline = (sraAccession) => join( + getSamples(sraAccession), + fastqDump, + gunzipIt, + fork( + join(indexReferenceBwa, bwaMapper), + join(indexReferenceBowtie2, bowtieMapper) + ) + ) +// then fetches the samples and executes the remaining pipeline + for (const sra of config.sraAccession) { + //console.log("sample:", sra, results.resolvedOutput) + console.log("cwd_check", process.cwd()) + const pipelineMaster = pipeline(sra) + pipelineMaster().then(results => console.log("Results: ", results)) + } +}) diff --git a/lib/lifecycle/resolve-input.js b/lib/lifecycle/resolve-input.js index c4823fe..af99138 100644 --- a/lib/lifecycle/resolve-input.js +++ b/lib/lifecycle/resolve-input.js @@ -2,6 +2,7 @@ const _ = require('lodash') const chalk = require('chalk') +const fs = require('fs') const applicator = require('../utils/applicator.js') const matchToFs = require('../matchers/match-to-fs.js') @@ -60,6 +61,20 @@ const resolveInput = (taskState, DAG, logger) => new Promise((resolve, reject) = } else if (minimatch(path, item)) { console.log(tab(2) + `${chalk.magenta(item)} matched to ${chalk.blue(path)}`) return resolve(path) + } else { + // this checks wether the input is on current working directory + // and tries to match it to input pattern + fs.readdir(process.cwd(), (err, files) => { + if (err) { + throw new Error(err) + } + files.forEach(file => { + if (minimatch(file, item)) { + console.log(tab(2) + `${chalk.magenta(item)} matched to ${chalk.blue(file)}`) + return resolve(file) + } + }) + }) } } diff --git a/lib/reducers/collection.js b/lib/reducers/collection.js index 9a18ab3..46d349f 100644 --- a/lib/reducers/collection.js +++ b/lib/reducers/collection.js @@ -11,7 +11,7 @@ const path = require('path') // define path to index.html, otherwise it will be relative to dir where // scripts is run -let scr = path.join(__dirname, '../../viz/index.html') +const scr = path.join(__dirname, '../../viz/index.html') // Loading the index file . html displayed to the client