# Part 2: Training and Hosting your Algorithm in Amazon SageMaker

Once you have your container packaged, you can use it to train and serve models. Let's do that with the algorithm we made above.

## Set up the environment

Here we specify a bucket to use and the role that will be used for working with SageMaker.

In [178]:
# Define IAM role
import boto3
import re
import json

import os
import numpy as np
import pandas as pd
from sagemaker import get_execution_role
from datetime import datetime
import re

In [48]:
BUCKET_NAME = 'bgc-data'
PREFIX = 'sagemaker/lstm-example/'
DATA_PREFIX = os.path.join(PREFIX, 'data')
SAMPLES_PREFIX = os.path.join(DATA_PREFIX, 'samples')
VALIDATION_PREFIX = os.path.join(DATA_PREFIX, 'validation')
FILES_PREFIX = os.path.join(DATA_PREFIX, 'files')
MODEL_PREFIX = os.path.join(PREFIX, 'model')
ROLE = 'arn:aws:iam::487322236248:role/bgc-sagemaker-role'

## Create the session

The session remembers our connection parameters to SageMaker. We'll use it to perform all of our SageMaker operations.

In [171]:
import sagemaker as sage
from time import gmtime, strftime

sess = None
s3 = None

def reload_aws_session():
    global s3, sess
    sess = sage.Session()
    s3 = sess.boto_session.resource('s3')

reload_aws_session()

## Upload the data for training

When training large models with huge amounts of data, you'll typically use big data tools, like Amazon Athena, AWS Glue, or Amazon EMR, to create your data in S3. For the purposes of this example, we're using some the classic [Iris dataset](https://en.wikipedia.org/wiki/Iris_flower_data_set), which we have included. 

We can use use the tools provided by the SageMaker Python SDK to upload the data to a default bucket. 

In [59]:
#sess.upload_data('../data/training/positive/CF_bgcs.csv', key_prefix=SAMPLES_PREFIX, bucket=BUCKET_NAME)

In [60]:
#sess.upload_data('../data/training/negative/geneswap_negatives.csv', key_prefix=SAMPLES_PREFIX, bucket=BUCKET_NAME)

In [38]:
#sess.upload_data('../data/features/pfam2vec-experiments/pfam2vec_top.bin', key_prefix=FILES_PREFIX, bucket=BUCKET_NAME)

In [None]:
sess.upload_data('../data/evaluation/labelled-bootstrap/splits', key_prefix=VALIDATION_PREFIX, bucket=BUCKET_NAME)

In [35]:
PFAM2VEC_PATH = os.path.join(FILES_PREFIX, '/pfam2vec')
sess.upload_data('../data/features/pfam2vec-experiments/iterations/', key_prefix=PFAM2VEC_PATH, bucket=BUCKET_NAME)

## Submit jobs

In [54]:
def s3_copy(sourcekey, targetkey, files=None):
    print('Copying "{}" to "{}"'.format(sourcekey, targetkey))
    if not files:
        copy_source = {
            'Bucket': BUCKET_NAME,
            'Key': sourcekey
        }
        s3.meta.client.copy(copy_source, BUCKET_NAME, targetkey)
        return
    for file in files:
        copy_source = {
            'Bucket': BUCKET_NAME,
            'Key': os.path.join(sourcekey, file)
        }
        print(' Copying "{}"'.format(file))
        s3.meta.client.copy(copy_source, BUCKET_NAME, os.path.join(targetkey, file))

In order to use SageMaker to fit our algorithm, we'll create an `Estimator` that defines how to use the container to train. This includes the configuration we need to invoke SageMaker training:

* The __container name__. This is constructed as in the shell commands above.
* The __role__. As defined above.
* The __instance count__ which is the number of machines to use for training.
* The __instance type__ which is the type of machine to use for training.
* The __output path__ determines where the model artifact will be written.
* The __session__ is the SageMaker session object that we defined above.

Then we use fit() on the estimator to train against the data that we uploaded above.

In [205]:
def run_training(model_name, model_config, data_path, model_path, mode='cpu', wait=False):
    if mode == 'cpu':
        # Compute optimized 16 CPUs $1.114/hour: ml.c4.4xlarge
        # Compute optimized 36 CPUs $2.227/hour: ml.c4.8xlarge
        instance_type = 'ml.m5.large'#'ml.c4.4xlarge'
        image_name='bgc-model-cpu'
    elif mode == 'gpu':
        # GPU 1xV100 $4.284/hour: ml.p3.2xlarge
        instance_type = 'ml.p3.2xlarge'
        image_name='bgc-model-gpu'
    else:
        raise ValueError('Invalid mode')
    account = sess.boto_session.client('sts').get_caller_identity()['Account']
    region = sess.boto_session.region_name
    image = '{}.dkr.ecr.{}.amazonaws.com/{}:latest'.format(account, region, image_name)
    print('Image:', image)
    input_path = "s3://{}/{}".format(BUCKET_NAME, data_path)
    output_path = "s3://{}/{}".format(BUCKET_NAME, model_path)
    
    hyperparameters = {k: json.dumps(v) for k, v in model_config.items()}
    model = sage.estimator.Estimator(image,
                           ROLE, 1, instance_type, 
                           output_path=output_path,
                           sagemaker_session=sess, 
                           hyperparameters=hyperparameters)

    print('Fitting on data folder: {}'.format(input_path))
    job_name = '{}-{}'.format(model_name, datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))
    model.fit(input_path, wait=wait, job_name=job_name)

    return job_name

In [206]:
def copy_files_and_run_job(model_name, model_config, training_files, validation_files, pfam2vec_file, wait=False):
    job_prefix = os.path.join('sagemaker', 'bootstrap')
    job_data_path = os.path.join(job_prefix, 'data', model_name)
    job_model_path = os.path.join(job_prefix, 'models')

    objects_to_delete = s3.meta.client.list_objects(Bucket=BUCKET_NAME, Prefix=job_data_path)
    delete_keys = [{'Key' : k} for k in [obj['Key'] for obj in objects_to_delete.get('Contents', [])]]
    
    print('Deleting {} existing files: {}'.format(len(delete_keys), delete_keys))
    if delete_keys:
        s3.meta.client.delete_objects(Bucket="MyBucket", Delete={'Objects': delete_keys})

    job_samples_path = os.path.join(job_data_path, 'samples')
    s3_copy(SAMPLES_PREFIX, job_samples_path, files=training_files)
    
    if validation_files:
        job_validation_path = os.path.join(job_data_path, 'validation')
        s3_copy(VALIDATION_PREFIX, job_validation_path, files=validation_files)

    job_pfam2vec_path = os.path.join(job_data_path, 'files', 'pfam2vec.bin')
    pfam2vec_source = os.path.join(FILES_PREFIX, pfam2vec_file)
    s3_copy(pfam2vec_source, job_pfam2vec_path)

    job_name = run_training(model_name, model_config, data_path=job_data_path, model_path=job_model_path, mode='cpu', wait=wait)
    model_path = os.path.join(job_model_path, job_name)
    
    return {
        'submitted': datetime.now(),
        'model_path': model_path,
        'job_name': job_name
    }

In [207]:
test_config = {
  "type": "KerasRNN",
  "build_params": {
    "batch_size": 64,
    "hidden_size": 128,
    "stateful": True
  },
  "fit_params": {
    "timesteps": 256,
    "validation_size": 0,
    "num_epochs": 1,
    "gpus": 0,
    "verbose": 1,
    "learning_rate": 0.0001,
    "positive_weight": 1
  },
  "input_params": {
    "features": [
      {
        "type": "ProteinBorderTransformer"
      },
      {
        "type": "Pfam2VecTransformer",
        "vector_path": "/opt/ml/input/data/training/files/pfam2vec.bin"
      }
    ]
  }
}

pfam2vec_file=os.path.join('pfam2vec', 'pfam2vec_corpus-1e-02_skipgram_100dim_5win_8iter.bin')
validation_files = ['split_0_train.csv']

reload_aws_session()

test_job = copy_files_and_run_job(
    model_name='lstm-test', 
    model_config=test_config, 
    training_files=['CF_bgcs.csv', 'geneswap_negatives.csv'],
    validation_files=validation_files, 
    pfam2vec_file=pfam2vec_file,
    wait=True
)
test_job

Deleting 4 existing files: [{'Key': 'sagemaker/bootstrap/data/lstm-test/files/pfam2vec.bin'}, {'Key': 'sagemaker/bootstrap/data/lstm-test/samples/CF_bgcs.csv'}, {'Key': 'sagemaker/bootstrap/data/lstm-test/samples/geneswap_negatives.csv'}, {'Key': 'sagemaker/bootstrap/data/lstm-test/validation/split_0_train.csv'}]
Copying "sagemaker/lstm-example/data/samples" to "sagemaker/bootstrap/data/lstm-test/samples"
 Copying "CF_bgcs.csv"
 Copying "geneswap_negatives.csv"
Copying "sagemaker/lstm-example/data/validation" to "sagemaker/bootstrap/data/lstm-test/validation"
 Copying "split_0_train.csv"
Copying "sagemaker/lstm-example/data/files/pfam2vec/pfam2vec_corpus-1e-02_skipgram_100dim_5win_8iter.bin" to "sagemaker/bootstrap/data/lstm-test/files/pfam2vec.bin"


INFO:sagemaker:Creating training-job with name: lstm-test-2018-08-16-01-36-04


Image: 487322236248.dkr.ecr.us-east-1.amazonaws.com/bgc-model-cpu:latest
Fitting on data folder: s3://bgc-data/sagemaker/bootstrap/data/lstm-test
.............................................
[31mRunning with python sys.version_info(major=3, minor=6, micro=5, releaselevel='final', serial=0)[0m
[31mStarting the training.[0m
[31mLoaded config:[0m
[31m{'build_params': {'batch_size': 64, 'hidden_size': 128, 'stateful': True}, 'fit_params': {'timesteps': 256, 'validation_size': 0, 'num_epochs': 1, 'gpus': 0, 'verbose': 1, 'learning_rate': 0.0001, 'positive_weight': 1}, 'type': 'KerasRNN', 'input_params': {'features': [{'type': 'ProteinBorderTransformer'}, {'type': 'Pfam2VecTransformer', 'vector_path': '/opt/ml/input/data/training/files/pfam2vec.bin'}]}}[0m
[31mLoaded model:[0m
[31m{'build_params': {'batch_size': 64, 'hidden_size': 128, 'stateful': True},
 'fit_params': {'gpus': 0,
                'learning_rate': 0.0001,
                'num_epochs': 1,
                'positive_

ValueError: Error training lstm-test-2018-08-16-01-36-04: Failed Reason: ClientError: Please use an instance type with more memory, or reduce the size of training data processed on an instance.

## Configurations

In [110]:
from sklearn.model_selection import ParameterGrid

In [187]:
grid_params = ParameterGrid({
    'lstm' : [128, 256],
    'pfamdim' : [50, 100, 200, 300],
    'pfamiter' : [8, 32, 64],
    'posweight' : [1, 16.415],
    'split': [0, 1, 2, 3, 4],
    'training_files': [ ['CF_bgcs.csv', 'geneswap_negatives.csv'] ]
})
print('Total', len(grid_params))
grid_params = ParameterGrid({
    'lstm' : [128],
    'pfamdim' : [100],
    'pfamiter' : [8],
    'posweight' : [1],
    'split': [0, 1, 2, 3, 4],
    'training_files': [ ['CF_bgcs.csv', 'geneswap_negatives.csv'] ]
})
tasks = list(grid_params)
for t in tasks[:5]:
    print(t)
len(tasks)

Total 240
{'lstm': 128, 'pfamdim': 100, 'pfamiter': 8, 'posweight': 1, 'split': 0, 'training_files': ['CF_bgcs.csv', 'geneswap_negatives.csv']}
{'lstm': 128, 'pfamdim': 100, 'pfamiter': 8, 'posweight': 1, 'split': 1, 'training_files': ['CF_bgcs.csv', 'geneswap_negatives.csv']}
{'lstm': 128, 'pfamdim': 100, 'pfamiter': 8, 'posweight': 1, 'split': 2, 'training_files': ['CF_bgcs.csv', 'geneswap_negatives.csv']}
{'lstm': 128, 'pfamdim': 100, 'pfamiter': 8, 'posweight': 1, 'split': 3, 'training_files': ['CF_bgcs.csv', 'geneswap_negatives.csv']}
{'lstm': 128, 'pfamdim': 100, 'pfamiter': 8, 'posweight': 1, 'split': 4, 'training_files': ['CF_bgcs.csv', 'geneswap_negatives.csv']}


5

In [194]:
def format_task_arg(k, v):
    formatted = str(v)
    if isinstance(v, list):
        if v == ['CF_bgcs.csv', 'geneswap_negatives.csv']:
            formatted = 'bgc-blastn'
            k = ''
        elif v == ['CF_bgcs.csv', 'geneswap_negatives.first_neg.csv']:
            formatted = 'first_neg'
            k = ''
        else:
            raise ValueError('No shortcut for value {} = {}'.format(k, v))
        
    return re.sub('[^a-zA-Z0-9-]+', '-', formatted+k)

In [147]:
jobs = {}

In [172]:
reload_aws_session()

In [195]:
for task in tasks:
    model_name = '-'.join(format_task_arg(k, task[k]) for k in sorted(list(task)))
    print(model_name)
    model_config = {
      "type": "KerasRNN",
      "build_params": {
        "batch_size": 64,
        "hidden_size": task['lstm'],
        "stateful": True
      },
      "fit_params": {
        "timesteps": 256,
        "validation_size": 0,
        "num_epochs": 250,
        "early_stop_monitor": "val_auc_roc",
        "early_stop_min_delta": 0.0005,
        "early_stop_patience": 20,
        "early_stop_mode": "max",
        "gpus": 0,
        "learning_rate": 0.0001,
        "positive_weight": task['posweight']
      },
      "input_params": {
        "features": [
          {
            "type": "ProteinBorderTransformer"
          },
          {
            "type": "Pfam2VecTransformer",
            "vector_path": "/opt/ml/input/data/training/files/pfam2vec.bin"
          }
        ]
      }
    }
    
    pfam2vec_file=os.path.join('pfam2vec', 'pfam2vec_corpus-1e-02_skipgram_{}dim_5win_{}iter.bin'.format(task['pfamdim'], task['pfamiter']))
    validation_files = ['split_{}_train.csv'.format(task['split'])]
    jobs[model_name] = copy_files_and_run_job(
        model_name=model_name, 
        model_config=model_config, 
        training_files=training_files,
        validation_files=validation_files, 
        pfam2vec_file=pfam2vec_file
    )

print('Submitted {} jobs!'.format(len(jobs)))

128lstm-100pfamdim-8pfamiter-1posweight-0split-bgc-blastn
128lstm-100pfamdim-8pfamiter-1posweight-1split-bgc-blastn
128lstm-100pfamdim-8pfamiter-1posweight-2split-bgc-blastn
128lstm-100pfamdim-8pfamiter-1posweight-3split-bgc-blastn
128lstm-100pfamdim-8pfamiter-1posweight-4split-bgc-blastn
Submitted 2 jobs!


In [104]:
jobs['128lstm_100pfamdim_8pfamiter_1posweight']['job_name']

'bgc-model-cpu-2018-08-15-16-05-01-246'

Fitting on data folder: s3://bgc-data/sagemaker/job1/data
Result will be saved to: s3://bgc-data/sagemaker/job1/model/ 
