This notebook is based on the preprocessing already locally performed. Training and Validation sets of images and LST files have been developed and placed in the correct structure, on S3. 

At this point, we can now simply perform the multiclass image classification training.

# Import

## Libraries

In [8]:
import time as t
import os
import json
import sagemaker
import boto3
import pprint
import posixpath
from sagemaker.amazon.amazon_estimator import get_image_uri

# Set-Up

## Establish AWS Parameters
This step establishes parameters used for AWS connections and other global inputs.

In [9]:
sagemaker_algorithm_name = 'image-classification'
bucket = "dsba-6190-final-team-project"
prefix_1 = "channels"
prefix_file_type = "rec"
region = "us-east-1"
label = "image-classification-drivers"

### Sessions and Clients
With these inputs we'll establish the necessary sessions and clients to operate this notebook.

In [10]:
# Boto3 Session
boto_sess = boto3.session.Session(region_name=region)

# Sagemaker Session
sess_sage = sagemaker.session.Session(boto_session=boto_sess)

# IAM client
iam_client = boto_sess.client('iam')

# Sagemaker Client
sm_client = boto_sess.client('sagemaker')

# App Autoscale Client
scale_client = boto_sess.client("application-autoscaling")

### Role Definition
I will define the role for the actions in this notebook. If working outside the AWS environment, make sure the AWS credentials are properly located. Instead of hardcoding the role ARN into the notebook, we will use the boto3 tools to extract the role ARN based on a known role name.

In [11]:
# Create List of All Roles associated with AWS Credentials
role_list = iam_client.list_roles()['Roles']

# Define Key/Value for role name 
key, value = 'RoleName', 'dsba_6190_team_project'

#Initialize Role ARN variable
role_arn= ''

for item in role_list:
    if key in item and value == item[key]:
        role_arn = item['Arn']

# Verify Role ARN
print(role_arn)

arn:aws:iam::726963482731:role/dsba_6190_team_project


We'll also define the ARN for the Autoscaling Role we will use later.

In [12]:
# Define Key/Value for role name 
key, value = 'RoleName', 'AWSServiceRoleForApplicationAutoScaling_SageMakerEndpoint'

#Initialize Role ARN variable
role_arn_auto_sc= ''

for item in role_list:
    if key in item and value == item[key]:
        role_arn_auto_sc = item['Arn']

# Verify Role ARN
print(role_arn_auto_sc)

arn:aws:iam::726963482731:role/aws-service-role/sagemaker.application-autoscaling.amazonaws.com/AWSServiceRoleForApplicationAutoScaling_SageMakerEndpoint


## Import Sagemaker Model
This step imports the latest version of the Amazon Sagemaker Image Classification model.

In [13]:
training_image = get_image_uri(region_name = sess_sage.boto_region_name, 
                               repo_name = sagemaker_algorithm_name, 
                               repo_version ="latest")
print (training_image)

811284229777.dkr.ecr.us-east-1.amazonaws.com/image-classification:latest


# Model Training
Two different data sets have been uploaded to S3. One is the complete dataset. The other is a 10% sample of the dataset. The 10% sample is for troubleshooting training and deployment of the Sagemaker Image Classification algorithm.

There are only two differences between training the model with the sample or complete dataset:

* __Input Location__: We need to point the algorithm to different S3 locations. We will do this with the **prefix_dataset** variable, which will be defined at the beginning of each dataset's notebook section.
* __Number of Training Samples__: The number of training samples will be different for the complete and the sample. Thes values are available in the Jupyter Notebook used to split the data and upload to S3.

We will define the number of **training** samples for each dataset below. 

**Note**: *Currently this is a manual process. Future iterations of this process will automate this calculation.*

In [14]:
num_training_samples_complete = 15686
num_training_samples_10 = 1567

## Define Dataset
This section defines the parameters of the dataset. By setting the split prefix and dataset prefix, it will direct the algorithm to the correct training and validation inputs. 

There are two varables which require definition:

1. **Dataset**: The dataset is either the complete dataset, or it is the 10% sample dataset. The 10% sample was created for troubleshooting purposes. Final production will use the complete dataset.
2. **Train/Validation Split Method**: Two different methods were developed to split the training data into a training and validation set. See the image processing notebook for more detail.
 * im2rec: This method was a random split, using the **im2rec.py** tool
 * split_drivers: This method divided the drivers into a training and validation set. Then, all the images associated with each driver are put into image training and validation sets. Using this method, all of the images associated with a driver are in either the training or validation set. No driver appears in both sets.

In [15]:
# Define Lists and Dictionary
list_dataset = ["complete", "sample",]
list_split_method = ["split_random", "split_driver"]

training_sample_dict = {
    "sample-split_random" : num_training_samples_10,
    "sample-split_driver": num_training_samples_10, 
    "complete-split_random": num_training_samples_complete,
    "complete-split_driver": num_training_samples_complete    
}

# Define Data Inputs
prefix_dataset = list_dataset[0] #0 = complete / 1 = sample
prefix_split_type = list_split_method[0]  #0 = split_random / 1 = split_drivers

# Extract Number of Training Samples
key_training_sample = prefix_dataset + "-" +prefix_split_type
num_training_samples = training_sample_dict[key_training_sample]

print("The following are the inputs for the model:")
print("Split Method:\t\t\t{}".format(prefix_split_type))
print("Dataset:\t\t\t{}".format(prefix_dataset))
print("# of Training Samples:\t\t{}".format(num_training_samples))

The following are the inputs for the model:
Split Method:			split_random
Dataset:			sample
# of Training Samples:		1567


## Model Inputs

### Model Output Location

In [16]:
s3_output_location = 's3://{}/output'.format(bucket)
print(s3_output_location)

s3://dsba-6190-final-team-project/output


### Model Input Location

First we establish the data input channels. As we are using RecordIO data format, only two channels are required.

In [17]:
s3train = 's3://{}/{}/{}/{}/train/'.format(bucket, prefix_1, prefix_split_type, prefix_dataset)
s3validation = 's3://{}/{}/{}/{}/validation/'.format(bucket, prefix_1, prefix_split_type, prefix_dataset)

print("The input data is pulled from the following S3 locations:")
print("Training:\t{}".format(s3train))
print("Validation:\t{}".format(s3validation))

The input data is pulled from the following S3 locations:
Training:	s3://dsba-6190-final-team-project/channels/split_random/sample/train/
Validation:	s3://dsba-6190-final-team-project/channels/split_random/sample/validation/


Then we define the channels as inputs into the image classification model.

In [18]:
train_data = sagemaker.session.s3_input(s3train, 
                                        distribution='FullyReplicated', 
                                        content_type='application/x-recordio', 
                                        s3_data_type='S3Prefix')

validation_data = sagemaker.session.s3_input(s3validation, 
                                             distribution='FullyReplicated', 
                                             content_type='application/x-recordio', 
                                             s3_data_type='S3Prefix')

data_channels = {'train': train_data, 
                 'validation': validation_data}

## Train Model

## Set Up Instance Types

In [19]:
# Available Instances
available_instances =['ml.p2.xlarge',              ### $1.26/hr
                      'ml.p3.2xlarge'              ### 4.284 /hr
                     ]

# Initialize Instance
train_instance_type = available_instances[1]

# Print Check
print("This training session used the following instance: {}".format(train_instance_type))

This training session used the following instance: ml.p3.2xlarge


### Initialize
#### Parameters
The following steps define the algoritm parameters and hyperparameters.

In [20]:
dist_drive_ic = sagemaker.estimator.Estimator(training_image,
                                              role_arn, 
                                              train_instance_count=1, 
                                              train_instance_type=train_instance_type,
                                              train_volume_size = 50,
                                              train_max_run = 360000,
                                              input_mode= 'File',
                                              output_path=s3_output_location,
                                              sagemaker_session=sess_sage)

#### Hyper-Parameters

In [21]:
dist_drive_ic.set_hyperparameters(num_layers = 18,
                                  use_pretrained_model = 1,
                                  image_shape = "3,210,280", #RGB Pictures, 210 x 280
                                  num_classes = 10,
                                  mini_batch_size = 128,
                                  epochs = 5,
                                  learning_rate = 0.01,
                                  num_training_samples = num_training_samples,
                                  precision_dtype = 'float16')

### Run Model
With the data inputs defined, parameters and hyperparameters initialized, we can run the model.

In [22]:
dist_drive_ic.fit(inputs = data_channels, logs = True)

2020-04-23 19:19:45 Starting - Starting the training job...
2020-04-23 19:19:46 Starting - Launching requested ML instances.........
2020-04-23 19:21:22 Starting - Preparing the instances for training......
2020-04-23 19:22:40 Downloading - Downloading input data
2020-04-23 19:22:40 Training - Downloading the training image.....[34mDocker entrypoint called with argument(s): train[0m
[34m[04/23/2020 19:23:39 INFO 139816725485376] Reading default configuration from /opt/amazon/lib/python2.7/site-packages/image_classification/default-input.json: {u'beta_1': 0.9, u'gamma': 0.9, u'beta_2': 0.999, u'optimizer': u'sgd', u'use_pretrained_model': 0, u'eps': 1e-08, u'epochs': 30, u'lr_scheduler_factor': 0.1, u'num_layers': 152, u'image_shape': u'3,224,224', u'precision_dtype': u'float32', u'mini_batch_size': 32, u'weight_decay': 0.0001, u'learning_rate': 0.1, u'momentum': 0}[0m
[34m[04/23/2020 19:23:39 INFO 139816725485376] Merging with provided configuration from /opt/ml/input/config/hyper

### Wait
We need to insert a waiting stage in the process flow to ensure the model training is complete before downstream processes start. This should happen automatically, but we will add this step for caution.

In [23]:
# Initialize Watier
waiter = sm_client.get_waiter('training_job_completed_or_stopped')

# Extract Training Job Name
training_job_name = dist_drive_ic._current_job_name

# Define Waiter Parameters
waiter_params = {
    'TrainingJobName' : training_job_name,
    'WaiterConfig'  : {
        'Delay' : 123,
        'MaxAttemps' : 123
    }
}

# Execute Waiter
waiter.wait(**waiter_params)

# Verify Status
response = sm_client.describe_training_job(
    TrainingJobName = training_job_name
)

print("Training Job Name:\t{}".format(response['TrainingJobName']))
print("Training Job Status:\t{}".format(response['TrainingJobStatus']))

Training Job Name:	image-classification-2020-04-23-19-19-43-941
Training Job Status:	Completed


## Model

### Establish Parameters

In [24]:
# Training Job
training_job_name = dist_drive_ic._current_job_name
print("Training Job Name: {}".format(training_job_name))
print()

# Extract Training Job Information
info = sm_client.describe_training_job(TrainingJobName=training_job_name)
print("Training Job Information:")
#print(info)
print()

# Define S3 Location for Model Artifacts
model_s3_loc = info['ModelArtifacts']['S3ModelArtifacts']
print("Model S3 Location: {}".format(model_s3_loc))

# Define Primary Container
primary_container = {
    'Image': training_image,
    'ModelDataUrl': model_s3_loc,
}

Training Job Name: image-classification-2020-04-23-19-19-43-941

Training Job Information:

Model S3 Location: s3://dsba-6190-final-team-project/output/image-classification-2020-04-23-19-19-43-941/output/model.tar.gz


### Create Model

In [25]:
timestamp = t.strftime('-%Y-%m-%d-%H-%M-%S', t.gmtime())
model_name = label + "-model" + timestamp

try:
    
    create_model_response = sm_client.create_model(
        ModelName = model_name,
        ExecutionRoleArn = role_arn,
        PrimaryContainer = primary_container)
    print("Initial creation of model")

except: 
    print("Model already created.")

print()
print("Model Name: {}".format(model_name))
print("Model ARN: {}".format(create_model_response['ModelArn']))

Initial creation of model

Model Name: image-classification-drivers-model-2020-04-23-19-24-33
Model ARN: arn:aws:sagemaker:us-east-1:726963482731:model/image-classification-drivers-model-2020-04-23-19-24-33


## Endpoint Configuration

### Establish Parameters

In [26]:
timestamp = t.strftime('-%Y-%m-%d-%H-%M-%S', t.gmtime())
endpoint_config_name = label + '-epc' + timestamp
variant_name = "AllTraffic"
print('Endpoint Configuration name:\t{}'.format(endpoint_config_name))

Endpoint Configuration name:	image-classification-drivers-epc-2020-04-23-19-24-34


### Create Endpoint Configuration

In [27]:
try:
    endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName = endpoint_config_name,
        ProductionVariants=[
            {
            'InstanceType':'ml.m4.xlarge',
            'InitialInstanceCount':1,
            'ModelName':model_name,
            'VariantName': variant_name,
            'InitialVariantWeight':1
            }
        ])
    print("Initial creation of endpoint configuration.")

except:
    print('Endpoint configuration already created')

print()
print('Endpoint configuration name:\t{}'.format(endpoint_config_name))
print('Endpoint configuration arn:\t{}'.format(endpoint_config_response['EndpointConfigArn']))

Initial creation of endpoint configuration.

Endpoint configuration name:	image-classification-drivers-epc-2020-04-23-19-24-34
Endpoint configuration arn:	arn:aws:sagemaker:us-east-1:726963482731:endpoint-config/image-classification-drivers-epc-2020-04-23-19-24-34


## Endpoint

### Establish Parameters

In [28]:
endpoint_name = label + '-endpoint'
print('Endpoint name: {}'.format(endpoint_name))

endpoint_params = {
    'EndpointName': endpoint_name,
    'EndpointConfigName': endpoint_config_name,
}

Endpoint name: image-classification-drivers-endpoint


### Create / Update Endpoint

When updating an endpoint, we cannot update if the endpoint configuration does not change. This should not be a problem when executed in the final proces flow, but for working on this process, we want to avoid this error. So, an **if** is added to the **try** portion of the code block to verify endpoint configuration changes.

In [33]:
try:
    repsonse = sm_client.describe_endpoint(EndpointName = endpoint_name)
    if response['EndpointConfigName'] == endpoint_config_name:
        print('Unable to Update. No change to Endpoint Configuration.')
    else:
        endpoint_response = sm_client.update_endpoint(**endpoint_params)
        print('Endpoint updated.')
        print()
except:
    endpoint_response = sm_client.create_endpoint(**endpoint_params)
    print("Initial creation of endpoint.")
    print()

print('EndpointArn:\t{}'.format(endpoint_response['EndpointArn']))

Endpoint updated.

EndpointArn:	arn:aws:sagemaker:us-east-1:726963482731:endpoint/image-classification-drivers-endpoint


### Wait
We need to insert a waiting stage in the process flow to ensure the endpoint is **InService** before we can apply the autoscaling policy. If this stage is not here, we may get an error when we try to apply the autoscaling policy to an endpoint that is still being constructed.

In [34]:
# Initialize Watier
waiter = sm_client.get_waiter('endpoint_in_service')

# Define Waiter Parameters
waiter_params = {
    'EndpointName' : endpoint_name,
    'WaiterConfig'  : {
        'Delay' : 123,
        'MaxAttemps' : 123
    }
}

# Execute Waiter
waiter.wait(**waiter_params)

# Verify Status
response = sm_client.describe_endpoint(
    EndpointName = endpoint_name
)

print('Endpoint Name:\t\t{}'.format(response['EndpointName']))
print('Endpoint Status:\t{}'.format(response['EndpointStatus']))

Endpoint Name:		image-classification-drivers-endpoint
Endpoint Status:	InService


# Auto Scaling

## Set Global Parameters

In [None]:
service_name_space = 'sagemaker'
resource_id = posixpath.join("endpoint", endpoint_name,"variant", variant_name)
scalable_dimension = 'sagemaker:variant:DesiredInstanceCount'
resource_id

## Register Scalable Target - Sagemaker Endpoint

### Define Parameters

In [None]:
scalable_target_params = {
    'ServiceNamespace' : service_name_space,
    'ResourceId' : resource_id,
    'ScalableDimension' : scalable_dimension,
    'MinCapacity' : 1,
    'MaxCapacity' : 4,
    'RoleARN' : role_arn_auto_sc
}

json_form = json.dumps(scalable_target_params, indent=3)
print(json_form)

### Apply To Object

In [None]:
response = scale_client.register_scalable_target(**scalable_target_params)
pprint.pprint(response)

## Put Scaling Policy

### Define Parameters

In [None]:
scaling_policy_params = {
    'PolicyName' : 'image-classification-driver-endpoint-scaling',
    'ServiceNamespace' : service_name_space,
    'ResourceId' : resource_id,
    'ScalableDimension' : scalable_dimension,
    'PolicyType' :  'TargetTrackingScaling',
    'TargetTrackingScalingPolicyConfiguration' : {
        'TargetValue' : 3e4,
        'PredefinedMetricSpecification' : {
            'PredefinedMetricType' : 'SageMakerVariantInvocationsPerInstance'
        },
        'ScaleInCooldown': 120,
        'ScaleOutCooldown': 300
    }
}

scale_policy_json_form = json.dumps(scaling_policy_params, indent=3)
print(scale_policy_json_form)

### Apply To Object

In [None]:
response = scale_client.put_scaling_policy(**scaling_policy_params)
pprint.pprint(response)

## Verify Scaling Policy Was Attached

In [None]:
scalable_policy_search_params = {
    'ServiceNamespace' : service_name_space,
}

response = scale_client.describe_scaling_policies(**scalable_policy_search_params)
pprint.pprint(response)