## Sagemaker Pipelines

Amazon SageMaker Pipelines is the purpose-built, easy-to-use continuous integration and continuous delivery (CI/CD) service for machine learning (ML). 

- SageMaker Integration: SageMaker Pipelines is a fully managed service, which means that it creates and manages resources for you
- SageMaker Python SDK Integration: you can create your pipelines programmatically using a high-level Python interface that you might already be familiar with
- SageMaker Studio Integration: offers an environment to manage the end-to-end SageMaker Pipelines experience
- Data Lineage Tracking: lets you analyze where the data came from, where it was used as an input, and the outputs that were generated from it

To learn more about SageMaker Pipelines, please check

* Doc https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines.html
* SDK https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html


In [1]:
%matplotlib inline
import pandas as pd
import numpy as np
import sagemaker
import json
import boto3
from sagemaker import get_execution_role

sm_client = boto3.client('sagemaker')

In [2]:
# Retrieve the default bucket
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
region = sagemaker_session.boto_region_name
print(region)
print(bucket)
role = get_execution_role()

eu-west-1
sagemaker-eu-west-1-707684582322


Before executing this step, we assume that you have already created a SageMaker Project. 

A SageMaker project is an AWS Service Catalog provisioned product that enables you to easily create an end-to-end ML solution. Each SageMaker project has a unique name and ID that are passed to all SageMaker and AWS resources created in the project. By using the name and ID, you can view all entities associated with your project.

In [3]:
project_name = "Replace with your sagemaker project name here"
# project_name = "mlops-cicd-demo"

project_id = sm_client.describe_project(ProjectName=project_name)['ProjectId']

model_package_group_name = project_name + '-' + project_id
print("Model package group name: %s" % model_package_group_name)

Model package group name: mlops-cicd-demo-p-1llgzzxekxpq


### Pipeline input parameters

You can introduce variables into your pipeline definition using parameters. Parameters that you define can be referenced throughout your pipeline definition.

In [None]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString

training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)

training_instance_count = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=1
)

input_raw_data = ParameterString(
    name="InputRawData",
    default_value='s3://{}/sagemaker/xgboostcontainer/raw-data'.format(bucket)
)

input_train_data = ParameterString(
    name="InputDataTrain",
    default_value='s3://{}/sagemaker/xgboostcontainer/processed/train'.format(bucket)
)

input_test_data = ParameterString(
    name="InputDataTest",
    default_value='s3://{}/sagemaker/xgboostcontainer/processed/test'.format(bucket)
)

### Preprocessing Step

Amazon SageMaker Processing allows you to run steps for data pre- or post-processing, feature engineering, data validation, or model evaluation workloads on Amazon SageMaker.

A processing step requires a processor, a Python script that defines the processing code, outputs for processing, and job arguments.

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor
role = get_execution_role()
sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=role,
                                     instance_type=training_instance_type,
                                     instance_count=training_instance_count)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

step_process = ProcessingStep(
    name="BostonHousingDropColumns",
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(source=input_raw_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[ProcessingOutput(output_name='xgboost_train_data',
                              source='/opt/ml/processing/output/train',
                              destination = input_train_data),
             ProcessingOutput(output_name='xgboost_test_data',
                              source='/opt/ml/processing/output/test',
                              destination = input_test_data)],
    code="preprocessing.py"
)

### Training Step 

You use a training step to create a training job to train a model.

In [None]:
from sagemaker.image_uris import retrieve 
from sagemaker.session import Session

# this line automatically looks for the XGBoost image URI and builds an XGBoost container.
# specify the repo_version depending on your preference.
container = retrieve(region=boto3.Session().region_name,
                          framework='xgboost', 
                          version='1.0-1')
print(container)

In [None]:
# initialize hyperparameters
hyperparameters = {
        "max_depth":"10",
        "eta":"0.2",
        "gamma":"4",
        "min_child_weight":"6",
        "subsample":"0.7",
        "objective":"reg:squarederror",
        "num_round":"200"}

In [None]:
# construct a SageMaker estimator that calls the xgboost-container
estimator = sagemaker.estimator.Estimator(image_uri=container, 
                                          hyperparameters=hyperparameters,
                                          role=role,
                                          instance_count=1, 
                                          instance_type='ml.m5.2xlarge')



A training step requires an estimator, and training and validation data inputs.

In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

step_train = TrainingStep(
    name="TrainingXgBoost",
    estimator=estimator,
    inputs={
        "train": TrainingInput(s3_data=step_process.properties.ProcessingOutputConfig.Outputs["xgboost_train_data"].S3Output.S3Uri, content_type="text/csv"),
        "validation": TrainingInput(s3_data=step_process.properties.ProcessingOutputConfig.Outputs["xgboost_test_data"].S3Output.S3Uri, content_type="text/csv"
        )
    },
)

### The Register step that will add a new version to the Model Registry

You use a RegisterModel step to register a model to a model group.

A RegisterModel step requires an estimator, model data output from training, and a model package group name to associate the model package with.

With this step, you can create a model group that tracks all of the models that you train to solve a particular problem. You can then register each model you train and the model registry adds it to the model group as a new model version.

In [None]:
from sagemaker.workflow.step_collections import RegisterModel

# NOTE: model_approval_status is not available as arg in service dsl currently
step_register = RegisterModel(
    name="RegisterXgBoostModel",
    estimator=estimator,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name
)

### Now, we can create the pipeline

This pipeline definition encodes a pipeline using a directed acyclic graph (DAG). This DAG gives information on the requirements for and relationships between each step of your pipeline. 

The structure of a pipeline's DAG is determined by the data dependencies between steps (defined within each step previously). These data dependencies are created when the properties of a step's output are passed as the input to another step. 

In [None]:
from botocore.exceptions import ClientError, ValidationError
from sagemaker.workflow.pipeline import Pipeline

# NOTE:
# condition steps have issues in service so we go straight to step_register
pipeline_name = "Replace by your pipeline name"
# pipeline_name = "XgBoost-Pipelines-2"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        training_instance_type,
        training_instance_count,
        input_raw_data,
        input_train_data,
        input_test_data
    ],
    steps=[step_process,step_train, step_register],
    sagemaker_session=sagemaker_session,
)

try:
    response = pipeline.create(role_arn=role)
except ClientError as e:
    error = e.response["Error"]
    if error["Code"] == "ValidationError" and "Pipeline names must be unique within" in error["Message"]:
        print(error["Message"])
        response = pipeline.describe()
    else:
        raise

In [None]:
pipeline_arn = response["PipelineArn"]
print(pipeline_arn)

### And then, run it

After you’ve created a pipeline definition using the SageMaker Python SDK, you can submit it to SageMaker to start your execution.

In [None]:
import time

start_response = pipeline.start(parameters={
    "TrainingInstanceCount": "1"
})

pipeline_execution_arn = start_response.arn
print(pipeline_execution_arn)

while True:
    resp = sm_client.describe_pipeline_execution(PipelineExecutionArn=pipeline_execution_arn)
    if resp['PipelineExecutionStatus'] == 'Executing':
        print('Running...')
    else:
        print(resp['PipelineExecutionStatus'], pipeline_execution_arn)
        break
    time.sleep(15)

### Finally, approve the model to kick-off the deployment process

In [None]:
# list all packages and select the latest one
packages = sm_client.list_model_packages(ModelPackageGroupName=model_package_group_name)['ModelPackageSummaryList']
packages = sorted(packages, key=lambda x: x['CreationTime'], reverse=True)
packages

You can either manually approve the model or using sdk

In [None]:
# latest_model_package_arn = packages[0]['ModelPackageArn']
# model_package_update_response = sm_client.update_model_package(
#    ModelPackageArn=latest_model_package_arn,
#    ModelApprovalStatus="Approved",
# )

## Done! :) Let's open the CodePipeline console and get some popcorn to watch