# Main pipeline for Bank CD purchase decision model

This pipeline will process input data, trian the model and publish the sagemaker inference endpoints
The various stages in the pipeline and their relation to corresponding py snippets is indicated in the diagram bwlow
![](TCB%20-%20Sagemaker%20ML%20pipeline.png)

# Importing required Packages

In [1]:
import boto3
import sagemaker
import sagemaker.session


region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"BankCDPackageGroupName"

# Data Location in S3 bucket

In [2]:
!mkdir -p data
local_path = "data/bank_clean.csv"

s3 = boto3.resource("s3")

# s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
#     "dataset/abalone-dataset.csv",
#     local_path
# )

base_uri = f"s3://{default_bucket}/BankCD"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sagemaker-us-east-1-256555058276/BankCD/bank_clean.csv


# Pipeline Parameters/Variables

In [4]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
# batch_data = ParameterString(
#     name="BatchData",
#     default_value=batch_data_uri,
# )

# Notebook Instance for Data Processing 

In [5]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
)

# Data Processing Step

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name="BankCDProcess",
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
    ],
    code="BankCdProcess/processing.py",
)

# MLOPS Pipeline

In [7]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"BankCDPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        model_approval_status,
        input_data,
#         batch_data,
    ],
    steps=[step_process],
#     steps=[step_process, step_train, step_eval, step_cond],
)

# Run a pipeline

In [8]:
import json

json.loads(pipeline.definition())

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-256555058276/BankCD/bank_clean.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'BankCDProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code/processing.py']},
    'RoleArn': 'arn

In [9]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:256555058276:pipeline/bankcdpipeline',
 'ResponseMetadata': {'RequestId': '314a9c05-27a1-4892-8507-5dc269f77810',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '314a9c05-27a1-4892-8507-5dc269f77810',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '82',
   'date': 'Fri, 12 Aug 2022 12:15:26 GMT'},
  'RetryAttempts': 0}}

In [10]:
execution = pipeline.start()

# To examine a pipeline execution

In [11]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:256555058276:pipeline/bankcdpipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:256555058276:pipeline/bankcdpipeline/execution/p3b9oche2071',
 'PipelineExecutionDisplayName': 'execution-1660306528433',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'bankcdpipeline',
  'TrialName': 'p3b9oche2071'},
 'CreationTime': datetime.datetime(2022, 8, 12, 12, 15, 28, 326000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 8, 12, 12, 15, 28, 326000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': '53641620-64c8-4e03-96f4-dab983af6349',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '53641620-64c8-4e03-96f4-dab983af6349',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '483',
   'date': 'Fri, 12 Aug 2022 12:15:29 GMT'},
  'RetryAttempts': 0}}

# Wait for the execution to finish

In [12]:
execution.wait()

WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"

# List the execution steps and their status.

In [13]:
execution.list_steps()

[{'StepName': 'BankCDProcess',
  'StartTime': datetime.datetime(2022, 8, 12, 12, 15, 29, 372000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 8, 12, 12, 19, 55, 170000, tzinfo=tzlocal()),
  'StepStatus': 'Failed',
  'AttemptCount': 0,
  'FailureReason': 'ClientError: AlgorithmError: See job logs for more information',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:256555058276:processing-job/pipelines-p3b9oche2071-bankcdprocess-hefsf3nzqi'}}}]

# Stop and Delete a Pipeline Execution

In [None]:
execution.stop()