# Running R2R (Ready to Run) workflows

In this tutorial, you will learn how to run any of the existing READY2RUN Omics Workflows.

## Prerequisites
### Python requirements
* Python >= 3.8
* Packages:
  * boto3 >= 1.26.19
  * botocore >= 1.29.19

### AWS requirements

#### AWS CLI
You will need the AWS CLI installed and configured in your environment. Supported AWS CLI versions are:

* AWS CLI v2 >= 2.9.3 (Recommended)
* AWS CLI v1 >= 1.27.19

#### Output buckets
You will need a bucket **in the same region** you are running this tutorial in to store workflow outputs.

## Policy setup
This notebook runs under the role that was created or selected during notebook creation.<br>
By executing the following code snippet we can crosscheck the role name.

In [171]:
boto3.client('sts').get_caller_identity()['Arn']

'arn:aws:sts::869795389844:assumed-role/AmazonSageMaker-ExecutionRole-20230514T131742/SageMaker'

We need to enrich this role with policy permissions, so that actions executed in upcoming statements do not fail.<br>
Here is a sample policy that can to be added to the role. It must be noted that this is a sample policy, for the needs of this workshop.<br>
In a production environment, actual policy must be much more restrictive.

In [172]:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "iam:GetPolicy",
                "iam:CreatePolicy",
                "iam:DeletePolicy",
                "iam:ListPolicyVersions",
                "iam:ListEntitiesForPolicy",
                "iam:CreateRole",
                "iam:DeleteRole",
                "iam:DeletePolicyVersion",
                "iam:AttachRolePolicy",
                "iam:DetachRolePolicy",
                "iam:ListAttachedRolePolicies",
                "omics:*"
            ],
            "Resource": "*"
        }
    ]
}

{'Version': '2012-10-17',
 'Statement': [{'Sid': 'VisualEditor0',
   'Effect': 'Allow',
   'Action': ['iam:GetPolicy',
    'iam:CreatePolicy',
    'iam:DeletePolicy',
    'iam:ListPolicyVersions',
    'iam:ListEntitiesForPolicy',
    'iam:CreateRole',
    'iam:DeleteRole',
    'iam:DeletePolicyVersion',
    'iam:AttachRolePolicy',
    'iam:DetachRolePolicy',
    'iam:ListAttachedRolePolicies',
    'omics:*'],
   'Resource': '*'}]}

## Environment setup

Reset environment, in case you are re-running this tutorial.<br> Load helper functions from helper notebook.

In [173]:
%reset -f
%run 200-omics_helper_functions.ipynb

Import libraries

In [174]:
import json
from datetime import datetime
import glob
import io
import os
from pprint import pprint
from textwrap import dedent
from time import sleep
from urllib.parse import urlparse
from zipfile import ZipFile, ZIP_DEFLATED

import boto3
import botocore.exceptions

## Create a service IAM role
To use Amazon Omics, you need to create an IAM role that grants the service permissions to access resources in your account. We'll do this below using the IAM client.

> **Note**: this step is fully automated from the Omics Workflows Console when you create a run

In [175]:
omics_role_name = 'omics-r2r-tutorial-service-role'
omics_role_trust_policy =  {
        "Version": "2012-10-17",
        "Statement": [{
            "Principal": {
                "Service": "omics.amazonaws.com"
            },
            "Effect": "Allow",
            "Action": "sts:AssumeRole"
        }]
    }

# delete role (if it exists) and create a new one
omics_role = recreate_role(omics_role_name, omics_role_trust_policy)


After creating the role, we next need to add policies to grant permissions. In this case, we are allowing read/write access to all S3 buckets in the account. This is fine for this tutorial, but in a real world setting you will want to scope this down to only the necessary resources. We are also adding a permissions to create CloudWatch Logs which is where any outputs sent to `STDOUT` or `STDERR` are collected.

In [176]:
s3_policy_name = f"omics-r2r-tutorial-s3-access-policy"
s3_policy_permissions = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:Get*",
                    "s3:List*",
                ],
                "Resource": [
                    "arn:aws:s3:::*/*"
                ]
            }
        ]
    }

AWS_ACCOUNT_ID = boto3.client('sts').get_caller_identity()['Account']

logs_policy_name = f"omics-r2r-tutorial-logs-access-policy"
logs_policy_permissions = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "logs:CreateLogGroup"
                ],
                "Resource": [
                    f"arn:aws:logs:*:{AWS_ACCOUNT_ID}:log-group:/aws/omics/WorkflowLog:*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "logs:DescribeLogStreams",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents",
                ],
                "Resource": [
                    f"arn:aws:logs:*:{AWS_ACCOUNT_ID}:log-group:/aws/omics/WorkflowLog:log-stream:*"
                ]
            }
        ]
    }

s3_policy = recreate_policy(s3_policy_name, s3_policy_permissions)
logs_policy = recreate_policy(logs_policy_name, logs_policy_permissions)

# attach policies to role
iam_client = boto3.client("iam")
iam_client.attach_role_policy(RoleName=omics_role['Role']['RoleName'], PolicyArn=s3_policy['Policy']['Arn'])
iam_client.attach_role_policy(RoleName=omics_role['Role']['RoleName'], PolicyArn=logs_policy['Policy']['Arn'])

{'ResponseMetadata': {'RequestId': 'ac30fa45-af92-4fd2-8619-43cc2bd4b02a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'ac30fa45-af92-4fd2-8619-43cc2bd4b02a',
   'content-type': 'text/xml',
   'content-length': '212',
   'date': 'Wed, 28 Jun 2023 08:52:43 GMT'},
  'RetryAttempts': 0}}

## Getting the list of READY2RUN workflows
Using the omics client we can get the full list of READY2RUN workflows.
Here, we print the id, name properties of workflows, in order to have a quick view.

In [177]:
omics = boto3.client('omics')
r2r_workflows = omics.list_workflows(type="READY2RUN")
r2r_workflows_items = r2r_workflows['items']

for r2r_workflow_item in r2r_workflows_items:
  print(r2r_workflow_item['id'], '\t', r2r_workflow_item['name'])

6094971 	 AlphaFold for 601-1200 residues
3768383 	 GATK-BP fq2bam
7330987 	 NVIDIA Parabricks Germline DeepVariant WGS for up to 30X
5562080 	 GATK-BP Somatic WES bam2vcf
2174942 	 scRNAseq with STARsolo
9500764 	 GATK-BP Germline fq2vcf for 30x genome
4523502 	 NVIDIA Parabricks BAM2FQ2BAM WGS for up to 5X
2009847 	 Sentieon Germline BAM WES for up to 300x
4484039 	 Sentieon Germline FASTQ WES for up to 100x
3021525 	 NVIDIA Parabricks Germline HaplotypeCaller WGS for up to 30X
4885129 	 AlphaFold for up to 600 residues
6914655 	 Sentieon LongRead for PacBio HiFi
1305211 	 Sentieon Somatic WES
5221318 	 NVIDIA Parabricks BAM2FQ2BAM WGS for up to 30X
2647398 	 NVIDIA Parabricks FQ2BAM WGS for up to 5X
3585800 	 NVIDIA Parabricks Germline DeepVariant WGS for up to 50X
8211545 	 NVIDIA Parabricks FQ2BAM WGS for up to 50X
8422905 	 Sentieon LongRead for ONT
2374431 	 Bases2Fastq for 2x150
4974161 	 NVIDIA Parabricks FQ2BAM WGS for up to 30X
7866315 	 scRNAseq with Salmon Alevin-fry
41373

We want to showcase the execution of a READY2RUN workflow.
We select the (6094971, AlphaFold for 601-1200 residues) workflow for demo purposes.

In [185]:
workflow = [r2r_workflow_item for r2r_workflow_item in r2r_workflows_items if r2r_workflow_item["id"] == "6094971" ][0]
pretty_print(workflow)

{
  "arn": "arn:aws:omics:eu-west-1::workflow/6094971",
  "creationTime": "2023-05-15 00:00:00+00:00",
  "id": "6094971",
  "name": "AlphaFold for 601-1200 residues",
  "status": "ACTIVE",
  "type": "READY2RUN"
}


We get the full details of the specific workflow, in order to examine it's list of parameters.

In [186]:
workflow_details = omics.get_workflow(id=workflow['id'], type="READY2RUN")

We get the full set of parameters per workflow, by examining its parameters template

In [187]:
pretty_print(workflow_details['parameterTemplate'])

{
  "fasta_path": {
    "description": "The S3 or Omics Storage URI of the input file in FASTA format. Only single-chain inputs are supported at this time."
  }
}


The specific workflow has one only parameter, the description of which is shown in the  output.<br>
We can now run the workflow, as any other workflow through the Amazon Omics.

Get the current region, in which this notebook is operating.

In [None]:

input_uri = "s3://aws-genomics-static-us-east-1/omics-tutorials/data/workflows/r2r/6094971/"

## NOTE: replace these S3 URIs with ones you have access to
output_uri = "s3://ktzouvan-omics-ireland/results"

run = omics.start_run(
    workflowId=workflow['id'],
    workflowType='READY2RUN',
    name="Sample workflow run",
    roleArn=role['Role']['Arn'],
    parameters={
        "input_file": input_uri
    },
    outputUri=output_uri,
)

print(f"running workflow {workflow['id']}, starting run {run['id']}")
try:
    waiter = omics.get_waiter('run_running')
    waiter.wait(id=run['id'], WaiterConfig={'Delay': 30, 'MaxAttempts': 60})

    print(f"run {run['id']} is running")

    waiter = omics.get_waiter('run_completed')
    waiter.wait(id=run['id'], WaiterConfig={'Delay': 60, 'MaxAttempts': 60})

    print(f"run {run['id']} completed")
except botocore.exceptions.WaiterError as e:
    print(e)

Once the run completes we can verify its status by either listing it:

In [None]:
[_ for _ in omics.list_runs()['items'] if _['id'] == run['id']]

or getting its full details:

In [None]:
omics.get_run(id=run['id'])

We can verify that the correct output was generated by listing the `outputUri` for the workflow run:

In [None]:
s3uri = urlparse(omics.get_run(id=run['id'])['outputUri'])
boto3.client('s3').list_objects_v2(
    Bucket=s3uri.netloc, 
    Prefix='/'.join([s3uri.path[1:], run['id']])
)['Contents']

Workflows typically have multiple tasks. We can list workflow tasks with:

In [None]:
tasks = omics.list_run_tasks(id=run['id'])
pprint(tasks['items'])

and get specific task details with:

In [None]:
task = omics.get_run_task(id=run['id'], taskId=tasks['items'][0]['taskId'])
pprint(task)

After running the cell above we should see that each task has an associated CloudWatch Logs LogStream. These capture any text generated by the workflow task that has been sent to either `STDOUT` or `STDERR`. These outputs are helpful for debugging any task failures and can be retrieved with:

In [None]:
events = boto3.client('logs').get_log_events(
    logGroupName="/aws/omics/WorkflowLog",
    logStreamName=f"run/{run['id']}/task/{task['taskId']}"
)
for event in events['events']:
    print(event['message'])

In [None]:
run_group = omics.create_run_group(
    name="TestRunGroup",
    maxCpus=100,
    maxDuration=600,
)

omics.get_run_group(id=run_group['id'])

One of the ways you can use a RunGroup is to run multiple iterations of a workflow - each with different input values. Below we'll define a simple Nextflow workflow that takes a simple string parameter that we can easily modify for multiple iterations.

In [None]:
os.makedirs('workflows/nf/sample', exist_ok=True)

nf = dedent('''
nextflow.enable.dsl = 2

params.greeting = 'hello'
params.addressee = null

if (!params.addressee) exit 1, "required parameter 'addressee' missing"

process Greet {
    publishDir '/mnt/workflow/pubdir'
    input:
        val greeting
        val addressee
    
    output:
        path "output", emit: output_file
    
    script:
        """
        echo "${greeting} ${addressee}" | tee output
        """
}

workflow {
    Greet(params.greeting, params.addressee)
}

''').strip()

with open('workflows/nf/sample/main.nf', 'wt') as f:
    f.write(nf)

We'll use the `create_function` function we defined above to create an Omics Workflow from this definition:

In [None]:
workflow = create_workflow(
    'workflows/nf/sample',
    parameters={
        "greeting": {
            "description": "(string) greeting to use",
            "optional": True
        },
        "addressee": {
            "description": "(string) who to greet"
        }
    },
    name="GreetingsNF",
    description="Greetings Nextflow workflow",
    main="main.nf"
)
pprint(workflow)

We can now run our this workflow with our run group. We'll start several runs of the workflow concurrently, each with different inputs to distinguish them, to see how the run group works:

In [None]:
rg_runs = []
run_inputs = [
    {"greeting": "Hello", "addressee": "Amazon"},
    {"greeting": "Bonjour", "addressee": "Omics"},
    {"greeting": "Hola", "addressee": "Workflows"},
]

for run_num, run_input in enumerate(run_inputs):
    run = omics.start_run(
        workflowId=workflow['id'],
        name=f"{workflow['name']} - {run_num} :: {run_input}",
        roleArn=role['Role']['Arn'],
        parameters=run_input,
        outputUri=output_uri,
        
        runGroupId=run_group['id'],  # <-- here is where we specify the run group
    )
    
    print(f"({run_num}) workflow {workflow['id']}, run group {run_group['id']}, run {run['id']}, input {run_input}")
    rg_runs += [run]

We can now list all the runs in the RunGroup and should see all of them transition from `PENDING` to `STARTING` at once.

(run the following cell multiple times)

In [None]:
[(_['id'], _['status']) for _ in omics.list_runs(runGroupId=run_group['id'])['items']]