#### Notes:
- Work on us-east-2
- all bucket objects must be us-east-2
- work on 10k rows of training and 5k rows of test

Framework:

https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/abalone_build_train_deploy/sagemaker-pipelines-preprocess-train-evaluate-batch-transform.ipynb

In [1]:
import sys
!{sys.executable} -m pip install "sagemaker==2.91.1"




In [2]:
import boto3
import sagemaker
import boto3.session
from sagemaker.session import Session
from sagemaker.xgboost.estimator import XGBoost
import os

In [3]:
AWS_ACCESS_KEY = 'a'
AWS_SECRET = 'b'

region_name='us-east-2'

boto_session = boto3.session.Session(
   aws_access_key_id=AWS_ACCESS_KEY,
   aws_secret_access_key=AWS_SECRET,
   region_name=region_name
)

sagemaker_session = Session(boto_session=boto_session)

In [4]:
model_package_group_name = f"AmazonModelPackageGroupName"

role = 'arn:aws:iam::013747046745:role/sagemaker-role-amazon'

In [5]:
local_path = "s3_data/train_10k.csv"


default_bucket = 'sm-amazon-nk'
s3 = boto3.resource("s3")
#missing k in naming, should have been train_10k
s3.Bucket('sm-amazon-nk').download_file(
    "train_10.csv", local_path
)

base_uri = f"s3://{default_bucket}"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sm-amazon-nk//train_10k.csv


In [6]:
#keep batch_data_uri asame as input_data_uri same asfor now
batch_data_uri = input_data_uri
print(batch_data_uri)

s3://sm-amazon-nk//train_10k.csv


#### Define Parameters to Parametize pipeline

In [7]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

#instance count of processing job
processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1)
#type of processinj job
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", 
    default_value="ml.m5.xlarge"
)
# ml.* instance type of processing job
instance_type = ParameterString(name="TrainingInstanceType",
                                default_value="ml.m5.xlarge")
#for CI/CD purposes, keep it manual as default
model_approval_status = ParameterString(
    name="ModelApprovalStatus", 
    default_value="PendingManualApproval"
)
#S3 backet URI location of input data
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
#S3 bucket URI for batch data
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)
#accuracy threshold
acc_threshold = ParameterFloat(name="AccThreshold",
                               default_value=0.75)

#### Define Processing Step for Feature Engineering

In [8]:
!mkdir -p amazon

In [9]:
%%writefile amazon/preprocessing.py
import argparse
import os
import requests
import tempfile

import numpy as np
import pandas as pd

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer



# specify the column names here.
feature_columns_names = ["text"]
label_column = "label"

feature_columns_dtype = {
    "text": str}

label_column_dtype = {"label": np.int_}


def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/train_10k.csv",
        header=None,
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),
    )
    
    df.dropna(axis=0, how='any', inplace=True)
    
    preprocess = ColumnTransformer(
        [
            TfidfVectorizer(strip_accents='ascii', 
                        lowercase=True,
                        analyzer = 'word',
                        stop_words='english',
                        token_pattern = r'(?u)\b\w\w+\b',
                        max_df = 0.95,
                        min_df = 5)
        ]
    )
    
    y = df.pop('label')
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), -1)
    X = np.concatenate((y_pre, X_pre), axis = 1)
    
    np.random.shuffle(X)
    train, validation, test = np.split(X, 
                                       [int(0.7 * len(X)),
                                        int(0.85 * len(X))])
    
    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        f"{base_dir}/validation/validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
    

Overwriting amazon/preprocessing.py


In [10]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-amazon-process",
    role=role,
)

##### Construct a Processing Step

In [11]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

base_dir = "s3_data"

step_process = ProcessingStep(
    name="AmazonProcess",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data,
                        destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="amazon/preprocessing.py",
)


#### Define Training Step to Train a Model

In [12]:
from sagemaker.estimator import Estimator


model_path = f"s3://{default_bucket}/AmazonTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region_name,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
)
xgb_train.set_hyperparameters(
    objective="binary:logistic",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

In [13]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="AmazonTrain",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

#### Define a Model Evaluation Step to Evaluate the Trained Model


In [14]:
%%writefile amazon/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix



if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)

    X_test = xgboost.DMatrix(df.values)

    predictions = model.predict(X_test)

    acc = accuracy_score(y_test, predictions)
    report_dict = {
        "Prediction_metrics": {
            "accuracy": {"value": acc},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting amazon/evaluation.py


##### Script Processor

In [15]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-amazon-eval",
    role=role,
)

In [16]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation", 
    path="evaluation.json"
)
step_eval = ProcessingStep(
    name="AmazonEval",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation",
                         source="/opt/ml/processing/evaluation"),
    ],
    code="amazon/evaluation.py",
    property_files=[evaluation_report],
)

#### Define a Create Model Step to Create a Model

In [17]:
from sagemaker.model import Model


model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
)

In [18]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep


inputs = CreateModelInput(
    instance_type="ml.m5.large",
    accelerator_type="ml.eia1.medium",
)
step_create_model = CreateModelStep(
    name="AmazonCreateModel",
    model=model,
    inputs=inputs,
)

In [19]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/AmazonTransform",
)

In [20]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="AmazonTransform", 
    transformer=transformer,
    inputs=TransformInput(data=batch_data)
)

#### Define a Register Model Step to Create a Model Package


In [21]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
step_register = RegisterModel(
    name="AmazonRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

The class RegisterModel has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


#### Define a Fail Step to Terminate the Pipeline Execution and Mark it as Failed

In [22]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="AmazonAccFail",
    error_message=Join(on=" ", values=["Execution failed due to ACC <", acc_threshold]),
)

#### Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry, Or Terminate the Execution in Failed State

In [23]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="prediction_metrics.acc.value",
    ),
    right=acc_threshold,
)

step_cond = ConditionStep(
    name="AmazonAccCond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)


#### Define a Pipeline of Parameters, Steps, and Conditions


In [24]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"AmazonPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        batch_data,
        acc_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

#### Examining the pipeline definition

In [25]:
import json


definition = json.loads(pipeline.definition())
definition

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sm-amazon-nk//train_10k.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sm-amazon-nk//train_10k.csv'},
  {'Name': 'AccThreshold', 'Type': 'Float', 'DefaultValue': 0.75}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'AmazonProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingIn

#### Submit the pipeline to SageMaker and start execution

In [26]:
pipeline.upsert(role_arn=role)


No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'PipelineArn': 'arn:aws:sagemaker:us-east-2:013747046745:pipeline/amazonpipeline',
 'ResponseMetadata': {'RequestId': 'e1142daf-8c89-47d9-810a-3908f79f9229',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'e1142daf-8c89-47d9-810a-3908f79f9229',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '82',
   'date': 'Mon, 30 May 2022 01:38:39 GMT'},
  'RetryAttempts': 0}}

In [27]:
execution = pipeline.start()

In [28]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-2:013747046745:pipeline/amazonpipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-2:013747046745:pipeline/amazonpipeline/execution/yq9qzgo9f5et',
 'PipelineExecutionDisplayName': 'execution-1653874720965',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2022, 5, 29, 18, 38, 40, 850000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 5, 29, 18, 38, 40, 850000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': '729fb781-b4e0-47e9-9321-5af7b68a100f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '729fb781-b4e0-47e9-9321-5af7b68a100f',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '391',
   'date': 'Mon, 30 May 2022 01:38:40 GMT'},
  'RetryAttempts': 0}}

In [29]:
execution.wait()

WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"

In [None]:
#list steps
execution.list_steps()

#### Examining the Evaluation

In [None]:
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))

#### Lineage

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

#### Parametrized Executions


In [None]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType="ml.c5.xlarge",
        ModelApprovalStatus="Approved",
    )
)

In [None]:
execution.wait()

In [None]:
execution.list_steps()

In [None]:
execution = pipeline.start(parameters=dict(MseThreshold=3.0))