# Orchestrating Machine Learning Workflow with Amazon SageMaker Pipelines

## Using Bring your own code method

###### What is a workflow in Machine Learning?
A workflow in ML is a sequence of tasks that runs subsequently in the machine learning process.

The workflows are the different phases of a machine learning project. These phases include:

data collection, 
data pre-processing, 
building datasets, 
model training and refinement, 
evaluation, 
deployment to production.

Machine Learning workflows can differ from company to company, so to get the most accurate one, let’s look at few tech giants on how they define them for their own use.
A machine learning workflow describes the processes involved in machine learning work.

Machine Learning workflow is a combination of the defined steps in a specific succession. It starts with defining problems and processes through Data preparation, Algorithm Selection, Training Model, Testing, and Evaluation respectively.

###### What is pipeline in Machine Learning?
Pipelines in machine learning are an infrastructural medium for the entire ML workflow. Pipelines help automate the overall MLOps workflow, from data gathering, EDA, data augmentation, to model building and deployment. After the deployment, it also supports reproduction, tracking, and monitoring.

ML pipelines help improve the performance and management of the entire model, resulting in quick and easy deployment.

A machine learning pipeline is a way to codify and automate the workflow it takes to produce a machine learning model. Machine learning pipelines consist of multiple sequential steps that do everything from data extraction and preprocessing to model training and deployment.

A machine learning pipeline is used to help automate machine learning workflows. They operate by enabling a sequence of data to be transformed and correlated together in a model that can be tested and evaluated to achieve an outcome, whether positive or negative.
Machine learning (ML) pipelines consist of several steps to train a model. Machine learning pipelines are iterative as every step is repeated to continuously improve the accuracy of the model and achieve a successful algorithm. To build better machine learning models, and get the most value from them, accessible, scalable and durable storage solutions are imperative, paving the way for on-premises object storage.

###### What is Sagemaker Pipeline

In [None]:
A Sagemaker Pipeline
https://docs.aws.amazon.com/sagemaker/latest/dg/define-pipeline.html

In [None]:
Dataset

In [None]:
https://www.kaggle.com/blastchar/telco-customer-churn
https://www.kaggle.com/bhartiprasad17/customer-churn-prediction

Data ingestion part is the first step of this notebook and upload the downloaded dataset into S3 bucket. You can select our own data set for the **`input_path`** as is appropriate.

In [54]:
import boto3
import sagemaker
import pandas as pd
import os

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
account_id = boto3.client("sts").get_caller_identity().get("Account")
region = boto3.session.Session().region_name

input_path = f"s3://{default_bucket}/customer_churn/"

In [None]:
df = pd.read_csv("WA_Fn-UseC_-Telco-Customer-Churn.csv")

df.to_csv("s3://sagemaker-ap-southeast-1-120582440665/customer_churn/")

In [17]:
input_dataset_path = os.path.join(input_path, "WA_Fn-UseC_-Telco-Customer-Churn.csv")

In [None]:
Install predefined Sagemaker libraries

In [55]:
import os
import pytz
from datetime import datetime

import boto3
import sagemaker
import sagemaker.session

from sagemaker.transformer import Transformer
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput,TransformInput,CreateModelInput
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)
from sagemaker.network import NetworkConfig
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.conditions import (
    ConditionGreaterThanOrEqualTo,
)
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
    TransformStep,
    CreateModelStep
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model import Model


sess = sagemaker.Session()

Next step is start session

In [56]:
def get_session(region, default_bucket):
    """Gets the sagemaker session based on the region.
    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts
    Returns:
        `sagemaker.session.Session instance
    """

    boto_session = boto3.Session(region_name=region)

    sagemaker_client = boto_session.client("sagemaker")
    runtime_client = boto_session.client("sagemaker-runtime")
    return sagemaker.session.Session(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        sagemaker_runtime_client=runtime_client,
        default_bucket=default_bucket,
    )

In [None]:
Define Parameters to Parametrize Pipeline Execution

In [66]:
# Parameters for pipeline execution
processing_instance_count = ParameterInteger(
        name="ProcessingInstanceCount", default_value=1
    )
processing_instance_type = ParameterString(
        name="ProcessingInstanceType", default_value="ml.t3.medium"
    )
postprocessing_instance_type = ParameterString(
        name="PostProcessingInstanceType", default_value="ml.m5.2xlarge"
    )
        
preprocess_output_data = ParameterString(
        name="InputDataUrl",
        default_value= os.path.join(input_path, "preprocess_output_data/"),
    )
    
training_instance_type = ParameterString(
        name="TrainingInstanceType", default_value="ml.m5.12xlarge"
    )
    
model_path = ParameterString(
        name="ModelPath",
        default_value=os.path.join(input_path, "model_path/"), 
    )

inference_instance_type = ParameterString(
        name="InferenceInstanceType", default_value="ml.m5.large"
    )
    
inference_output_data=ParameterString(
        name="InferencePath",
        default_value=os.path.join(input_path, "inference_output_data/"),
    )
    
model_approval_status = ParameterString(
        name="ModelApprovalStatus",
        default_value="Approved",  # ModelApprovalStatus can be set to a default of "Approved" if you don't want manual approval.
    )

In [25]:
%%writefile preprocessing/preprocessing.py
import boto3
import pandas as pd
import warnings
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=DeprecationWarning)
import argparse
import os
import warnings
warnings.simplefilter(action='ignore')
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

def change_format(df):
    df['TotalCharges'] = pd.to_numeric(df.TotalCharges, errors='coerce')
    
    return df

def missing_value(df):
    print("count of missing values: (before treatment)", df.isnull().sum())
    
    df['TotalCharges'] = df['TotalCharges'].fillna(df['TotalCharges'].mean())
    print("count of missing values: (before treatment)", df.isnull().sum())
    print("missing values successfully replaced")
    return df

def data_manipulation(df):
    df = df.drop(['customerID'], axis = 1)
    
    return df

def cat_encoder(df, variable_list):
    dummy = pd.get_dummies(df[variable_list], drop_first = True)
    df = pd.concat([df, dummy], axis=1)
    df.drop(df[cat_var], axis = 1, inplace = True)
    
    print("Encoded successfully")
    return df

def scaling(X):  
    min_max=MinMaxScaler()
    X=pd.DataFrame(min_max.fit_transform(X),columns=X.columns)
    
    return X

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-test-split-ratio", type=float, default=0.3)
    args, _ = parser.parse_known_args()

    print("Received arguments {}".format(args))

    input_data_path = os.path.join("/opt/ml/processing/input", 'WA_Fn-UseC_-Telco-Customer-Churn.csv')


    print("Reading input data from {}".format(input_data_path))
    df = pd.read_csv(input_data_path)
    
    columns = ['customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents',
           'tenure', 'PhoneService', 'MultipleLines', 'InternetService',
           'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport',
           'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling',
           'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'Churn']

    df.columns = columns

    cat_var = ['gender', 'Partner', 'Dependents', 'PhoneService', 'MultipleLines', 'InternetService',
           'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport',
           'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling',
           'PaymentMethod', 'Churn']
    
    df = data_manipulation(missing_value(change_format(df)))
    df = cat_encoder(df, cat_var)

    X = df.iloc[:, 0:30]
    y = df.iloc[:, -1]
    X = scaling(X)
    
    print("Saving the outputs")
    X_output_path = os.path.join("/opt/ml/processing/train", "X.csv")   
        
    print("Saving output to {}".format(X_output_path))
    pd.DataFrame(X).to_csv(X_output_path, header=False, index=False)
    
    y_output_path = os.path.join("/opt/ml/processing/test", "y.csv")   
        
    print("Saving output to {}".format(y_output_path))
    pd.DataFrame(y).to_csv(y_output_path, header=False, index=False)

Writing preprocessing/preprocessing.py


In [29]:
ecr_repository = "cutomer-churn-preprocessing-image"
tag = ":latest"
uri_suffix = "amazonaws.com"
    
preprocessing_repository_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository + tag
    )
        
script_processor = ScriptProcessor(
         command = ["python3"],
         image_uri = preprocessing_repository_uri,
         role = role,
         instance_count = 1,
         instance_type = processing_instance_type,
         #tags = generic_tags + [{'Key': 'JobType', 'Value': 'Preprocessing'}],
         #network_config = NetworkConfig(subnets=subnets.split(':'), security_group_ids=security_group_ids.split(':'))
    )

In [None]:
generic_tags=[{'Key': 'EnvironmentName', 'Value': env}, {'Key': 'ProjectName', 'Value': 'project_name'},{'Key': 'DepartmentName', 'Value': 'DepartmentName'}, {'Key': 'UsecaseName', 'Value': "project_name"}, {'Key': 'ResourceName', 'Value': 'sagemaker_notebook'}, {'Key': 'OwnerName', 'Value': 'mlops'}, {'Key': 'BUName', 'Value': 'mobile'} ]
    

In [30]:
input_data = "s3://dlk-cloud-tier-10-preprocessed-ml-dev/PREPAID_CHURN/WA_Fn-UseC_-Telco-Customer-Churn.csv"
X_output_data = "s3://dlk-cloud-tier-10-preprocessed-ml-dev/PREPAID_CHURN/X/"
y_output_path = "s3://dlk-cloud-tier-10-preprocessed-ml-dev/PREPAID_CHURN/y/"

In [None]:
input_data = os.path.join(input_path,"WA_Fn-UseC_-Telco-Customer-Churn.csv")
X_output_data = os.path.join(input_path,"X/")
y_output_path = os.path.join(input_path,"y/")

In [43]:
step_preprocess = ProcessingStep(
    name="customer-churn-preprocessing",
    processor=script_processor,
    code="preprocessing/preprocessing.py",
    inputs=[ProcessingInput(source=input_data, destination="/opt/ml/processing/input")],
    outputs=[
            ProcessingOutput(output_name="train", destination=X_output_data, source="/opt/ml/processing/train"),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination=y_output_path),
        ]
)

## Define a Training Step to Train a Model

In [86]:
%%writefile training/model/train.py
#Import the neccessary libaries in here
import os
import pandas as pd
from xgboost import XGBClassifier,plot_importance
#from imblearn.over_sampling import SMOTE
#from imblearn.combine import SMOTETomek # doctest: +NORMALIZE_WHITESPACE
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support as score
from sklearn.metrics import accuracy_score, precision_score, recall_score, auc,roc_curve,r2_score,confusion_matrix,roc_auc_score
from sklearn.model_selection import GridSearchCV
import argparse
import pickle
import boto3
from sklearn.metrics import confusion_matrix , classification_report


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-test-split-ratio", type=float, default=0.3)
    args, _ = parser.parse_known_args()

    print("Received arguments {}".format(args))
    training_data_directory = '/opt/ml/input/data/training/'
    training_data_directory2 = '/opt/ml/input/data/test/'
    train_features_data = os.path.join(training_data_directory, "X.csv")
    train_labels_data = os.path.join(training_data_directory2, "y.csv")
    print("Reading input data")
    print("Reading input data from {}".format(train_features_data))
    X = pd.read_csv(train_features_data, header = None)
    
    print("Reading input data from {}".format(train_labels_data))
    y = pd.read_csv(train_labels_data, header = None)
    
    columns = ['SeniorCitizen', 'tenure', 'MonthlyCharges', 'TotalCharges',
       'gender_Male', 'Partner_Yes', 'Dependents_Yes', 'PhoneService_Yes',
       'MultipleLines_No phone service', 'MultipleLines_Yes',
       'InternetService_Fiber optic', 'InternetService_No',
       'OnlineSecurity_No internet service', 'OnlineSecurity_Yes',
       'OnlineBackup_No internet service', 'OnlineBackup_Yes',
       'DeviceProtection_No internet service', 'DeviceProtection_Yes',
       'TechSupport_No internet service', 'TechSupport_Yes',
       'StreamingTV_No internet service', 'StreamingTV_Yes',
       'StreamingMovies_No internet service', 'StreamingMovies_Yes',
       'Contract_One year', 'Contract_Two year', 'PaperlessBilling_Yes',
       'PaymentMethod_Credit card (automatic)',
       'PaymentMethod_Electronic check', 'PaymentMethod_Mailed check']
    #df_train_labels = pd.read_csv(train_labels_path)

    #print("Loading validation dataframes...")
    #df_val_features = pd.read_csv(val_features_path)
    #df_val_labels = pd.read_csv(val_labels_path)
    
    X.columns = columns
    
    column = ['Churn']
    
    y.columns = column
    
    print("Successfully rename the dataset")
    
    print("split the dataset")
    X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.3,random_state=5)
    
    print("train the model")
    xgb = XGBClassifier()
    parameters = {
        'n_estimators': [100, 250, 500],
        'max_depth': [3, 5],
        'learning_rate' : [0.01, 0.05, 0.1],
        'gamma' : [0.0, 0.1, 0.2],
        'min_child_weight' : [1, 3]
    }
    
    cv = GridSearchCV(xgb, parameters, cv=3)
    
    print("fitting the model")
    cv.fit(X_train, y_train.values.ravel())

    final_model = cv.best_estimator_

    y_pred = final_model.predict(X_test)

    print(confusion_matrix(y_test,final_model.predict(X_test)))

    print(classification_report(y_test,y_pred))

    print(roc_auc_score(y_test,final_model.predict_proba(X_test)[:, 1]))
    
    OUTPUT_DIR = "/opt/ml/model/"
    
    print("Saving model....")
            
    print("Saving model....")
    path = os.path.join(OUTPUT_DIR, "temp_dict.pkl")
    print(f"saving to {path}")
    with open(path, 'wb') as p_file:
        pickle.dump(final_model, p_file)
            
    print('Training Job is completed.')

Overwriting training/model/train.py


In [87]:
recommender_image_uri = '120582440665.dkr.ecr.ap-southeast-1.amazonaws.com/cutomer-churn-training-image:latest'
    
estimator = Estimator(image_uri=recommender_image_uri,
                      role=role,
                      sagemaker_session=sess,                                  # Technical object
                      output_path=model_path,
                      base_job_name='cutomer-churn-training-job',
                      input_mode='File',                                       # Copy the dataset and then train    
                      train_instance_count=1,
                      train_instance_type=training_instance_type,
                      debugger_hook_config=False,
                      disable_profiler = True,
                      metric_definitions=[
                          # Only 40 Metrics can be accomodated
                            {'Name': 'acc_train:' , 'Regex': 'acc_train:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'prec_train' , 'Regex': 'prec_train:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'rec_train:' , 'Regex': 'rec_train:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'f1_sc_train' , 'Regex': 'f1_sc_train:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'acc_test:' , 'Regex': 'acc_test:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'prec_test' , 'Regex': 'prec_test:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'rec_test:' , 'Regex': 'rec_test:([-+]?[0-9]*\.?[0-9]+)'},
                            {'Name': 'f1_sc_test' , 'Regex': 'f1_sc_test:([-+]?[0-9]*\.?[0-9]+)'}
                      ],
                      #tags = generic_tags + [{'Key': 'JobType', 'Value': 'Training'}],
                      #subnets = subnets.split(':'),
                      #security_group_ids = security_group_ids.split(':')
                         )

train_instance_count has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.
train_instance_type has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


In [88]:
step_train = TrainingStep(
        name="cutomer-churn-training-job",
        estimator=estimator,
        inputs = {
            "training": TrainingInput(
                s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
                content_type="text/csv",
            ),
           "test": TrainingInput(
                s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
                content_type="text/csv",
            )
        }
    )

## Define a Create Model Step to Create a Model

In [89]:
from sagemaker.model import Model


model = Model(
    image_uri=recommender_image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
)

In [90]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep


inputs = CreateModelInput(
    instance_type="ml.m5.large",
    accelerator_type="ml.eia1.medium",
)
step_create_model = CreateModelStep(
    name="Customer-churn-CreateModel",
    model=model,
    inputs=inputs,
)

In [119]:
image = "120582440665.dkr.ecr.ap-southeast-1.amazonaws.com/customer-churn-inference-image:latest"

In [120]:
step_create_model = CreateModelStep(
        name="Customer-churn-CreateModel",
        model=Model(image, 
                    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
                    role = role,
                    sagemaker_session = sagemaker_session,
                    ),
        inputs=CreateModelInput(instance_type="ml.m5.large")
    )

## Define a Transform Step to Perform Batch Transformation

without flask app (not the seperate inference job)

In [121]:
model_name = step_create_model.properties.ModelName

transformer = Transformer(model_name=model_name,
                          instance_count=1,
                          strategy='SingleRecord',
                          #max_payload=10, #Optional
                          assemble_with="Line",
                          instance_type="ml.m5.12xlarge",
                          output_path=f"s3://{default_bucket}/customer_churn/transform",
                          #tags = generic_tags + [{'Key': 'JobName', 'Value': 'Inference'}]
                )

In [122]:
inference_data = os.path.join(input_path,"WA_Fn-UseC_-Telco-Customer-Churn.csv")

In [123]:
step_transform = TransformStep(
        name="Customer-churn-BatchTransform",
        transformer=transformer,
        inputs=TransformInput(data="s3://dlk-cloud-tier-10-preprocessed-ml-dev/PREPAID_CHURN/X/X.csv",
                              #data = step_preprocess.properties.ProcessingOutputConfig.Outputs["inference"].S3Output.S3Uri,
                              split_type="Line",
                              content_type="text/csv")
    )

## Define a Register Model Step to Create a Model Package

In [95]:
model_package_group_name = f"CustomerChurnModelPackageGroupName"

In [96]:
step_register = RegisterModel(
    name="Customer-churn-RegisterModel",
    estimator=estimator,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    #model_metrics=model_metrics,
)

In [124]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"CustomerChurnPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
        model_path,
    ],
    steps=[step_preprocess,
           step_train, 
           #step_eval, 
           step_create_model,
           step_register,
           step_transform
          ],
)

In [125]:
pipeline.upsert(role_arn=role)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'PipelineArn': 'arn:aws:sagemaker:ap-southeast-1:120582440665:pipeline/customerchurnpipeline',
 'ResponseMetadata': {'RequestId': '820d98db-0819-480f-ad2d-d68287dde2d4',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '820d98db-0819-480f-ad2d-d68287dde2d4',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '94',
   'date': 'Tue, 01 Feb 2022 15:26:48 GMT'},
  'RetryAttempts': 0}}

In [126]:
execution = pipeline.start()

In [127]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:ap-southeast-1:120582440665:pipeline/customerchurnpipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:ap-southeast-1:120582440665:pipeline/customerchurnpipeline/execution/nhrihfdef870',
 'PipelineExecutionDisplayName': 'execution-1643729211142',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'customerchurnpipeline',
  'TrialName': 'nhrihfdef870'},
 'CreationTime': datetime.datetime(2022, 2, 1, 15, 26, 51, 48000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 2, 1, 15, 26, 51, 48000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': '04c04902-1046-42a1-8316-a4b0b91cbc93',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '04c04902-1046-42a1-8316-a4b0b91cbc93',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '514',
   'date': 'Tue, 01 Feb 2022 15:26:50 GMT'},
  'RetryAttempts': 0}}

In [129]:
execution.list_steps()

[{'StepName': 'Customer-churn-BatchTransform',
  'StartTime': datetime.datetime(2022, 2, 1, 15, 42, 6, 260000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 2, 1, 15, 49, 46, 175000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:ap-southeast-1:120582440665:transform-job/pipelines-nhrihfdef870-customer-churn-batch-e19fncink4'}}},
 {'StepName': 'Customer-churn-CreateModel',
  'StartTime': datetime.datetime(2022, 2, 1, 15, 42, 4, 754000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 2, 1, 15, 42, 5, 640000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:ap-southeast-1:120582440665:model/pipelines-nhrihfdef870-customer-churn-creat-pvwcy8a6qn'}}},
 {'StepName': 'Customer-churn-RegisterModel',
  'StartTime': datetime.datetime(2022, 2, 1, 15, 42, 4, 754000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 2, 1, 15,

## Lineage
Review the lineage of the artifacts generated by the pipeline.

In [128]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

{'StepName': 'customer-churn-preprocessing', 'StartTime': datetime.datetime(2022, 2, 1, 15, 26, 52, 497000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 2, 1, 15, 34, 18, 30000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:ap-southeast-1:120582440665:processing-job/pipelines-nhrihfdef870-customer-churn-prepr-hwfgaenswf'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...3d72e7129923/input/code/preprocessing.py,Input,DataSet,ContributedTo,artifact
1,s3://...URN/WA_Fn-UseC_-Telco-Customer-Churn.csv,Input,DataSet,ContributedTo,artifact
2,12058...cutomer-churn-preprocessing-image:latest,Input,Image,ContributedTo,artifact
3,s3://...-10-preprocessed-ml-dev/PREPAID_CHURN/y/,Output,DataSet,Produced,artifact
4,s3://...-10-preprocessed-ml-dev/PREPAID_CHURN/X/,Output,DataSet,Produced,artifact


{'StepName': 'cutomer-churn-training-job', 'StartTime': datetime.datetime(2022, 2, 1, 15, 34, 18, 658000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 2, 1, 15, 42, 3, 849000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:ap-southeast-1:120582440665:training-job/pipelines-nhrihfdef870-cutomer-churn-traini-rie0pyndij'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...-10-preprocessed-ml-dev/PREPAID_CHURN/y/,Input,DataSet,ContributedTo,artifact
1,s3://...-10-preprocessed-ml-dev/PREPAID_CHURN/X/,Input,DataSet,ContributedTo,artifact
2,12058....com/cutomer-churn-training-image:latest,Input,Image,ContributedTo,artifact
3,s3://...rn-traini-RIe0PYndij/output/model.tar.gz,Output,Model,Produced,artifact


{'StepName': 'Customer-churn-RegisterModel', 'StartTime': datetime.datetime(2022, 2, 1, 15, 42, 4, 754000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 2, 1, 15, 42, 5, 844000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:ap-southeast-1:120582440665:model-package/customerchurnmodelpackagegroupname/4'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...rn-traini-RIe0PYndij/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,12058....com/cutomer-churn-training-image:latest,Input,Image,ContributedTo,artifact
2,customerchurnmodelpackagegroupname-4-Approved-...,Input,Approval,ContributedTo,action
3,CustomerChurnModelPackageGroupName-1643722245-...,Output,ModelGroup,AssociatedWith,context


{'StepName': 'Customer-churn-CreateModel', 'StartTime': datetime.datetime(2022, 2, 1, 15, 42, 4, 754000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 2, 1, 15, 42, 5, 640000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:ap-southeast-1:120582440665:model/pipelines-nhrihfdef870-customer-churn-creat-pvwcy8a6qn'}}}


None

{'StepName': 'Customer-churn-BatchTransform', 'StartTime': datetime.datetime(2022, 2, 1, 15, 42, 6, 260000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 2, 1, 15, 49, 46, 175000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:ap-southeast-1:120582440665:transform-job/pipelines-nhrihfdef870-customer-churn-batch-e19fncink4'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...rn-traini-RIe0PYndij/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,12058...om/customer-churn-inference-image:latest,Input,Image,ContributedTo,artifact
2,s3://...reprocessed-ml-dev/PREPAID_CHURN/X/X.csv,Input,DataSet,ContributedTo,artifact
3,s3://...-1-120582440665/customer_churn/transform,Output,DataSet,Produced,artifact


## Clean-Up

In [41]:
import boto3

client = boto3.client('sagemaker')

In [130]:
response = client.delete_pipeline(
    PipelineName='CustomerChurnPipeline',
    ClientRequestToken='aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2'
)