## Follow these instructions
* Open two terminals
* Activate conda environment on both, e.g., `conda activate notebook-env`
* Load following env variables on these two terminals
    * `export FORWARDED_ALLOW_IPS='*'`
    * `export AIRFLOW__WEBSERVER__ENABLE_PROXY_FIX=True`
    * `export AIRFLOW__CORE__LOAD_EXAMPLES=False`
* Create Airflow admin account using following command
    `airflow users create -u admin -f Admin -l User -e admin@email.org -r Admin`
* Run webserver command on one terminal
    `airflow webserver -w1`
* Run Airflow scheduler on the second terminal
    `airflow scheduler`
* Copy URL for the notebook and replace `/lab/address` with `/proxy/8080/`

In [1]:
%%bash
export FORWARDED_ALLOW_IPS='*'
export AIRFLOW__WEBSERVER__ENABLE_PROXY_FIX=True
export AIRFLOW__CORE__LOAD_EXAMPLES=False

In [38]:
!mkdir -p /tmp/input
!echo "A" > /tmp/input/sample_A.fastq
!echo "B" > /tmp/input/sample_B.fastq
!echo "C" > /tmp/input/sample_C.fastq

In [57]:
%%file ~/airflow/dags/test_pipeline_1.py

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator


samples = [
    ["A", "/tmp/input/sample_A.fastq"],
    ["B", "/tmp/input/sample_B.fastq"],
    ["C", "/tmp/input/sample_C.fastq"]
]

with DAG(
        dag_id="test_pipeline_1",
        schedule_interval=None,
        start_date=datetime(2022, 3, 4)) as dag:
    genotype_calling = \
        BashOperator(
            task_id="genotype_calling",
            dag=dag,
            params={
                'prev_task_ids': [f"samtools_sort_{sample_index}" for sample_index in range(0, 3)]
            },
            bash_command="""
                mkdir -p /tmp/genotype_calling
                cat {{ ' '.join(ti.xcom_pull(task_ids=params.prev_task_ids)) }} > //tmp/genotype_calling/all.vcf
                """
        )
    for sample_index in range(0,3):
        bwa_map = \
            BashOperator(
                task_id=f"bwa_map_{sample_index}",
                dag=dag,
                do_xcom_push=True,
            params={
                'samples': samples,
                'sample_index': sample_index
                },
                bash_command="""
                    mkdir -p /tmp/mapped_reads
                    fastq_file=`echo {{ params.samples[params.sample_index][1] }}`
                    sample_id=`echo {{ params.samples[params.sample_index][0] }}`
                    cat $fastq_file > /tmp/mapped_reads/sample_${sample_id}.bam
                    echo /tmp/mapped_reads/sample_${sample_id}.bam
                """
            )
        samtools_sort = \
            BashOperator(
                task_id=f"samtools_sort_{sample_index}",
                dag=dag,
                do_xcom_push=True,
                params={
                    'samples': samples,
                    'sample_index': sample_index,
                    'prev_task_ids': f"bwa_map_{sample_index}"
                    },
                bash_command="""
                mkdir -p /tmp/sorted_reads
                sample_id=`echo {{ params.samples[params.sample_index][0] }}`
                mapped_bam=`echo {{ ti.xcom_pull(task_ids=params.prev_task_ids) }}`
                cat $mapped_bam > /tmp/sorted_reads/sample_sorted_${sample_id}.bam
                echo /tmp/sorted_reads/sample_sorted_${sample_id}.bam
                """)
        samtools_index = \
            BashOperator(
                task_id=f"samtools_index_{sample_index}",
                dag=dag,
                do_xcom_push=True,
                params={
                    'prev_task_ids': f"samtools_sort_{sample_index}"
                    },
                bash_command="""
                    sorted_bam=`echo {{ ti.xcom_pull(task_ids=params.prev_task_ids) }}`
                    cat $sorted_bam > ${sorted_bam}.bai
                    echo ${sorted_bam}.bai
                    """
            )
        bwa_map >> samtools_sort >> samtools_index >> genotype_calling
        

Overwriting /home/studio-lab-user/airflow/dags/test_pipeline_1.py


In [59]:
cat /tmp/genotype_calling/all.vcf

A
B
C
