<a href="https://colab.research.google.com/github/garciafranciscomartn/nextflow_intro_rsg/blob/main/Modulo3_Workflows_M%C3%B3dulos.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#*Workflows* y Módulos
Ahora que conocemos las maneras de manejar y conectar archivos, vamos a agregar pasos a nuestro pipeline aprendiendo cómo conectar los procesos en el *workflow*.
Primero vamos a agregar un proceso en donde se ejecute la herramienta [Fastp](http://www.usadellab.org/cms/?page=trimmomatic). Brevemente, esta es una herramienta que podemos usar para limpiar secuencias, eliminando lecturas cortas, bases de mala calidad, etc.
El proceso lo vamos a llamar *runFastp* y sería el siguiente:


```
process runFastp {
   container "community.wave.seqera.io/library/fastp_fastqc_multiqc:46d8231a252ab2c8"


   publishDir "${params.outdir}", mode: 'copy'


   input:
   path fastq_file


   output:
   path "*_trimmed.fastq.gz"




   script:
   """
   fastp -i ${fastq_file} \\
         -o ${fastq_file.baseName}_trimmed.fastq.gz \\
         --qualified_quality_phred 15 \\
         --unqualified_percent_limit 40 \\
         --length_required 36
   """
}


```
Vamos a explicar un poco que hace el script de fastp para que después podamos manipularlo:
-   -i {fastq_file} es nuestro input
-   -o {fastq_file}.baseName_trimmed.fastq.gz es el output, donde utilizamos .basename. Esta es una propiedad de los objetos Path en Nextflow que devuelve el nombre del archivo sin la extensión final.
- --qualified_quality_phred 15 es el cutoff de la calidad de las bases para el filtrado de ellas
-   --unqualified_percent_limit es el límite de porcentaje de bases por debajo del cutoff que se permite por lectura
-   --length_required es el largo mínimo en bases que tiene que tener la lectura para no ser descartada.




Sin pensar mucho podemos ver una gran variedad de parámetros que tendríamos que modificar la forma en que se emplean para que tengan valores por default y que sean manipulables. Pero eso lo dejamos para más adelante. Primero veamos cómo conectar este proceso con el anterior.


Modifiquemos el workflow para que conectemos los pasos ubicando este paso antes del de fastqc para poder ver la calidad de las muestras una vez ejecutado Fastp. Para esto necesitamos que el output de runFastp se use como input en el proceso runFastQC. Convenientemente, Nextflow empaqueta automáticamente la salida de un proceso en un canal llamado .out. Así que la salida de runFastQC es un canal llamado runFastQC.out, que podemos conectar directamente a runFastp().
```


workflow {


   // Creamos un canal con el input de archivos FASTQ
   input_ch = Channel.fromPath(params.fastq)
       .splitCsv()
       .flatten()
       .map { filename -> file(filename)}

   // Ejecutar Fastp para el procesamiento de los datos
   runFastp(input_ch)
   // Ejecutar control de calidad
   runFastQC(runFastp.out)


}


```
Ahora vamos a agregar etiquetas *emit* a nuestras declaraciones de salida. Esto nos permitirá seleccionar las salidas por el nombre que definimos para poder identificarlas sobre el output generado, para esto basta con agregar la sentencia , ```emit: nombre``` en cada output declarado.
Con los outputs mejor nombrados nuestro pipeline quedaria asi:


```
#!/usr/bin/env nextflow
/*
* Usar FASTQC para generar un reporte de control de calidad
*/
/*
* Parámetros
*/
params.fastq = "test_1.fastq.gz"
params.outdir = "results"


process runFastQC {
   container "community.wave.seqera.io/library/fastp_fastqc_multiqc:46d8231a252ab2c8"

   publishDir "${params.outdir}", mode: 'copy'

   input:
   path fastq_file

   output:
   path "*.html", emit: fastqc_report
   path "*.zip", emit: fastqc_zip

   script:
   """
   fastqc ${fastq_file}
   """
}


process runFastp {
   container "community.wave.seqera.io/library/fastp_fastqc_multiqc:46d8231a252ab2c8"

   publishDir "${params.outdir}", mode: 'copy'

   input:
   path fastq_file

   output:
   path "*_trimmed.fastq.gz", emit: trimmed_fastq

   script:
   """
   fastp -i ${fastq_file} \\
         -o ${fastq_file.baseName}_trimmed.fastq.gz \\
         --qualified_quality_phred 15 \\
         --unqualified_percent_limit 40 \\
         --length_required 36
   """
}


workflow {


   // Creamos un canal con el input de archivos FASTQ
   input_ch = Channel.fromPath(params.fastq)
       .splitCsv()
       .flatten()
       .map { filename -> file(filename)}

   // Ejecutar Fastp para el procesamiento de los datos
   runFastp(input_ch)
   // Ejecutar control de calidad
   runFastQC(runFastp.out.trimmed_fastq)


}
```


#### Agregamos el paso de MultiQC


Ahora vamos a agregar un tercer paso a nuestro workflow que ya incluye FastP (preprocesamiento) y FastQC (control de calidad). Este nuevo paso utilizará MultiQC para generar un reporte consolidado de todos los análisis de calidad.


Cuando tenemos un proceso que se ejecuta múltiples veces (como FastQC que se ejecuta una vez por cada archivo), a veces queremos recopilar todas las salidas y usarlas en un único proceso que realice algún tipo de análisis o resumen. Para facilitarnos esto vamos a hacer uso de otro operador, el operador [collect()](https://www.nextflow.io/docs/latest/reference/operator.html#collect). Este operador recopila todos los elementos de un canal de origen en una lista y los emite como un solo elemento.


Para eso primero vamos a agregar el proceso runMultiQC
```
process runMultiQC {
   container "community.wave.seqera.io/library/fastp_fastqc_multiqc:46d8231a252ab2c8"


   publishDir "${params.outdir}", mode: 'copy'


   input:
   path '*'


   output:
   path "multiqc_report.html", emit: report


   script:
   """
   multiqc . -n multiqc_report.html
   """
}
```
Es importante notar que multiqc particularmente, se encarga de analizar todos los archivos que identifique como análisis en la carpeta que se le otorga como input. La función de collect en este caso va a ser pasar todos los archivos .zip generados por fastqc y generar los links simbólicos a esos archivos en la carpeta donde se está ejecutando multiqc.
Entonces en el workflow lo aplicamos de esta forma :
```
workflow {

   // Creamos un canal con el input de archivos FASTQ
   input_ch = Channel.fromPath(params.fastq)
       .splitCsv()
       .flatten()
       .map { filename -> file(filename)}

   // Ejecutar Fastp para el procesamiento de los datos
   runFastp(input_ch)

   // Ejecutar control de calidad en archivos procesados
   runFastQC(runFastp.out.trimmed_fastq)

   // Recopilar todos los reportes ZIP de FastQC para MultiQC
   multiqc_input = runFastQC.out.fastqc_zip.collect()
   //Agregamos el operador view para visualizar que archivos posee el canal de input
   multiqc_input.view()
   // Ejecutar MultiQC
   runMultiQC(multiqc_input)

}
```
Como no queremos repetir los pasos que ya se completaron correctamente vamos a usar la opción de Nextflow llamada -resume que te permite hacer justamente eso. Al usarlo, cualquier proceso que ya se haya ejecutado con el mismo código, configuración y entradas será omitido. Esto significa que Nextflow solo ejecutará los procesos que hayas agregado o modificado desde la última ejecución, o a los que les estás proporcionando nuevas configuraciones o entradas.


Hay dos ventajas clave al usar esta opción:


-   Durante el desarrollo del pipeline, podés iterar más rápido, ya que solo necesitás ejecutar el/los proceso(s) en los que estás trabajando activamente para probar tus cambios.
-   En producción, si algo sale mal, en muchos casos podés corregir el problema y relanzar el pipeline, y este continuará desde el punto donde falló. Esto puede ahorrarte mucho tiempo y recursos de cómputo. Para usarlo, simplemente agregamos -resume a nuestro comando


```

nextflow run hola_fastqc.nf --fastq fastq.csv -resume

```

Y vemos el indicador cached en el proceso que ya se había corrido, y también podemos ver que el sub directorio donde se realiza es el mismo.


---


## Manejo de Módulos
Cuando comenzamos a desarrollar nuestro flujo de trabajo, pusimos todo en un solo archivo de código. Pero a la hora de revisar o corregir el mismo terminamos trabajando con un archivo sumamente extenso. Para evitar esto vamos a guardar los procesos en **Módulos**


En Nextflow, un módulo es una definición de proceso individual que está encapsulada por sí sola en un archivo de código independiente. Para usar un módulo en un flujo de trabajo, simplemente agregamos una línea de importación en tu archivo de código del flujo; luego podés integrar el proceso en el flujo de trabajo de la misma manera que lo harías normalmente.


Separar los procesos en módulos individuales permite reutilizar definiciones de procesos en múltiples flujos de trabajo sin generar múltiples copias del código. Esto hace que el código sea más compartible, flexible y mantenible.


**Aclaración** - Es una buena práctica guardar tus módulos en un directorio específico. Podés llamarlo como quieras, pero la convención es llamarlo modules/.


```
mkdir modules


```


En su forma más simple, convertir un proceso existente en un módulo es básicamente una operación de copiar y pegar. Vamos a crear un archivo para el módulo, copiar el código relevante y luego eliminarlo del archivo principal del flujo de trabajo.


Después, solo necesitaremos agregar una declaración de importación para que Nextflow sepa incluir el código correspondiente en tiempo de ejecución.


#### Vamos a comenzar con el proceso runFastQC


Primero creamos un archivo vacío llamado fastqc.nf para tener lugar donde poner el código del proceso:
```
touch modules/fastqc.nf
```
Movemos el código del proceso runFastQC al archivo del módulo. Para eso copiamos toda la definición del proceso desde el archivo original al archivo del módulo, asegurándose de incluir también la línea #!/usr/bin/env nextflow.
El archivo modules/fastqc.nf deberia ser así:
```
#!/usr/bin/env nextflow
/*
* Usar FASTQC para generar un reporte de control de calidad
*/


process runFastQC {
   container "community.wave.seqera.io/library/fastp_fastqc_multiqc:46d8231a252ab2c8"


   publishDir "${params.outdir}", mode: 'copy'


   input:
   path fastq_file


   output:
   path "*.html", emit: fastqc_report
   path "*.zip", emit: fastqc_zip


   script:
   """
   fastqc ${fastq_file}
   """
}

```
Una vez hecho esto, podemos eliminar el proceso del script original.
Ahora necesitamos incluirlo en este. y para eso usamos la declaración de importación antes del bloque workflow. La sintaxis para importar un módulo local es bastante sencilla:
```
include { <NOMBRE_DEL_MÓDULO> } from '<ruta_al_módulo>'
```
Insertamos esto antes del bloque workflow:


```


/*
* Módulos
*/

include { runFastQC } from './modules/fastqc.nf'


workflow {
   ...
}


```


Ejecutamos el script para verificar que funciona igual que antes
Como estamos ejecutando el flujo con el mismo código y entradas que antes, usamos la opción -resume:
```
nextflow run hola_fastqc.nf --fastq fastq.csv -resume

```
¡Funciono!
Vemos también que Nextflow reconoce que el trabajo sigue siendo el mismo, incluso si el código está dividido en varios archivos.


Hagamos lo mismo con los otros dos procesos, runFastP y runMultiqc.
Para eso de nuevo, creamos los archivos:
```
touch modules/fastp.nf
touch modules/multiqc.nf
```
El script hola_fastqc.nf deberia quedar de la siguiente forma
```


// Incluir módulos


include { runFastQC } from './modules/fastqc.nf'
include { runFastp } from './modules/fastp.nf'
include { runMultiQC } from './modules/multiqc.nf'




workflow {
 ...
}


```
Y en la carpeta modules deberíamos haber guardado los archivos para los procesos fastp y multiqc


Volvemos a correr todo con la opción -resume y debería funcionar igual que en el paso anterior.
Antes de pasar al punto siguiente vamos a ver la utilidad de nombrar los módulos


#### Alias de módulos


Al incluir un componente de módulo, es posible especificar un alias de nombre usando la declaración as. Esto permite incluir e invocar el mismo componente varias veces usando nombres diferentes.


Veamos por ejemplo que sucede si quisiéramos modificar el workflow y agregar un paso previo para ver la calidad de las lecturas antes de ser procesadas por Fastp.
Intuitivamente, agregaremos un paso igual al de runFastQC pero usamos como input el mismo canal que para runFastp (que serian las lecturas crudas)
el workflow quedaria asi:


```


   // Ejecutar control de calidad en archivos sin procesar
   runFastQC(input_ch)


   // Ejecutar Fastp para el procesamiento de los datos
   runFastp(input_ch)


   // Ejecutar control de calidad en archivos procesados
   runFastQC(runFastp.out.trimmed_fastq)


   // Recopilar todos los reportes ZIP de FastQC para MultiQC
   multiqc_input = runFastQC.out.fastqc_zip.collect()
   //Agregamos el operador view para visualizar que archivos posee el canal de input
   multiqc_input.view()
   // Ejecutar MultiQC
   runMultiQC(multiqc_input)


```


Al ejecutarlo nuevamente, vemos que nos da el siguiente error:
```
Process 'runFastQC' has been already used -- If you need to reuse the same component, include it with a different name or include it in a different workflow context
```


Lo que nos dice es que no podemos usar dos procesos con el mismo nombre, y que si necesitamos hacer esto podemos usarlo con un nombre diferente. Para lograr esto hay que importar el módulo con un alias. Esto se logra así:


```
include { runFastQC as runRawFastQC } from './modules/fastqc.nf'
include { runFastQC as runPostFastQC} from './modules/fastqc.nf'
```


Y luego tenemos que modificar la sección del workflow, con los alias nuevos:


```


    // Ejecutar control de calidad en archivos sin procesar
    runRawFastQC(input_ch)


    // Ejecutar Fastp para el procesamiento de los datos
    runFastp(input_ch)


    // Ejecutar control de calidad en archivos procesados
    runPostFastQC(runFastp.out.trimmed_fastq)


    // Recopilar todos los reportes ZIP de FastQC para MultiQC
    multiqc_input = runPostFastQC.out.fastqc_zip.collect()
    //Agregamos el operador view para visualizar que archivos posee el canal de input
    multiqc_input.view()
    // Ejecutar MultiQC
    runMultiQC(multiqc_input)


```


Ahora vemos que se ejecutan ambos pasos.
Por último, vamos a agregar el uso de otro operador mas a nuestro pipeline, el operador [mix](https://www.nextflow.io/docs/latest/reference/operator.html#mix).
Este permite combinar múltiples canales en uno solo. Es especialmente útil cuando necesitas procesar datos de diferentes fuentes o etapas del pipeline de manera conjunta. Para eso toma dos o más canales y los fusiona en un canal único, preservando todos los elementos de los canales originales. Los elementos se emiten en el orden en que llegan, sin ninguna sincronización específica.
Para usarlo se aplica antes del operador collect, con lo que nuestro workflow quedaría:
```


   // Combinar ambos outputs para MultiQC
   multiqc_input = runRawFastQC.out.fastqc_zip
       .mix(runPostFastQC.out.fastqc_zip)
       .collect()


```
El patrón mix().collect() es una práctica estándar en Nextflow para combinar outputs de múltiples procesos antes de pasarlos a un proceso que necesita todos los archivos juntos.


---



