Set Sagemaker dependencies

In [1]:
import boto3
import sagemaker


region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket() # the default S3 bucket where you will store everything that cames from the pipeline
model_package_group_name = f"BankNoteAuthentication"

Copy the data from the local path (the data from the side bar) to the  default S3 bucket.

In [2]:
local_path = "data/data.csv" # local path where you have the data

s3 = boto3.resource("s3")
base_uri = f"s3://{default_bucket}/banknote"

# This line copies your data in the local path to a default S3 bucket
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
) 

Specify parameters that will be useful when setting the pipeline.  

In [3]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)


processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)


processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)

training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")


model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)


Create a preprocessing script and store it in a folder called 'banknote'. 

Then create a ```SKLearnProcessor``` instance and specify some dependencies to run the Preprocessing Step. 

Then pass the code and the Processor to a ```ProcessingStep```.

In [4]:
!mkdir -p banknote # create a folder called 'banknote'

In [5]:
%%writefile banknote/preprocessing.py 

import pandas as pd
import numpy as np


if __name__ == "__main__":
    print("preprocessing")
    
    base_dir = "/opt/ml/processing" # this path will make more sense once you build the ProcessingStep
    # read data
    df = pd.read_csv(f"{base_dir}/input/data.csv", sep=",", 
                     error_bad_lines=False, engine='python') # just to avoid an error




    # split data into dependent and independent variables
    X = df.drop('Target', axis=1)
    y = df['Target']




    # create the processed dataset
    dataset = np.column_stack((y, X))

    np.random.shuffle(dataset)
    
    # Split into train validation and test datasets
    train, validation, test = np.split(dataset, [int(.6*len(dataset)), int(.7*len(dataset))])

    # Save the data
    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)


Overwriting banknote/preprocessing.py


Create a SKLearn processor instance:

In [6]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-process",
    role=role,
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


Pass the processor and the code into a ```ProcessingStep```. 

Also specify the inputs and outputs paths. 

NOTE: '/opt/ml/processing' is just a default path in the processing container. 

In [7]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


step_process = ProcessingStep(
    name="bankNote",
    processor=sklearn_processor, # SKLearnProcessor
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ], # copies the data from the S3 bucket to the container
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ], # You store the output in s3 (example: sagemaker-us-east-<>/<base_job_name>/output/train/train.csv)
    code="banknote/preprocessing.py", 
)

# Training Step

Import a build-in image of XGBoost and use it as the estimator for the ```TrainingStep``` class. 

Set the hyperparameters. When building the ```TrainingStep``` we pass the estimator and the inputs ('train.csv' and 'test.csv')

In [8]:
from sagemaker.estimator import Estimator

# Set the model path in the S3 bucket
model_path = f"s3://{default_bucket}/bankNote"

# Retrieve the XGBoost image from the SageMaker repository
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)

# Set the XGBoost image as an estimator
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
)


# Set the hyperparameters 
xgb_train.set_hyperparameters(
    objective="binary:logistic", 
    num_round=50, 
    max_depth=5, 
    subsample=1, 
    silent=0,
    eval_metric="logloss",
    eta=0.3, 
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


Now that we have our estimator, we pass it to the ```TrainingStep``` class. 

also specify the data inputs ('train.csv' and 'validation.csv')

In [9]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="bankNote2",
    estimator=xgb_train,
    # Data inputs in this step are the outputs in the previous step ('train.csv' and 'validation.csv') 
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",    
        ),
        
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

# Evaluation Step

In this step first we develop an evaluation script. 

Then we create a Processor intance with ```ScriptProcessor``` and a property file with ```PropertyFile```. 

Then we develop the ```ProcessingStep```

In [10]:
%%writefile banknote/evaluation.py

import json
import pathlib
import pickle
import tarfile


import numpy as np
import pandas as pd
import xgboost


from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score

if __name__ == "__main__":
    # import the model
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    
    model = pickle.load(open("xgboost-model", "rb"))

    # import test set
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    

    # Split into dependent and independent variables
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    
    # make predictions
    predictions = model.predict(X_test)
    predictions = np.where(predictions > 0.5, 1, 0 )
 
    # compute accuracy
    acc = accuracy_score(y_test, predictions)
    
    # Create a json file with the metrics results
    report_dict = {
        "metrics": {
            "accuracy": {
                "value": acc
            },
        },
    }
    
    # Set the output directory
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    # Save the json file with the metrics result
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting banknote/evaluation.py


Set the processor instance. 

In [11]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri, # The XGBoost image
    command=["python3"], # We run python3 inside the container
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-eval",
    role=role
)

Before defining the ```ProcessingStep```  define a ```PropertyFile```. Property files are used to store information from the output of a processing step.

In [12]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

# The path parameter is the name of the JSON file that the property file is saved to.
# output_name must match the output_name of the ProcessingOutput that you define in your processing step.

Define the ```ProcessingStep```. 

Pass the ```ScriptProcessor```, the inputs ('model.tar.gz' and 'test.csv'), the ```PropertyFile``` and the evaluation.py script.


In [13]:
step_eval = ProcessingStep(
    name="bankNoteEval",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts, # S3 path of model.tar.gz
            destination="/opt/ml/processing/model", # destination in the processing container
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, # S3 path to test.csv
            destination="/opt/ml/processing/test", # destination in the processing container
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ], # the output is the 'evaluation.json' file
    code="banknote/evaluation.py",
    property_files=[evaluation_report],
)

# Create Model Step

Pass the XGBoost image and the 'model.tar.gz' file to it. 

Then create an input with the ```CreateModelInput``` class. 

Finally pass both of them to the ```CreateModelStep```.

In [14]:
from sagemaker.model import Model


model = Model(
    image_uri=image_uri, # the XGBoost image
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, # The S3 location of the 'model.tar.gz' file
    sagemaker_session=sagemaker_session,
    role=role,
)

In [15]:
from sagemaker.inputs import CreateModelInput


inputs = CreateModelInput(
    instance_type="ml.m5.large",
    accelerator_type="ml.eia1.medium",
)

In [16]:
from sagemaker.workflow.steps import CreateModelStep


step_create_model = CreateModelStep(
    name="banknoteCreateModel",
    model=model,
    inputs=inputs,
)

# Register Model Step

Register the model if it passes a condition step (defined below). 

For the Register Model Step, first set the model metrics with the ```ModelMetrics``` class, passing the 'evaluation.json' file to it. 

Then create the Register Model Step with the ```RegisterModel``` class. 

Then pass the estimator, the 'model.tar.gz' file and the model metrics object to it. 

In [17]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
step_register = RegisterModel(
    name="banknoteRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, # The S3 uri to the 'model.tar.gz' file
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

# Condition step

For the Condition Step, create a condition with the ```ConditionGreaterThanOrEqualTo``` class, defining a threshold and the accuracy value. 

Then pass the Condition object to the ```ConditionStep```.

In [18]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)


condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=step_eval,
        property_file=evaluation_report, # we pass the property file object
        json_path="metrics.accuracy.value",
    ),
    right=0.8, # the threshold
)

step_cond = ConditionStep(
    name="banknoteAccCond",
    conditions=[condition],
    if_steps=[step_register, step_create_model], # if the condition is true we execute the Register Model and Create Model steps
    else_steps=[],
)

The class JsonGet has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


# Create the Pipeline

Create a pipeline with the ```Pipeline``` class.  

Pass the parameters that we use in the steps (the ones defined at the beginning of the notebook), and the steps of the pipeline. 

The create model step and register model step are triggered if the condition from the condition step is true

In [19]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"banknotePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond]
)

Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist, or update the pipeline if it does. 

In [20]:
pipeline.upsert(role_arn=role);

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.


Start the pipeline

In [21]:
execution = pipeline.start()

In [22]:
execution.wait() # wait untill the process ends

Once the pipeline process is completed, check the output metrics from the evaluation step (the 'evaluation.json' file).  

In [23]:
from pprint import pprint
import json

evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))

{'metrics': {'accuracy': {'value': 0.9927184466019418}}}


In [24]:
sagemaker.s3.S3Downloader

sagemaker.s3.S3Downloader

In [25]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

{'StepName': 'bankNote', 'StartTime': datetime.datetime(2023, 6, 15, 20, 9, 38, 853000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 6, 15, 20, 14, 3, 516000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:017785611837:processing-job/pipelines-j2ssuq5qr7g8-bankNote-fKVyqeFBwW'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...9bf38b946778/input/code/preprocessing.py,Input,DataSet,ContributedTo,artifact
1,s3://...us-east-1-017785611837/banknote/data.csv,Input,DataSet,ContributedTo,artifact
2,68331...om/sagemaker-scikit-learn:0.23-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...peline/j2ssuq5qr7g8/bankNote/output/test,Output,DataSet,Produced,artifact
4,s3://.../j2ssuq5qr7g8/bankNote/output/validation,Output,DataSet,Produced,artifact
5,s3://...eline/j2ssuq5qr7g8/bankNote/output/train,Output,DataSet,Produced,artifact


{'StepName': 'bankNote2', 'StartTime': datetime.datetime(2023, 6, 15, 20, 14, 3, 933000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 6, 15, 20, 16, 37, 213000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:017785611837:training-job/pipelines-j2ssuq5qr7g8-bankNote2-EGzqe0dlxu'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://.../j2ssuq5qr7g8/bankNote/output/validation,Input,DataSet,ContributedTo,artifact
1,s3://...eline/j2ssuq5qr7g8/bankNote/output/train,Input,DataSet,ContributedTo,artifact
2,68331...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...bankNote2-EGzqe0dlxu/output/model.tar.gz,Output,Model,Produced,artifact


{'StepName': 'bankNoteEval', 'StartTime': datetime.datetime(2023, 6, 15, 20, 16, 37, 893000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 6, 15, 20, 21, 4, 561000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:017785611837:processing-job/pipelines-j2ssuq5qr7g8-bankNoteEval-52HinNL5Yr'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...0ef0107d5f26ccd/input/code/evaluation.py,Input,DataSet,ContributedTo,artifact
1,s3://...peline/j2ssuq5qr7g8/bankNote/output/test,Input,DataSet,ContributedTo,artifact
2,s3://...bankNote2-EGzqe0dlxu/output/model.tar.gz,Input,Model,ContributedTo,artifact
3,68331...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
4,s3://...10937660ef0107d5f26ccd/output/evaluation,Output,DataSet,Produced,artifact


{'StepName': 'banknoteAccCond', 'StartTime': datetime.datetime(2023, 6, 15, 20, 21, 4, 953000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 6, 15, 20, 21, 5, 550000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'Condition': {'Outcome': 'True'}}}


None

{'StepName': 'banknoteRegisterModel-RegisterModel', 'StartTime': datetime.datetime(2023, 6, 15, 20, 21, 6, 529000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 6, 15, 20, 21, 7, 888000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:017785611837:model-package/banknoteauthentication/2'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...bankNote2-EGzqe0dlxu/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,68331...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
2,banknoteauthentication-2-PendingManualApproval...,Input,Approval,ContributedTo,action
3,BankNoteAuthentication-1686859586-aws-model-pa...,Output,ModelGroup,AssociatedWith,context


{'StepName': 'banknoteCreateModel', 'StartTime': datetime.datetime(2023, 6, 15, 20, 21, 6, 529000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 6, 15, 20, 21, 8, 292000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:017785611837:model/pipelines-j2ssuq5qr7g8-banknotecreatemodel-coqkxwpw3s'}}}


None