# **Orchestrating Machine Learning Workflow with Amazon SageMaker Pipelines**
## Only the Inference job - Using Bring your own code method and using prevously trained model
### Usecase- Customer Churn Prediction
------------------------------------------------------------------------------------------------------------------------

# Table of Contents
* [1. Data](#1.-Data)
	* [1.1 Import dataset](#1.1-Import-dataset)
	* [1.2 Data description](#1.2-Data-description)
* [2. Sagemaker Pipeline](#2.-Sagemaker-Pipeline)
	* [2.1 Architecture](#2.1-Architecture)
	* [2.2 Install predefined Sagemaker libraries](#2.2-Install-predefined-Sagemaker-libraries)
	* [2.3 Import other define functions](#2.3-Import-other-define-functions)
	* [2.4 Convert your model and upload it to s3](#2.4-Convert-your-model-and-upload-it-to-s3)
	* [2.5 Define Preprocessing Stage](#2.5-Define-Preprocessing-Stage)
	* [2.6 Define Inference stage](#2.6-Define-Inference-stage)
	* [2.7 Define Postprocess steps](#2.7-Define-Postprocess-steps)
	* [2.8 Define required parameters for get pipeline](#2.8-Define-required-parameters-for-get-pipeline)
	* [2.9 Othere details of the pipeline](#2.8-Othere-details-of-the-pipeline)

# 1. Data

## 1.1 Import dataset

In [85]:
import pandas as pd
df = pd.read_csv("data/data_inference.csv")

In [86]:
df.head()

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges
0,2674-MIAHT,Female,0,No,No,4,Yes,Yes,Fiber optic,No,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,80.3,324.2
1,4086-WITJG,Male,0,Yes,Yes,71,Yes,No,No,No internet service,No internet service,No internet service,No internet service,No internet service,No internet service,Two year,No,Credit card (automatic),19.7,1301.1
2,7096-ZNBZI,Female,0,Yes,No,72,Yes,Yes,No,No internet service,No internet service,No internet service,No internet service,No internet service,No internet service,Two year,No,Credit card (automatic),26.45,1914.5
3,9885-AIBVB,Male,0,Yes,No,29,Yes,Yes,Fiber optic,No,Yes,Yes,No,No,No,Month-to-month,Yes,Electronic check,85.8,2440.25
4,8647-SDTWQ,Male,0,Yes,Yes,57,Yes,Yes,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,74.3,4018.35


## 1.2 Data description

Each row represents a customer, each column contains customer’s attributes described on the column Metadata. The raw data contains 7043 rows (customers) and 21 columns (features). The “Churn” column is our target. 

- **`CustomerID:`** Customer ID 

- **`Gender:`** Whether the customer is a male or a female 

- **`SeniorCitizen:`** Whether the customer is a senior citizen or not (1, 0) 

- **`Partner:`** Whether the customer has a partner or not (Yes, No) 

- **`Dependents:`** Whether the customer has dependents or not (Yes, No) 

- **`tenure:`** Number of months the customer has stayed with the company 

- **`PhoneService:`** Whether the customer has a phone service or not (Yes, No) 

- **`MultipleLines:`** Whether the customer has multiple lines or not (Yes, No, No phone service) 

- **`InternetService:`** Customer’s internet service provider (DSL, Fiber optic, No) 

- **`OnlineSecurity:`** Whether the customer has online security or not (Yes, No, No internet service) 

- **`OnlineBackup:`** Whether the customer has online backup or not (Yes, No, No internet service)

- **`DeviceProtection:`** Whether the customer has device protection or not (Yes, No, No internet service)

- **`TechSupport:`** Whether the customer has tech support or not (Yes, No, No internet service)

- **`StreamingTV:`** Whether the customer has streaming TV or not (Yes, No, No internet service)

- **`StreamingMovies:`** Whether the customer has streaming movies or not (Yes, No, No internet service)

- **`Contract:`** The contract term of the customer (Month-to-month, One year, Two year)

- **`PaperlessBilling:`** Whether the customer has paperless billing or not (Yes, No)

- **`PaymentMethod:`** The customer’s payment method (Electronic check, Mailed check, Bank transfer (automatic), Credit card (automatic))

- **`MonthlyCharges:`** The amount charged to the customer monthly

- **`TotalCharges:`** The total amount charged to the customer

- **`Churn:`** Whether the customer churned or not (Yes or No) ---> dependent variable

# 2. Sagemaker Pipeline

SageMaker Pipelines supports the following activities, which are demonstrated in this notebook:

- Pipelines - A DAG of steps and conditions to orchestrate SageMaker jobs and resource creation.
- Processing job steps - A simplified, managed experience on SageMaker to run data processing workloads, such as feature engineering, data validation, model evaluation, and model interpretation.
- Training job steps - An iterative process that teaches a model to make predictions by presenting examples from a training dataset.
- Register model steps - A step that creates a model package resource in the Model Registry that can be used to create deployable models in Amazon SageMaker.
- Create model steps - A step that creates a model for use in transform steps or later publication as an endpoint.
- Transform job steps - A batch transform to preprocess datasets to remove noise or bias that interferes with training or inference from a dataset, get inferences from large datasets, and run inference when a persistent endpoint is not needed.
- Post processing - (Optional) A step that filtering the final predicted output base : In here we don't include that step into our pipeline
- Parametrized Pipeline executions - Enables variation in pipeline executions according to specified parameters.

### 2.1 Architecture

This **inference** pipeline contains preprocess, inference, create model and post-process steps.

![architecture](images/final_pipeline.PNG)

Lets beginning the code

![workflow](images/pic10.PNG)

## 2.2 Install predefined Sagemaker libraries


Initailly we have to install AWS predefined Sagemaker libraries.

In [1]:
#!/usr/bin/env python
# coding: utf-8

# In[1]:

import os
import pytz
from datetime import datetime

import boto3
import sagemaker
import sagemaker.session

from sagemaker.transformer import Transformer
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput,TransformInput,CreateModelInput
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)
from sagemaker.network import NetworkConfig
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.conditions import (
    ConditionGreaterThanOrEqualTo,
)
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
    TransformStep,
    CreateModelStep
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model import Model


sess = sagemaker.Session()

![workflow](images/pic11.PNG)

## 2.3 Import other define functions

Next step is start session and in this process we are defining our <b> AWS region, sagemaker client, boto 3 session</b> and <b>default s3 bucket </b>.

In [2]:
def get_session(region, default_bucket):
    """Gets the sagemaker session based on the region.
    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts
    Returns:
        `sagemaker.session.Session instance
    """

    boto_session = boto3.Session(region_name=region)

    sagemaker_client = boto_session.client("sagemaker")
    runtime_client = boto_session.client("sagemaker-runtime")
    return sagemaker.session.Session(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        sagemaker_runtime_client=runtime_client,
        default_bucket=default_bucket,
    )

## 2.4 Convert your model and upload it to s3

Next step is to convert your previouly trained model into <b>model.tar.format</b> because <b>Flask app</b> is looking this compressed format.

In [64]:
import tarfile 
import os.path 

def make_tarfile(output_filename, source_dir): 

    with tarfile.open(output_filename, "w:gz") as tar: 

        tar.add(source_dir, arcname=os.path.basename(source_dir)) 

make_tarfile("model.tar.gz", "temp_dict.pkl")  

If you need to check whether the pickle file successfully compressed into model.tra.gz file, please use this below code to unzip it.

In [63]:
import tarfile 
 

#simple function to extract the train data 
#tar_file : the path to the .tar file 
#path : the path where it will be extracted 
def extract(tar_file, path): 
    opened_tar = tarfile.open(tar_file) 

    if tarfile.is_tarfile(tar_file): 
        opened_tar.extractall(path) 
    else: 
        print("The tar file you entered is not a tar file")

extract('model.tar.gz', 'model')

Then, we will export this model.tar.gz file into s3 bucket

In [None]:
default_bucket = sess.default_bucket()
account_id = boto3.client("sts").get_caller_identity().get("Account")
region = boto3.session.Session().region_name

In [None]:
import boto3
s3 = boto3.resource('s3')
data = open('model.tar.gz','rb')
s3.Bucket(f"{default_bucket}").put_object(Key='customer_churn/model/model.tar.gz', Body=data)

In [65]:
### calling the leatest model
def model_with_pipeline():
    model_path = f"s3://{default_bucket}/customer_churn/model/model.tar.gz"
    return model_path

![workflow](images/pic12.PNG)

## 2.5 Define Preprocessing Stage

In [None]:
input_path = f"s3://{default_bucket}/customer_churn/customer_churn/inference_input/data_inference.csv"

In [None]:
df.to_csv(input_path, index = False)

This is the script used in preprocessing 

In [69]:
!pygmentize customer_churn_inference_preprocessing/preprocessing_without.py

[34mimport[39;49;00m [04m[36mboto3[39;49;00m
[34mimport[39;49;00m [04m[36mpandas[39;49;00m [34mas[39;49;00m [04m[36mpd[39;49;00m
[34mimport[39;49;00m [04m[36margparse[39;49;00m
[34mimport[39;49;00m [04m[36mos[39;49;00m
[34mimport[39;49;00m [04m[36mjson[39;49;00m

[36mprint[39;49;00m([33m"[39;49;00m[33mimport your necessary libraries in here[39;49;00m[33m"[39;49;00m) 
[34mimport[39;49;00m [04m[36mnumpy[39;49;00m [34mas[39;49;00m [04m[36mnp[39;49;00m
[34mfrom[39;49;00m [04m[36msklearn[39;49;00m[04m[36m.[39;49;00m[04m[36mpreprocessing[39;49;00m [34mimport[39;49;00m MinMaxScaler
[34mfrom[39;49;00m [04m[36msklearn[39;49;00m[04m[36m.[39;49;00m[04m[36mmodel_selection[39;49;00m [34mimport[39;49;00m train_test_split


[36mprint[39;49;00m([33m"[39;49;00m[33menter your own functions in the bellow space[39;49;00m[33m"[39;49;00m)
[34mdef[39;49;00m [32mchange_format[39;49;00m(df):
    df[[33m'[39;49;00m[33mTotal

This is the code for run the preprocessing part. You can add more inputs or outputs according to your requirement. Please refer the guideline document for more details.

In [51]:
def get_pipeline(
    region,
    subnets,
    security_group_ids,
    role=None,
    default_bucket=None,
    model_package_group_name="CustomerChurnModelPackageGroup",  # Choose any name
    pipeline_name="cutomer-churn-prediction-inference-pipeline",  # You can find your pipeline name in the Studio UI (project -> Pipelines -> name)
    base_job_prefix="cutomer-churn-prediction-inference",  # Choose any name
):
    """Gets a SageMaker ML Pipeline instance working with on CustomerChurn data.
    Args:
        region: AWS region to create and run the pipeline.
        role: IAM role to create and run steps and pipeline.
        default_bucket: the bucket to use for storing the artifacts
    Returns:
        an instance of a pipeline
    """
    
    
    #data versioning control using date
    srilanka_tz = pytz.timezone('Asia/Colombo')
    s3 = boto3.client('s3')
    date_folder = datetime.now(srilanka_tz).strftime("%Y-%m-%d")
    
    #working with input data path
    input_data = f"s3://{default_bucket}/customer_churn/inference_input/data_inference.csv"
    
    #working with output data path   
    preprocessed_output1 = f"s3://{default_bucket}/customer_churn/inference/{date_folder}/output1/"
    
    sagemaker_session = get_session(region, default_bucket)
    if role is None:
        role = sagemaker.session.get_execution_role(sagemaker_session)
    account_id = boto3.client("sts").get_caller_identity().get("Account")
    region = boto3.session.Session().region_name

    # Parameters for pipeline execution
    

    ####### --------------------- PREPROCESSING --------------------------------------------------------------------

    ecr_repository = "customer-churn-prediction-preprocessing-image"
    tag = ":latest"
    uri_suffix = "amazonaws.com"
    
    preprocessing_repository_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository + tag
    )
        
    script_processor = ScriptProcessor(
         command = ["python3"],
         image_uri = preprocessing_repository_uri,
         role = role,
         instance_count = 1,
         instance_type = "ml.m5.large",
         #tags = generic_tags + [{'Key': 'JobType', 'Value': 'Preprocessing'}],
         network_config = NetworkConfig(subnets=subnets.split(':'), security_group_ids=security_group_ids.split(':'))
    )
    
    step_preprocess = ProcessingStep(
        name= f"{base_job_prefix}-preprocessing",
        processor= script_processor, 
        code= "customer_churn_inference_preprocessing/preprocessing_without.py",
        inputs= [ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
               ],
        outputs= [
            ProcessingOutput(output_name="output1", destination=preprocessed_output1, source="/opt/ml/processing/output1"),
        ]
    )
    
    # ========================================= PIPELINE ORCHESTRATION ================================================
    
    # Pipeline instance
    pipeline = Pipeline(
        name=pipeline_name+env,
        parameters=[
        ],
        steps=[
            step_preprocess,
              ],
        sagemaker_session=sagemaker_session,
    )
    return pipeline


![workflow](images/pic13.PNG)

## 2.6 Define Inference stage

We used this script for inference stage.

In [70]:
!pygmentize customer_churn_inference/model/predictor.py

[37m# This is the file that implements a flask server to do inferences. It's the file that you will modify to[39;49;00m
[37m# implement the scoring for your own algorithm.[39;49;00m

[34mfrom[39;49;00m [04m[36m__future__[39;49;00m [34mimport[39;49;00m print_function

[34mimport[39;49;00m [04m[36mos[39;49;00m
[34mimport[39;49;00m [04m[36mjson[39;49;00m
[34mimport[39;49;00m [04m[36mpickle[39;49;00m
[34mimport[39;49;00m [04m[36mio[39;49;00m
[34mimport[39;49;00m [04m[36msys[39;49;00m
[34mimport[39;49;00m [04m[36msignal[39;49;00m
[34mimport[39;49;00m [04m[36mtraceback[39;49;00m

[34mimport[39;49;00m [04m[36mflask[39;49;00m
[34mimport[39;49;00m [04m[36mboto3[39;49;00m
[34mimport[39;49;00m [04m[36mpandas[39;49;00m [34mas[39;49;00m [04m[36mpd[39;49;00m
[34mimport[39;49;00m [04m[36mnumpy[39;49;00m [34mas[39;49;00m [04m[36mnp[39;49;00m
[37m#import statsmodels.api as sm[39;49;00m

[37m#Temporarly remove this path and g

Please execute bellow code to create the pipeline. This pipeline contains  **preprocessing and inference** steps together

In [None]:
def get_pipeline(
    region,
    subnets,
    security_group_ids,
    role=None,
    default_bucket=None,
    model_package_group_name="CustomerChurnModelPackageGroup",  # Choose any name
    pipeline_name="cutomer-churn-prediction-inference-pipeline",  # You can find your pipeline name in the Studio UI (project -> Pipelines -> name)
    base_job_prefix="cutomer-churn-prediction-inference",  # Choose any name
):
    """Gets a SageMaker ML Pipeline instance working with on CustomerChurn data.
    Args:
        region: AWS region to create and run the pipeline.
        role: IAM role to create and run steps and pipeline.
        default_bucket: the bucket to use for storing the artifacts
    Returns:
        an instance of a pipeline
    """
    
    
    #data versioning control using date
    srilanka_tz = pytz.timezone('Asia/Colombo')
    s3 = boto3.client('s3')
    date_folder = datetime.now(srilanka_tz).strftime("%Y-%m-%d")
    
    #working with input data path
    input_data = f"s3://{default_bucket}/customer_churn/inference_input/data_inference.csv"
    
    #working with output data path   
    preprocessed_output1 = f"s3://{default_bucket}/customer_churn/inference/{date_folder}/output1/"
    
    postprocessed_output1= f"s3://{default_bucket}/customer_churn/postprocess/{date_folder}/output1/"   
    
    sagemaker_session = get_session(region, default_bucket)
    if role is None:
        role = sagemaker.session.get_execution_role(sagemaker_session)
    account_id = boto3.client("sts").get_caller_identity().get("Account")
    region = boto3.session.Session().region_name

    # Parameters for pipeline execution
    
    

    ####### --------------------- PREPROCESSING --------------------------------------------------------------------

    ecr_repository = "customer-churn-prediction-preprocessing-image"
    tag = ":latest"
    uri_suffix = "amazonaws.com"
    
    preprocessing_repository_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository + tag
    )
        
    script_processor = ScriptProcessor(
         command = ["python3"],
         image_uri = preprocessing_repository_uri,
         role = role,
         instance_count = 1,
         instance_type = "ml.m5.large",
         #tags = generic_tags + [{'Key': 'JobType', 'Value': 'Preprocessing'}],
         network_config = NetworkConfig(subnets=subnets.split(':'), security_group_ids=security_group_ids.split(':'))
    )
    
    step_preprocess = ProcessingStep(
        name= f"{base_job_prefix}-preprocessing",
        processor= script_processor, 
        code= "awsworkshop_team2_inf_preprocessing/preprocessing_without.py",
        inputs= [ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
               ],
        outputs= [
            ProcessingOutput(output_name="output1", destination=preprocessed_output1, source="/opt/ml/processing/output1"),
        ]
    )
    
    
    ############## Inference Job ##############################################
    
    ecr_repository = "customer-churn-prediction-inference-image"
    tag = ":latest"
    uri_suffix = "amazonaws.com"
    
    create_model_repository_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository + tag
    )
    
    image = create_model_repository_uri
    
    #Batch Transform job
    
    #model_name = step_create_model.properties.ModelName
    job_name = "cutomerchurn-model"
    instance_type = "ml.m5.12xlarge"
    step_create_model = CreateModelStep(
        name=f"{base_job_prefix}-createmodel",
        model=Model(image, 
                    model_data=model_with_pipeline(),
                    role = role,
                    sagemaker_session = sagemaker_session,
                    name = job_name,
                    vpc_config = {'Subnets':subnets.split(':'),
                                    'SecurityGroupIds': security_group_ids.split(':')}),
        inputs=CreateModelInput(instance_type=instance_type,
                               #accelerator_type="ml.eia1.medium"
                               )
    )
    
    model_name = step_create_model.properties.ModelName
    content_type ="text/csv"
    
    transformer = Transformer(model_name=model_name,
                              instance_count=1,
                              strategy='SingleRecord',
                              #max_payload=15, 
                              assemble_with="Line",
                              instance_type=instance_type,
                              output_path=f"s3://{default_bucket}/mobile_price_pred/inference/{date_folder}/predictions/",
                              #tags = generic_tags + [{'Key': 'JobName', 'Value': 'Inference'}]
                )
    
    step_transform = TransformStep(
        name=f"{base_job_prefix}-batchtransform",
        transformer=transformer,
        inputs=TransformInput(data = step_preprocess.properties.ProcessingOutputConfig.Outputs["output1"].S3Output.S3Uri,
                             split_type="Line",
                               compression_type = 'Gzip', 
                              content_type=content_type)
    )

    # ========================================= PIPELINE ORCHESTRATION ================================================
    
    # Pipeline instance
    pipeline = Pipeline(
        name=pipeline_name+env,
        parameters=[
            #model_path,
            #model_approval_status
        ],
        steps=[
            step_preprocess,
            step_create_model,
            step_transform,
              ],
        sagemaker_session=sagemaker_session,
    )
    return pipeline


![workflow](images/pic14.PNG)

## 2.7 Define Postprocess steps 

Please execute bellow code to create the pipeline.This pipeline contains **preprocessing,inference and postprocess** steps together

In [76]:
def get_pipeline(
    region,
    subnets,
    security_group_ids,
    role=None,
    default_bucket=None,
    model_package_group_name="CustomerChurnModelPackageGroup",  # Choose any name
    pipeline_name="cutomer-churn-prediction-inference-pipeline",  # You can find your pipeline name in the Studio UI (project -> Pipelines -> name)
    base_job_prefix="cutomer-churn-prediction-inference",  # Choose any name
):
    """Gets a SageMaker ML Pipeline instance working with on CustomerChurn data.
    Args:
        region: AWS region to create and run the pipeline.
        role: IAM role to create and run steps and pipeline.
        default_bucket: the bucket to use for storing the artifacts
    Returns:
        an instance of a pipeline
    """
    
    
    #data versioning control using date
    srilanka_tz = pytz.timezone('Asia/Colombo')
    s3 = boto3.client('s3')
    date_folder = datetime.now(srilanka_tz).strftime("%Y-%m-%d")
    
    #working with input data path
    input_data = f"s3://{default_bucket}/customer_churn/inference_input/data_inference.csv"
    
    #working with output data path   
    preprocessed_output1 = f"s3://{default_bucket}/customer_churn/inference/{date_folder}/output1/"
    
    postprocessed_output1= f"s3://{default_bucket}/customer_churn/postprocess/{date_folder}/output1/"   
    
    sagemaker_session = get_session(region, default_bucket)
    if role is None:
        role = sagemaker.session.get_execution_role(sagemaker_session)
    account_id = boto3.client("sts").get_caller_identity().get("Account")
    region = boto3.session.Session().region_name

    # Parameters for pipeline execution
    
    

    ####### --------------------- PREPROCESSING --------------------------------------------------------------------

    ecr_repository = "customer-churn-prediction-preprocessing-image"
    tag = ":latest"
    uri_suffix = "amazonaws.com"
    
    preprocessing_repository_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository + tag
    )
        
    script_processor = ScriptProcessor(
         command = ["python3"],
         image_uri = preprocessing_repository_uri,
         role = role,
         instance_count = 1,
         instance_type = "ml.m5.large",
         #tags = generic_tags + [{'Key': 'JobType', 'Value': 'Preprocessing'}],
         network_config = NetworkConfig(subnets=subnets.split(':'), security_group_ids=security_group_ids.split(':'))
    )
    
    step_preprocess = ProcessingStep(
        name= f"{base_job_prefix}-preprocessing",
        processor= script_processor, 
        code= "customer_churn_inference_preprocessing/preprocessing_without.py",
        inputs= [ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
               ],
        outputs= [
            ProcessingOutput(output_name="output1", destination=preprocessed_output1, source="/opt/ml/processing/output1"),
        ]
    )
    
    
    ############## Inference Job ##############################################
    
    ecr_repository = "customer-churn-prediction-inference-image"
    tag = ":latest"
    uri_suffix = "amazonaws.com"
    
    create_model_repository_uri = "{}.dkr.ecr.{}.{}/{}".format(
        account_id, region, uri_suffix, ecr_repository + tag
    )
    
    image = create_model_repository_uri
    
    #Batch Transform job
    
    #model_name = step_create_model.properties.ModelName
    job_name = "cutomerchurn-model"
    instance_type = "ml.m5.12xlarge"
    step_create_model = CreateModelStep(
        name=f"{base_job_prefix}-createmodel",
        model=Model(image, 
                    model_data=model_with_pipeline(),
                    role = role,
                    sagemaker_session = sagemaker_session,
                    name = job_name,
                    vpc_config = {'Subnets':subnets.split(':'),
                                    'SecurityGroupIds': security_group_ids.split(':')}),
        inputs=CreateModelInput(instance_type=instance_type,
                               #accelerator_type="ml.eia1.medium"
                               )
    )
    
    model_name = step_create_model.properties.ModelName
    content_type ="text/csv"
    
    transformer = Transformer(model_name=model_name,
                              instance_count=1,
                              strategy='SingleRecord',
                              #max_payload=15, 
                              assemble_with="Line",
                              instance_type=instance_type,
                              output_path=f"s3://{default_bucket}/customer_churn/inference/{date_folder}/predictions/",
                              #tags = generic_tags + [{'Key': 'JobName', 'Value': 'Inference'}]
                )
    
    step_transform = TransformStep(
        name=f"{base_job_prefix}-batchtransform",
        transformer=transformer,
        inputs=TransformInput(data = step_preprocess.properties.ProcessingOutputConfig.Outputs["output1"].S3Output.S3Uri,
                             split_type="Line",
                               compression_type = 'Gzip', 
                              content_type=content_type)
    )
    
    ####### --------------------- Post-PREPROCESSING --------------------------------------------------------------------

        
    script_postprocessor = ScriptProcessor(
         command = ["python3"],
         image_uri = preprocessing_repository_uri,
         role = role,
         instance_count = 1,
         instance_type = "ml.m5.large",
         #tags = generic_tags + [{'Key': 'JobType', 'Value': 'Preprocessing'}],
         network_config = NetworkConfig(subnets=subnets.split(':'), security_group_ids=security_group_ids.split(':'))
    )
    
    step_postpreprocess = ProcessingStep(
        name= f"{base_job_prefix}-postpreprocessing",
        processor= script_postprocessor, 
        code= "customer_churn_inference_postprocessing/postprocessing_without.py",
        inputs= [ProcessingInput(source=step_transform.properties.TransformOutput.S3OutputPath, destination="/opt/ml/processing/input",s3_data_type='S3Prefix'),
                ProcessingInput(source=step_preprocess.properties.ProcessingOutputConfig.Outputs["output1"].S3Output.S3Uri, destination="/opt/ml/processing/input1"),

               ],
        outputs= [
            ProcessingOutput(output_name="post_output1", destination=postprocessed_output1, source="/opt/ml/processing/post_output1"),
        ]
    )    
    # ========================================= PIPELINE ORCHESTRATION ================================================
    
    # Pipeline instance
    pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
            #model_path,
            #model_approval_status
        ],
        steps=[
            step_preprocess,
            step_create_model,
            step_transform,
            step_postpreprocess
              ],
        sagemaker_session=sagemaker_session,
    )
    return pipeline


![workflow](images/pic15.PNG)

This is the script used for postprocessing 

In [77]:
!pygmentize customer_churn_inference_postprocessing/postprocessing_without.py

[37m#import only the necessary libraries[39;49;00m

[34mimport[39;49;00m [04m[36mglob[39;49;00m 
[34mimport[39;49;00m [04m[36mpyarrow[39;49;00m[04m[36m.[39;49;00m[04m[36mparquet[39;49;00m [34mas[39;49;00m [04m[36mpq[39;49;00m
[34mimport[39;49;00m [04m[36mpickle[39;49;00m
[34mimport[39;49;00m [04m[36mdatetime[39;49;00m
[34mimport[39;49;00m [04m[36margparse[39;49;00m
[34mimport[39;49;00m [04m[36mos[39;49;00m
[34mimport[39;49;00m [04m[36mboto3[39;49;00m
[34mimport[39;49;00m [04m[36mjson[39;49;00m

[36mprint[39;49;00m([33m"[39;49;00m[33mimport your necessary libraries in here[39;49;00m[33m"[39;49;00m) 
[34mimport[39;49;00m [04m[36mpandas[39;49;00m [34mas[39;49;00m [04m[36mpd[39;49;00m 
[34mimport[39;49;00m [04m[36mnumpy[39;49;00m [34mas[39;49;00m [04m[36mnp[39;49;00m


[34mif[39;49;00m [31m__name__[39;49;00m == [33m"[39;49;00m[33m__main__[39;49;00m[33m"[39;49;00m:
    
    input_data_path = os.path.j

## 2.8 Define required parameters for get pipeline

Define subnets and parameters for get_pipeline function here

In [78]:
subnets = 'Enter your subnet here'
sg = 'Enter your security group here'
role='Enter your IAM role here'

In [79]:
default_bucket=default_bucket
pipeline_def = get_pipeline(region, 
                            dev_subnets, 
                            dev_sg, 
                            role,
                            default_bucket, 
                           )

![workflow](images/pic17.PNG)

In [None]:
pipeline_def.upsert(role_arn=role)

In [81]:
execution = pipeline_def.start()

## 2.9 Othere details of the pipeline

#### To see the execution ID





In [None]:
execution.describe()

In [None]:
execution.list_steps()

## Lineage
Review the lineage of the artifacts generated by the pipeline.

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

Check whether the pipeline is up and running using **`Sagemaker Studio`**

![workflow](images/sagemakerstudio.PNG)