In [2]:
%pip install -U sagemaker

[0mNote: you may need to restart the kernel to use updated packages.


In [3]:
import sagemaker
import pandas as pd
from sagemaker.pipeline import PipelineModel
from sagemaker.sklearn.processing import SKLearnProcessor, ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep
from sagemaker.workflow.parameters import ParameterString, ParameterInteger
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.inputs import TrainingInput
from sagemaker.sklearn import SKLearn
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker.workflow.properties import PropertyFile
import os

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [4]:
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
bucket = sagemaker_session.default_bucket()

model_package_group_name = "PipelineModelPackageGroup3"
prefix = "pipeline-model-example"
data_prefix = f'data/'
pipeline_name = "serial-inference-pipeline"  # SageMaker Pipeline name

In [5]:
# Need to configure defaultvalue for input data
input_data_uri = ParameterString(name="InputData", default_value=f"s3://{bucket}/data/processed_balanced_diabetes2.csv")
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.large")
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="Approved")
model_name = ParameterString(name="ModelName", default_value="my-model3")
region = sagemaker_session.boto_region_name

## Read in Data and do simple Preprocessing 

In [6]:
!mkdir -p code

In [7]:
df = pd.read_csv("diabetes_balanced.csv")

In [8]:
# Variables Income and Education are catered for US Demographics

'''
1 Less than $10,000
2 Less than $15,000 ($10,000 to less than $15,000)
3 Less than $20,000 ($15,000 to less than $20,000)
4 Less than $25,000 ($20,000 to less than $25,000)
5 Less than $35,000 ($25,000 to less than $35,000)
6 Less than $50,000 ($35,000 to less than $50,000)
7 Less than $75,000 ($50,000 to less than $75,000)
8 $75,000 or more '''

'''
1 Never attended school or only kindergarten 
2 Grades 1 through 8 (Elementary) 
3 Grades 9 through 11 (Some high school) 
4 Grade 12 or GED (High school graduate) 
5 College 1 year to 3 years (Some college or technical school) 
6 College 4 years or more (College graduate)
''';

In [9]:
''' 
Set it to Singapore Standards (Changed from yearly income to monthly income)
1,2 : 1 (Below $1000)
3: 2 ($1000 - $2499)
4: 3 ($2500 - $3499)
5: 4 ($3500 - $4999)
6: 5 ($5000 - $6999)
7: 6 ($7000 - $10000)
8: 7 ($10000 & Over)
''';

In [10]:
import numpy as np
### Change the labels to accomodate Singapore standards
US_to_SG_income = {1:1, 2:1, 3:2, 4:3, 5:4, 6:5, 7:6, 8:7}
df['Income'] = df['Income'].replace(US_to_SG_income)
df['Diabetes_binary'] = np.int64(df['Diabetes_binary'] * 100)

In [11]:
df.to_csv('processed_balanced_diabetes2.csv', index=False)
processed_dataset_file = 'processed_balanced_diabetes2.csv' 

In [12]:
raw_s3 = sagemaker_session.upload_data(path="./{}".format(processed_dataset_file), key_prefix=data_prefix)
print(raw_s3)

s3://sagemaker-us-east-1-763709885713/data/processed_balanced_diabetes2.csv


## Preprocess Step

In [13]:
%%writefile code/preprocess.py

"""Feature engineers the abalone dataset."""
import argparse
import logging
import os
import pathlib
import requests
import tempfile

import boto3
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


# Since we get a headerless CSV file we specify the column names here.
feature_columns_names = ['HighBP', 'HighChol', 'CholCheck', 'BMI', 'Smoker',
       'Stroke', 'HeartDiseaseorAttack', 'PhysActivity', 'Fruits', 'Veggies',
       'HvyAlcoholConsump', 'AnyHealthcare', 'NoDocbcCost', 'GenHlth',
       'MentHlth', 'PhysHlth', 'DiffWalk', 'Sex', 'Age', 'Education',
       'Income'
]
label_column = "Diabetes_binary"

feature_columns_dtype = {
    'HighBP': np.float64, 'HighChol': np.float64, 'CholCheck': np.float64, 'BMI': np.float64, 'Smoker': np.float64,
       'Stroke': np.float64, 'HeartDiseaseorAttack': np.float64, 'PhysActivity': np.float64, 'Fruits': np.float64, 'Veggies': np.float64,
       'HvyAlcoholConsump': np.float64, 'AnyHealthcare': np.float64, 'NoDocbcCost': np.float64, 'GenHlth': np.float64,
       'MentHlth': np.float64, 'PhysHlth': np.float64, 'DiffWalk': np.float64, 'Sex': np.float64, 'Age': np.float64, 'Education': np.float64,
       'Income': np.float64
}
label_column_dtype = {"Diabetes_binary": np.int64}


def merge_two_dicts(x, y):
    """Merges two dicts, returning a new copy."""
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    logger.debug("Starting preprocessing.")
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-data", type=str, required=True)
    args = parser.parse_args()
    base_dir = "/opt/ml/processing"
    pathlib.Path(f"{base_dir}/data").mkdir(parents=True, exist_ok=True)
    input_data = args.input_data
    bucket = input_data.split("/")[2]
    key = "/".join(input_data.split("/")[3:])
    logger.info("Downloading data from bucket: %s, key: %s", bucket, key)
    fn = f"{base_dir}/data/processed_balanced_diabetes2.csv"
    s3 = boto3.resource("s3")
    s3.Bucket(bucket).download_file(key, fn)
    logger.debug("Reading downloaded data.")
    df = pd.read_csv(fn,header=0,dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype))
    os.unlink(fn)

    logger.debug("Defining transformers.")
    numeric_features = list(feature_columns_names)
    numeric_transformer = Pipeline(steps=[("scaler", StandardScaler()),])


    preprocess = ColumnTransformer(transformers=[("num", numeric_transformer, numeric_features)])
    logger.info("Applying transforms.")
    y = df.pop("Diabetes_binary")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)

    X = np.concatenate((y_pre, X_pre), axis=1)

    logger.info("Splitting %d rows of data into train, validation, test datasets.", len(X))
    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])

    logger.info("Writing out datasets to %s.", base_dir)
    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Overwriting code/preprocess.py


In [14]:
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name=f"balanced-diabetes-preprocess",
    sagemaker_session=pipeline_session,
    role=role,
)

processor_args = sklearn_processor.run(
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocess.py",
    arguments=["--input-data", input_data_uri]
)

step_process = ProcessingStep(
    name="PreprocessDiabetesData",
    step_args=processor_args
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


## Training

In [15]:
image_uri = sagemaker.image_uris.retrieve(
    region=region,
    framework="xgboost",
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)



xgb_train = sagemaker.estimator.Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=f"s3://{bucket}/model/",
    base_job_name=f"sklearn-diabetes-train",
    sagemaker_session=pipeline_session,
    role=role,
)


xgb_train.set_hyperparameters(
    objective="reg:squarederror",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0,
)


train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)


step_train = TrainingStep(
    name="TrainDiabetesModel",
    step_args=train_args
)



## Evaluation

In [16]:
%%writefile code/evaluate.py

"""Evaluation script for measuring mean squared error."""
import json
import logging
import pathlib
import pickle
import tarfile

import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import accuracy_score, mean_squared_error

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


if __name__ == "__main__":
    logger.debug("Starting evaluation.")
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    logger.debug("Loading xgboost model.")
    model = pickle.load(open("xgboost-model", "rb"))

    logger.debug("Reading test data.")
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    logger.debug("Reading test data.")
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)

    logger.info("Performing predictions against test data.")
    predictions = model.predict(X_test)
    
    #predicted_classes = (predictions > 0.5).astype(int)
    
    #logger.debug("Calculating classification metrics.")
    logger.debug("Calculating mean squared error.")
    '''accuracy = accuracy_score(y_test, predicted_classes)'''
    mse = mean_squared_error(y_test, predictions)
    rmse = np.sqrt(mse)
    # Use probabilities for ROC AUC

    '''report_dict = {
        "classification_metrics": {
            "accuracy": {"value": accuracy}
        }
    }'''
    report_dict = {
        "regression_metrics": {
            "rmse": {"value": rmse},
        },
    }
    

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    '''logger.info("Writing out evaluation report with accuracy: %f", accuracy)'''
    logger.info("Writing out evaluation report with accuracy: %f", rmse)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting code/evaluate.py


In [17]:
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name=f"sklearn-diabetes-preprocess",
    sagemaker_session=pipeline_session,
    role=role,
)

evaluation_report = PropertyFile(
    name="DiabetesEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model",),
        ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, destination="/opt/ml/processing/test")],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation", destination=f"s3://{bucket}/evaluation_report")],
    code="code/evaluate.py")

step_eval = ProcessingStep(
    name="EvaluateDiabetesModel",
    step_args=eval_args,
    property_files=[evaluation_report])

## Register Model

In [18]:
evaluation_s3_uri = "{}/evaluation.json".format(
    step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=evaluation_s3_uri,
        content_type="application/json",
    )
)

step_register = RegisterModel( # model registry part, make it available in model registry
    name='RegisterModel3',
    estimator=xgb_train,
    model_data = step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.large", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)





In [19]:
pipeline = Pipeline(
    name="MyPipeline3",
    parameters=[input_data_uri, processing_instance_type, processing_instance_count, training_instance_type, model_approval_status, model_name],
    steps=[step_process, step_train, step_eval, step_register],
    sagemaker_session=pipeline_session
)

In [20]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()



In [21]:
execution.wait()

In [106]:
client = sagemaker_session.sagemaker_client

In [108]:
response = client.list_model_packages(
    ModelPackageGroupName="my-model3",
    ModelApprovalStatus="Approved"
)

In [110]:
model_package_arn = response['ModelPackageSummaryList'][0]['ModelPackageArn']

In [111]:
print(model_package_arn)

arn:aws:sagemaker:us-east-1:763709885713:model-package/my-model3/1


In [112]:
from sagemaker import ModelPackage
model = ModelPackage(
    role=role, model_package_arn=model_package_arn, sagemaker_session=sagemaker_session
)

In [113]:
predictor = model.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.large",
    endpoint_name="my-endpoint2"
)

INFO:sagemaker:Creating model with name: my-model3-2024-06-20-07-38-39-787
INFO:sagemaker:Creating endpoint-config with name my-endpoint2
INFO:sagemaker:Creating endpoint with name my-endpoint2


------!

In [130]:
import boto3
smr = boto3.client('sagemaker-runtime')

resp = smr.invoke_endpoint(EndpointName='my-endpoint2', Body=b'0.0, 0.0, 1.0, 30.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 5.0, 5.0', 
                           ContentType='text/csv')

print(resp.keys())

dict_keys(['ResponseMetadata', 'ContentType', 'InvokedProductionVariant', 'Body'])


In [132]:
endpoint_name = "my-endpoint2"

In [133]:
# AutoScaling client
asg = boto3.client('application-autoscaling')

# Resource type is variant and the unique identifier is the resource ID.
resource_id=f"endpoint/{endpoint_name}/variant/AllTraffic"

# scaling configuration
response = asg.register_scalable_target(
    ServiceNamespace='sagemaker', #
    ResourceId=resource_id,
    ScalableDimension='sagemaker:variant:DesiredInstanceCount', 
    MinCapacity=1,
    MaxCapacity=4
)

#Target Scaling
response = asg.put_scaling_policy(
    PolicyName=f'Request-ScalingPolicy-{endpoint_name}',
    ServiceNamespace='sagemaker',
    ResourceId=resource_id,
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    PolicyType='TargetTrackingScaling',
    TargetTrackingScalingPolicyConfiguration={
        'TargetValue': 10.0, # Threshold
        'PredefinedMetricSpecification': {
            'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance',
        },
        'ScaleInCooldown': 300, # duration until scale in
        'ScaleOutCooldown': 60 # duration between scale out
    }
)

In [135]:
import time

In [141]:
request_duration = 240
end_time = time.time() + request_duration
print(f"test will run for {request_duration} seconds")
while time.time() < end_time:
    resp = smr.invoke_endpoint(EndpointName=endpoint_name, Body=b'0.0, 0.0, 1.0, 30.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 5.0, 5.0', 
                           ContentType='text/csv')

test will run for 240 seconds


In [142]:
sm_client = boto3.client(service_name='sagemaker')

response = sm_client.describe_endpoint(EndpointName=endpoint_name)
status = response['EndpointStatus']
print("Status: " + status)


while status=='Updating':
    time.sleep(1)
    response = sm_client.describe_endpoint(EndpointName=endpoint_name)
    status = response['EndpointStatus']
    instance_count = response['ProductionVariants'][0]['CurrentInstanceCount']
    print(f"Status: {status}")
    print(f"Current Instance count: {instance_count}")

Status: InService


In [33]:
from sagemaker.predictor import Predictor

act_predictor = Predictor(endpoint_name="my-endpoint2")

In [31]:
import pandas as pd

In [34]:
'''df = pd.read_csv('processed_balanced_diabetes.csv')
label_data = df['Diabetes_binary']
feature_data = df.drop('Diabetes_binary', axis=1, inplace=False)

payload = feature_data.iloc[:].to_csv(header=False, index=False)
p = act_predictor.predict(payload, initial_args={"ContentType": "text/csv"})
print(p.decode("utf-8"))'''

0.7748411893844604,0.725610613822937,0.6558834314346313,0.7399713397026062,0.6969309449195862,0.8483266234397888,0.6385347843170166,0.725610613822937,0.7572422623634338,0.7399713397026062,0.7394870519638062,0.7104500532150269,0.777940034866333,0.7364181280136108,0.7031608819961548,0.6969309449195862,0.7572422623634338,0.6385347843170166,0.7572422623634338,0.7135424017906189,0.7049344182014465,0.7399713397026062,0.7572422623634338,0.7049344182014465,0.7701552510261536,0.7194156646728516,0.7301725745201111,0.7080613970756531,0.6925895810127258,0.6969309449195862,0.8405607342720032,0.7804040312767029,0.7135424017906189,0.7080613970756531,0.7203447222709656,0.7699853777885437,0.777940034866333,0.725610613822937,0.725610613822937,0.6385347843170166,0.7434324026107788,0.777940034866333,0.6385347843170166,0.725610613822937,0.7572422623634338,0.7889652252197266,0.6268411874771118,0.6680100560188293,0.777940034866333,0.8298271298408508,0.7080613970756531,0.8423826098442078,0.7203447222709656,0.

In [35]:
'''import boto3
import json

runtime = boto3.client('runtime.sagemaker')

def lambda_handler(event, context):
    data = json.loads(event['body'])
    input_features = data['features']

    payload = json.dumps([input_features])

    response = runtime.invoke_endpoint(
        EndpointName='my-endpoint',    # Replace with your endpoint name
        ContentType='application/json',
        Body=payload
    )

    result = json.loads(response['Body'].read().decode())
    return {
        'statusCode': 200,
        'body': json.dumps(result),
        'headers': {
            'Content-Type': 'application/json',
            'Access-Control-Allow-Origin': '*'
        }
    }'''