In [None]:
import os
from textwrap import dedent

os.makedirs('workflows/nf/sample', exist_ok=True)

nf = dedent('''
nextflow.enable.dsl = 2

params.greeting = 'hello'
params.addressee = null

if (!params.addressee) exit 1, "required parameter 'addressee' missing"

process Greet {
    publishDir '/mnt/workflow/pubdir'
    input:
        val greeting
        val addressee
    
    output:
        path "output", emit: output_file
    
    script:
        """
        echo "${greeting} ${addressee}" | tee output
        """
}

workflow {
    Greet(params.greeting, params.addressee)
}

''').strip()

with open('workflows/nf/sample/main.nf', 'wt') as f:
    f.write(nf)


In [1]:
import glob
import io
from zipfile import ZipFile, ZIP_DEFLATED

# everything in this folder will get bundled into a zip
workflow_root_dir = 'workflows/nf/sample/'

print("creating workflow zip bundle:")
buffer = io.BytesIO()
with ZipFile(buffer, mode='w', compression=ZIP_DEFLATED) as zf:
    for file in glob.iglob(os.path.join(workflow_root_dir, '**/*'), recursive=True):
        if os.path.isfile(file):
            arcname = file.replace(os.path.join(workflow_root_dir, ''), '')
            print(f".. adding: {file} -> {arcname}")
            zf.write(file, arcname=arcname)


creating workflow zip bundle:


NameError: name 'os' is not defined

In [None]:
response = omics.create_workflow(
    name="GreetingsNF",
    description="Greetings Nextflow workflow",
    definitionZip=buffer.getvalue(),  # this argument needs bytes
    main="main.nf",
    parameterTemplate={
        "greeting": {"description": "(string) greeting to use"},
        "addressee": {"description": "(string) who to greet"}
    },
)

workflow_greetings = response
response


In [None]:
print(f"waiting for workflow {workflow_greetings['id']} to become ACTIVE")
workflow_greetings = omics.get_workflow(id=workflow_greetings['id'])
while workflow_greetings['status'] in ('CREATING', 'UPDATING'):
    time.sleep(5)
    workflow_greetings = omics.get_workflow(id=workflow_greetings['id'])

if workflow_greetings['status'] == 'ACTIVE':
    print(f"workflow {workflow_greetings['id']} ready for use")
else:
    print(f"workflow {workflow_greetings['id']} {workflow_greetings['status']}")


In [None]:
definition_uri = "s3://aws-genomics-static-us-east-1/omics-workshop/gatkbestpractices.wdl.zip"


In [None]:
parameters = {
    "sample_name": {"description": "sample name"},
    "fastq_1": {"description": "path to fastq1"},
    "fastq_2": {"description": "path to fastq2"},
    "ref_fasta": {"description": "path to reference fasta"},
    "readgroup_name": {"description": "readgroup name"},
    "library_name": {"description": "library name"},
    "platform_name": {"description": "platform name, e.g. Illumina"},
    "run_date": {"description": "sequencing run date"},
    "sequencing_center": {"description": "name of sequencing center"},
    "dbSNP_vcf": {"description": "dbsnp vcf"},
    "Mills_1000G_indels_vcf": {"description": "Mills 1000 genomes gold indels vcf"},
    "known_indels_vcf": {"description": "known indels vcf"},
    "scattered_calling_intervals_archive": {"description": "tar gzip of scatter intervals"},
    "gatk_docker": {"description": "docker uri in private ECR of GATK"},
    "gotc_docker": {"description": "docker uri in private ECR of Genomes in the Cloud"}
}


In [None]:
response = omics.create_workflow(
    name="GATK",
    description="GATK best practices variant discovery",
    definitionUri=definition_uri,  
    main="main.wdl",
    parameterTemplate=parameters,
)

workflow_gatk = response

print(f"waiting for workflow {workflow_gatk['id']} to become ACTIVE")
workflow_gatk = omics.get_workflow(id=workflow_gatk['id'])
while workflow_gatk['status'] in ('CREATING', 'UPDATING'):
    time.sleep(5)
    workflow_gatk = omics.get_workflow(id=workflow_gatk['id'])

if workflow_gatk['status'] == 'ACTIVE':
    print(f"workflow {workflow_gatk['id']} ready for use")
else:
    print(f"workflow {workflow_gatk['id']} {workflow_gatk['status']}")


## Running Worlflows

In [None]:
response = omics.start_run(
    workflowId=workflow_greetings['id'],
    name="Greetings workflow run",
    roleArn=OMICS_JOB_ROLE_ARN,
    parameters={
        "greeting": "Hello", 
        "addressee": "Amazon"},
    outputUri=f's3://{OMICS_OUTPUT_BUCKET}/output/greetings',
)

run_greetings = response
response


In [None]:
run_greetings = omics.get_run(id=run_greetings['id'])
run_greetings


In [None]:

sample_name = 'NA12878_20K'
gatk_container_uri = f"{AWS_ACCOUNT_ID}.dkr.ecr.{AWS_REGION}.amazonaws.com/gatk:4.1.9.0"
gotc_container_uri = f"{AWS_ACCOUNT_ID}.dkr.ecr.{AWS_REGION}.amazonaws.com/genomes-in-the-cloud:2.4.7-1603303710"

response = omics.start_run(
    workflowId=workflow_gatk['id'],
    name=f"GATK variant discovery - {sample_name}",
    roleArn=OMICS_JOB_ROLE_ARN,
    parameters={
        "sample_name": sample_name,
        "fastq_1": f"s3://{OMICS_WORKSHOP_BUCKET}/data/fastq/GIAB_NIST_NA12878_HG001_HiSeq_300x__L002_R1_001.fastq.gz",
        "fastq_2": f"s3://{OMICS_WORKSHOP_BUCKET}/data/fastq/GIAB_NIST_NA12878_HG001_HiSeq_300x__L002_R2_001.fastq.gz",
        "ref_fasta": "s3://broad-references/hg38/v0/Homo_sapiens_assembly38.fasta",
        "readgroup_name": "NA12878",
        "library_name": "Solexa-NA12878",
        "platform_name": "Illumina",
        "run_date": "2016-09-01T02:00:00+0200",
        "sequencing_center": "ABCD",
        "ref_fasta": "s3://broad-references/hg38/v0/Homo_sapiens_assembly38.fasta",
        "dbSNP_vcf": "s3://broad-references/hg38/v0/Homo_sapiens_assembly38.dbsnp138.vcf",
        "Mills_1000G_indels_vcf": "s3://broad-references/hg38/v0/Mills_and_1000G_gold_standard.indels.hg38.vcf.gz",
        "known_indels_vcf": "s3://broad-references/hg38/v0/Homo_sapiens_assembly38.known_indels.vcf.gz",
        "scattered_calling_intervals_archive": f"s3://{OMICS_WORKSHOP_BUCKET}/intervals.tar.gz",
        "gatk_docker": gatk_container_uri,
        "gotc_docker": gotc_container_uri
    },
    outputUri=f's3://{OMICS_OUTPUT_BUCKET}/output/gatk',
)

run_gatk = response
response
