<h1><center>Building Reproducible and Scalable Workflows on OSCAR: Nextflow</center></h1>
<p><center>Presenter: Paul Cao</center>
 <center>Center for Computation and Visualization</center>
 <center>Center for Computational Biology of Human Disease - Computational Biology Core</center></p>

Resources for help @brown <br> 

COBRE CBHD Computational Biology Core
- Office hours
- https://cbc.brown.edu
- slack channel on ccv-share
- cbc-help@brown.edu <br>

Center for Computation and Visualization
- Office hours
- https://ccv.brown.edu
- ccv-share slack channel
- https://docs.ccv.brown.edu
- support@ccv.brown.edu

## What is Nextflow?  

Nextflow is workflow management tools that allow users to easily write data-intensive computational **pipelines**. These pipelines, or workflows as they are also called, have the following key features:

- Sequential processing of files
- Usually requires more than one tool
- Multiple programming languages
- Most times each sample is processed individually
- Compute resource intensive
  - Alignment could take 16 cpus, 60 Gb RAM, 4-24 hours, 30Gb of disk space per sample

## Why do we care about these pipelines? 

### Reason 1: Reproducibility 

<img src="./img/reproduce.png" width="700"/>

The journal Nature published a survey that found that more than 70% of researchers have tried and failed to reproduce another scientist's experiments. This trend is hugely problematic because we then can't trust the findings from many studies enough to use them to make data-driven decisions. In short, we need tools and standards that help address the reproducibility crisis in science! 

Pipelines created with Snakemake and Nextflow incorporate version-control and state-of-the-art software tools, known as containers, to manage all software dependencies and create stable environments that can be trusted to reproduce analyses reliably and accurately. 

***Reproducibility is important for producing trustworthy research!***

### Reason 2: Portability

#### What if we need to perform analyses with more resources?

This type of scenario would require us to move our analyses to a different environment, for example, a High Performance Computing (HPC) cluster environment. 

An important feature of Snakemake and Nextflow workflow management tools is that they enable users to easily scale any pipeline written on a personal computer to then run on an HPC cluster such as OSCAR, the HPC cluster we use at Brown University. So now we can run our pipelines using high performance resources without having to change workflow definitions or hard-code a pipeline to a specific setup. As a result, **the code stays constant** across varying infrastructures, thereby allowing portability, easy collaboration, and avoiding lock-in. 

***In short, we can easily move our multi-step analyses (i.e., pipelines) to any place we need them!***

## So Let's See How All This Works! 

### Our Starting Point

Say we have samples from RNASeq that we need to process on OSCAR by performing the following set of actions: 

<img src="./img/rnaseq_flowchart.png" width="700"/>

<h2><center>What do you do??</center></h2>

## A Naive Approach

One solution would be to write a bunch of shell scripts that use various software tools to process the data in the ways we need. 

For example, if we need to run fastqc, trim galore, and then an alignment, we could just write a shell script for each step - so a total of 4 shell scripts in this case - where we have various inputs and outputs. This would look something as follows: 

**Script 1: Fastqc**

```
#!/bin/bash
#SBATCH -t 48:00:00
#SBATCH -n 32
#SBATCH -J rrbs_fastqc
#SBATCH --mem=198GB
#SBATCH --mail-type=ALL
#SBATCH --mail-user=jordan_lawson@brown.edu

source /gpfs/runtime/cbc_conda/bin/activate_cbc_conda
conda activate fedulov_rrbs
for sample in `ls /gpfs/data/cbc/fedulov_alexey/porcine_rrbs/Sequencing_Files/*fastq.gz`
do
align_dir="/gpfs/data/cbc/fedulov_alexey/porcine_rrbs" 
fastqc -o ${align_dir}/fastqc $sample
done
```

**Script 2: Trimming** 

```
#!/bin/bash
#SBATCH -t 48:00:00
#SBATCH -n 32
#SBATCH -J trimmomatic_update
#SBATCH --mem=198GB
#SBATCH --mail-type=ALL
#SBATCH --mail-user=jordan_lawson@brown.edu

source /gpfs/runtime/cbc_conda/bin/activate_cbc_conda

for sample in `ls /gpfs/data/cbc/fedulov_alexey/porcine_rrbs/trim_galore/*_trimmed.fq.gz`
do
    dir="/gpfs/data/cbc/fedulov_alexey/porcine_rrbs/trimmomatic"
    base=$(basename $sample "_trimmed.fq.gz")
    trimmomatic SE  -threads 8 -trimlog ${dir}/${base}_SE.log $sample ${dir}/${base}_tr.fq.gz ILLUMINACLIP:/gpfs/data/cbc/cbc_conda_v1/envs/cbc_conda/opt/trimmomatic-0.36/adapters/TruSeq3-SE.fa:2:30:5:6:true SLIDINGWINDOW:4:20 MINLEN:35
done 
```


**Script 3: Fastqc on trimmed reads**

```
#!/bin/bash
#SBATCH -t 24:00:00
#SBATCH -n 8
#SBATCH -J retrim_fastqc_update
#SBATCH --mem=16GB
#SBATCH --mail-type=ALL
#SBATCH --mail-user=jordan_lawson@brown.edu

source /gpfs/runtime/cbc_conda/bin/activate_cbc_conda
conda activate fedulov_rrbs
for sample in `ls /gpfs/data/cbc/fedulov_alexey/porcine_rrbs/trimmomatic/*_tr.fq.gz`
do
trim_qc_dir="/gpfs/data/cbc/fedulov_alexey/porcine_rrbs"
fastqc -o ${trim_qc_dir}/trimmomatic_qc $sample
done
```

**Script 4: Alignment**

```
#!/bin/bash
#SBATCH -t 24:00:00
#SBATCH -N 1
#SBATCH -n 16
#SBATCH -J bismark_align_update_redo
#SBATCH --mem=160GB
#SBATCH --mail-type=ALL
#SBATCH --mail-user=jordan_lawson@brown.edu
#SBATCH --array=1-18
#SBATCH -e /gpfs/data/cbc/fedulov_alexey/porcine_rrbs/logs/bismark_align_%a_%A_%j.err
#SBATCH -o /gpfs/data/cbc/fedulov_alexey/porcine_rrbs/logs/bismark_align_%a_%A_%j.out

source /gpfs/runtime/cbc_conda/bin/activate_cbc_conda
conda activate fedulov_rrbs
input=($(ls /gpfs/data/cbc/fedulov_alexey/porcine_rrbs/trimmomatic/*_tr.fq.gz)) # using the round brackets indicates that this is a bash array
bismark -o /gpfs/data/cbc/fedulov_alexey/porcine_rrbs/alignments --bowtie2 --genome /gpfs/data/shared/databases/refchef_refs/S_scrofa/primary/bismark_index --un --pbat ${input[$((SLURM_ARRAY_TASK_ID -1))]}
```

## Problems with the Naive Approach 

Using multiple shell scripts to create a makeshift pipeline will work, but it is **inefficient**, can **get complicated fast**, and there are a few **challenges you have to manage**, such as: 

* Making sure you have the appropriate software and all dependencies for each step in the analysis - this can be a lot to stay on top of if you have a pipeline with a lot of steps! (imagine a 10 step process)
* **Portability!** Building and running on different machines is much more work
* Specifying where your output will go 
* Calling in the appropriate input (which is often the output from a previous step) 
* Handling where log files go 
* More labor intensive - we have to stay on top of jobs and monitor when each step finishes and then run next

## A Smarter Approach: Using Workflow Managers! 

The solution for processing your data in a much more efficient manner that handles the aforementioned issues is workflow management tools, such as Snakemake and Nextflow. Let's now learn how to use Snakemake and Nextflow...

## Tutorial: Using workflow management tools on OSCAR 

Workflow management tools are software that allow you to write more efficient, portable computational pipelines. As a result, you are able to optimize your workflows while maintaining reproducibility and rigor. Note that there are many workflow management tools available to researchers, but the two tools we will focus on learning about today are **Snakemake** and **Nextflow**. Let's first start with Nextflow...

### Step 1: The Setup

Let's first discuss setting up our environment on OSCAR so that we can get Snakemake up and running. 

**At this point, I am going to open my terminal on Open OnDemand (OOD) so that I can walk you through and show you how each of these steps and files below look. Feel free to open your terminal as well and follow along. To do so, you can go to OOD at https://ood.ccv.brown.edu and under the Clusters tab at the top select the >_OSCAR Shell Access option. All files used today can be found on GitHub in the folder at: https://github.com/compbiocore/workflows_on_OSCAR**

#### Step 1a: Set up Nextflow Configuration Script using `compbiocore/workflows_on_OSCAR`:

```bash
[pcao5@node1322 ~]$ cd ~/
[pcao5@node1322 ~]$ git clone https://github.com/compbiocore/workflows_on_OSCAR.git
[pcao5@node1322 ~]$ git clone https://github.com/compbiocore/workflows_workshop.git
```

#### Step 1b: Install `compbiocore/workflows_on_OSCAR` package:

```bash
bash ~/workflows_on_OSCAR/install_me/install.sh && source ~/.bashrc
```


For the 1st installation prompt, input `NextFlow`:

```bash
Welcome to a program designed to help you easily set up and run workflow management systems on OSCAR!

Please type which software you wish to use: Nextflow or Snakemake? Nextflow
```

For the 2nd installation prompt, input your GitHub username (e.g., `paulcao-brown`):

```bash
Nextflow software detected, initializing configuration...
What is your GitHub user name? paulcao-brown
What is your GitHub token (we will keep this secret) - [Hit Enter when Done]?
```

#### Step 1c: Create a new GitHub Token and enter it:
<img src="./img/GBGDQhY.png" width="700"/>

#### Step 1d: Complete the Installation 


```bash
Currently the Nextflow default for HPC resources is: memory = 5.GB time = 2.h cpus = 2 
Do you want to change these default resources for your Nextflow pipeline [Yes or No]? No
Keeping defaults!

OUTPUT MESSAGE:

                ******************************************************************
                 NEXTFLOW is now set up and configured and ready to run on OSCAR!
                ******************************************************************
                

Your default resources for Nextflow are: memory = 5.GB time = 2.h cpus = 2 


                To further customize your pipeline for efficiency, you can enter the following 
                label '<LabelName>' options right within processes in your Nextflow .nf script:
                1. label 'OSCAR_small_job' (comes with memory = 4 GB, time = 1 hour, cpus = 1)
                2. label 'OSCAR_medium_job' (comes with memory = 8 GB, time = 16 hours, cpus = 2)
                3. label 'OSCAR_large_job' (comes with memory = 16 GB, time = 24 hours, cpus = 2)
                

README:

Please see https://github.com/compbiocore/workflows_on_OSCAR for further details on how to add the above label options to your workflow.

Note the setup is designed such that pipelines downloaded from nf-core with their own resource specs within the .nf script will override your defaults.

To run Nextflow commands, you must first type and run the nextflow_start command.

To further learn how to easily run your Nextflow pipelines on OSCAR, use the Nextflow template shell script located in your ~/nextflow_setup directory.
```

### Step 2: Run a 'Hello World' Example

#### 2a. hello_world.nf:
```bash
#!/usr/bin/env nextflow
nextflow.enable.dsl=2

params.name = "World"

process sayHello {
  input:
    val name
  output:
    stdout
  script:
    """
    echo 'Hello ${name}!'
    """
}

workflow {
  sayHello(params.name) | view
}
```

#### 2b. Launch the Workflow:
```bash
cd workflows_workshop/workflows
nextflow run hello_world.nf
```

#### 2c. Workflow Output:
![](./img/8bNSCPv.png)

#### 2d. Launch the Workflow (Custom Parameter):
```bash
nextflow run hello_world.nf --name "Bleuno Bear"
```

#### 2e. Workflow Output (Custom Parameter):
![](https://i.imgur.com/YpjvNoe.png)

#### 2f. Inspect Underneath the Hood the Command (e.g.,`52/992fb7`):
```bash
cd work/52/992fb76ea549e001e25b749b615f66/
ls -la
```

##### Nextflow Generated Commands:

![](./img/JK1Ly1s.png)


##### The Actual Command Run:
```bash
cat .command.sh

#!/bin/bash -ue
echo 'Hello Blueno Bear!'
```

##### Slurm Wrapper Command:
```bash 
cat .command.run

#!/bin/bash
#SBATCH -J nf-sayHello
#SBATCH -o /gpfs/data/cbc/workflow_workshop/work/52/992fb76ea549e001e25b749b615f66/.command.log
#SBATCH --no-requeue
#SBATCH --signal B:USR2@30
#SBATCH -c 2
#SBATCH -t 02:00:00
#SBATCH --mem 5120M
#SBATCH -p batch
NXF_CHDIR=/gpfs/data/cbc/workflow_workshop/work/52/992fb76ea549e001e25b749b615f66
# NEXTFLOW TASK: sayHello
...
/bin/bash -ue /gpfs/data/cbc/workflow_workshop/work/52/992fb76ea549e001e25b749b615f66/.command.sh &
...
```

##### Standard Out/Error:
```bash 
cat .command.log
## SLURM PROLOG ###############################################################
##    Job ID : 10159219
##  Job Name : nf-sayHello
##  Nodelist : node1322
##      CPUs : 4
##  Mem/Node : 5120 MB
## Directory : /gpfs/data/cbc/workflow_workshop/work/52/992fb76ea549e001e25b749b615f66
##   Job Started : Wed Jun  7 10:34:20 EDT 2023
###############################################################################

Hello Blueno Bear!
```

### Step 3: Run a 'Word Count' Example (2 Step Workflow) 

#### 3a. word_count.nf:
```bash
#!/usr/bin/env nextflow
nextflow.enable.dsl=2

params.name = "World"

process sayHello {
  input:
    val name
  output:
    path "hello.txt"
  script:
    """
    echo 'Hello ${name}!' > hello.txt
    """
}

process countWords {
  input:
    path(file_in)
  output:
    stdout

  script:
   """
   cat ${file_in}
   wc -w ${file_in} | awk '{print \$1}'
   """
}

workflow {
  countWords(sayHello(params.name)) | view
}
```

#### 3b. Launch the Workflow:
```bash
nextflow run count_words.nf --name "Blueno Bear"
```

#### 3c. Workflow Output:
![](./img/649M0u8.png)

In the next iteration, we want to save the word count to a text file; so we add to `word_count.nf` the following lines:
- add this directive `publishDir "${params.out_dir}/", mode: 'copy'` to `countWords`
- and change the `countWords` function to redirect output to `wc -w ${file_in} | awk '{print \$1}' > count_words.txt`

#### 3d. count_words_and_save.nf

```bash
!/usr/bin/env nextflow
nextflow.enable.dsl=2

params.name = "World"

process sayHello {
  input:
    val name
  output:
    path "hello.txt"
  script:
    """
    echo 'Hello ${name}!' > hello.txt
    """
}

process countWords {
  input:
    path(file_in)
  output:
    path("count_words.txt")

  publishDir "${params.out_dir}/", mode: 'copy'

  script:
   """
   wc -w ${file_in} | awk '{print \$1}' > count_words.txt
   """
}

workflow {
  countWords(sayHello(params.name))
}
```

#### 3e. Launch the Workflow
```bash
nextflow run count_words_and_save.nf --name "Blueno Bear" --out_dir words_out
```

##### Workflow Output (Custom Parameter):
![](./img/nrrpAGV.png)

##### Inspect the output directory `words_out`:
```bash
cat words_out/count_words.txt

3
```

### Step 4. Conditional Logic

Below we want to demonstrate how conditional logic works in Nextflow workflows. We will extend the previous `count_words.nf` to have two boolean flags in our pipline `--reverse` (reverse the words in our final output) and `--count_letters` (if the flag is set, count the letters insteads of words in the final output)

##### Add `reverse` function in the body of the workflow:
```bash
params.reverse = false

workflow {
  hello_result = null

  if (params.reverse) {
    hello_result = reverse(sayHello(params.name))
  } else {
    hello_result = sayHello(params.name)
  }

  count(hello_result) | view
}
```

##### Add `count_letters` function in the function of `count`:
```bash
params.count_letters = false
... 

process count {
  input: 
    path(file_in)
  output:
    stdout
  
  script:
   if (params.count_letters) {
    """
    cat ${file_in}
    wc -m ${file_in} | awk '{print \$1}'
    """
   } else {
    """
    cat ${file_in}
    wc -w ${file_in} | awk '{print \$1}'
    """
   }
}
```

#### count_conditional.nf

```bash
!/usr/bin/env nextflow
nextflow.enable.dsl=2 

params.name = "World"
params.count_letters = false
params.reverse = false

process sayHello {
  input: 
    val name
  output:
    path "hello.txt"
  script:
    """
    echo 'Hello ${name}!' > hello.txt
    """
}

process count {
  input: 
    path(file_in)
  output:
    stdout
  
  script:
   if (params.count_letters) {
    """
    cat ${file_in}
    wc -m ${file_in} | awk '{print \$1}'
    """
   } else {
    """
    cat ${file_in}
    wc -w ${file_in} | awk '{print \$1}'
    """
   }
}

process reverse {
  input: 
    path(file_in)
  output:
    path "reverse_hello.txt"
  
  script:
    """
    cat ${file_in} | rev > "reverse_hello.txt"
    """
}

workflow {
  hello_result = null

  if (params.reverse) {
    hello_result = reverse(sayHello(params.name))
  } else {
    hello_result = sayHello(params.name)
  }

  count(hello_result) | view
}
```

##### Example Outputs:
```bash
nextflow run workflows/count_samplesheet.nf --samplesheet samplesheet/samplesheet.csv
```

```bash
nextflow run workflows/count_conditional.nf

Hello World!
2

nextflow run workflows/count_conditional.nf --reverse
!dlroW olleH
2

nextflow run workflows/count_conditional.nf --reverse --count_letters
!dlroW olleH
13
```

### Step 5. Parsing a Sample Sheet

In real production Bioinformatics workflows, most of times we cannot just get by with specifying a list of fasta, raw or aligned reads. We also need to carry through all of the important metadata associated with a particular sample's sequencing data and provide their contexts to the analysis programs (e.g., Differential Gene Analysis to differentiate between `treatment` and `control` groups; temporal analysis to differentiate between the samples taken at different time points). 

We will demonstrate concept by making a toy counting workflow that takes a samplesheet of documents; and with a specificiation for each document to whether to count the text of the document by letters or words (determined by `count_letters` column).

##### Make a simple samplesheet:
Our toy sample sheet: 

```bash
document,count_letters
samplesheet/hello1.txt,F
samplesheet/hello2.txt,T
```

##### Make a `documents_ch` as a channel of documents; parsed from the samplesheet above:

Using `Channel.fromPath(params.samplesheet).splitCsv(header:true)`, We create a "channel" (or a "stream") of documents; where each entry is a tuple of both the document file itself and a boolean flag whether to count it by words or letters:
`[ document, count_letters ]`.

###### count_samplesheet.nf:
```bash
...
// convert each row in the samplesheet into a tuple of File object and a Boolean of count_letter
def get_document_info(LinkedHashMap sample) {
    document  = sample.document ? file(sample.document, checkIfExists: true) : null
    count_letters = (sample.count_letters == "T") || (sample.count_letters == "true") ? true : false   

    return [ document, count_letters ]
}
...
workflow {
     // create a Channel of documents to count either by word or by letter, from the samplesheet
     Channel.fromPath(params.samplesheet).splitCsv(header:true)
            .map { get_document_info(it) }.set { documents_ch }
            
     // launch a sub-workflow of COUNT_DOCUMENT on each document
     COUNT_DOCUMENT(documents_ch)
}
```


##### Define the sub-workflow `COUNT_DOCUMENT` which will process each individual document and to count appropriately by its metadata flag:

Instead of refering to a global variable `params.count_letters` as we did previously. 

We refer to the current metadata we are processing: `tuple file(file_in), val(count_letters)`; which is: `[(samplesheet/hello1.txt,F), (samplesheet/hello2.txt,T)]`. And for `T`, we will count by letters; and for `F`, by words.

```bash
process count_document {
  input: 
    tuple file(file_in), val(count_letters)
  output:
    stdout
  
  script:
   if (count_letters) {
    """
    cat ${file_in}
    wc -m ${file_in} | awk '{print \$1}'
    """
   } else {
    """
    cat ${file_in}
    wc -w ${file_in} | awk '{print \$1}'
    """
   }
}

workflow COUNT_DOCUMENT {
    take:
        input_ch

    main:
        count_document(input_ch) | view
}
```


##### count_samplesheet.nf:
```bash
#!/usr/bin/env nextflow
nextflow.enable.dsl=2 

process count_document {
  input: 
    tuple file(file_in), val(count_letters)
  output:
    stdout
  
  script:
   if (count_letters) {
    """
    cat ${file_in}
    wc -m ${file_in} | awk '{print \$1}'
    """
   } else {
    """
    cat ${file_in}
    wc -w ${file_in} | awk '{print \$1}'
    """
   }
}

// convert each row in the samplesheet into a tuple of File object and a Boolean of count_letter
def get_document_info(LinkedHashMap sample) {
    document  = sample.document ? file(sample.document, checkIfExists: true) : null
    count_letters = (sample.count_letters == "T") || (sample.count_letters == "true") ? true : false   

    return [ document, count_letters ]
}

workflow COUNT_DOCUMENT {
    take:
        input_ch

    main:
        count_document(input_ch) | view
}

workflow {
     // create a Channel of documents to count either by word or by letter, from the samplesheet
     Channel.fromPath(params.samplesheet).splitCsv(header:true)
            .map { get_document_info(it) }.set { documents_ch }
    
    // launch a sub-workflow of COUNT_DOCUMENT on each document
     COUNT_DOCUMENT(documents_ch)
}
```


##### Example Run:
```bash
nextflow run workflows/count_samplesheet.nf --samplesheet samplesheet/samplesheet.csv
```

##### Example Outputs:

```bash
N E X T F L O W  ~  version 23.04.2
Launching `workflows/count_samplesheet.nf` [dreamy_fourier] DSL2 - revision: c7971cd787
executor >  local (2)
[3f/9df991] process > COUNT_DOCUMENT:count_document (1) [100%] 2 of 2 ✔
Count Me (using Letters)24

Count Me (using Words)4
```

### Step 6. Asynchronous and synchronous job execution (or scatter-gather pattern)

Scatter-gather pattern is an important pattern in pipelining. The concept is we want to split ("scatter") out entries on a fasta file or rows in a samplesheet for parallel execution and on any downstream jobs asynchronously (on as many nodes as possible on `OSCAR`). 

Then in the end of pipeline, we would like to aggregate ("gather") these results; for examples, 
 - In RNASeq, to sum up or contrast the gene expressions of the `wild-type` replicates against the `treatment` replicates
 - In metagenomics workflows, to summarize the overall distribution or diversity of all the samples in a data-set.
 
 
In our toy example of document counting, we would like to demonstrate this concept by simply gathering all of the individual counts (whether by letter or by words) and summing all of them together.


###### Add a variable `count_result` which is an synchronous collection/`collect()` of all the scattered or asynchronous runs of `COUNT_DOCUMENT`:

By defining a variable to hold onto `COUNT_DOCUMENT(documents_ch).collect()`, we are essentially assigning `count_results` as the gather-step's result placeholder. In another words, we will wait for all `COUNT_DOCUMENTS` to finish, and all individual document counts gathered into `count_results` and then run the final gather step of `sum_all_results(count_results)`. 

###### count_and_summarize.nf:
```bash
process sum_all_results {
  input: 
    path (files)
  output:
    stdout

  script:
    """
    cat ${files} > combined.txt
    awk '{ sum += \$1 } END { print sum }' combined.txt
    """
}

workflow {
    ... 
    // gather all of the results of each document counting sub-workflow
    count_results = COUNT_DOCUMENT(documents_ch).collect()
    
    // and feed this collection of results into a summing function
    sum_all_results(count_results) | view
}
```

###### count_and_summarize.nf (full): 
```bash
#!/usr/bin/env nextflow
nextflow.enable.dsl=2 

process count_document {
  input: 
    tuple file(file_in), val(count_letters)
  output:
    file "${file_in.baseName}_count.txt"
  
  script:
   if (count_letters) {
    """
    wc -m ${file_in} | awk '{print \$1}' > ${file_in.baseName}_count.txt
    """
   } else {
    """
    wc -w ${file_in} | awk '{print \$1}' > ${file_in.baseName}_count.txt
    """
   }
}

process sum_all_results {
  input: 
    path (files)
  output:
    stdout

  script:
    """
    cat ${files} > combined.txt
    awk '{ sum += \$1 } END { print sum }' combined.txt
    """
}

// convert each row in the samplesheet into a tuple of File object and a Boolean of count_letter
def get_document_info(LinkedHashMap sample) {
    document  = sample.document ? file(sample.document, checkIfExists: true) : null
    count_letters = (sample.count_letters == "T") || (sample.count_letters == "true") ? true : false   

    return [ document, count_letters ]
}

workflow COUNT_DOCUMENT {
    take:
        input_ch

    main:
        word_or_letter_counts = count_document(input_ch)
    
    emit:
        word_or_letter_counts
}

workflow {
     // create a Channel of documents to count either by word or by letter, from the samplesheet
     Channel.fromPath(params.samplesheet).splitCsv(header:true)
            .map { get_document_info(it) }.set { documents_ch }
    
    // launch a sub-workflow of COUNT_DOCUMENT on each document
    count_results = COUNT_DOCUMENT(documents_ch).collect()
    sum_all_results(count_results) | view
}
```

##### Example Run:
```bash
nextflow run workflows/count_and_summarize.nf --samplesheet samplesheet/samplesheet.csv
```

##### Example Outputs:
```bash
N E X T F L O W  ~  version 23.04.2
Launching `workflows/count_and_summarize.nf` [zen_baekeland] DSL2 - revision: 478f5774ff
executor >  local (3)
[12/8215ea] process > COUNT_DOCUMENT:count_document (1) [100%] 2 of 2 ✔
[ff/fbd4b5] process > sum_all_results                   [100%] 1 of 1 ✔
28
```

#### Step 7. Putting it all together in a real-world RNASeq example

We have learned all of the essential patterns involved in creating pipelines; and now we can begin to put all of them together in a simple RNASeq pipeline. Below is the code for a full RNASeq pipeline, don't worry too much about understanding completely all of the steps. 

Our purpose here is to refresh/review the pipeline patterns we just learned applied to a real world example. Each application of the patterns will be called out following the code. 

###### rnaseq_simple.nf:

```bash
workflow PROCESS_SAMPLE {
    take:
        input_ch
        reference_genome

    main:
        fastqc(input_ch)

        trimmed_reads = trimmomatic(input_ch)

        fastqcs = fastqc2(trimmed_reads).collect()
        fastqc_screens = fastq_screen(trimmed_reads).collect()

        aligned_bams = star(trimmed_reads, reference_genome)
        marked_duplicate_bams = mark_duplicate(aligned_bams)

        qualimaps = qualimap(marked_duplicate_bams).collect()

        htseq_counts = htseq_count(marked_duplicate_bams).collect()
        feature_counts = feature_count(marked_duplicate_bams)

        multiqc(fastqcs, fastqc_screens, qualimaps, htseq_counts)
}

// Function to resolve files
def get_sample_info(LinkedHashMap sample) {
    read1  = sample.read1 ? file(sample.read1, checkIfExists: true) : null
    read2 = sample.read2 ? file(sample.read2, checkIfExists: true) : null

    return [ sample.sample_id, read1, read2 ]
}

workflow {
     reference_genome = params.reference_genome

     if (params.reference_genome_fasta != "") {
        reference_genome = build_star_index()
     }

     Channel.fromPath(params.samplesheet).splitCsv(header:true)
            .map { get_sample_info(it) }.set { samples_ch }

     PROCESS_SAMPLE (samples_ch, reference_genome)
}
```

###### Pattern 1: Conditional Logic

If `--reference_genome_fasta` is specified, we will build an index for this reference genome (so that our raw reads could be aligned against this index more efficiently)

```bash
reference_genome = params.reference_genome

if (params.reference_genome_fasta != "") {
    reference_genome = build_star_index()
}

... # use reference_genome everywhere
```

###### Pattern 2: Parsing the Samplesheet
We parse a samplesheet of RNASeq samples which will be `(sample_id, read1, read2)` and carry this metadata throughout the rest of the workflow.

```bash

// Function to resolve files
def get_sample_info(LinkedHashMap sample) {
    read1  = sample.read1 ? file(sample.read1, checkIfExists: true) : null
    read2 = sample.read2 ? file(sample.read2, checkIfExists: true) : null

    return [ sample.sample_id, read1, read2 ]
}

...

Channel.fromPath(params.samplesheet).splitCsv(header:true)
            .map { get_sample_info(it) }.set { samples_ch }
PROCESS_SAMPLE (samples_ch, reference_genome)
```

###### Pattern 3: Scatter-Gather using `collect()`
We will process asynchronously all of the tasks for each RNASeq sample such as:
- QCing the read `fastqc`
- Screening the read for contamination/sequencing artifacts `fastq_screen`
- Aligning the reads and marking duplicates on the aligned reads `star` aligner and `mark_duplicate` tool
- Count the features (individual gene expression) on the algined reads `htseq`

Then finally, we gather all of these results into an awesome data visualization/summarization tool `multiqc`. 

```bash
        fastqcs = fastqc2(trimmed_reads).collect()
        fastqc_screens = fastq_screen(trimmed_reads).collect()

        ...

        qualimaps = qualimap(marked_duplicate_bams).collect()
        htseq_counts = htseq_count(marked_duplicate_bams).collect()

        multiqc(fastqcs, fastqc_screens, qualimaps, htseq_counts)
```

### Step 9: How to Import Secrets


Put a token or secret value as a nextflow `secrets`: 
```commandline
nextflow secrets set dscov_secret_location RI
```

###### secret.nf:
```bash
#!/usr/bin/env nextflow
nextflow.enable.dsl=2


process sayHello {
  input:
    secret 'dscov_secret_location'
  output:
    stdout
  script:
    """
    curl https://api.weather.gov/alerts/active?area=\$dscov_secret_location
    """
}

workflow {
  sayHello() | view
}
```

##### Example Run:
```bash
nextflow run workflows/secret.nf
```

##### Example output:
![](./img/secret_output.png)

##### Workflow logs (will not reveal the secret value/token/key...):
```bash
#!/bin/bash -ue
curl https://api.weather.gov/alerts/active?area=$dscov_secret_location
```



### Step 10: Dynamically Size VMs Memory:

##### dynamic_memory.nf:
```bash
#!/usr/bin/env nextflow
nextflow.enable.dsl=2

params.name = "World"

process sayHello {
  input:
    val name
  output:
    tuple path("hello.txt"), stdout
  script:
    """
    echo 'Hello ${name}!' > hello.txt
    echo \$(( \$( stat -c '%s' hello.txt ) * 3 )) ### get the size of the output file and multiply it by 3
    """
}

process countWords {
  input:
    tuple path(file_in), val(file_size)
  output:
    path("count_words.txt")
  memory "${file_size}"

  publishDir "${params.out_dir}/", mode: 'copy'

  script:
   """
   wc -w ${file_in} | awk '{print \$1}' > count_words.txt
   """
}

workflow {
  countWords(sayHello(params.name))
}
```

##### Example Run:
```bash
nextflow run workflows/dynamic_memory.nf
```

##### Slurm Wrapper Command:
```bash 
cat .command.run

#!/bin/bash
#SBATCH -J nf-sayHello
#SBATCH -o /gpfs/data/cbc/workflow_workshop/work/52/992fb76ea549e001e25b749b615f66/.command.log
#SBATCH --no-requeue
#SBATCH --signal B:USR2@30
#SBATCH -c 2
#SBATCH -t 02:00:00
#SBATCH --mem 0M
#SBATCH -p batch
NXF_CHDIR=/gpfs/data/cbc/workflow_workshop/work/52/992fb76ea549e001e25b749b615f66
# NEXTFLOW TASK: sayHello
...
/bin/bash -ue /gpfs/data/cbc/workflow_workshop/work/52/992fb76ea549e001e25b749b615f66/.command.sh &
...
```

### Step 12: Using Singularity
```
process countWords {
  container 'cowmoo/rnaseq_pipeline:latest'
  containerOptions '--bind /oscar/data/shared/databases/refchef_refs:/oscar/data/shared/databases/refchef_refs'

  input:
    tuple path(file_in), val(file_size)
  output:
    path("count_words.txt")
  memory "${file_size}"

  publishDir "${params.out_dir}/", mode: 'copy'

  script:
   """
   wc -w ${file_in} | awk '{print \$1}' > count_words.txt
   """
}

workflow {
  countWords(sayHello(params.name))
}
```


##### Slurm Wrapper Command:
```bash 
cat .command.run

#!/bin/bash
#SBATCH -J nf-sayHello
#SBATCH -o /gpfs/data/cbc/workflow_workshop/work/52/992fb76ea549e001e25b749b615f66/.command.log
#SBATCH --no-requeue
#SBATCH --signal B:USR2@30
#SBATCH -c 2
#SBATCH -t 02:00:00
#SBATCH --mem 512M
#SBATCH -p batch
NXF_CHDIR=/gpfs/data/cbc/workflow_workshop/work/52/992fb76ea549e001e25b749b615f66
# NEXTFLOW TASK: sayHello
...
/bin/bash -ue 
singularity exec -B /oscar/data/cbc/ -B "$PWD" --bind /oscar/data/shared/databases/refchef_refs:/oscar/data/shared/databases/refchef_refs -/oscar/data/cbc/sanghyun_lee/work/singularity/cowmoo-rnaseq_pipeline-latest.img /bin/bash -c "cd $PWD; /bin/bash -ue /oscar/data/cbc/sanghyun_lee/work//52/992fb76ea549e001e25b749b615f66/.command.sh"

...
```


### Step 13: Coming Soon Curated Workflows

![](./img/nfcore_screenshot.png)

https://nf-co.re/pipelines