# Deploy a machine learning model on sagemaker

The objective of this workshop is to understand the steps needed to deploy a machine learning model on SageMaker.
To do that, it is necessary to understand how SageMaker works, and what it needs to work properly.

For that, we will understand how to preprocess the data, and how to properly parameter the SageMaker sdk functions.

Objectives :
- Understand the problem
- Understand how SageMaker works
- Preprocess the data in a manner that works with SageMaker
- Understand how to use hyperparameter optimisation with SageMaker to get the best model
- Understand the steps to deploy a model with SageMaker

In [93]:
import sagemaker

sess = sagemaker.Session()
bucket = "s3://sagemaker-data-workshop"

# Define IAM role
import boto3
from sagemaker import get_execution_role

role = get_execution_role()

In [94]:
import pandas as pd
import numpy as np
import io
import os
import sys
from sagemaker.inputs import TrainingInput
from sagemaker.serializers import CSVSerializer
from sagemaker.tuner import (
    IntegerParameter,
    ContinuousParameter,
    HyperparameterTuner,
)

from sagemaker.predictor import Predictor


# <center> Plan of actions
![Plan of actions](Workflow-diagram.jpg)


## Preprocessing the data

Important notions to keep in mind :
- The model will only accept data without headers, so we use numpy instead of pandas
- The target of the model needs to be the first column (sagemaker default)

The dataset is available from UCI Machine Learning. The aim for this task is to determine age of an Abalone (a kind of shellfish) from its physical measurements. At the core, it's a regression problem. The dataset contains several features - sex (categorical), length (continuous), diameter (continuous), height (continuous), whole_weight (continuous), shucked_weight (continuous), viscera_weight (continuous), shell_weight (continuous) and rings (integer).Our goal is to predict the variable rings which is a good approximation for age (age is rings + 1.5).



In [106]:
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


# Since we get a headerless CSV file we specify the column names here.
feature_columns_names = [
    'sex', # M, F, and I (infant)
    'length', # Longest shell measurement
    'diameter', # perpendicular to length
    'height', # with meat in shell
    'whole_weight', # whole abalone
    'shucked_weight', # weight of meat
    'viscera_weight', # gut weight (after bleeding)
    'shell_weight'] # after being dried

label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64,
}

label_column_dtype = {'rings': "float64"} # +1.5 gives the age in years

def merge_two_dicts(x, y):
    z = x.copy()   # start with x's keys and values
    z.update(y)    # modifies z with y's keys and values & returns None
    return z


if __name__ == "__main__":


    df = pd.read_csv(
        bucket+"/abalone.csv",
        header=None,
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),
        )
    
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
                                  )

    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore")),
        ]
                                      )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features),
        ]
                                  )
    
    train, validation, test = np.split(df, [int(0.7 * len(df)), int(0.85 * len(df))])
    
    y = train.pop("rings")
    X_pre = preprocess.fit_transform(train)
    y_pre = y.to_numpy().reshape(len(y), 1)

    train = np.concatenate((y_pre, X_pre), axis=1)
    
    y = validation.pop("rings")
    X_pre = preprocess.transform(validation)
    y_pre = y.to_numpy().reshape(len(y), 1)

    validation = np.concatenate((y_pre, X_pre), axis=1)
    

    y = test.pop("rings")
    X_pre = preprocess.transform(test)
    y_pre = y.to_numpy().reshape(len(y), 1)

    test = np.concatenate((y_pre, X_pre), axis=1)
    

    
    pd.DataFrame(train).to_csv("train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        "validation.csv", header=False, index=False
                                   )
    pd.DataFrame(test).to_csv("test.csv", header=False, index=False)

In [4]:
boto3.Session().resource("s3").Bucket("sagemaker-data-workshop").Object( #Upload train data to s3
    ("train/train.csv")
).upload_file("train.csv")

boto3.Session().resource("s3").Bucket("sagemaker-data-workshop").Object( #Upload validation data to s3
    ("validation/validation.csv")
).upload_file("validation.csv")

boto3.Session().resource("s3").Bucket("sagemaker-data-workshop").Object( #Upload test data to s3
    ("test/test.csv")
).upload_file("test.csv")

## Hyperparameter tuning with xgboost

In [16]:
# Each step in SageMaker has its own input : Traininginput for training, transforminput for batch_transform,
# ProcessingInput for processing, etc...

s3_input_train = TrainingInput( 
    s3_data="{}/train/train.csv".format(bucket), content_type="csv"
)
s3_input_validation = TrainingInput(
    s3_data="{}/validation/validation.csv".format(bucket), content_type="csv"
)


- Load the image that runs inside the EC2 machine that trains the model
- Specify the parameters of the models, and the parameters that we are going to tune

In [44]:
container = sagemaker.image_uris.retrieve("xgboost",
                                          boto3.Session().region_name,
                                          "latest")

In [6]:
sess = sagemaker.Session()

xgb = sagemaker.estimator.Estimator(
    container,
    role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    output_path="{}/output".format(bucket),
    sagemaker_session=sess,
)

xgb.set_hyperparameters(
    eval_metric="rmse",
    objective="reg:linear",
    num_round=50, #Number of trees (num_estimator)
)

hyperparameter_ranges = {
    "alpha": ContinuousParameter(0.01, 10, scaling_type="Logarithmic"), #Parameter for L1 regularization
    "lambda": ContinuousParameter(0.01, 10, scaling_type="Logarithmic"), #Parameter for L2 regularization
    "max_depth": IntegerParameter(1, 10),
}

Types of [EC2 Machines](https://aws.amazon.com/ec2/instance-types/?trkCampaign=acq_paid_search_brand&sc_channel=PS&sc_campaign=acquisition_IBERIA&sc_publisher=Google&sc_category=Cloud%20Computing&sc_country=IBERIA&sc_geo=EMEA&sc_outcome=acq&sc_detail=amazon%20ec2%20instance&sc_content=%7Bad%20group%7D&sc_matchtype=e&sc_segment=536392708261&sc_medium=ACQ-P%7CPS-GO%7CBrand%7CDesktop%7CSU%7CCloud%20Computing%7CEC2%7CIBERIA%7CEN%7CSitelink&s_kwcid=AL!4422!3!536392708261!e!!g!!amazon%20ec2%20instance&ef_id=EAIaIQobChMI38OXroWC9gIVSgKLCh3pDwytEAAYASABEgJuvfD_BwE:G:s&s_kwcid=AL!4422!3!536392708261!e!!g!!amazon%20ec2%20instance) , their specs and their characteristics

In [7]:
objective_metric_name = "validation:rmse"
tuner = HyperparameterTuner(
    xgb,                     
    objective_metric_name,   
    hyperparameter_ranges,
    max_jobs=3,
    max_parallel_jobs=3,
    strategy="Random",         #Random hyperparameter search. Other alternative is Bayesian
                               #SageMaker doesn't support gridsearch
    objective_type="Minimize", #We want to minimize RMSE. If it was precision, we would have "Maximize"
    base_tuning_job_name="T1"  #Name for the tuning job
)

tuner.fit({"train": s3_input_train, "validation": s3_input_validation})

...............................................!


## Tuning analytics

In [13]:
sage_client = boto3.client('sagemaker')
#sage_client.list_hyper_parameter_tuning_jobs()

In [7]:
tuning_job_name="T1-220210-1716"
#tuner.latest_tuning_job.job_name

In [8]:
tuning_job_result = sage_client.describe_hyper_parameter_tuning_job(
    HyperParameterTuningJobName=tuning_job_name
)

status = tuning_job_result["HyperParameterTuningJobStatus"]
if status != "Completed":
    print("Reminder: the tuning job has not been completed.")

job_count = tuning_job_result["TrainingJobStatusCounters"]["Completed"]
print("%d training jobs have completed" % job_count)

objective = tuning_job_result["HyperParameterTuningJobConfig"]["HyperParameterTuningJobObjective"]
is_minimize = objective["Type"] != "Maximize"
objective_name = objective["MetricName"]

3 training jobs have completed


In [9]:
import pandas as pd

tuner = sagemaker.HyperparameterTuningJobAnalytics(tuning_job_name)

full_df = tuner.dataframe()

if len(full_df) > 0:
    df = full_df[full_df["FinalObjectiveValue"] > -float("inf")]
    if len(df) > 0:
        df = df.sort_values("FinalObjectiveValue", ascending=is_minimize)
        print("Number of training jobs with valid objective: %d" % len(df))
        print({"lowest": min(df["FinalObjectiveValue"]), "highest": max(df["FinalObjectiveValue"])})
        pd.set_option("display.max_colwidth", None)  # Don't truncate TrainingJobName
    else:
        print("No training jobs have reported valid results yet.")

df

Number of training jobs with valid objective: 3
{'lowest': 2.062730073928833, 'highest': 2.125580072402954}


Unnamed: 0,alpha,lambda,max_depth,TrainingJobName,TrainingJobStatus,FinalObjectiveValue,TrainingStartTime,TrainingEndTime,TrainingElapsedTimeSeconds
1,4.242154,4.787595,4.0,T1-220210-1716-002-4cd9773c,Completed,2.06273,2022-02-10 17:19:03+00:00,2022-02-10 17:19:54+00:00,51.0
0,0.200011,0.536478,2.0,T1-220210-1716-003-4dfda1a3,Completed,2.1008,2022-02-10 17:18:59+00:00,2022-02-10 17:19:40+00:00,41.0
2,7.431121,0.015235,8.0,T1-220210-1716-001-79b9b93d,Completed,2.12558,2022-02-10 17:18:58+00:00,2022-02-10 17:19:52+00:00,54.0


In [10]:
if tuning_job_result.get("BestTrainingJob", None):
    print("Best model found so far:")
    print(tuning_job_result["BestTrainingJob"])

Best model found so far:
{'TrainingJobName': 'T1-220210-1716-002-4cd9773c', 'TrainingJobArn': 'arn:aws:sagemaker:eu-west-1:775923162736:training-job/t1-220210-1716-002-4cd9773c', 'CreationTime': datetime.datetime(2022, 2, 10, 17, 16, 40, tzinfo=tzlocal()), 'TrainingStartTime': datetime.datetime(2022, 2, 10, 17, 19, 3, tzinfo=tzlocal()), 'TrainingEndTime': datetime.datetime(2022, 2, 10, 17, 19, 54, tzinfo=tzlocal()), 'TrainingJobStatus': 'Completed', 'TunedHyperParameters': {'alpha': '4.242153590034549', 'lambda': '4.787594964023144', 'max_depth': '4'}, 'FinalHyperParameterTuningJobObjectiveMetric': {'MetricName': 'validation:rmse', 'Value': 2.062730073928833}, 'ObjectiveStatus': 'Succeeded'}


In [11]:
tuning_job_result["BestTrainingJob"]["TunedHyperParameters"]

{'alpha': '4.242153590034549', 'lambda': '4.787594964023144', 'max_depth': '4'}

In [12]:
best_params=tuning_job_result["BestTrainingJob"]["TunedHyperParameters"]

### Building an xgboost model with best parameters and deploying it

In [20]:
sess = sagemaker.Session()

xgb = sagemaker.estimator.Estimator(
    container,
    role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    output_path="{}/output".format(bucket),
    sagemaker_session=sess,
)

xgb.set_hyperparameters(
    eval_metric="rmse",
    objective="reg:linear",
    num_round=50,
    
    #Getting the best params
    alpha=best_params["alpha"],
    reg_lambda=best_params["lambda"],
    max_depth=best_params["max_depth"]   
)

s3_input_train = TrainingInput(
    s3_data="s3://sagemaker-data-workshop/train/", content_type="csv"
)


In [22]:
xgb.fit({"train": s3_input_train})

2022-02-14 15:29:04 Starting - Starting the training job...
2022-02-14 15:29:06 Starting - Launching requested ML instancesProfilerReport-1644852544: InProgress
..................
2022-02-14 15:32:25 Starting - Preparing the instances for training.........
2022-02-14 15:33:46 Downloading - Downloading input data...
2022-02-14 15:34:34 Training - Training image download completed. Training in progress.
2022-02-14 15:34:34 Uploading - Uploading generated training model
2022-02-14 15:34:34 Completed - Training job completed
[34mArguments: train[0m
[34m[2022-02-14:15:34:21:INFO] Running standalone xgboost training.[0m
[34m[2022-02-14:15:34:21:INFO] Path /opt/ml/input/data/validation does not exist![0m
[34m[2022-02-14:15:34:21:INFO] File size need to be processed in the node: 0.43mb. Available memory size in the node: 7853.53mb[0m
[34m[2022-02-14:15:34:21:INFO] Determined delimiter of CSV input is ','[0m
[34m[15:34:21] S3DistributionType set as FullyReplicated[0m
[34m[15:34:21]


Now we are going to host an endpoint, we can make real-time predictions from our model very easily, simply by making a http POST request. But first, we'll need to set up serializers and deserializers for passing our test_data NumPy arrays to the model behind the endpoint.

Now, we'll use a simple function to:

- Loop over our test dataset <br>
- Split it into mini-batches of rows <br> 
- Convert those mini-batchs to CSV string payloads <br>
- Retrieve mini-batch predictions by invoking the XGBoost endpoint <br>
- Collect predictions and convert from the CSV output our model provides into a NumPy array <br>


In [6]:
xgb_model=sagemaker.Model(image_uri=container,model_data="s3://sagemaker-data-workshop/output/xgboost-2022-02-14-15-29-04-011/output/model.tar.gz",role=role)

In [7]:
xgb_model.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name="xgb-endpoint")

----!

Serialization is the process of converting a data object—a combination of code and data represented within a region of data storage—into a series of bytes that saves the state of the object in an easily transmittable form

In [37]:
test=pd.read_csv("s3://sagemaker-data-workshop/test/test.csv")
predictor=Predictor(endpoint_name="xgb-endpoint",
                    sagemaker_session=sess,
                    serializer=CSVSerializer()  
)


In [39]:
def predict(data, rows=500):
    split_array = np.array_split(data, int(data.shape[0] / float(rows) + 1)) #Mini batch
    predictions = ""
    for array in split_array:
        predictions = ",".join([predictions, predictor.predict(array).decode("utf-8")]) #Make predictions and 
                                                                                        #Desereliaze data

    return np.fromstring(predictions[1:], sep=",")


predictions = predict(test.to_numpy()[:, 1:]) #remove the target


In [41]:
len(predictions)

626

In [42]:
test.shape

(626, 11)

In [43]:
predictor.delete_endpoint()

### Batch transform

Use batch transform when you need to do the following:

- Preprocess datasets to remove noise or bias that interferes with training or inference from your dataset.
- Get inferences from large datasets.
- Run inference when you don't need a persistent endpoint.
- Associate input records with inferences to assist the interpretation of results.


In [68]:
transformer=xgb_model.transformer(instance_count=1,
    instance_type="ml.m5.xlarge",
    strategy="MultiRecord", #Explain
    assemble_with="Line", # Explain
    accept="text/csv",
    output_path="s3://sagemaker-data-workshop/predictions",
                                )

# Strategy
Specifies the number of records to include in a mini-batch for an HTTP inference request. A record is a single unit of input data that inference can be made on. For example, a single line in a CSV file is a record.

To enable the batch strategy, you must set the SplitType property to Line, RecordIO, or TFRecord.

To use only one record when making an HTTP invocation request to a container, set BatchStrategy to SingleRecord and SplitType to Line.

To fit as many records in a mini-batch as can fit within the MaxPayloadInMB limit, set BatchStrategy to MultiRecord and SplitType to Line.



In [69]:
transformer.transform("s3://sagemaker-data-workshop/test/test.csv",
                      content_type="text/csv",
                      join_source="Input",
                      split_type="Line",
                      input_filter="$[1:]",
                      output_filter="$[0,-1]")


........................
[34mArguments: serve[0m
[34m[2022-02-15 11:00:11 +0000] [1] [INFO] Starting gunicorn 19.9.0[0m
[34m[2022-02-15 11:00:11 +0000] [1] [INFO] Listening at: http://0.0.0.0:8080 (1)[0m
[34m[2022-02-15 11:00:11 +0000] [1] [INFO] Using worker: gevent[0m
[34m[2022-02-15 11:00:11 +0000] [21] [INFO] Booting worker with pid: 21[0m
[34m[2022-02-15 11:00:11 +0000] [22] [INFO] Booting worker with pid: 22[0m
[34m[2022-02-15 11:00:11 +0000] [23] [INFO] Booting worker with pid: 23[0m
[34m[2022-02-15 11:00:11 +0000] [24] [INFO] Booting worker with pid: 24[0m
  monkey.patch_all(subprocess=True)[0m
  monkey.patch_all(subprocess=True)[0m
[34m[2022-02-15:11:00:11:INFO] Model loaded successfully for worker : 22[0m
[34m[2022-02-15:11:00:11:INFO] Model loaded successfully for worker : 21[0m
  monkey.patch_all(subprocess=True)[0m
[34m[2022-02-15:11:00:11:INFO] Model loaded successfully for worker : 23[0m
  monkey.patch_all(subprocess=True)[0m
[34m[2022-02-15:11

In [71]:
pd.read_csv("s3://sagemaker-data-workshop/predictions/test.csv.out",names=["Target","Prediction"])

Unnamed: 0,Target,Prediction
0,4.0,4.269047
1,12.0,11.722960
2,12.0,8.294746
3,11.0,12.225395
4,8.0,10.048048
...,...,...
622,16.0,13.366733
623,7.0,8.585545
624,8.0,8.632757
625,17.0,12.202889
