# 33.3.6 Mini-Project - Churn Prediction Part 2 - AWS Pipelines

## Platform/package configurations

In [1]:
import boto3
import sagemaker
import datetime as dt
import pandas as pd

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


### Configuration Variables

In [43]:
model_name = "end-to-end_churn_model"
bucket = "mini-project-churn-prediction"
datadir = "data"
datafile = "storedata_total.csv"


In [44]:
region = boto3.Session().region_name
role = sagemaker.get_execution_role()
sess = sagemaker.Session()
sklearn_processor_version = "0.23-1"
model_pkg_group = "churn-model-pkg-group"
pipeline_name = "churn-model-pipeline"
clarify_image = sagemaker.image_uris.retrieve(framework='sklearn',version=sklearn_processor_version,region=region)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.


## Prepare Baseline Dataset

In [51]:
import os
if not os.path.exists("data"):
    os.mkdir("data")
    
# If necessary, convert Excel file to CSV
if not os.path.exists("{datadir}/{datafile}"):
    tmp1_path = f"s3://{bucket}/{datadir}/{datafile}"
    tmp1_path = tmp1_path.replace("csv","xlsx")
    tmp2_path = f"{datadir}/{datafile}"
    tmp_df = pd.read_excel(tmp1_path)
    tmp_df.to_csv(tmp2_path,header=True,index=False)

  for idx, row in parser.parse():


In [52]:
def preprocess_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df

In [None]:
!pip install s3fs --upgrade

In [53]:
datafile_path = f"{datadir}/{datafile}"
baseline_data = preprocess_data(datafile_path)
baseline_data.pop("retained")
baseline_sample = baseline_data.sample(frac=0.0002)

In [54]:
pd.DataFrame(baseline_sample).to_csv("data/baseline.csv",header=False,index=False)

## Generate Batch Dataset

In [55]:
batch_data = preprocess_data(datafile_path)
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)

In [56]:
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

## Copy Baseline Data, Batch Data, and Scripts to S3 Bucket

In [72]:
s3cli = boto3.resource('s3')
s3cli.Bucket(bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")
s3cli.Bucket(bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3cli.Bucket(bucket).upload_file("data/baseline.csv","input/baseline/baseline.csv")

In [58]:
s3cli.Bucket(bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")
s3cli.Bucket(bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
s3cli.Bucket(bucket).upload_file("pipelines/customerchurn/generate_config.py","input/code/generate_config.py")

## Create Pipeline Instance

In [73]:
from pipelines.customerchurn.pipeline import get_pipeline

pipeline = get_pipeline(
    region = region,
    role = role,
    default_bucket = bucket,
    model_package_group_name = model_pkg_group,
    pipeline_name = pipeline_name,
    custom_image_uri = clarify_image,
    sklearn_processor_version = sklearn_processor_version
)

INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: 1.0.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


In [74]:
pipeline.definition()



'{"Version": "2020-12-01", "Metadata": {}, "Parameters": [{"Name": "ProcessingInstanceType", "Type": "String", "DefaultValue": "ml.m5.xlarge"}, {"Name": "ProcessingInstanceCount", "Type": "Integer", "DefaultValue": 1}, {"Name": "TrainingInstanceType", "Type": "String", "DefaultValue": "ml.m5.xlarge"}, {"Name": "InputData", "Type": "String", "DefaultValue": "s3://mini-project-churn-prediction/data/storedata_total.csv"}, {"Name": "BatchData", "Type": "String", "DefaultValue": "s3://mini-project-churn-prediction/data/batch/batch.csv"}], "PipelineExperimentConfig": {"ExperimentName": {"Get": "Execution.PipelineName"}, "TrialName": {"Get": "Execution.PipelineExecutionId"}}, "Steps": [{"Name": "ChurnModelProcess", "Type": "Processing", "Arguments": {"ProcessingResources": {"ClusterConfig": {"InstanceType": {"Get": "Parameters.ProcessingInstanceType"}, "InstanceCount": {"Get": "Parameters.ProcessingInstanceCount"}, "VolumeSizeInGB": 30}}, "AppSpecification": {"ImageUri": "246618743249.dkr.ecr

## Submit the pipeline to SageMaker and start

In [75]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-west-2:605134454307:pipeline/churn-model-pipeline',
 'ResponseMetadata': {'RequestId': '6c860c6c-6fb0-432c-87dc-5886ca44d87f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '6c860c6c-6fb0-432c-87dc-5886ca44d87f',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '88',
   'date': 'Sat, 19 Oct 2024 21:14:43 GMT'},
  'RetryAttempts': 0}}

In [76]:
execution = pipeline.start()

In [77]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-west-2:605134454307:pipeline/churn-model-pipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-west-2:605134454307:pipeline/churn-model-pipeline/execution/5tdj1yqt4ldh',
 'PipelineExecutionDisplayName': 'execution-1729372486699',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'churn-model-pipeline',
  'TrialName': '5tdj1yqt4ldh'},
 'CreationTime': datetime.datetime(2024, 10, 19, 21, 14, 46, 654000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 10, 19, 21, 14, 46, 654000, tzinfo=tzlocal()),
 'CreatedBy': {'IamIdentity': {'Arn': 'arn:aws:sts::605134454307:assumed-role/SageMaker-UCSD_ML_Student2/SageMaker',
   'PrincipalId': 'AROAYZZGS7YR3XA25LPLB:SageMaker'}},
 'LastModifiedBy': {'IamIdentity': {'Arn': 'arn:aws:sts::605134454307:assumed-role/SageMaker-UCSD_ML_Student2/SageMaker',
   'PrincipalId': 'AROAYZZGS7YR3XA25LPLB:SageMaker'}},
 'ResponseMetadata': {'RequestId': 'c2f1c7a6-8d94-4c7d

In [106]:
for step in execution.list_steps():
    print(f"Step: {step['StepName']: <36} Status: {step['StepStatus']}")

Step: ClarifyProcessingStep                Status: Succeeded
Step: ChurnModelConfigFile                 Status: Succeeded
Step: ChurnTransform                       Status: Succeeded
Step: RegisterChurnModel-RegisterModel     Status: Succeeded
Step: ChurnCreateModel                     Status: Succeeded
Step: CheckAUCScoreChurnEvaluation         Status: Succeeded
Step: ChurnEvalBestModel                   Status: Succeeded
Step: ChurnHyperParameterTuning            Status: Succeeded
Step: ChurnModelProcess                    Status: Succeeded
