# Machine Learning Pipelines

## Machine Learning Orchestration with Amazon SageMaker Pipelines

In [286]:
import os
import sagemaker
import logging
import boto3
import time
import pandas as pd
import json
import botocore
from botocore.exceptions import ClientError


# ========================== low-level service client of the boto3 session ==========================
config = botocore.config.Config(user_agent_extra='bedissj-1699438736259')


sm = boto3.client(service_name='sagemaker', 
                  config=config)

sm_runtime = boto3.client('sagemaker-runtime',
                          config=config)

sess = sagemaker.Session(sagemaker_client=sm,
                         sagemaker_runtime_client=sm_runtime)

bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = sess.boto_region_name


## 1. Configure Data Processing Step

In [287]:
raw_data_s3_uri = f's3://{bucket}/data/transformed_querying/month1'


In [288]:
from sagemaker.workflow.parameters import ParameterFloat, ParameterInteger, ParameterString


# ========================== Processing parameters ==========================
input_data = ParameterString(
    name='input-data',
    default_value='/opt/ml/processing/input/data'
)


output_data = ParameterString(
    name='output-data',
    default_value='/opt/ml/processing/output'
)


validation_split_percentage = ParameterFloat(
    name='validation-split-percentage',
    default_value=0.1
)


test_split_percentage = ParameterFloat(
    name='test-split-percentage',
    default_value=0.2
)


feature_group_name = ParameterString(
    name='feature-group-name',
    default_value='bank-churn-mon1-feature-group'
)


feature_store_offline_prefix = ParameterString(
    name='feature-store-offline-prefix',
    default_value=None
)


In [289]:
from sagemaker.processing import ProcessingInput, ProcessingOutput


# ========================== Processing Inputs ==========================
processing_inputs = [ProcessingInput(input_name='bank-churn-mon1-raw-data',
                                     source=raw_data_s3_uri, 
                                     destination=input_data.default_value,
                                     s3_data_distribution_type='ShardedByS3Key')]



# ========================== Processing Outputs ==========================
output_data_train = output_data.default_value + '/train'
output_data_validation = output_data.default_value + '/validation'
output_data_test = output_data.default_value + '/test'



processing_outputs = [
    ProcessingOutput(output_name='bank-churn-mon1-train',
                     source=output_data_train,
                     s3_upload_mode='EndOfJob'),
    
    ProcessingOutput(source=output_data_validation,
                    output_name='bank-churn-mon1-validation',
                    s3_upload_mode='EndOfJob'),
    
    ProcessingOutput(source=output_data_test,
                    output_name='bank-churn-mon1-test',
                    s3_upload_mode='EndOfJob'),
]


In [290]:
from sagemaker.sklearn.processing import SKLearnProcessor


# ========================== Processing Parameters ==========================
FRAMEWORK_VERSION = '1.0-1'
processing_instance_type = 'ml.t3.medium'
processing_instance_count = 1



# ========================== Instanciate SKLearn Processor ==========================

sklearn_processor = SKLearnProcessor(
    framework_version=FRAMEWORK_VERSION,
    role=role,
    instance_type=processing_instance_type,
    instance_count = processing_instance_count,
    env={'AWS_DEFAULT_REGION': region}
)


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [291]:
from sagemaker.workflow.steps import ProcessingStep


processing_step = ProcessingStep(
    name='DataProcessing',
    code='./src/processing.py',
    processor=sklearn_processor,
    inputs=processing_inputs,
    outputs=processing_outputs,
    job_arguments=[
        '--input-data', str(input_data.default_value),
        '--output-data', str(output_data.default_value),
        '--validation-split-percentage', str(validation_split_percentage.default_value),
        '--test-split-percentage', str(test_split_percentage.default_value),
        '--feature-store-offline-prefix', str(feature_store_offline_prefix.default_value),
        '--feature-group-name', str(feature_group_name.default_value)
    ]
)


In [292]:
from pprint import pprint

pprint(list(processing_step.arguments.keys()))



['ProcessingResources',
 'AppSpecification',
 'RoleArn',
 'ProcessingInputs',
 'ProcessingOutputConfig',
 'Environment']


In [293]:
pprint(processing_step.arguments['ProcessingInputs'])



[{'AppManaged': False,
  'InputName': 'bank-churn-mon1-raw-data',
  'S3Input': {'LocalPath': '/opt/ml/processing/input/data',
              'S3CompressionType': 'None',
              'S3DataDistributionType': 'ShardedByS3Key',
              'S3DataType': 'S3Prefix',
              'S3InputMode': 'File',
              'S3Uri': 's3://sagemaker-eu-west-3-668303144976/data/transformed_querying/month1'}},
 {'AppManaged': False,
  'InputName': 'code',
  'S3Input': {'LocalPath': '/opt/ml/processing/input/code',
              'S3CompressionType': 'None',
              'S3DataDistributionType': 'FullyReplicated',
              'S3DataType': 'S3Prefix',
              'S3InputMode': 'File',
              'S3Uri': 's3://sagemaker-eu-west-3-668303144976/DataProcessing-fadeac1e1c3f09d53b332e0d3a5ccde0/input/code/processing.py'}}]


In [294]:
pprint(processing_step.arguments['ProcessingOutputConfig'])



{'Outputs': [{'AppManaged': False,
              'OutputName': 'bank-churn-mon1-train',
              'S3Output': {'LocalPath': '/opt/ml/processing/output/train',
                           'S3UploadMode': 'EndOfJob',
                           'S3Uri': 's3://sagemaker-eu-west-3-668303144976/DataProcessing-fadeac1e1c3f09d53b332e0d3a5ccde0/output/bank-churn-mon1-train'}},
             {'AppManaged': False,
              'OutputName': 'bank-churn-mon1-validation',
              'S3Output': {'LocalPath': '/opt/ml/processing/output/validation',
                           'S3UploadMode': 'EndOfJob',
                           'S3Uri': 's3://sagemaker-eu-west-3-668303144976/DataProcessing-fadeac1e1c3f09d53b332e0d3a5ccde0/output/bank-churn-mon1-validation'}},
             {'AppManaged': False,
              'OutputName': 'bank-churn-mon1-test',
              'S3Output': {'LocalPath': '/opt/ml/processing/output/test',
                           'S3UploadMode': 'EndOfJob',
                     

In [295]:
pprint(processing_step.arguments['ProcessingResources'])



{'ClusterConfig': {'InstanceCount': 1,
                   'InstanceType': 'ml.t3.medium',
                   'VolumeSizeInGB': 30}}


## 2. Configure Training Step

In [296]:
# ========================= Training resources =========================
training_instance_type = 'ml.m5.large'
training_instance_count = 1


# ========================== training inputs ==========================
objective = 'validation:accuracy'
metric_definitions = [
    {'Name': 'validation:precision', 'Regex': 'val_precision: ([0-9.]+)'},
    {'Name': 'validation:recall', 'Regex': 'val_recall: ([0-9.]+)'},
    {'Name': 'validation:f1Score', 'Regex': 'val_f1score: ([0-9.]+)'},
    {'Name': 'validation:ROCAUC', 'Regex': 'val_roc_auc: ([0-9.]+)'},
    {'Name': 'validation:accuracy', 'Regex': 'val_accuracy: ([0-9.]+)'}
]


In [297]:
from sagemaker.parameter import IntegerParameter, CategoricalParameter, ContinuousParameter


# =========================  Static hyperparameters =========================
random_state = ParameterInteger(
    name='random-state',
    default_value=2024
)

static_hyperparameters = {
    'random_state': random_state
}

# ==========================  Hyperparameter ranges ==========================

hyperparameter_ranges = {
    'n_estimators': IntegerParameter(min_value=10, 
                                     max_value=200, 
                                     scaling_type='Logarithmic'),
    
    'max_depth': IntegerParameter(min_value=3, 
                                  max_value=10, 
                                  scaling_type='Linear'),
    
    'criterion': CategoricalParameter(values=['gini', 'entropy'])
}


In [298]:
from sagemaker.sklearn.estimator import SKLearn


# =========================  Instanciate estimator  =========================
sklearn_estimator = SKLearn(
    entry_point='./src/training.py',
    framework_version=FRAMEWORK_VERSION,
    instance_type=training_instance_type,
    instance_count=training_instance_count,
    role=role,
    hyperparameters=static_hyperparameters,
    metric_definitions=metric_definitions
)


In [299]:
from sagemaker.tuner import HyperparameterTuner


# =========================  Instanciate hyperparameter tuner  =========================
tuner = HyperparameterTuner(
        estimator=sklearn_estimator,
        hyperparameter_ranges=hyperparameter_ranges,
        objective_metric_name=objective,
        metric_definitions=metric_definitions,
        strategy='Bayesian',
        objective_type='Maximize',
        max_jobs=10,
        max_parallel_jobs=2,
        # autotune=True
)


In [300]:
from sagemaker.inputs import TrainingInput


# ====================== Configure training/tuning inputs ======================
tuning_inputs = {
    'train': TrainingInput(
        s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
            'bank-churn-mon1-train'
        ].S3Output.S3Uri,
        content_type='text/csv'),
    
    
    'validation': TrainingInput(
        s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
            'bank-churn-mon1-validation'
        ].S3Output.S3Uri,
        content_type='text/csv'
    )
}

In [301]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") # PT1H represents `one hour`

In [302]:
from sagemaker.workflow.steps import TuningStep, TrainingStep

# Hyperparameter tuning
tuning_step =TuningStep(
    name='ModelTraining',
    tuner=tuner,
    inputs=tuning_inputs,
    cache_config=cache_config
)


# # Simple training
# tuning_step =TrainingStep(
#     name='SimpleTraining',
#     estimator=sklearn_estimator,
#     inputs=tuning_inputs,
#     cache_config=cache_config
# )


In [303]:
tuning_step.properties.__dict__.keys()

dict_keys(['_step', 'step_name', 'path', '_shape_names', 'HyperParameterTuningJobName', 'HyperParameterTuningJobArn', 'HyperParameterTuningJobConfig', 'TrainingJobDefinition', 'TrainingJobDefinitions', 'HyperParameterTuningJobStatus', 'CreationTime', 'HyperParameterTuningEndTime', 'LastModifiedTime', 'TrainingJobStatusCounters', 'ObjectiveStatusCounters', 'BestTrainingJob', 'OverallBestTrainingJob', 'WarmStartConfig', 'Autotune', 'FailureReason', 'TuningJobCompletionDetails', 'ConsumedResources', 'TrainingJobSummaries', 'NextToken'])

In [304]:
tuning_step.properties.TrainingJobDefinition.__dict__.keys()

dict_keys(['_step', 'step_name', 'path', '_shape_names', 'DefinitionName', 'TuningObjective', 'HyperParameterRanges', 'StaticHyperParameters', 'AlgorithmSpecification', 'RoleArn', 'InputDataConfig', 'VpcConfig', 'OutputDataConfig', 'ResourceConfig', 'HyperParameterTuningResourceConfig', 'StoppingCondition', 'EnableNetworkIsolation', 'EnableInterContainerTrafficEncryption', 'EnableManagedSpotTraining', 'CheckpointConfig', 'RetryStrategy', 'Environment'])

In [305]:
# tuning_step.properties.properties.ModelArtifacts.S3ModelArtifacts

## 3. Configure model evaluation step

In [306]:
evaluation_processor = SKLearnProcessor(
    framework_version=FRAMEWORK_VERSION,
    role=role,
    instance_type=processing_instance_type,
    instance_count = processing_instance_count,
    env={'AWS_DEFAULT_REGION': region}  
)


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [307]:
from sagemaker.workflow.properties import PropertyFile


evaluation_metrics = PropertyFile(
    name='EvaluationReport',
    output_name='metrics',
    path='evaluation.json'
)


In [308]:
evaluation_step = ProcessingStep(
    name='EvaluateMetrics',
    code='./src/evaluate_metrics.py',
    processor=evaluation_processor,
    inputs=[
        ProcessingInput(
            source=tuning_step.get_top_model_s3_uri(top_k=0, s3_bucket=bucket),
            # source=tuning_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination='/opt/ml/processing/input/model'
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingOutputConfig.Outputs[
                'bank-churn-mon1-test'
            ].S3Output.S3Uri,
            destination='/opt/ml/processing/input/data'
        )
    ],
    outputs = [
        ProcessingOutput(
            output_name='metrics',
            s3_upload_mode='EndOfJob',
            source='/opt/ml/processing/output/metrics/'
        )
    ],
    property_files=[evaluation_metrics]
    # job_arguments=None
)


In [309]:
evaluation_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']



's3://sagemaker-eu-west-3-668303144976/EvaluateMetrics-558cc2049391644b997c852ef17938f9/output/metrics'

## 4. Configure model registry step

In [310]:
model_package_group_name = 'churn-month1-pipeline'


model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)


deploy_instance_type = ParameterString(
    name="DeployInstanceType",
    default_value="ml.m5.large"
)


deploy_instance_count = ParameterInteger(
    name="DeployInstanceCount",
    default_value=1
)

In [311]:
churn_inference_image_uri = sagemaker.image_uris.retrieve(
    framework='sklearn',
    version=FRAMEWORK_VERSION,
    instance_type=deploy_instance_type,
    image_scope="inference",
    region=region
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [312]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri='{}/evaluation.json'.format(
            evaluation_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
        ),
        content_type='application/json'
    )
)

model_metrics.model_statistics



<sagemaker.model_metrics.MetricsSource at 0x7fdd18629de0>

In [313]:
from sagemaker.workflow.step_collections import RegisterModel


register_step = RegisterModel(
    name='RegisterModel',
    estimator=sklearn_estimator,
    image_uri=churn_inference_image_uri,
    inference_instances=[deploy_instance_type],
    transform_instances=[deploy_instance_type],
    model_data=tuning_step.get_top_model_s3_uri(top_k=0, s3_bucket=bucket),
    # model_data=tuning_step.properties.ModelArtifacts.S3ModelArtifacts,
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics,
    approval_status=model_approval_status,
    content_types=['text/csv'],
    response_types=['text/csv'],
)

## 5. Create model for model deployment step

In [314]:
import time
from sagemaker.model import Model


timestamp = int(time.time())
model_name = 'churn-month1-{}'.format(timestamp)


model = Model(
    name=model_name,
    image_uri=churn_inference_image_uri,
    model_data=tuning_step.get_top_model_s3_uri(top_k=0, s3_bucket=bucket),
    # model_data=tuning_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role,
)

In [315]:
from sagemaker.inputs import CreateModelInput

model_inputs = CreateModelInput(
    instance_type=deploy_instance_type
)

In [316]:
from sagemaker.workflow.steps import CreateModelStep


create_model_step = CreateModelStep(
    name='CreateModel',
    model=model,
    inputs=model_inputs
)

## 6. Check model accuracy condition step

In [335]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import JsonGet


minimum_accuracy_value = ParameterFloat(
    name='MinimumAccuracy',
    default_value=0.70
)


minimum_accuracy_condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        step=evaluation_step,
        property_file=evaluation_metrics,
        json_path="metrics.accuracy.value"
    ),
    
    right=minimum_accuracy_value
)

In [336]:
from sagemaker.workflow.condition_step import ConditionStep


condition_step = ConditionStep(
    name='MinimumAccuracyCondition',
    conditions=[minimum_accuracy_condition],
    if_steps=[register_step, create_model_step],
    else_steps = []
)


## 7. Create pipeline

In [337]:
timestamp = int(time.time())
pipeline_name = 'churn-month1-pipeline-{}'.format(timestamp)

pipeline_parameters = [
    input_data,
    output_data,
    random_state,
    processing_instance_type,
    processing_instance_count,
    training_instance_type,
    training_instance_count,
    validation_split_percentage,
    test_split_percentage,
    feature_group_name,
    feature_store_offline_prefix,
    model_approval_status,
    deploy_instance_count,
    deploy_instance_type,
    model_inputs,
    minimum_accuracy_value
]

In [338]:
from sagemaker.workflow.pipeline import Pipeline 

pipeline = Pipeline(
    name=pipeline_name,
    sagemaker_session=sess,
    parameters=pipeline_parameters,
    steps=[
        processing_step, 
        tuning_step, 
        evaluation_step,
        condition_step,
        # register_step, create_model_step
    ]
)

In [339]:
response = pipeline.create(role_arn=role)

pipeline_arn = response["PipelineArn"]
print(pipeline_arn)



arn:aws:sagemaker:eu-west-3:668303144976:pipeline/churn-month1-pipeline-1727369546


In [340]:
import json
from pprint import pprint

definition = json.loads(pipeline.definition())

pprint(definition)



{'Metadata': {},
 'Parameters': [{'DefaultValue': '/opt/ml/processing/input/data',
                 'Name': 'input-data',
                 'Type': 'String'},
                {'DefaultValue': '/opt/ml/processing/output',
                 'Name': 'output-data',
                 'Type': 'String'},
                {'DefaultValue': 2024,
                 'Name': 'random-state',
                 'Type': 'Integer'},
                {'DefaultValue': 0.1,
                 'Name': 'validation-split-percentage',
                 'Type': 'Float'},
                {'DefaultValue': 0.2,
                 'Name': 'test-split-percentage',
                 'Type': 'Float'},
                {'DefaultValue': 'bank-churn-mon1-feature-group',
                 'Name': 'feature-group-name',
                 'Type': 'String'},
                {'Name': 'feature-store-offline-prefix', 'Type': 'String'},
                {'DefaultValue': 'PendingManualApproval',
                 'Name': 'ModelApprovalStatus',
    

In [None]:
pipeline.create(
    role_arn=role,
    
)

In [44]:
# inference_processor = SKLearnProcessor(
#     framework_version=FRAMEWORK_VERSION,
#     role=role,
#     instance_type=processing_instance_type,
#     instance_count = processing_instance_count,
#     env={'AWS_DEFAULT_REGION': region}
# )