In [1]:
import boto3
import sagemaker
import sagemaker.session


region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"CardioPreprocessing"

In [2]:
default_bucket

'sagemaker-ca-central-1-049049517134'

In [3]:
s3 = boto3.resource("s3")

local_path = "../Resources/cardio_train.csv"

base_uri = f"s3://{default_bucket}/cardio"

input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)

In [4]:
print(input_data_uri)

s3://sagemaker-ca-central-1-049049517134/cardio/cardio_train.csv


In [5]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.t3.medium"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)

In [6]:
!mkdir -p cardio

In [7]:
%%writefile cardio/preprocessing.py

# Import depndencies 
import pandas as pd 
import numpy as np 

if __name__ == "__main__":
    
    base_dir = "/opt/ml/processing"

    # Loading the files 
    file_path = f"{base_dir}/input/cardio_train.csv"

    # Read the data file and store it in a Pandas DataFrame.
    cardio_df = pd.read_csv(file_path, delimiter=";")
    cardio_df.head()

    # Formatting the year as the whole number
    cardio_df['New_age'] = cardio_df['age'].map("{:.0f}".format)

    # Format the age /365
    cardio_df['New_age'] = cardio_df['age'] / 365

    # Make the whole number
    cardio_df['New_age'] = cardio_df['New_age'].map("{:.0f}".format)

    # replace 'age' column.
    cardio_df.drop('age', axis=1, inplace=True)

    # Rearrange the columns
    cardio_df = cardio_df[['id', 'New_age', 'gender', 'height', 'weight', 'ap_hi', 'ap_lo', 
                                        'cholesterol', 'gluc', 'smoke', 'alco', 'active', 'cardio']] 

    # Rename the new 'age' column 
    cardio_df = cardio_df.rename(columns = {"New_age":"Age"}) 
    cardio_df.head()

    # Check data types
    cardio_df.dtypes

    # ap hi maximum heart rate is around 220 beats per minute and the minumum is 100
    #  Increasingly, experts pin an ideal resting heart rate at between 50 to 70 beats per minute

    # Removing  outliers in the 'ap_hi' coulmn that are above 220
    cardio_df.drop(cardio_df[cardio_df['ap_hi'] > 220].index, inplace = True)

    # Remove outliers in the 'ap_hi' coulmn that are below 100
    cardio_df.drop(cardio_df[cardio_df['ap_hi'] < 100].index, inplace = True)

    # ap lo maximum heart rate is around 150 beats per minute and the minumum is 65
    # Removing  outliers in the 'ap_lo' coulmn that are above  150
    cardio_df.drop(cardio_df[cardio_df['ap_lo'] > 140].index, inplace = True)
    # Remove outliers in the 'ap_lo' coulmn that are below 65
    cardio_df.drop(cardio_df[cardio_df['ap_lo'] < 60].index, inplace = True)

    # Converting  'height' column from cm to feet each cm = 0.032808399 foot

    cardio_df['height'] = round(cardio_df['height']*0.0328084, 2)

    # Converting 'weight' from kg to lb  1kg = 2.20462 lbs
    cardio_df['weight'] = round(cardio_df['weight'] * 2.20462, 1)

    cardio_df.to_csv("/opt/ml/processing/output/preprocessed/cardio_train_output.csv",header=False, index=False)

Overwriting cardio/preprocessing.py


In [8]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-cardio-process",
    role=role,
)

In [9]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name="CardioProcess",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="cardio_train_output", source="/opt/ml/processing/output/preprocessed"),
    ],
    code="cardio/preprocessing.py",
)

In [10]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"CardioPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        input_data,
    ],
    steps=[step_process],
)

In [11]:
import json
json.loads(pipeline.definition())

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.t3.medium'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-ca-central-1-049049517134/cardio/cardio_train.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'CardioProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '341280168497.dkr.ecr.ca-central-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code/preprocessi

In [12]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:ca-central-1:049049517134:pipeline/cardiopipeline',
 'ResponseMetadata': {'RequestId': '4ad2e405-d601-431d-ab6f-2e45be4dd891',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '4ad2e405-d601-431d-ab6f-2e45be4dd891',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '85',
   'date': 'Sat, 04 Sep 2021 03:42:35 GMT'},
  'RetryAttempts': 0}}

In [13]:
execution = pipeline.start()

In [14]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:ca-central-1:049049517134:pipeline/cardiopipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:ca-central-1:049049517134:pipeline/cardiopipeline/execution/bh1r1oz3dv7z',
 'PipelineExecutionDisplayName': 'execution-1630726955725',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'cardiopipeline',
  'TrialName': 'bh1r1oz3dv7z'},
 'CreationTime': datetime.datetime(2021, 9, 4, 3, 42, 35, 662000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2021, 9, 4, 3, 42, 35, 662000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:ca-central-1:049049517134:user-profile/d-vfui3c0gpdwg/test-studio',
  'UserProfileName': 'test-studio',
  'DomainId': 'd-vfui3c0gpdwg'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:ca-central-1:049049517134:user-profile/d-vfui3c0gpdwg/test-studio',
  'UserProfileName': 'test-studio',
  'DomainId': 'd-vfui3c0gpdwg'},
 'ResponseMetadata': {'RequestId': 'c2

In [15]:
execution.list_steps()


[{'StepName': 'CardioProcess',
  'StartTime': datetime.datetime(2021, 9, 4, 3, 42, 36, 146000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:ca-central-1:049049517134:processing-job/pipelines-bh1r1oz3dv7z-cardioprocess-yr7lsuwsyo'}}}]