# 6) SageMaker pipeline
In this notebook, we will create a sagemaker pipeline to automate the process of training a model, and updating the deployed endpoint if it perform better on a given test set.  
Also slighty depricated, this documentation helped a lot:
https://docs.aws.amazon.com/sagemaker/latest/dg/define-pipeline.html  
  
All the scripts used by the different steps are available in the pipeline folder.

In [2]:
import sagemaker
import boto3
from sagemaker.tuner import CategoricalParameter, ContinuousParameter, HyperparameterTuner
from sagemaker.pytorch import PyTorch
from sagemaker import get_execution_role
from sagemaker.model_monitor import DataCaptureConfig
from sagemaker.pytorch import PyTorchModel
from sagemaker.predictor import Predictor
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.steps import CreateModelStep
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.inputs import TrainingInput
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.pipeline import Pipeline
from PIL import Image
import io
import base64
import json
import pprint
import time


session = sagemaker.Session()

bucket = session.default_bucket()
print("Default Bucket: {}".format(bucket))

region = session.boto_region_name
print("AWS Region: {}".format(region))

role = get_execution_role()
print("RoleArn: {}".format(role))

prefix = "capstone-inventory-project"
model_package_group_name = "Capstone_pipeline"

Default Bucket: sagemaker-us-east-1-646714458109
AWS Region: us-east-1
RoleArn: arn:aws:iam::646714458109:role/service-role/AmazonSageMaker-ExecutionRole-20211122T183493


### 1) Define Pipeline Parameters

training_instance_type – The ml.* instance type of the training jobs.

input_data – The Amazon S3 location of the input data.

In [3]:
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.g4dn.xlarge"
)

input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/{}/data".format(bucket, prefix)
)

# We will also define the were our model artefacts and evaluation results will be stored.
model_path = "s3://{}/{}/pipeline_model".format(bucket, prefix)
evaluation_file_path = "s3://{}/{}/current_accuracy.json".format(bucket, prefix)

### 1) Training step
First, let's create a training step.  
This step will train a resnet34 for 5 epochs using the data in the specified folder.  
Note that in this folder, is contained the inference script which will be moved alongside the model artefacts. This way, when instantiating a estimator, our inference script will be used by default.

In [4]:
estimator = PyTorch(
    entry_point="pipeline/1_train_pipeline.py",
    role=role,
    py_version='py36',
    framework_version="1.8",
    instance_count=1,
    instance_type="ml.g4dn.xlarge",        
    output_path = model_path,  # The training jobs output (mainly model artefacts) will go there.
    hyperparameters={                                        
        "batch-size": 64,
        "lr": 0.01}
)

In [5]:
step_train = TrainingStep(
    name="Training",
    estimator=estimator,
    inputs=input_data
)

### 2) Evaluation step
Next, a processing step was created to 1) load the model, 2) evaluate the model on the test set (acccuracy), and 3) save the result as a json file.

In [6]:
image_uri = sagemaker.image_uris.retrieve(
    framework="pytorch",
    region=region,
    version="1.8",
    py_version="py36",
    instance_type="ml.m5.xlarge",
    image_scope="training"
)

In [7]:
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=training_instance_type,
    instance_count=1,
    base_job_name="pipeline-eval",
    role=role
    #env= {"MODEL_DATA": step_train.properties.ModelArtifacts.S3ModelArtifacts}
)

In [8]:
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)
last_report = PropertyFile(
    name="LastReport",
    output_name="evaluation",
    path="current_accuracy.json"
)

step_eval = ProcessingStep(
    name="Evaluation",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source="s3://sagemaker-us-east-1-646714458109/capstone-inventory-project/data/test/",
            destination="/opt/ml/processing/test"
        ),
        ProcessingInput(
            source=evaluation_file_path,
            destination="/opt/ml/processing/accuracy"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="pipeline/2_evaluation.py",
    property_files=[evaluation_report, last_report]
)

### 3) Update endpoint

If the newly trained model performed better than the model currently deployed (see step 4)), then the model endpoint will be udpdated.  
Updating an endpoint permit to keep its name (which is usefull when others services like lambda functions rely on it) and ensure continuity of service.  
The following processing function perform 2 main actions:
- It updates the model with the new artefacts (by registering a model and an endpoint configuration).
- It updates the current_accuracy.json file with the resuts obtained in the previous step.


In [9]:
script_update = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=training_instance_type,
    instance_count=1,
    base_job_name="update-endpoint",
    role=role,
    env= {"MODEL_DATA": step_train.properties.ModelArtifacts.S3ModelArtifacts}
)

In [10]:
step_update = ProcessingStep(
    name="Update-endpoint",
    processor=script_update,
    inputs=[
        ProcessingInput(
            source="{}/evaluation.json".format(step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]),
            destination="/opt/ml/processing/accuracy"
        )
    ],
    code="pipeline/3_update_endpoint.py"
)

### 4) Condition step
In this step, we will compare the results obtained in step 2 to the S3 hosted "current_accuracy.json" file which contains the accuracy of the deployed model.  
If the trained model has a greater accuracy, step 3 will start. Otherwise, nothing will happen.

In [11]:
# False step
script_else = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.large",
    instance_count=1,
    base_job_name="else",
    role=role,
)

step_fail = ProcessingStep(
    name="Too-bad",
    processor=script_else,
    code="pipeline/3_else.py"
)

In [12]:
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name="Evaluation",
        property_file=evaluation_report,
        json_path="accuracy"
    ),
    right=JsonGet(
        step_name="Evaluation",
        property_file=last_report,
        json_path="accuracy"
    )
)

In [13]:
step_cond = ConditionStep(
    name="Is-the-new_model-better-than-the-one-deployed",
    conditions=[cond_gte],
    if_steps=[step_update],  # step_register
    else_steps=[step_fail]
)

### 5) Pipeline creation
Finally, all steps are concatenated to form a pipeline (it is import to give a role which has permissions for all the actions that will be performed).

In [14]:
pipeline_name = "CapstonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        training_instance_type,
        input_data
    ],
    steps=[step_train, step_eval, step_cond]
)

In [15]:
json.loads(pipeline.definition())

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.g4dn.xlarge'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-646714458109/capstone-inventory-project/data'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'Training',
   'Type': 'Training',
   'Arguments': {'AlgorithmSpecification': {'TrainingInputMode': 'File',
     'TrainingImage': '763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:1.8-gpu-py36',
     'EnableSageMakerMetricsTimeSeries': True},
    'OutputDataConfig': {'S3OutputPath': 's3://sagemaker-us-east-1-646714458109/capstone-inventory-project/pipeline_model'},
    'StoppingCondition': {'MaxRuntimeInSeconds': 86400},
    'ResourceConfig': {'InstanceCount': 1,
     'InstanceType': 'ml.g4dn.xlarge',
     'VolumeSizeInGB': 30},
 

In [16]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:646714458109:pipeline/capstonepipeline',
 'ResponseMetadata': {'RequestId': '0b068c3c-20db-427a-b172-c3cd325e2b00',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '0b068c3c-20db-427a-b172-c3cd325e2b00',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '84',
   'date': 'Sat, 22 Jan 2022 15:33:12 GMT'},
  'RetryAttempts': 0}}

In [70]:
#pipeline.delete()

In [35]:
#execution = pipeline.start()

## Time to test our pipeline!   
Let's first re-deployed the model we trained in "4) Model training resnet34.ipynb" on a 5 instance.

In [22]:
# Create and upload model
pytorch_model = PyTorchModel(model_data="s3://sagemaker-us-east-1-646714458109/capstone-inventory-project/main_training/pytorch-training-2022-01-17-10-08-10-837/output/model.tar.gz", 
                             role=role, 
                             entry_point="scripts/inference.py",
                             py_version='py3',
                             framework_version='1.5')

In [23]:
predictor = pytorch_model.deploy(initial_instance_count=1,
                                 instance_type='ml.m5.large'
                                )  

-------!

In [24]:
predictor.endpoint_name

'pytorch-inference-2022-01-22-14-40-51-906'

Here is a picture of the endpoint configuration settings.

![alt text](images/endpoint_start.png "Title")

First, the pipeline was executed. 




![alt text](images/pipeline_start.png "Title")

The "current-accuracy.json" file was created with an initial value of 0.1, as the trained model reached 32% accuracy, the update-endpoint step was triggerded. 

![alt text](images/pipeline_success.png "Title")

As we can see in the following picture, our previous endpoint got his settings updated.  
In addition to the new model artefacts, an elastic inference and data capture configurations were added.

![alt text](images/endpoint_updated.png "Title")

Prior to executing the pipeline a second time, I manually deleted a few pictures from the training set.  
Naturally, the trained model performed below 32% accucary and nothing happened.

![alt text](images/pipeline_fail.png "Title")

As a last sanity check (mainly to ensure that the endpoint is using the provided inference script), let's call our previously created lambda function.

In [42]:
client = boto3.client('lambda')
response = client.invoke(
    FunctionName='inference_capstone',
    Payload='{"url": "https://sagemaker-us-east-1-646714458109.s3.amazonaws.com/capstone-inventory-project/data/train/1/00014.jpg"}',
)

In [43]:
json.loads(response['Payload'].read().decode())["body"]

[[0.024132639169692993,
  0.014283359050750732,
  -0.0014040172100067139,
  -0.3216896057128906,
  -0.8034760355949402]]

Everything is working fine!!