In [10]:
import boto3
import re
import os
import json
import time
import base64
from botocore.exceptions import ClientError

In [2]:
batch_client = boto3.client('batch')
job_image = 'phoenixajalogan/kmer-ska-batch'
ec2_client = boto3.client("ec2")

# setup keypair for instance access
ec2KeyPair = "phoenixlogan"

# set up aws resource names
launch_template = "phoenixlogan-template-ska"
compute_environment = "phoenixlogan-compute"
job_queue = "phoenixlogan-queue"
job_definition = "phoenixlogan-ska-def"

# setup compute environment parameters 
root_volume_size = 2000
instance_types = ["optimal"]

# setup batch job parameters
job_vcpus = 15
job_memory = 40000

In [3]:
try:
    print("Creating launch template...")
    with open("logs/launch_template.json", "w") as f:
        json.dump(
            ec2_client.create_launch_template(
                LaunchTemplateName=launch_template,
                LaunchTemplateData={
                    "BlockDeviceMappings": [
                        {
                            "DeviceName": "/dev/xvda",
                            "Ebs": {
                                "DeleteOnTermination": True,
                                "VolumeSize": root_volume_size,
                                "VolumeType": "gp2"
                            }
                        }
                    ],
                }
            ),
            f, default=str, indent=4
        )
    print("Finished creating launch template.")
except ClientError as e:
    print(e.response["Error"]["Message"])
    pass

Creating launch template...
Launch template name already in use.


In [4]:
try:
    print("Creating compute environment...")
    with open("logs/compute_environment.json", "w") as f:
        compute_resources = {
            'type': 'EC2',
            'minvCpus': 0,
            'maxvCpus': 256,
            'instanceTypes': instance_types,
            'subnets': [
                # subnets for us-west-2a, us-west-2b, us-west-2c\n",
                "subnet-672e832e",
                "subnet-04119a63",
                "subnet-4347451b",
            ],
            'securityGroupIds': [
                'sg-3195a049',
            ],
            "ec2KeyPair": ec2KeyPair,
            'instanceRole': 'ecsInstanceRole2',
            'bidPercentage': 100,
            'spotIamFleetRole': 'arn:aws:iam::423543210473:role/aws-ec2-spot-fleet-role',
            'launchTemplate': {
                'launchTemplateName': launch_template
            }
        }
        json.dump(
            batch_client.create_compute_environment(
                computeEnvironmentName=compute_environment,
                type='MANAGED',
                state='ENABLED',
                computeResources=compute_resources,
                serviceRole='arn:aws:iam::423543210473:role/AWSBatchServiceRole'
            ),
            f, default=str, indent=4
        )
    print("Finished creating compute environment.")
    
except ClientError as e:
    print(e.response["Error"]["Message"])
    pass

Creating compute environment...
Object already exists


In [16]:
try:
    print("Creating job queue...")
    n_tries = 5
    sleep_time = 30
    for i in range(n_tries):
        desc, = batch_client.describe_compute_environments(
            computeEnvironments=[compute_environment]
        )["computeEnvironments"]
        if desc['status'] != 'VALID':
            print("Waiting for compute environment...",
                  f"(Try {i+1}/{n_tries})")
            time.sleep(sleep_time)
        else:
            break
    with open("logs/job_queue.json", "w") as f:
        json.dump(
            batch_client.create_job_queue(
                jobQueueName=job_queue,
                state='ENABLED',
                priority=5,
                computeEnvironmentOrder=[
                    {
                        'order': 5,
                        'computeEnvironment': compute_environment
                    },
                ]
            ),
            f, default=str, indent=4
        )
    print("Finished creating job queue.")

except ClientError as e:
    print(e.response["Error"]["Message"])
    pass

In [6]:
print("Creating job definition...")
with open("logs/job_definition.json", "w") as f:
    json.dump(
        batch_client.register_job_definition(
            jobDefinitionName=job_definition,
            type='container',
            containerProperties={
                "image": job_image,
                "vcpus": job_vcpus,
                "memory": job_memory,
                "command": [
                    "create_sketch.py",
                    "Ref::bucket_name",
                    "Ref::key1",
                    "Ref::key2",
                    "Ref::size",
                    "Ref::ksize",
                    "Ref::outbucket",
                ],
                "volumes": [
                    {"host": {"sourcePath": "/scratch"},
                     "name": "scratch"},
                ],
                "mountPoints": [
                    {"containerPath": "/scratch",
                     "sourceVolume": "scratch"},
                ],
                "jobRoleArn": "arn:aws:iam::423543210473:role/simpleBatchJob",
                "privileged": True
            }
        ),
        f, default=str, indent=4
    )
print("Finished creating job definition.")

Creating job definition...
Finished creating job definition.


In [7]:
def files_list(keys, bucket_name):
    """
    Args:
        keys (lst): file prefixes to pass .

    Returns:
        list : all sample names to pass to batch
    
    """
    
    # download fastq files from S3
    s3_resource = boto3.resource("s3")
    #seq_bucket_name = "czbiohub-mosquito"
    seq_bucket_name = bucket_name
    first_reads = re.compile(r".+(R1).+")
    
    sample_names = []
    
    for key in keys:
        print(key)
        seq_bucket = s3_resource.Bucket(seq_bucket_name)
        for obj in seq_bucket.objects.filter(Prefix=key):
            matched = first_reads.match(os.path.basename(obj.key))
            if matched:
                sample_names.append(matched.group(0))
                
    return sample_names[0:75]

In [14]:

# load compute and queue arns to map to job submission
with open("logs/job_definition.json") as f:
    jobDefinition = json.load(f)["jobDefinitionArn"]

with open("logs/job_queue.json") as f:
#     jobQueue = json.load(f)["jobQueueArn"]
    jobQueue = "arn:aws:batch:us-west-2:423543210473:job-queue/phoenixlogan-queue"

seq_buckets = {
    "sequences/CMS001_fastq.gz": files_list(["sequences/CMS001_fastq.gz"], "czbiohub-mosquito"), 
    "sequences/CMS002_fastq.gz": files_list(["sequences/CMS002_fastq.gz"], "czbiohub-mosquito")
}

all_file_paths = []

for k in seq_buckets.keys():
    val = seq_buckets[k]
    for v in val:
        path = os.path.join(k, v)
#         print(f"PATH: {path}")
        all_file_paths.append(path)
all_file_paths

sequences/CMS001_fastq.gz
sequences/CMS002_fastq.gz


In [13]:
# s3 = boto3.resource("s3")
# maca_bucket = s3.Bucket(name='czb-maca')
# maca_files = [fp.key for fp in maca_bucket.objects.filter(Prefix='Plate_seq/3_month/170907_A00111_0051_BH2HWLDMXX')]
# maca_files[0:50]
# seq_buckets = {
#     "Plate_seq/3_month/170907_A00111_0051_BH2HWLDMXX/fastqs": files_list(["Plate_seq/3_month/170907_A00111_0051_BH2HWLDMXX/fastqs"], "czb-maca"), 
# }
# seq_buckets

In [27]:
import pandas as pd

path_locations = pd.read_csv("metadata_formatted.csv")
path_locations.head()
fps = [x.replace("s3://czb-seqbot/", "") for x in list(path_locations["read1"])]
ids = list(path_locations["id"])
ids_and_fps = zip(ids, fps)


In [28]:
# submit batch jobs 

seq_bucket_name = "czb-seqbot"

for i, fp in ids_and_fps:
    sample = i
    size = "10000000_seqbot"
    identifier = f"{sample}_{size}"
    print(identifier)
    response = batch_client.submit_job(
        jobName=identifier,
        jobQueue=job_queue,
        jobDefinition=job_definition,
        parameters={
            "bucket_name": seq_bucket_name,
            "key1": fp,
            "key2": fp.replace("R1", "R2"),
            "size": size,
            "ksize": "15",
            "outbucket": "phoenixlogan-sketches",
            "sketch_size": '100000000'
        }
    )
    prefix = sample.replace(".fastq.gz", "")
    with open(f"logs/jobs/{prefix}.json", "w") as f:
        json.dump(response, f, indent=4, default=str)

CMS_001_RNA_A_S1_10000000_seqbot
CMS_002_RNA_A_S1_10000000_seqbot
CMS_003_RNA_A_S2_10000000_seqbot
CMS_004_RNA_A_S2_10000000_seqbot
CMS_005_RNA_A_S3_10000000_seqbot
CMS_006_RNA_A_S5_10000000_seqbot
CMS_007_RNA_A_S12_10000000_seqbot
CMS_008_RNA_A_S3_10000000_seqbot
CMS_009_RNA_A_S13_10000000_seqbot
CMS_010_RNA_A_S1_10000000_seqbot
CMS_011_RNA_A_S4_10000000_seqbot
CMS_012_RNA_A_S4_10000000_seqbot
CMS_013_RNA_A_S5_10000000_seqbot
CMS_014_RNA_A_S5_10000000_seqbot
CMS_015_RNA_A_S13_10000000_seqbot
CMS_016_RNA_A_S6_10000000_seqbot
CMS_017_RNA_A_S6_10000000_seqbot
CMS_018_RNA_A_S14_10000000_seqbot
CMS_019_RNA_A_S14_10000000_seqbot
CMS_020_RNA_A_S15_10000000_seqbot
CMS_021_RNA_A_S16_10000000_seqbot
CMS_022_RNA_A_S6_10000000_seqbot
CMS_023_RNA_A_S17_10000000_seqbot
CMS_024_RNA_A_S15_10000000_seqbot
CMS_025_RNA_A_S7_10000000_seqbot
CMS_026_RNA_A_S18_10000000_seqbot
CMS_027_RNA_A_S16_10000000_seqbot
CMS_028_RNA_A_S17_10000000_seqbot
CMS_029_RNA_A_S18_10000000_seqbot
CMS_030_RNA_A_S7_10000000_seqb

In [25]:
# submit batch jobs 

seq_bucket_name = "czbiohub-mosquito"

for fp in all_file_paths:
    sample = os.path.basename(fp).replace('.fastq.gz', '')
    size = "10000000_csv"
    identifier = f"{sample}_{size}"
    print(identifier)
    response = batch_client.submit_job(
        jobName=identifier,
        jobQueue=job_queue,
        jobDefinition=job_definition,
        parameters={
            "bucket_name": seq_bucket_name,
            "key1": fp,
            "key2": fp.replace("R1", "R2"),
            "size": size,
            "ksize": "15",
            "outbucket": "phoenixlogan-sketches",
            "sketch_size": '100000000'
        }
    )
    prefix = sample.replace(".fastq.gz", "")
    with open(f"logs/jobs/{prefix}.json", "w") as f:
        json.dump(response, f, indent=4, default=str)

CMS_001_RNA_A_S1_R1_001_10000000
CMS_002_RNA_A_S1_R1_001_10000000
CMS_003_RNA_A_S2_R1_001_10000000
CMS_004_RNA_A_S2_R1_001_10000000
CMS_005_RNA_A_S3_R1_001_10000000
CMS_006_RNA_A_S5_R1_001_10000000
CMS_007_RNA_A_S12_R1_001_10000000
CMS_008_RNA_A_S3_R1_001_10000000
CMS_009_RNA_A_S13_R1_001_10000000
CMS_010_RNA_A_S1_R1_001_10000000
CMS_011_RNA_A_S4_R1_001_10000000
CMS_012_RNA_A_S4_R1_001_10000000
CMS_013_RNA_A_S5_R1_001_10000000
CMS_014_RNA_A_S5_R1_001_10000000
CMS_015_RNA_A_S13_R1_001_10000000
CMS_016_RNA_A_S6_R1_001_10000000
CMS_017_RNA_A_S6_R1_001_10000000
CMS_018_RNA_A_S14_R1_001_10000000
CMS_019_RNA_A_S14_R1_001_10000000
CMS_020_RNA_A_S15_R1_001_10000000
CMS_021_RNA_A_S16_R1_001_10000000
CMS_022_RNA_A_S6_R1_001_10000000
CMS_023_RNA_A_S17_R1_001_10000000
CMS_024_RNA_A_S15_R1_001_10000000
CMS_025_RNA_A_S7_R1_001_10000000
CMS_026_RNA_A_S18_R1_001_10000000
CMS_027_RNA_A_S16_R1_001_10000000
CMS_028_RNA_A_S17_R1_001_10000000
CMS_029_RNA_A_S18_R1_001_10000000
CMS_030_RNA_A_S7_R1_001_100000

In [17]:
all_file_paths

['sequences/CMS001_fastq.gz/CMS_001_RNA_A_S1_R1_001.fastq.gz',
 'sequences/CMS001_fastq.gz/CMS_002_RNA_A_S1_R1_001.fastq.gz',
 'sequences/CMS001_fastq.gz/CMS_003_RNA_A_S2_R1_001.fastq.gz',
 'sequences/CMS001_fastq.gz/CMS_004_RNA_A_S2_R1_001.fastq.gz',
 'sequences/CMS001_fastq.gz/CMS_005_RNA_A_S3_R1_001.fastq.gz',
 'sequences/CMS001_fastq.gz/CMS_006_RNA_A_S5_R1_001.fastq.gz',
 'sequences/CMS001_fastq.gz/CMS_007_RNA_A_S12_R1_001.fastq.gz',
 'sequences/CMS001_fastq.gz/CMS_008_RNA_A_S3_R1_001.fastq.gz',
 'sequences/CMS001_fastq.gz/CMS_009_RNA_A_S13_R1_001.fastq.gz',
 'sequences/CMS001_fastq.gz/CMS_010_RNA_A_S1_R1_001.fastq.gz',
 'sequences/CMS001_fastq.gz/CMS_011_RNA_A_S4_R1_001.fastq.gz',
 'sequences/CMS001_fastq.gz/CMS_012_RNA_A_S4_R1_001.fastq.gz',
 'sequences/CMS001_fastq.gz/CMS_013_RNA_A_S5_R1_001.fastq.gz',
 'sequences/CMS001_fastq.gz/CMS_014_RNA_A_S5_R1_001.fastq.gz',
 'sequences/CMS001_fastq.gz/CMS_015_RNA_A_S13_R1_001.fastq.gz',
 'sequences/CMS001_fastq.gz/CMS_016_RNA_A_S6_R1_001.

In [21]:
fp = all_file_paths[0]


In [29]:
len(all_file_paths)

140

In [24]:
os.path.basename(fp).replace('.fastq.gz', '')

'CMS_001_RNA_A_S1_R1_001'

In [30]:
all_file_paths[0]

'sequences/CMS001_fastq.gz/CMS_001_RNA_A_S1_R1_001.fastq.gz'