# Running a Nextflow pipeline in Betteromics

## Table Of Contents:
* [Introduction](#Introduction)
    * [Prerequisites](#Prerequisites)
* [Getting started with an example](#Getting-started-with-an-example)
    * [Data](#Data)
    * [Nextflow example](#Nextflow-example)
* [Running the example above in Betteromics](#Running-the-example-above-in-Betteromics)
    * [Option 1: Execute the Nextflow pipeline within Flyte](#Execute-the-Nextflow-pipeline-within-Flyte)
    * [Option 2: Conversion of the Nextflow pipeline to Flyte](#Conversion-of-the-Nextflow-pipeline-to-Flyte)
    * [Registering and running the new workflow on Betteromics](#Registering-and-running-the-new-workflow-on-Betteromics)


# Introduction
In this tutorial, we will guide you through the process of running a simple Nextflow pipeline in Betteromics platform. Once your Nextflow pipeline is on Betteromics, users can run them in a web UI, and analyze the inputs and outputs of each run easily.

## Prerequisites

> **Note:**
You must have access to a Betteromics environment, other than https://public-demo.betteromics.com/, in order to run Betteromics CLI commands. If you need help accessing your Betteromics environment, contact Betteromics Support or reach out to us on our [website](https://betteromics.com/) (click "Try it" and "Request a Tour")
    
    
Before you begin, make sure you have the following installed:

* Nextflow: Install [Nextflow website](https://nextflow.io/) using `curl -s https://get.nextflow.io | bash`
* Flytekit: Install [Flytekit](https://pypi.org/project/flytekit/) using `pip install flytekit`

Throughout this notebook, we will use some of the utilities described in Betteromics CLI. You can learn more about the commands used in [here](https://github.com/betteromics/betteromics-examples/blob/main/cli_example.ipynb)

# Getting started with an example

Let's start by understanding the Nextflow script that we plan to run in Betteromics. Betteromics natively supports Flyte as a workflow engine, but as you will see in this notebook, it can integrate with other common engines like Nextflow and Cromwell. Let's look an an example together:

## Data
For this example, we have `gene_counts.tsv` that contains gene names, the corresponding Ensembl IDs, gene expressions in different samples, and their counts.

You can access the full data in [this](https://docs.google.com/spreadsheets/d/1sRZwJ55ay6IHbj51sDSFupmJv86Ceg9W9EOkuJiL8t0) Google Sheet. The content of this file looks something like below. Download this file (as a tsv) and call it `gene_counts.tsv`


| Gene ID          | Gene names | Sample ID   | Count |
|------------------|-------------|-----------|-------|
| ENSG00000138395  | CDK15       | sample_1  | 0     |
| ENSG00000151952  | TMEM132D    | sample_1  | 1     |
| ENSG00000155506  | LARP1       | sample_1  | 3     |
| ENSG00000162616  | DNAJB4      | sample_1  | 753   |
| ENSG00000162621  | LRRC53      | sample_1  | 4     |
| ENSG00000179420  | OR6W1P      | sample_1  | 74    |
| ENSG00000184661  | CDCA2       | sample_1  | 2     |
| ENSG00000229663  | DPY19L4P1   | sample_3  | 4046  |
| ENSG00000229753  | RPS27AP15   | sample_3  | 1     |
| ENSG00000234222  | LIX1L-AS1   | sample_3  | 31    |
| ENSG00000237950  | LINC02918   | sample_3  | 0     |
| ENSG00000239731  | RN7SL825P   | sample_3  | 3     |
| ENSG00000252929  | RNU6-218P   | sample_3  | 3     |
| ENSG00000263432  | RN7SL689P   | sample_3  | 111   |
| ENSG00000266509  | MIR3934     | sample_3  | 1     |
| ENSG00000269439  | PGLS-DT     | sample_3  | 2     |
| ENSG00000214200  | TPM3P2      | sample_8  | 1     |
| ENSG00000220267  | ACTBP8      | sample_8  | 3     |
| ENSG00000226693  | NXNP1       | sample_8  | 2     |
| ENSG00000229663  | DPY19L4P1   | sample_8  | 86    |


Let's get started by uploading this data to Betteromics platform. You can do this either on the web UI or
on your **terminal** by using the CLI command `create_resource` like below (make sure you are logged in to your environment)

<pre>
betteromics --context {your_environment} create-resource --origin local gene_counts.tsv --name gene_counts.tsv
</pre>

If successful, it will result in a message such as below where your_environment is :
<pre>
File successfully uploaded.
View it on betteromics here: https://{your_environment}.betteromics.com/resources/RESXXX
</pre>


Please visit the link provided above and get familiar with the resources tab on our platform. On the Resource Metadata page, you will see a field called `Location` which is a path to the data on S3. We will need this S3 path (we're calling it S3_path) throughout this example. 

## Nextflow example

Throughout this tutorial, we're gonna use template workflows provided by Betteromics. There workflow templates are available on [Betteromics github repository](https://github.com/betteromics).  

In [1]:
betteromics_github_repo = "https://github.com/betteromics/betteromics-examples.git"

On your **terminal**, run the cookiecutter command below to get the Nextflow template (called `nextflow_example`) provided by Betteromics. You can find the Nextflow pipeline under `nextflow_example/gene_count_example.nf`. This script reads gene names and counts from a file on S3 (`gene_counts.tsv` we uploaded above) that is passed as an argument and writes the output to a file if the count is greater than 10.

In [2]:
!cookiecutter $betteromics_github_repo --directory="workflow-templates/nextflow_example" --no-input 

In [3]:
!ls nextflow_example

gene_count_example.nf


Let's take a look at our Nextflow script:

In [None]:
# %load nextflow_example/gene_count_example.nf
/* This script reads gene names and counts from a file (called gene_counts.tsv that is on S3) and writes the output to 
a file if the count is greater than 10. */

params.outdir = './nextflow_example'
params.gene_counts = null
MINIMUM_COUNT = 10

// Check if the gene_counts parameter is provided
if (params.gene_counts == null) {
    error "Please provide the --gene_counts parameter with the S3 path."
}

Channel
    .fromPath( params.gene_counts ) 
    .splitCsv( header: true, sep: '\t' )
    .map { row -> tuple( row.gene_id, row.gene_name, row.sample_id, row.count ) }
    .set { gene_counts_run_ch }
    
process filter_low_gene_counts {

    input:
    tuple val(gene_id), val(gene_name), val(sample_id), val(count)
    
    output:
    path "out.txt"
        
    shell:
    """
    if [[ ! -f out.txt ]]; then
        echo "gene_id\tgene_name\tsample_id\tcount" > out.txt
    fi
    
    echo "$gene_id\t$gene_name\t$sample_id\t$count" | awk -F '\\t' '\$4 >= $MINIMUM_COUNT {print \$0}' >> out.txt
    """
}


workflow {
    filter_low_gene_counts(gene_counts_run_ch)
    .collectFile(
        name: 'output.tsv',
        storeDir:params.outdir,
        keepHeader: true,
        )
}



You can run the Nextflow script above by running the following command. This script will collect all the information from different channels and writes them to one file called `output.tsv`

In [5]:
%%capture 
#Feel free to comment out the command above to see the logs/progress
!nextflow run nextflow_example/gene_count_example.nf -plugins nf-amazon --gene_counts {YOUR_S3_PATH}

In [6]:
%%capture

# Clean up project cache and work directories (Force clean)
!nextflow clean -f ;

Let's look at the output file (`nextflow_example/output.tsv`)

In [7]:
!column -t nextflow_example/output.tsv | head

gene_id          gene_name  sample_id  count
ENSG00000115266  APC2       sample_5   198
ENSG00000242123  RPL36AP47  sample_3   187
ENSG00000168090  COPS6      sample_7   44
ENSG00000220267  ACTBP8     sample_1   103
ENSG00000162616  DNAJB4     sample_1   753
ENSG00000171208  NETO2      sample_8   70
ENSG00000266509  MIR3934    sample_1   30
ENSG00000269439  PGLS-DT    sample_5   22
ENSG00000234222  LIX1L-AS1  sample_7   32


## Running the example above in Betteromics

As we mentioned before, Betteromics workflows are powered by Flyte. To run the Nextflow script above in Betteromics platform, we will execute the Nextflow process within a Flyte task as a subprocess call (Option 1). This means that we don't need to modify existing Nextflow code.

Alternatively, you can opt for a complete conversion of the Nextflow pipeline to Flyte (Option 2) for exploring enhanced integration - features like better monitoring, caching of intermediate results, and built-in workflow versioning.

## Option 1

### Execute the Nextflow pipeline within Flyte


### Initialize a flyte project

You can create a new flyte project that runs Nextflow as a subprocess call in Flyte by using our workflow template for this example:

In [8]:
!betteromics init remote-config nextflow_subprocess_example \
                        --git-url $betteromics_github_repo \
                        --flyte-template workflow-templates/nextflow_subprocess_example

Created workflow scaffold in nextflow_subprocess_example. When you are ready, run the register action to package and upload your changes to Betteromics.


The command above will initialize some files inside the workflow project (called `nextflow_subprocess_example`). You can customize and expand these files to tailor the project to your specific requirements.

If this is your first time working with Flyte workflows, take a moment to review the example files provided to get familiar with the content. 

Let's take a look at `nextflow_subprocess_example/workflows/nf_subprocess.py`, our flyte workflow that will run Nextflow as a subprocess.

In [None]:
# %load nextflow_subprocess_example/workflows/nf_subprocess.py
"""
This module defines a Flyte workflow for an existing Nextflow script.
"""

import subprocess
from typing import NamedTuple
import os
from pathlib import Path
import flytekit
from flytekit import task, workflow
from flytekit.types.file import FlyteFile


def subp_run(cmd):
    """
    Run a subprocess command and print its output and error messages.

    Parameters:
    - cmd (list): A list representing the command to be executed.

    Raises:
    - subprocess.CalledProcessError: If the subprocess command returns a non-zero exit code.

    Returns:
    - None
    """
    try:
        with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as proc:
            for line in proc.stdout:
                print("Output:", line, end="", flush=True)

            for line in proc.stderr:
                print("Error:", line, end="", flush=True)

            # Wait until process finishes and get the returncode
            retcode = proc.wait()

            if retcode:
                raise subprocess.CalledProcessError(retcode, cmd)
    except subprocess.CalledProcessError as e:
        print("Command failed with error code:", e.returncode, flush=True)
        raise


@task
def nextflow_run(input_gene_counts_file: FlyteFile) -> FlyteFile:
    """
    Run a Nextflow pipeline using the provided gene counts file path on S3.

    Parameters:
    - input_gene_counts_file (FlyteFile): Input gene counts file in FlyteFile format (S3).

    Returns:
    - FlyteFile: Output file generated by the Nextflow pipeline (rows with counts greater than or equal to MINIMUM_COUNT)
    """
    working_dir = flytekit.current_context().working_directory
    output_path = Path(os.path.join(working_dir, "output.tsv"))

    subp_run(
        [
            "nextflow",
            "run",
            "workflows/gene_count_example.nf",
            "-plugins",
            "nf-amazon",
            "--gene_counts",
            input_gene_counts_file,
            "--outdir",
            working_dir,
        ]
    )
    return FlyteFile(path=output_path)


output = NamedTuple(
    "output",
    output=FlyteFile,
)


@workflow
def gene_count(input_gene_counts_file: FlyteFile) -> output:
    """
    Runs a nextflow code that ingests s3 file of gene counts (tsv) and returns a subset of a dataframe if count>=MINIMUM_COUNT defined in nextflow script.

    Parameters:
    - gene_counts_dir (str): S3 path where gene_counts.tsv can be found. (see s3 location in resources)

    Returns:
    - output (FlyteFile): subset of original dataframe generated by nextflow
    """
    return output(output=nextflow_run(input_gene_counts_file=input_gene_counts_file))


we will discuss how you can easily run this flyte workflow later.

## Option 2
### Conversion of the Nextflow pipeline to Flyte

Similar to option 1, You can initialize your project by running the following command

In [10]:
!betteromics init remote-config native_flyte_example \
                        --git-url $betteromics_github_repo \
                        --flyte-template workflow-templates/native_flyte_example

Created workflow scaffold in native_flyte_example. When you are ready, run the register action to package and upload your changes to Betteromics.


We now should have a Flyte workflow and necessary files initialized, e.g., `native_flyte_example/workflows/native_flyte.py`. Below, we have fully converted the [example Nextflow pipeline](#Nextflow-example) to a flyte workflow.  

In [None]:
# %load native_flyte_example/workflows/native_flyte.py
"""
This module defines a Flyte workflow for filtering genes based on a minimum count threshold (MINIMUM_COUNT).
"""
from typing import NamedTuple
import pandas as pd
import os
import flytekit
from flytekit import task, workflow
from flytekit.types.file import FlyteFile

MINIMUM_COUNT = 10


@task
def filter_low_gene_counts(gene_counts_file: FlyteFile) -> FlyteFile:
    """
    Task for filtering gene counts based on a minimum count threshold (MINIMUM_COUNT).

    Parameters:
    - gene_counts_file (FlyteFile): Input gene counts file in FlyteFile format (S3).

    Returns:
    - FlyteFile: Output gene counts file containing rows with counts greater than or equal to MINIMUM_COUNT.
    """
    working_dir = flytekit.current_context().working_directory
    output_path = os.path.join(working_dir, "output.tsv")
    gene_counts = pd.read_csv(gene_counts_file, sep="\t")
    gene_counts = gene_counts[gene_counts["count"] >= MINIMUM_COUNT]
    gene_counts.to_csv(output_path, sep="\t", index=False)

    return FlyteFile(path=output_path)


output = NamedTuple(
    "output",
    output=FlyteFile,
)


@workflow
def gene_count(gene_counts_file: FlyteFile) -> output:
    """Ingests s3 file path with gene counts (tsv) and returns a subset of a dataframe if count>=MINIMUM_COUNT

    Args:
        gene_counts_dir (FlyteFile): S3 path where gene_counts.tsv file can be found. (see s3 location in resources)

    Returns:
        output (FlyteFile): subset of original dataframe
    """
    return output(output=filter_low_gene_counts(gene_counts_file=gene_counts_file))


# Registering and running the new workflow on Betteromics

You should now be able to register your workflow by following the command below. 

<pre>
betteromics --context {PROJECT_DIRECTORY} {YOUR_ENVIRONMENT} register {WORKFLOW_NAME} {WORKFLOW_DIRECTORY}
</pre>


On your **terminal** run the following depending on the option you chose earlier:

For Nextflow Subprocess Example:
<pre>
betteromics --context {your_environment} register nextflow_subprocess_example "Nextflow Subprocess Example" "workflows.nf_subprocess.gene_count"
</pre>

or for Native Flyte Example (option 2):

<pre>
betteromics --context {your_environment} register native_flyte_example "Native Flyte Example" "workflows.native_flyte.gene_count"
</pre>


Once you successfully register your workflow, you will be able to see it in the user interface and execute the workflow after inserting all the necessary input data. 
Please note that you can choose your input data from the resource dropdown in the web UI in any of the following formats:
* Local file upload
* From S3 path
* Google Drive
* Existing Resources
* Benchling notebook

