# Train Multiple Models in Parallel Using SageMaker Training


## Learning Objectives
1. Kick off multiple SageMaker Training jobs in parallel for scalable parallel training.
2. Register SageMaker build models in [the SageMaker Model Registry](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry.html).
3. Use [SageMaker Experiments](https://aws.amazon.com/blogs/aws/amazon-sagemaker-experiments-organize-track-and-compare-your-machine-learning-trainings/) to organize and track the muliple models.

## Introduction

This notebook will demonstrate how SageMaker Training jobs can be used in a scalable fashion to kick off parallel training jobs. This has the advantage that jobs can be run in parallel without concerns about RAM or similair constraints. 

The underlying dataset used is a processed version of retrieved from here: https://healthdata.gov/dataset/medicare-hospital-spending-claim where we seek to determine average hospital Medicare spending based on certain features.

In this workshop, we will deploy different models partitioned by state; where a seperate model is built based on what state in the USA the hospital is located it. We will register the models in the SageMaker Model Registry.

In [None]:
%pip install sagemaker==2.74.0 sagemaker-experiments

First we will load needed libraries and create a location where models and data will be located

In [None]:
import boto3
import json
import logging
import matplotlib.pyplot as plt
import os
import pandas
from random import random
import s3fs
import sagemaker
from sagemaker import get_execution_role
from smexperiments.experiment import Experiment
from sagemaker.s3 import S3Downloader
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.sklearn.model import SKLearnModel
from sagemaker.s3 import S3Uploader, S3Downloader
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent
from smexperiments.tracker import Tracker
import sklearn
import sklearn.metrics as metrics
import time

logger = logging.getLogger("log")
# set logs if not done already
if not logger.handlers:
    logger.setLevel(logging.INFO)

In [None]:
role = get_execution_role()
sagemaker_session = sagemaker.Session()
BUCKET = sagemaker_session.default_bucket()
PREFIX = "multiple_models_" + str(random()).replace(".", "")
print(f"The S3 location is {BUCKET}/{PREFIX}")
WORKING_DIR = os.getcwd()
# logger.info(f"The bucket is {BUCKET}")
# logger.info(f"The prefix is is {PREFIX}")

In [None]:
# Create folder for data
os.makedirs(os.path.join(WORKING_DIR, "data"), exist_ok=True)

In [None]:
S3Downloader.download(
    s3_uri="s3://aws-hcls-ml/workshop/immersion_day_workshop_data_DO_NOT_DELETE/data/processed_Medicare_Hospital_Spending_by_Claim.csv",
    local_path="data",
    sagemaker_session=sagemaker_session,
)

In [None]:
df_1 = pandas.read_csv("data/processed_Medicare_Hospital_Spending_by_Claim.csv")
df_1 = df_1.rename(columns={"Avg_Hosp": "truth"})
df_1 = df_1.drop(columns="Facility ID")
df_1.head()

Create a new SageMaker experiment to track our models

In [None]:
create_date = time.strftime("%Y-%m-%d-%H-%M-%S")
experiment_name = f"medicare-hospital-spending-{create_date}"
medicare_spend_experiment = Experiment.create(
    experiment_name=experiment_name,
    description="Predict Medicare hospital spending",
    tags=[{"Key": "Creator", "Value": "Alejandro Rosalez"}],
)

Specify a training script

In [None]:
# Create folder for training script
os.makedirs(os.path.join(WORKING_DIR, "scripts"), exist_ok=True)

In [None]:
%%writefile scripts/train.py

import argparse
import joblib
import json
import logging
import numpy as np
import os
import pandas as pd
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPRegressor
from smexperiments.tracker import Tracker

logger = logging.getLogger("log")
#set logs if not done already
if not logger.handlers:
    logger.setLevel(logging.INFO)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()

    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument("--output-data-dir", type=str, default=os.environ.get("SM_OUTPUT_DATA_DIR"))
    parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    args = parser.parse_args()

    # Take the set of files and read them all into a single pandas dataframe
    df=pd.read_csv(os.path.join(args.train, "train_data.csv"), engine="python")
    train_data, test_data = train_test_split(df, test_size=0.2)

    # labels are in the first column
    train_y = train_data["truth"]
    train_X = train_data[train_data.columns[1:len(train_data)]]
    test_y = test_data["truth"]
    test_X = test_data[test_data.columns[1:len(test_data)]]    
    
    # Now use scikit-learn's MLP MLPRegressor to train the model.
    regr = MLPRegressor(random_state=1, max_iter=500).fit(train_X, train_y)
    print(regr.loss_curve_)
    
    # Evaluate model performance on test data
    print("Evaluating model")
    # Initialize the experiment tracker
    try:
        my_tracker = Tracker.load()
    except ValueError:
        my_tracker = Tracker.create()
               
        
    # Evaluate training loss
    for epoch, value in enumerate(regr.loss_curve_):
        my_tracker.log_metric(metric_name="train:loss", value=value, iteration_number=epoch)
    
    # Evaluate test performance
    test_predictions = regr.predict(test_X)
    rmse = mean_squared_error(test_y, test_predictions)
    my_tracker.log_metric(metric_name="test:rmse", value=rmse)  
    print(f"RMSE: {rmse:.2f}")        
    my_tracker.close()    

    # Save the coefficients of the trained regression model
    joblib.dump(regr, os.path.join(args.model_dir, "model.joblib"))

def model_fn(model_dir):
    """Deserialized and return fitted model

    Note that this should have the same name as the serialized model in the main method
    """
    regr = joblib.load(os.path.join(model_dir, "model.joblib"))
    return regr

def predict_fn(input_data, model):
    """return the class and the probability of the class"""
    #logger.info(type(input_data))
    #logger.info(input_data)
    if np.array(input_data).ndim==1: #if only one dimension, reshape
        input_data=np.array(input_data).reshape(1,-1)
    prediction = model.predict(input_data)
    #pred_prob = model.predict_proba(input_data) #a numpy array
    return np.array(prediction)

In [None]:
!echo "sagemaker-experiments\njoblib" > scripts/requirements.txt

In [None]:
# ### Uncomment to test training script locally
# df_1.to_csv("data/train_data.csv", index=False)
# os.makedirs(os.path.join(WORKING_DIR, "output"), exist_ok=True)
# !python scripts/train.py --output-data-dir "output" \
#     --model-dir "output" \
#     --train "data/"

# Perform Training
Now we will kick of the training jobs. Since we are creating multiple jobs, we will copy each file to a unique location in S3.

In [None]:
# Create the estimator
sklearn = SKLearn(
    entry_point="train.py",
    source_dir="scripts",
    instance_type="ml.m5.large",
    role=role,
    py_version="py3",
    framework_version="0.23-1",
    enable_sagemaker_metrics=True,
    sagemaker_session=sagemaker_session,
)

In [None]:
the_states = list(set(df_1["State"].tolist()))
all_jobs_info = []
for i in range(0, 10):  # create models for the first 10 states only
    the_state = the_states[i]
    df_just_that_state = df_1[df_1["State"] == the_state]
    df_just_that_state.to_csv("data/train_data.csv", index=False)
    just_state_prefix = f"state-{str(the_state).zfill(2)}-" + str(random()).replace(
        ".", ""
    )

    # Create a trial and associate it to the experiment defined above
    rf_trial = Trial.create(
        trial_name=just_state_prefix, experiment_name=experiment_name
    )
    # Upload the training data to S3
    S3Uploader.upload(
        local_path="data/train_data.csv",
        desired_s3_uri=f"s3://{BUCKET}/{PREFIX}/{just_state_prefix}/train_data.csv",
        sagemaker_session=sagemaker_session,
    )

    job_name = "training-job-" + just_state_prefix
    model_info = sklearn.fit(
        {"train": f"s3://{BUCKET}/{PREFIX}/{just_state_prefix}/train_data.csv"},
        experiment_config={
            "TrialName": just_state_prefix,
            "TrialComponentDisplayName": f"{just_state_prefix}-train",
        },
        wait=False,
        job_name=job_name,
    )
    all_jobs_info.append(job_name)

In [None]:
sm_client = boto3.client("sagemaker")

def training_results(all_jobs_info):
    return [
        sm_client.describe_training_job(TrainingJobName=i) for i in all_jobs_info
    ]

for job_result in training_results(all_jobs_info):
    print(f"{job_result['TrainingJobName']} - {job_result['TrainingJobStatus']}")

Before continuing on, wait for all of the model training jobs to complete. You can track them in either the SageMaker console, or the Experiments view in SageMaker Studio, or re-run the cell above.

# Register the Models
Now that the S3 model artifacts have been created, we will register them with the SageMaker Model Registry.

In [None]:
# Get model artifact locations
the_training_results = training_results(all_jobs_info)
the_model_artifacts = [
    the_training_results[i]["ModelArtifacts"]["S3ModelArtifacts"]
    for i in range(0, len(the_training_results))
]
the_model_artifacts

In [None]:
# Register a model package group
model_package_group_name = "hospital-spending-states-models" + str(round(time.time()))
model_package_group_input_dict = {
    "ModelPackageGroupName": model_package_group_name,
    "ModelPackageGroupDescription": "Group of Models for Hospital Spending By State",
}
create_model_pacakge_group_response = sm_client.create_model_package_group(
    **model_package_group_input_dict
)
print(
    "ModelPackageGroup Arn : {}".format(
        create_model_pacakge_group_response["ModelPackageGroupArn"]
    )
)

In [None]:
# Register model package versions (one per state)
all_model_arns = []
for i in range(0, len(all_jobs_info)):
    create_model_package_input_dict = {
        "ModelPackageGroupName": model_package_group_name,
        "ModelPackageDescription": f"State {i} Model",
        "ModelApprovalStatus": "PendingManualApproval",
        "InferenceSpecification": {
            "Containers": [
                {"Image": sklearn.image_uri, "ModelDataUrl": the_model_artifacts[i]}
            ],
            "SupportedContentTypes": ["text/csv"],
            "SupportedResponseMIMETypes": ["text/csv"],
        },
    }

    create_mode_package_response = sm_client.create_model_package(
        **create_model_package_input_dict
    )
    model_package_arn = create_mode_package_response["ModelPackageArn"]
    all_model_arns.append(model_package_arn)
    print("ModelPackage Version ARN : {}".format(model_package_arn))

Once the model package registration has completed, let's approve the first model for deployment.

In [None]:
model_package_update_input_dict = {
    "ModelPackageArn": all_model_arns[0],
    "ModelApprovalStatus": "Approved",
}
model_package_update_response = sm_client.update_model_package(
    **model_package_update_input_dict
)

Now we will deploy the first model to an endpoint for prediction.

In [None]:
sklearn_model = SKLearnModel(
    model_data=the_model_artifacts[0],
    role=role,
    entry_point="train.py",
    source_dir="scripts",
    framework_version="0.23-1",
)

predictor = sklearn_model.deploy(instance_type="ml.c5.xlarge", initial_instance_count=1)

The model deployment will take about 5 minutes to complete.

## Query the endpoint with sample data

First, try calling the endpoing using the `predict` method

In [None]:
the_payload = df_1.iloc()[0][1:].tolist()
predictor.predict(data=the_payload)

Next, try calling the endpoint via http using the `invoke_endppoint` method

In [None]:
runtime = boto3.client("sagemaker-runtime")
response = runtime.invoke_endpoint(
    EndpointName=predictor.endpoint_name,
    ContentType="application/json",
    Body=json.dumps(the_payload),
)
result = response["Body"]
final_prediction = json.loads(result.read())[0]
print(final_prediction)

## Delete the Endpoint

In [None]:
predictor.delete_endpoint()