In [1]:
!pip install conducto

# some constants for brevity below
data = "/conducto/data/pipeline"
genomes = [("s_cerevisiae",  "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/146/045/GCF_000146045.2_R64/GCF_000146045.2_R64_genomic.fna.gz"),
           ("b_bruxellensis", "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/011/074/885/GCA_011074885.2_ASM1107488v2/GCA_011074885.2_ASM1107488v2_genomic.fna.gz"),
           ("z_kombuchaensis", "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/003/705/955/GCA_003705955.1_ASM370595v1/GCA_003705955.1_ASM370595v1_genomic.fna.gz"),
           ("d_albidus", "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/003/707/475/GCA_003707475.1_ASM370747v1/GCA_003707475.1_ASM370747v1_genomic.fna.gz"),
           ("c_parapsilosos", "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/182/765/GCF_000182765.1_ASM18276v2/GCF_000182765.1_ASM18276v2_genomic.fna.gz"),
           ("s_bombicola", "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/001/599/315/GCA_001599315.1_JCM_9596_assembly_v001/GCA_001599315.1_JCM_9596_assembly_v001_genomic.fna.gz")]

genes = [("S288C", "https://sgd-prod-upload.s3.amazonaws.com/S000208654/orf_coding.20150113.fasta.gz")]

# disable word wrap in outputs (puts long urls out of view)
from IPython.core.display import display, HTML
display(HTML("<style>div.output_area pre {white-space: pre;}</style>"))



[Structure the pipeline](https://www.conducto.com/docs/basics/pipeline-structure) with a [`Serial`](https://www.conducto.com/docs/basics/pipeline-structure#serial) node at the root, and give it some [`Parallel`](https://www.conducto.com/docs/basics/pipeline-structure#parallel) children. `/Process` won't start until `/Download` is complete, but their children will run in parallel.

In [2]:
import conducto as co
root = co.Serial()
root["Download"] = co.Parallel()
root["Process"] = co.Parallel()
print(root.pretty())

[1;34m/[0m
├─0 [1;34mDownload[0m
└─1 [1;34mProcess[0m


[`Exec`](https://www.conducto.com/docs/basics/pipeline-structure) nodes take shell commands and run them in the environment defined by [`Image`](https://www.conducto.com/docs/basics/images).

In [3]:
prep_img = co.Image(reqs_packages=["wget", "gzip"])
for name, url in genomes + genes:
    root["Download"][name] = co.Serial(image=prep_img)
    root["Download"][name]["Get"] = co.Exec(f"wget -O {data}/{name}.fna.gz {url}")
    root["Download"][name]["Decompress"] = co.Exec(f"cd {data} && gunzip {name}.fna.gz")
print(root.pretty())

[1;34m/[0m
├─0 [1;34mDownload[0m
│ ├─ [1;34ms_cerevisiae[0m
│ │ ├─0 [1;36mGet[0m   wget -O /conducto/data/pipeline/s_cerevisiae.fna.gz https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/146/045/GCF_000146045.2_R64/GCF_000146045.2_R64_genomic.fna.gz
│ │ └─1 [1;36mDecompress[0m   cd /conducto/data/pipeline && gunzip s_cerevisiae.fna.gz
│ ├─ [1;34mb_bruxellensis[0m
│ │ ├─0 [1;36mGet[0m   wget -O /conducto/data/pipeline/b_bruxellensis.fna.gz https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/011/074/885/GCA_011074885.2_ASM1107488v2/GCA_011074885.2_ASM1107488v2_genomic.fna.gz
│ │ └─1 [1;36mDecompress[0m   cd /conducto/data/pipeline && gunzip b_bruxellensis.fna.gz
│ ├─ [1;34mz_kombuchaensis[0m
│ │ ├─0 [1;36mGet[0m   wget -O /conducto/data/pipeline/z_kombuchaensis.fna.gz https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/003/705/955/GCA_003705955.1_ASM370595v1/GCA_003705955.1_ASM370595v1_genomic.fna.gz
│ │ └─1 [1;36mDecompress[0m   cd /conducto/data/pipeline && gunzip z_kombuchaensis.

[`Exec`](https://www.conducto.com/docs/basics/pipeline-structure) nodes can also call functions. For this they need an environment with the current file and its dependencies.  

In this case it depends oni some python packages and [an image from DockerHub](https://hub.docker.com/r/ncbi/blast) which already has the tool that we need.

In [4]:
root["Process"].image = co.Image(image="ncbi/blast",
                                 copy_dir=".",
                                 reqs_py=["conducto", "biopython"])

def process(target, genes, hits):
    from Bio.Blast.Applications import (
        NcbiblastnCommandline as blastn,
        NcbimakeblastdbCommandline as makedb,
    )
    makedb(dbtype="nucl", input_file=target, out="tmp")()
    blastn(query=genes, outfmt=5, db="tmp", out=hits)()
    
from pipeline import process  # hack

for name, _ in genomes:
    root["Process"][name] = co.Exec(process,
                                    f"{data}/{name}.fna", 
                                    f"{data}/S288C.fna",
                                    f"{data}/{name}.xml")

Finally, we'll add an interactive environment where we can explore the process output.  We give this node extra `cpu` and `mem` for easy exploration.

In [5]:
root["Analyze"] = co.Exec("analyze.ipynb", cpu=8, mem=32)
print(root.pretty())

[1;34m/[0m
├─0 [1;34mDownload[0m
│ ├─ [1;34ms_cerevisiae[0m
│ │ ├─0 [1;36mGet[0m   wget -O /conducto/data/pipeline/s_cerevisiae.fna.gz https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/146/045/GCF_000146045.2_R64/GCF_000146045.2_R64_genomic.fna.gz
│ │ └─1 [1;36mDecompress[0m   cd /conducto/data/pipeline && gunzip s_cerevisiae.fna.gz
│ ├─ [1;34mb_bruxellensis[0m
│ │ ├─0 [1;36mGet[0m   wget -O /conducto/data/pipeline/b_bruxellensis.fna.gz https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/011/074/885/GCA_011074885.2_ASM1107488v2/GCA_011074885.2_ASM1107488v2_genomic.fna.gz
│ │ └─1 [1;36mDecompress[0m   cd /conducto/data/pipeline && gunzip b_bruxellensis.fna.gz
│ ├─ [1;34mz_kombuchaensis[0m
│ │ ├─0 [1;36mGet[0m   wget -O /conducto/data/pipeline/z_kombuchaensis.fna.gz https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/003/705/955/GCA_003705955.1_ASM370595v1/GCA_003705955.1_ASM370595v1_genomic.fna.gz
│ │ └─1 [1;36mDecompress[0m   cd /conducto/data/pipeline && gunzip z_kombuchaensis.

When we launch this pipeline, we get a link to the Conducto web app where we can interact with it.

In [6]:
root._build()

Starting pipeline sbw-oox [Sun Feb 28 22:22:19 2021]
View at [1;4mhttps://test.conducto.io/app/p/sbw-oox[0m
