In [1]:
import awswrangler as wr
import os
import pandas as pd
from numpy import random

import logging
import boto3

## Pipeline parameters

In [15]:
pipeline_output_path = "s3://genomics-workflow-core/Results/GenomeMining"
project = "IMG"
prefix = "20220707"

seedfile_name = f"{prefix}_seedfile.csv"

input_s3_path = "s3://maf-versioned/GenomeMining/Genomes/IMG/"
file_extension = "tar.gz"

seedfile = os.path.join(pipeline_output_path, project, prefix, "00_seedfile", seedfile_name)


### Testing parameters

In [3]:
test_project = "00_TEST"
test_prefix = prefix
test_seedfile_name = f"test.{seedfile_name}"
test_seedfile = os.path.join(pipeline_output_path, test_project, test_prefix, test_seedfile_name)

In [4]:
file_paths = wr.s3.list_objects(input_s3_path, file_extension)

In [5]:
total_samples = len(file_paths)
total_samples

78357

In [6]:
random_idx = random.randint(0, total_samples, 3)
random_idx


array([  510, 48149,  2578])

In [7]:
test_seed_df = pd.DataFrame({
"genome_id" : [os.path.basename(file_paths[idx]).replace(".tar.gz","") for idx in random_idx],
"genome_path" : [file_paths[idx] for idx in random_idx]
}).drop_duplicates()
test_seed_df

Unnamed: 0,genome_id,genome_path
0,2508501034,s3://maf-versioned/GenomeMining/Genomes/IMG/25...
1,2784132081,s3://maf-versioned/GenomeMining/Genomes/IMG/27...
2,2522572164,s3://maf-versioned/GenomeMining/Genomes/IMG/25...


In [8]:
wr.s3.to_csv(df=test_seed_df, path=test_seedfile, index=False)

{'paths': ['s3://genomics-workflow-core/Results/GenomeMining/00_TEST/20220707/test.20220707_seedfile.csv'],
 'partitions_values': {}}

In [9]:
def submit_batch_job(
    project: str,
    prefix: str,
    seedfile: str = None,
    genome: str = None,
    branch: str = "main",
    job_queue: str = "priority-maf-pipelines",
    job_definition: str = "nextflow-production",
    job_name_suffix: str = None,
    aws_profile: str = None,
    dry_run: bool= False
) -> dict:
    """Submit a nf-genome-mining job to AWS Batch

    Args:
        project (_str_): name of the project
        prefix (_str_): name of the sample/batch
        seedfile (_str_): s3 path to seedfile (incompatible with `genome`). Defaults to "None".
        genome (_str_): s3 path to individual genome tarball (incompatible with `seedfile`). Defaults to "None".
        branch (_str_, optional): Branch of read-profiler to use. Defaults to "main".
        job_queue (_str_, optional): name of the queue for the head node. Defaults to "priority-maf-pipelines".
        job_definition (_str_, optional): nextflow job definition. Doesn't usually change. Defaults to "nextflow-production".
        job_name_suffix (_str_, optional): AWS Batch job name identifier. Defaults to `prefix`. Job name becomes nf-gm-`job_name_suffix`.
        aws_profile (_str_, optional): if a non-default aws profile should be used to submit jobs. Defaults to "None".
        dry_run (_bool_, optional): don't submit the job, just print what the submission command would look like. Defaults to "False".
    Returns:
        _dict_: a response object that contains details of the job submission from AWS
        (https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch.html#Batch.Client.submit_job)
    """
    
    assert not((seedfile is None) & (prefix is None)), "Both `seedfile` and `prefix` cannot be empty"
    
    if job_name_suffix is None:
        job_name_suffix = prefix

    ## Set AWS Profile
    if aws_profile is None:
        s = boto3.session.Session()
    else:
        s = boto3.session.Session(profile_name=aws_profile)

    batch = s.client("batch")

    # Set the pipeline flags for the analysis
    command = [
        "FischbachLab/nf-genome-mining",
        "-r",
        branch,
        "--project",
        project,
        "--prefix",
        prefix,
    ]

    ## Set Single/Pair Ended
    if seedfile is not None:
        command += ["--seedfile", seedfile]
    elif genome is not None:
        command += ["--genome", genome]

    if dry_run:
        logging.info(f"command: \'{' '.join(command)}\'")
        return

    # Submit job
    response = batch.submit_job(
        jobName=f"nf-gm-{job_name_suffix}",
        jobQueue=job_queue,
        jobDefinition=job_definition,
        containerOverrides={"command": command},
    )
    return response

### Test run

In [17]:
test_job_name_suffix = f"{test_prefix}-7"
response = submit_batch_job(project=test_project, prefix=test_prefix, seedfile=test_seedfile, job_name_suffix=test_job_name_suffix)
response

{'ResponseMetadata': {'RequestId': 'a800b34f-317f-411b-8fb0-77fcb4e405c2',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Thu, 07 Jul 2022 23:21:37 GMT',
   'content-type': 'application/json',
   'content-length': '166',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'a800b34f-317f-411b-8fb0-77fcb4e405c2',
   'access-control-allow-origin': '*',
   'x-amz-apigw-id': 'U62UTGI3PHcFtUw=',
   'access-control-expose-headers': 'X-amzn-errortype,X-amzn-requestid,X-amzn-errormessage,X-amzn-trace-id,X-amz-apigw-id,date',
   'x-amzn-trace-id': 'Root=1-62c76a81-694c84ad7e94c6832fab33c7'},
  'RetryAttempts': 0},
 'jobArn': 'arn:aws:batch:us-west-2:458432034220:job/8e7a5373-82f6-4a52-aeb0-eb2c779f5e8b',
 'jobName': 'nf-gm-20220707-7',
 'jobId': '8e7a5373-82f6-4a52-aeb0-eb2c779f5e8b'}

## Actual run

In [16]:
seedfile_df = pd.DataFrame({
    "genome_id" : [os.path.basename(filename).replace(".tar.gz","") for filename in file_paths],
    "genome_path" : file_paths
}).drop_duplicates()

wr.s3.to_csv(df=seedfile_df, path=seedfile, index=False)

{'paths': ['s3://genomics-workflow-core/Results/GenomeMining/IMG/20220707/00_seedfile/20220707_seedfile.csv'],
 'partitions_values': {}}

In [18]:
response = submit_batch_job(project=project, prefix=prefix, seedfile=seedfile)
response

{'ResponseMetadata': {'RequestId': '05757453-cf61-49f3-8482-8c1d4c678363',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Thu, 07 Jul 2022 23:44:45 GMT',
   'content-type': 'application/json',
   'content-length': '164',
   'connection': 'keep-alive',
   'x-amzn-requestid': '05757453-cf61-49f3-8482-8c1d4c678363',
   'access-control-allow-origin': '*',
   'x-amz-apigw-id': 'U65tEG1cvHcFvIA=',
   'access-control-expose-headers': 'X-amzn-errortype,X-amzn-requestid,X-amzn-errormessage,X-amzn-trace-id,X-amz-apigw-id,date',
   'x-amzn-trace-id': 'Root=1-62c76fed-4169a82b58455dab3fbba7fb'},
  'RetryAttempts': 0},
 'jobArn': 'arn:aws:batch:us-west-2:458432034220:job/3a58fc19-7ab5-4d2c-81a1-0f98820c3bdd',
 'jobName': 'nf-gm-20220707',
 'jobId': '3a58fc19-7ab5-4d2c-81a1-0f98820c3bdd'}