<html><head>


<!-- Load require.js. Delete this if your page already loads require.js -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" crossorigin="anonymous"></script>
<script src="https://unpkg.com/@jupyter-widgets/html-manager@*/dist/embed-amd.js" crossorigin="anonymous"></script>
<script type="application/vnd.jupyter.widget-state+json">
{
    "version_major": 2,
    "version_minor": 0,
    "state": {}
}
</script>
</head>
<body>


</body>
</html>


# Parallel Analysis - Setup and Run

This notebook sets up and runs a parallel analysis on AWS.

You can learn more at [RGLab/scamp](https://github.com/RGLab/scamp)

The steps are:

* Create the code
* Configure AWS connection
* Configure the data you want to process
* Upload the data to AWS (to S3)
* Configure the processing (code to run, CPU and memory and storage to use)
* Kick off the processing


### Step A - Create the Code

1. Import the necessary libraries
2. Create the functions we'll use

In [2]:
from __future__ import print_function
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets

import boto3
import os.path
import sys
import datetime

In [3]:
# from https://stackoverflow.com/questions/1392413/calculating-a-directorys-size-using-python 
def get_folder_size(local_directory):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(local_directory):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            total_size += os.path.getsize(fp)
    return total_size / 1000000.0

In [4]:
def get_cpu_needed (analysis_size_mb):
    cpu_count = 4
    if analysis_size_mb > 100:
        cpu_count = 8
    elif analysis_size_mb > 500:
        cpu_count = 16
    elif analysis_size_mb > 1000:
        cpu_count = 32
    return cpu_count

In [5]:
def get_memory_needed (analysis_size_mb, cpu_count):
    return 2.0 * cpu_count / 4.0

In [18]:
def get_directories_and_sizes(parent_directory):
    dir_sizes = {}
    for analysis_dir in os.listdir(parent_directory):
        analysis_dir_path = os.path.join(parent_directory, analysis_dir)
        if os.path.isdir(analysis_dir_path):
            dir_sizes[analysis_dir_path] = get_folder_size(analysis_dir_path)
    return dir_sizes

In [7]:
# based on https://gist.github.com/feelinc/d1f541af4f31d09a2ec3
def upload_analysis(input_dir, s3_bucket, s3_prefix, client):
    for root, dirs, files in os.walk(input_dir):
        for filename in files:
            # construct the full local path
            local_path = os.path.join(root, filename)

            # construct the full S3 path
            relative_path = os.path.relpath(local_path, input_dir)
            s3_path = os.path.join(s3_prefix, relative_path)

            # relative_path = os.path.relpath(os.path.join(root, filename))

            print ('Searching "%s" in "%s"' % (s3_path, bucket))
            try:
                client.head_object(Bucket=s3_bucket, Key=s3_path)
                print ("Path found on S3! Skipping %s..." % s3_path)

                # try:
                    # client.delete_object(Bucket=bucket, Key=s3_path)
                # except:
                    # print "Unable to delete %s..." % s3_path
            except:
                print ("Uploading %s..." % s3_path)
                client.meta.client.upload_file(local_path, bucket, s3_path)

### Step B - Configure AWS connection

1. Configure AWS account
2. Configure AWS credentials (e.g. access key and secret key)

In [8]:
profile_widget = widgets.Text(
    value='',
    placeholder='default / sandbox',
    description='AWS Profile:',
    disabled=False
)
display(profile_widget)

Text(value='', description='AWS Profile:', placeholder='default / sandbox')

In [11]:
profile_name = profile_widget.value
print("Using '{}' profile".format(profile_name))

Using 'hackathon' profile


In [None]:
cortex_session = boto3.session.Session(profile_name='hackathon')

### Step C - Configure the data you want to process

1. Set the input directory
2. See the list of analysis, confirm that's corrext


In [13]:
input_dir_widget = widgets.Text(
    value='',
    placeholder='/Users/dnambi/hackathon/input',
    description='Directory:',
    disabled=False
)
display(input_dir_widget)

Text(value='', description='Directory:', placeholder='/Users/dnambi/hackathon/input')

In [16]:
parent_directory = input_dir_widget.value
print("The directory to load for all analysis is: {}".format(parent_directory))

The directory to load for all analysis is: /Users/dnambi/Downloads/ExampleAnalysis


In [20]:
analysis_dict = get_directories_and_sizes(parent_directory)
print("The analyses to run are {}".format(list(analysis_dict.keys())))

### Step D - Upload the data to AWS (to S3)

In [24]:
s3_bucket_widget = widgets.Text(
    value='fh-hdc-cytometry-hackathon',
    placeholder='',
    description='S3 bucket:',
    disabled=False
)
display(s3_bucket_widget)

Text(value='fh-hdc-cytometry-hackathon', description='S3 bucket:', placeholder='')

In [20]:
s3_client = cortex_session.resource('s3')
local_dir = '/Users/dnambi/Downloads/LZLabs'
bucket = s3_bucket_widget.value
prefix = 'dnambi/test/LZLabs'
upload_analysis(input_dir=local_dir, s3_bucket=bucket, s3_prefix=prefix, client=s3_client)

### Step E - Configure the processing

1. Set the code location (GitHub repo)
2. Set the startup command
3. Configure CPU and memory for each analysis
4. Configure storage for each analysis

#### Steps E1 and E2 - set GitHub repo and analysis command

In [23]:
github_widget = widgets.Text(
    value='',
    placeholder='https://github.com/RGLab/scamp',
    description='S3 bucket:',
    disabled=False
)
display(github_widget)

Text(value='', description='S3 bucket:', placeholder='https://github.com/RGLab/scamp')

In [22]:
run_command_widget = widgets.Textarea(
    value='',
    placeholder='Rscript parallel.r --dir /data/input',
    description='R command to run',
    disabled=False
)
display(run_command_widget)

Textarea(value='', description='R command to run', placeholder='Rscript parallel.r --dir /data/input')

In [None]:
github_repo = github_widget.value
startup_command = run_command_widget.value



print("{}".format())
print("{}".format())

#### Step E3 - Configure CPU and memory for each analysis

In [None]:
# get info for analysis
analysis_info = {}
for analysis_dir in os.listdir(parent_directory):
    analysis_dir_path = os.path.join(parent_directory, analysis_dir)
    if os.path.isdir(analysis_dir_path):
        folder_size_mb = get_folder_size(analysis_dir_path)
        cpu_count_needed = get_cpu_needed(folder_size_mb)
        mem_gb_needed = get_memory_needed(folder_size_mb, cpu_count_needed)
        print ("Analysis {} is {} MB and needs {} CPU and {} GB of RAM".format(analysis_dir_path,folder_size_mb, cpu_count_needed, mem_gb_needed))
        analysis_params = {"size_mb": folder_size_mb, "cpu_count": cpu_count_needed, "mem_gb": mem_gb_needed}
        analysis_info[analysis_dir_path] = analysis_params
print (analysis_info)

In [None]:
analysis_params = {} 
# key is the analysis folder name
# value is a dict of input size, CPU info, memory info, storage info

#### Step E4 - Configure storage for each analysis

In [None]:
# N/A, data sets are small enough it doesn't matter

### Step F - Kick off the processing

1. Name the batch
   * Add the ability to email somene when it's done?
2. Kick off the analysis (start the batch)
3. Confirm it has started correctly

#### Step F1 - Name the batch

In [42]:
cortex = boto3.session.Session(profile_name='hackathon')
batch_client = cortex.client('batch', region_name='us-west-2')

In [45]:
def create_batch_job(r_script_s3_url
                     , s3_input_dir
                     , s3_output_dir
                     , cpu_count
                     , mem_mb
                     , job_name
                     , job_queue
                     , job_definition
                     , client
                     , batch_file_s3_url = 's3://fh-hdc-cytometry-hackathon/startup.sh'):
    response = client.submit_job(
        jobName = job_name
        ,jobQueue = job_queue
        ,jobDefinition=job_definition
        ,containerOverrides={
            'vcpus': cpu_count,
            'memory': mem_mb,
            'command': [
                'startup.sh',
            ],
            "environment": [ 
             { 
                "name": "BATCH_FILE_TYPE",
                "value": "script"
             },
             { 
                "name": "BATCH_FILE_S3_URL",
                "value": batch_file_s3_url
             },
             { 
                "name": "R_SCRIPT_S3_URL",
                "value": r_script_s3_url
             },
             { 
                "name": "S3_INPUT_DIR",
                "value": s3_input_dir
             },
             { 
                "name": "S3_OUTPUT_DIR",
                "value": s3_output_dir
             }
            ]
        }
        )
    return response

In [48]:
job_queue = 'dnambi-hackathon-queue'
job_definition = 'fetch_and_run_1:5'
job_name = 'dnambi-boto3-test-job-2'
r_script_s3_url = 's3://fh-hdc-cytometry-hackathon/r-code/analysis.r'
s3_input_dir = 's3://fh-hdc-cytometry-hackathon/input'
s3_output_dir = 's3://fh-hdc-cytometry-hackathon/output'
cpu_count = 2
mem_mb = 512


response = create_batch_job (r_script_s3_url = r_script_s3_url
                     , s3_input_dir = s3_input_dir
                     , s3_output_dir = s3_output_dir
                     , cpu_count = cpu_count
                     , mem_mb = mem_mb
                     , job_name = job_name
                     , job_queue = job_queue
                     , job_definition = job_definition
                     , client = batch_client)

In [49]:
print (response)

{'ResponseMetadata': {'RequestId': '9ed24ef6-fa3a-11e8-a8ec-ab5a643eca6f', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Fri, 07 Dec 2018 16:10:31 GMT', 'content-type': 'application/json', 'content-length': '84', 'connection': 'keep-alive', 'x-amzn-requestid': '9ed24ef6-fa3a-11e8-a8ec-ab5a643eca6f', 'x-amz-apigw-id': 'Ri06qHNyvHcFS3g=', 'x-amzn-trace-id': 'Root=1-5c0a9b77-c3a675ca30505e0e7b94929f;Sampled=0'}, 'RetryAttempts': 0}, 'jobName': 'dnambi-boto3-test-job-2', 'jobId': 'bdf51e4e-f621-46be-8025-c0336e471845'}


In [25]:
batch_name = 'dnambi-test-analysis'

Batch ID is 2018-12-01-dnambi-test-analysis


In [None]:
widgets.DatePicker(
    description='Pick a Date',
    disabled=False
)

In [None]:
batch_start_date = datetime.datetime.now().date()
print ("Batch ID is {}-{}".format(batch_start_date, batch_name))

#### Step F2 - Kick off the analysis processing

#### Step F3 - Confirm the processing has started