In [5]:
%%sh
pip install pycaret
pip install python-dotenv
pip install ydata-profiling
pip install shap
pip -q install --upgrade stepfunctions
pip install s3fs







[0m

In [16]:
import boto3
import sagemaker
import time
import random
import uuid
import logging
import stepfunctions
import io
import random
import os

from sagemaker.amazon.amazon_estimator import get_image_uri
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep, TransformStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath


import os
from sagemaker import get_execution_role
from dotenv import load_dotenv
from load_data import load_data
from split_data import split_data
import importlib
from save_model_to_s3 import save_model_to_s3
from deploy_model_endpoint import deploy_model
from finalize_and_save_model import finalize_and_save_model
from delete_sagemaker_endpoint import delete_sagemaker_endpoint
# from ydata_profiling import ProfileReport
import boto3

In [17]:
# Variables Setup Stage
load_dotenv(".env")
role = get_execution_role()

# Env variables
data_location_s3 = os.getenv("data_location_s3")
algorithm_choice = os.getenv("algorithm_choice")
target = os.getenv("target")
endpoint_name = os.getenv("endpoint_name")
model_name = os.getenv("model_name")
data_location = "s3://{}".format(data_location_s3)
instance_type = os.getenv("instance_type")
model_instance_count = int(os.getenv("model_instance_count"))
image_uri = os.getenv("ecr_repo_uri")
tuning_metric = os.getenv("tuning_metric")

print(
    data_location_s3,
    algorithm_choice,
    target,
    endpoint_name,
    model_name,
    data_location,
    instance_type,
    image_uri,
    tuning_metric,
)

streaming-data-platform-ml-data/ethan_data.csv classification y classification-proba-endpoint banking-classification s3://streaming-data-platform-ml-data/ethan_data.csv ml.m4.xlarge 135544376709.dkr.ecr.eu-west-1.amazonaws.com/mlops-classification-repo:latest AUC


In [18]:
stepfunctions.set_stream_logger(level=logging.INFO)
bucket = "streaming-data-platform-ml-data"
prefix = "step_function"
bucket_path = 's3://{}/{}/'.format(bucket, prefix)
region = boto3.Session().region_name
session = sagemaker.Session()

In [19]:
# Load data from S3
df = load_data(data_location)
df.head()

Unnamed: 0,age,job,education,default,balance,housing,loan,y
0,32,7,2,1,-238,1,0,0
1,34,4,2,0,-478,1,1,0
2,32,3,2,0,266,1,0,0
3,36,7,2,1,13,0,1,0
4,23,11,2,0,486,0,0,0


In [20]:
train_and_val_data, test_data = split_data(df, shuffle=True)

In [21]:
train_data, validation_data = split_data(train_and_val_data, shuffle=True)

In [22]:
FILE_TRAIN = "train.csv"
FILE_TEST = "test.csv"
FILE_VALIDATION = "validation.csv"

In [23]:
train_s3_file = os.path.join(prefix, FILE_TRAIN)
test_s3_file = os.path.join(prefix, FILE_TEST)
validation_s3_file = os.path.join(prefix, FILE_VALIDATION)
print(train_s3_file, test_s3_file, validation_s3_file)

step_function/train.csv step_function/test.csv step_function/validation.csv


In [24]:
from io import StringIO

# Upload the three files to Amazon S3

csv_buffer = StringIO()
train_data.to_csv(csv_buffer, index=False)
s3_resource = boto3.resource("s3")
s3_resource.Object(bucket, train_s3_file).put(Body=csv_buffer.getvalue())

csv_buffer = StringIO()
test_data.to_csv(csv_buffer, index=False, header=False)
s3_resource = boto3.resource("s3")
s3_resource.Object(bucket, test_s3_file).put(Body=csv_buffer.getvalue())

csv_buffer = StringIO()
validation_data.to_csv(csv_buffer, index=False)
s3_resource = boto3.resource("s3")
s3_resource.Object(bucket, validation_s3_file).put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'EHPANX0CW8S0T18K',
  'HostId': 'YeLZfYBAmJoLOEj+S7SEG28SswDKuB0xR6pBj9X5RhrvhV2XJbDSvkJCXdsVDgqeT/FvcAFg1jA=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'YeLZfYBAmJoLOEj+S7SEG28SswDKuB0xR6pBj9X5RhrvhV2XJbDSvkJCXdsVDgqeT/FvcAFg1jA=',
   'x-amz-request-id': 'EHPANX0CW8S0T18K',
   'date': 'Tue, 20 Aug 2024 14:23:43 GMT',
   'x-amz-server-side-encryption': 'aws:kms',
   'x-amz-server-side-encryption-aws-kms-key-id': 'arn:aws:kms:eu-west-1:135544376709:key/30fbba55-c738-4f17-a525-65cb9e22bd2c',
   'etag': '"71cac7b13e988b0006f4f0dc29d787de"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"71cac7b13e988b0006f4f0dc29d787de"',
 'ServerSideEncryption': 'aws:kms',
 'SSEKMSKeyId': 'arn:aws:kms:eu-west-1:135544376709:key/30fbba55-c738-4f17-a525-65cb9e22bd2c'}

In [25]:
train_s3_file = 's3://{}/{}'.format(bucket, train_s3_file)
validation_s3_file = 's3://{}/{}'.format(bucket, validation_s3_file)
test_s3_file = 's3://{}/{}'.format(bucket, test_s3_file)
output_s3  = 's3://{}/{}/{}/'.format(bucket, prefix, 'output')
print(train_s3_file, validation_s3_file, test_s3_file, output_s3)

s3://streaming-data-platform-ml-data/step_function/train.csv s3://streaming-data-platform-ml-data/step_function/validation.csv s3://streaming-data-platform-ml-data/step_function/test.csv s3://streaming-data-platform-ml-data/step_function/output/


In [91]:
# SageMaker expects unique names for each job, model and endpoint.
# If these names are not unique the execution will fail. Pass these
# dynamically for each execution using placeholders.
execution_input = ExecutionInput(
    schema={"JobName": str, "ModelName": str, "EndpointName": str}
)

In [41]:
pycaret_estimator = sagemaker.estimator.Estimator(
    image_uri="135544376709.dkr.ecr.eu-west-1.amazonaws.com/mlops-classification-repo:latest",
    role=role,
#     instance_count=1,
    train_instance_type="ml.m5.large",
#     container_entry_point="train.py",
    train_instance_count = 1, 
#     train_instance_type = 'ml.m4.4xlarge',
    output_path = output_s3,
    sagemaker_session = session
)

pycaret_estimator.set_hyperparameters(algorithm_choice=algorithm_choice, target=target)

In [55]:
sagemaker_execution_role = role
pycaret_estimator = sagemaker.estimator.Estimator(
    get_image_uri(region, 'xgboost', repo_version='0.90-2'),
    sagemaker_execution_role, 
    train_instance_count = 1, 
    train_instance_type = 'ml.m4.4xlarge',
    output_path = output_s3,
    sagemaker_session = session
)

pycaret_estimator.set_hyperparameters(
    objective = 'reg:linear',
    num_round = 50,
    max_depth = 5,
    eta = 0.2,
    gamma = 4,
    min_child_weight = 6,
    subsample = 0.7,
    silent = 0
)

pycaret_estimator = Estimator(
    image_uri="135544376709.dkr.ecr.eu-west-1.amazonaws.com/mlops-classification-repo:latest",
    source_dir="pycaret_image_files",
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    entry_point="train.py",
    git_config={
        "repo": "https://github.com/konradbachusz/AWS-MLOps-module",
        "branch": "aws-ml-model-retraining",
    },
)

pycaret_estimator.set_hyperparameters(algorithm_choice=algorithm_choice, target=target)

In [56]:
execution_input = ExecutionInput(schema={
    'JobName': str, 
    'ModelName': str,
    'EndpointName': str
})

In [57]:
print(train_s3_file, validation_s3_file)

s3://streaming-data-platform-ml-data/step_function/train.csv s3://streaming-data-platform-ml-data/step_function/validation.csv


In [58]:
training_step = steps.TrainingStep(
    'Train Step', 
    estimator=pycaret_estimator,
    data={
        'train': sagemaker.inputs.TrainingInput(train_s3_file, content_type='csv'),
        'validation': sagemaker.inputs.TrainingInput(validation_s3_file, content_type='csv')
    },
    job_name=execution_input['JobName']
)

In [59]:
model_step = steps.ModelStep(
    'Save model',
    model=training_step.get_expected_model(),
    model_name=execution_input['ModelName']  
)

In [60]:
endpoint_config_step = steps.EndpointConfigStep(
    "Create Endpoint Config",
    endpoint_config_name=execution_input['ModelName'],
    model_name=execution_input['ModelName'],
    initial_instance_count=1,
    instance_type='ml.m5.large'
)

In [61]:
endpoint_step = steps.EndpointStep(
    "Create Endpoint",
    endpoint_name=execution_input['EndpointName'],
    endpoint_config_name=execution_input['ModelName']
)

In [62]:
workflow_definition = steps.Chain([
    training_step,
    model_step,
    endpoint_config_step,
    endpoint_step
])

In [63]:
from time import strftime, gmtime
timestamp = strftime('%d-%H-%M-%S', gmtime())

workflow = Workflow(
    name='{}-{}'.format('MyTrainTransformDeploy_v1', timestamp),
    definition=workflow_definition,
    role=role,
    execution_input=execution_input
)

In [64]:
workflow.render_graph()

In [65]:
workflow.create()

[32m[INFO] Workflow created successfully on AWS Step Functions.[0m


'arn:aws:states:eu-west-1:135544376709:stateMachine:MyTrainTransformDeploy_v1-20-14-43-44'

In [66]:
execution = workflow.execute(
    inputs={
        'JobName': 'regression-{}'.format(uuid.uuid1().hex), # Each Sagemaker Job requires a unique name
        'ModelName': 'regression-{}'.format(uuid.uuid1().hex), # Each Model requires a unique name,
        'EndpointName': 'regression-{}'.format(uuid.uuid1().hex) # Each Endpoint requires a unique name,
    }
)

[32m[INFO] Workflow execution started successfully on AWS Step Functions.[0m


In [67]:
execution.render_progress()

In [28]:
print(role)

arn:aws:iam::135544376709:role/banking-classification-sagemaker-role


In [29]:
training_step = steps.TrainingStep(
    "Train Step",
    estimator=pycaret_estimator,
    data={"train": train_s3_file},
    job_name=execution_input["JobName"],
)

NameError: name 'execution_input' is not defined

In [15]:
model_step = steps.ModelStep(
    'Save model',
    model=training_step.get_expected_model(),
    model_name=execution_input['ModelName']  
)

In [16]:
endpoint_config_step = steps.EndpointConfigStep(
    "Create Endpoint Config",
    endpoint_config_name=execution_input['ModelName'],
    model_name=execution_input['ModelName'],
    initial_instance_count=1,
    instance_type='ml.m5.large'
)

In [17]:
endpoint_step = steps.EndpointStep(
    "Create Endpoint",
    endpoint_name=execution_input['EndpointName'],
    endpoint_config_name=execution_input['ModelName']
)

In [18]:
workflow_definition = steps.Chain([
    training_step,
    model_step,
    endpoint_config_step,
    endpoint_step
])

In [19]:
from time import strftime, gmtime
timestamp = strftime('%d-%H-%M-%S', gmtime())

workflow = Workflow(
    name='{}-{}'.format('MyTrainTransformDeploy_v1', timestamp),
    definition=workflow_definition,
    role=role,
    execution_input=execution_input
)

In [20]:
workflow.render_graph()

In [21]:
workflow.create()

[32m[INFO] Workflow created successfully on AWS Step Functions.[0m


'arn:aws:states:eu-west-1:135544376709:stateMachine:MyTrainTransformDeploy_v1-12-14-56-05'

In [22]:
execution = workflow.execute(
    inputs={
        'JobName': 'regression-{}'.format(uuid.uuid1().hex), # Each Sagemaker Job requires a unique name
        'ModelName': 'regression-{}'.format(uuid.uuid1().hex), # Each Model requires a unique name,
        'EndpointName': 'regression-{}'.format(uuid.uuid1().hex) # Each Endpoint requires a unique name,
    }
)

[32m[INFO] Workflow execution started successfully on AWS Step Functions.[0m


In [23]:
execution.render_progress()

In [24]:
workflow.list_executions(html=True)

Name,Status,Started,End Time
7355f414-31ab-4f55-8073-c06e198e3b9e,RUNNING,"Apr 12, 2024 02:56:06.473 PM",-


In [25]:
Workflow.list_workflows(html=True)

Name,Creation Date
MyTrainTransformDeploy_v1-12-14-56-05,"Apr 12, 2024 02:56:06.328 PM"
MyTrainTransformDeploy_v1-22-13-48-04,"Mar 22, 2024 01:48:05.353 PM"
MyTrainTransformDeploy_v1-22-13-54-31,"Mar 22, 2024 01:54:31.808 PM"
MyTrainTransformDeploy_v1-22-14-26-20,"Mar 22, 2024 02:26:20.486 PM"
MyTrainTransformDeploy_v1-22-14-27-31,"Mar 22, 2024 02:27:31.957 PM"
MyTrainTransformDeploy_v1-22-14-42-29,"Mar 22, 2024 02:42:29.500 PM"
MyTrainTransformDeploy_v1-22-14-48-10,"Mar 22, 2024 02:48:10.993 PM"
