# Amazon SageMaker Pipelines: Deploying End-to-End Machine learning Pipelines in the Cloud


The purpose of this notebook is to deploy an End-To-End Machine Learning Pipeline with Amazon SageMaker. We will work with [Adult Census Income](https://www.kaggle.com/uciml/adult-census-income) Dataset. We will use 'income', a binary variable that explains if a person earns more than 50k or not, as the target variable. For the training step, we will use an image of XGBoost. 

In order to replicate the results in your SageMaker Studio, please clone the repository (if you don't know how to do it look for it in the README.md). 

Once you have this notebook and the data in your SageMaker Studio, is time to start!


First we set the required dependencies to use SageMaker:

In [2]:
import boto3
import sagemaker


region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket() # the default S3 bucket where you will store everything that cames from the pipeline
model_package_group_name = f"AdultModelPackageGroupName"

In the next cell we will copy the data from the local path (the data from the side bar) to the  default S3 bucket.

In [3]:
local_path = "data/adult.csv" # local path where you have the data

s3 = boto3.resource("s3")
base_uri = f"s3://{default_bucket}/adult"

# This line copies your data in the local path to a default S3 bucket
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
) 

Now we will specify some parameters that will be useful when setting the pipeline. Since we will manage a good amount of variables, I think is a good practice to use the Find bar (ctrl+F) to see when a variable will be used or where it came from.  

In [4]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)


processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)


processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)

training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")


model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)


# Preprocessing Step

Now its time to go with the first step. We will create a preprocessing script and store it in a folder called 'adult'. Then we will create a ```SKLearnProcessor``` instance and specify some dependencies to run the Preprocessing Step. Finally, we will pass the code and the Processor to a ```ProcessingStep```.

In [5]:
!mkdir -p adult # create a folder called 'adult'

In [6]:
%%writefile adult/preprocessing.py 

import pandas as pd
import numpy as np


if __name__ == "__main__":
    
    base_dir = "/opt/ml/processing" # this path will make more sense once you build the ProcessingStep
    # read data
    df = pd.read_csv(f"{base_dir}/input/adult.csv", sep=",", 
                     error_bad_lines=False, engine='python') # just to avoid an error

    # target variable to binary
    df['income'].replace(['<=50K','>50K'],[0,1], inplace=True) 

    # drop useless variables
    df = df.drop('fnlwgt', axis=1)
    df = df.drop('education.num', axis=1)

    # drop rows with missing data
    df = df.loc[ (df['workclass'] != '?') & (df['occupation'] != '?') & (df['native.country']!= '?')]

    # split data into dependent and independent variables
    X = df.drop('income', axis=1)
    y = df['income']

    # split dependent variables into continous and categorical variables
    X_continous  = X[['age', 'capital.gain', 'capital.loss', 'hours.per.week']]

    X_categorical = X[['workclass', 'education', 'marital.status', 'occupation', 'relationship', 'race',
                    'sex', 'native.country']]


    # One hot encoding
    X_encoded = pd.get_dummies(X_categorical)

    # Concatenate both continous and encoded sets:
    X = pd.concat([X_continous, X_encoded],axis=1)

    y = y.to_numpy().reshape(y.shape[0],1)
    X = X.to_numpy()

    # create the processed dataset
    dataset = np.concatenate((y, X), axis=1)

    np.random.shuffle(dataset)
    
    # Split into train validation and test datasets
    train, validation, test = np.split(dataset, [int(.6*len(dataset)), int(.7*len(dataset))])

    # Save the data
    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)


Overwriting adult/preprocessing.py


Now we can create a SKLearn processor instance:

In [7]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-adult-process",
    role=role,
)

Finally we pass the processor and the code into a ```ProcessingStep```. We also specify the inputs and outputs paths. 
NOTE: '/opt/ml/processing' is just a default path in the processing container. You can take a quick look  [here](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html) if you want more information.

In [8]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


step_process = ProcessingStep(
    name="adultProcess",
    processor=sklearn_processor, # SKLearnProcessor
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ], # copies the data from the S3 bucket to the container
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ], # You store the output in s3 (example: sagemaker-us-east-<>/<base_job_name>/output/train/train.csv)
    code="adult/preprocessing.py", 
)

# Training Step

In this step we are going to import a build-in image of XGBoost and use it as the estimator for the ```TrainingStep``` class. We also need to set the hyperparameters. When building the ```TrainingStep``` we will pass the estimator and the inputs ('train.csv' and 'test.csv')



NOTE: You can also perform [automatic model tuning](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning.html) with SageMaker.

In [9]:
from sagemaker.estimator import Estimator

# Set the model path in the S3 bucket
model_path = f"s3://{default_bucket}/adultTrain"

# Retrieve the XGBoost image from the SageMaker repository
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)

# Set the XGBoost image as an estimator
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
)


# Set the hyperparameters 
xgb_train.set_hyperparameters(
    objective="binary:logistic", 
    num_round=50, 
    max_depth=5, 
    subsample=1, 
    silent=0,
    eval_metric="logloss",
    eta=0.3, 
)

Now that we have our estimator, we can pass it to the ```TrainingStep``` class. We also specify the data inputs ('train.csv' and 'validation.csv')

In [10]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="adultTrain",
    estimator=xgb_train,
    # Data inputs in this step are the outputs in the previous step ('train.csv' and 'validation.csv') 
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",    
        ),
        
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

# Evaluation Step

In this step first we develop an evaluation script. Then we create a Processor intance with ```ScriptProcessor``` and a property file with ```PropertyFile```. Finally we develop the ```ProcessingStep```.

In [11]:
%%writefile adult/evaluation.py

import json
import pathlib
import pickle
import tarfile


import numpy as np
import pandas as pd
import xgboost


from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score

if __name__ == "__main__":
    # import the model
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    
    model = pickle.load(open("xgboost-model", "rb"))

    # import test set
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    

    # Split into dependent and independent variables
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    
    # make predictions
    predictions = model.predict(X_test)
    predictions = np.where(predictions > 0.5, 1, 0 )
 
    # compute accuracy
    acc = accuracy_score(y_test, predictions)
    
    # Create a json file with the metrics results
    report_dict = {
        "metrics": {
            "accuracy": {
                "value": acc
            },
        },
    }
    
    # Set the output directory
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    # Save the json file with the metrics result
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting adult/evaluation.py


Next step is setting the processor instance. 

In [12]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri, # The XGBoost image
    command=["python3"], # We run python3 inside the container
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-adult-eval",
    role=role
)

Before defining the ```ProcessingStep``` we are going to define a ```PropertyFile```. You use property files to store information from the output of a processing step. This is particularly useful when analyzing the results of a processing step to decide how a conditional step should be executed.



In [13]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

# The path parameter is the name of the JSON file that the property file is saved to.
# output_name must match the output_name of the ProcessingOutput that you define in your processing step.

Now we can define the ```ProcessingStep```. We pass the ```ScriptProcessor```, the inputs ('model.tar.gz' and 'test.csv'), the ```PropertyFile``` and the evaluation.py script.


In [14]:
step_eval = ProcessingStep(
    name="adultEval",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts, # S3 path of model.tar.gz
            destination="/opt/ml/processing/model", # destination in the processing container
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, # S3 path to test.csv
            destination="/opt/ml/processing/test", # destination in the processing container
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ], # the output is the 'evaluation.json' file
    code="adult/evaluation.py",
    property_files=[evaluation_report],
)

# Create Model Step

If the model passes the condition step (defined below), we create a SageMaker Model. A SageMaker Model is an instance that can be deployed to an Endpoint. First we create a model with the```Model``` class. We pass the XGBoost image and the 'model.tar.gz' file to it. Then we create an input with the ```CreateModelInput``` class. Finally we pass both of them to the ```CreateModelStep```.

In [15]:
from sagemaker.model import Model


model = Model(
    image_uri=image_uri, # the XGBoost image
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, # The S3 location of the 'model.tar.gz' file
    sagemaker_session=sagemaker_session,
    role=role,
)

In [16]:
from sagemaker.inputs import CreateModelInput


inputs = CreateModelInput(
    instance_type="ml.m5.large",
    accelerator_type="ml.eia1.medium",
)

In [17]:
from sagemaker.workflow.steps import CreateModelStep


step_create_model = CreateModelStep(
    name="adultCreateModel",
    model=model,
    inputs=inputs,
)

# Register Model Step

We register the model if it passes a condition step (defined below). For the Register Model Step, first we set the model metrics with the ```ModelMetrics``` class, passing the 'evaluation.json' file to it. Then we create the Register Model Step with the ```RegisterModel``` class. We pass the estimator, the 'model.tar.gz' file and the model metrics object to it. 

In [18]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
step_register = RegisterModel(
    name="adultRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, # The S3 uri to the 'model.tar.gz' file
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

# Condition step

For the Condition Step, we will create a condition with the ```ConditionGreaterThanOrEqualTo``` class, defining a threshold and the accuracy value. Then we will pass the Condition object to the ```ConditionStep```.

In [19]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)


condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=step_eval,
        property_file=evaluation_report, # we pass the property file object
        json_path="metrics.accuracy.value",
    ),
    right=0.8, # the threshold
)

step_cond = ConditionStep(
    name="adultAccCond",
    conditions=[condition],
    if_steps=[step_register, step_create_model], # if the condition is true we execute the Register Model and Create Model steps
    else_steps=[],
)

The class JsonGet has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


# Create the Pipeline

We will create a pipeline with the ```Pipeline``` class.  We have to pass the parameters that we use in the steps (the ones defined at the beginning of the notebook), and the steps of the pipeline. The create model step and register model step are triggered if the condition from the condition step is true, so we don't need to specify them in the pipeline.

In [20]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"adultPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond]
)

Once we create a pipeline we can now [run it](https://docs.aws.amazon.com/sagemaker/latest/dg/run-pipeline.html). First we submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist, or update the pipeline if it does. 

In [21]:
pipeline.upsert(role_arn=role);

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


Now that we have defined the pipeline, we can just start it:

In [22]:
execution = pipeline.start()

In [23]:
execution.wait() # wait untill the process ends

Once the pipeline process is completed, we can check the output metrics from the evaluation step (the 'evaluation.json' file).  

In [24]:
from pprint import pprint
import json

evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))

{'metrics': {'accuracy': {'value': 0.8700408884959664}}}


You can also see all the steps in the pipeline as well as their inputs and outputs with the following code. I won't run it just in order to not share my account information.

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

If you want to check the different Jobs created, you can go to Amazon Sagemaker and look at them in the sidebar. It should be a Training Job in the Training section, three Processing Jobs in the Processing section and a model in the Inference section.