# Dairy Generative Formulation
### Model training and deployment using SageMaker Pipelines

In [None]:
import os
import sagemaker
import logging
import boto3
import time
import pandas as pd
import json
import botocore
from botocore.exceptions import ClientError


# ========================== low-level service client of the boto3 session ==========================
config = botocore.config.Config(user_agent_extra='bedissj-1699438736259')

bucket =  "dairy-generative-formulation"

sm = boto3.client(service_name='sagemaker', 
                  config=config)

sm_runtime = boto3.client('sagemaker-runtime',
                          config=config)

sess = sagemaker.Session(sagemaker_client=sm,
                         sagemaker_runtime_client=sm_runtime,
                         default_bucket = bucket)

role = sagemaker.get_execution_role()
region = sess.boto_region_name

In [2]:
from sagemaker.parameter import IntegerParameter, CategoricalParameter, ContinuousParameter
from sagemaker.estimator import Model
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.tuner import HyperparameterTuner
from sagemaker.inputs import TrainingInput, CreateModelInput

from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.steps import TuningStep, ProcessingStep, CreateModelStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo, ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.pipeline import Pipeline 

from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model_metrics import MetricsSource, ModelMetrics

In [None]:
raw_data_s3_uri = "s3://{}/data/raw/".format(bucket)
print(raw_data_s3_uri)

### 1. Processing step

In [None]:
from sagemaker.workflow.parameters import ParameterString, ParameterFloat, ParameterInteger


input_data = ParameterString(
    name='input-data',
    default_value='/opt/ml/processing/input/data'
)

output_data = ParameterString(
    name='output-data',
    default_value='/opt/ml/processing/output'
)

validation_split_percentage = ParameterFloat(
    name='validation-split-percentage',
    default_value=0.1
)


test_split_percentage = ParameterFloat(
    name='test-split-percentage',
    default_value=0.2
)

feature_group_name = ParameterString(
    name='feature-group-name',
    default_value='dairy-generative-formulation-feature-group'
)


feature_store_offline_prefix = ParameterString(
    name='feature-store-offline-prefix',
    default_value=None
)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput


# ========================== Processing Inputs ==========================
processing_inputs = [
    ProcessingInput(
        input_name='dairy-generative-formulation-raw-data',
        source=raw_data_s3_uri,
        destination=input_data.default_value,
        s3_data_distribution_type='ShardedByS3Key'
    )
]


# ========================== Processing Outputs ==========================
output_data_train = output_data.default_value + '/train'
output_data_validation = output_data.default_value + '/validation'
output_data_test = output_data.default_value + '/test'
output_encoder = output_data.default_value + '/encoder'


processing_outputs = [
    ProcessingOutput(source=output_data_train,
                    output_name='dairy-generative-formulation-train',
                     s3_upload_mode='EndOfJob'),
    
    ProcessingOutput(source=output_data_validation,
                    output_name='dairy-generative-formulation-validation',
                    s3_upload_mode='EndOfJob'),
    
    ProcessingOutput(source=output_data_test,
                    output_name='dairy-generative-formulation-test',
                    s3_upload_mode='EndOfJob'),
    
    ProcessingOutput(source=output_encoder,
                    output_name='dairy-generative-formulation-encoder',
                    s3_upload_mode='EndOfJob')
]

In [None]:
# ========================== Processing Parameters ==========================
FRAMEWORK_VERSION = '1.0-1'
processing_instance_type = 'ml.t3.medium'
processing_instance_count = 1

In [None]:
from sagemaker.sklearn import SKLearnProcessor

# ========================== Instanciate SKLearn Processor ==========================
sklearn_processor = SKLearnProcessor(
    framework_version=FRAMEWORK_VERSION,
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    sagemaker_session=sess,
    env={
        'AWS_DEFAULT_REGION': region
    }
)

In [None]:
from sagemaker.workflow.steps import ProcessingStep


processing_step = ProcessingStep(
    name='DataProcessing',
    code='./src/processing.py',
    processor=sklearn_processor,
    inputs=processing_inputs,
    outputs=processing_outputs,
    job_arguments=[
        '--input-data', str(input_data.default_value),
        '--output-data', str(output_data.default_value),
        '--validation-split-percentage', str(validation_split_percentage.default_value),
        '--test-split-percentage', str(test_split_percentage.default_value),
        '--feature-store-offline-prefix', str(feature_store_offline_prefix.default_value),
        '--feature-group-name', str(feature_group_name.default_value)
    ]
)

In [None]:
# sklearn_processor.run(
#     code='./src/processing.py',
#     inputs=processing_inputs,
#     outputs=processing_outputs,
#     arguments=[
#         '--input-data', str(input_data.default_value),
#         '--output-data', str(output_data.default_value),
#         '--validation-split-percentage', str(validation_split_percentage.default_value),
#         '--test-split-percentage', str(test_split_percentage.default_value),
#         '--feature-store-offline-prefix', str(feature_store_offline_prefix.default_value),
#         '--feature-group-name', str(feature_group_name.default_value)
#     ],
#     wait=True,
#     logs=True,
# )

In [None]:
# from pprint import pprint

# pprint(sklearn_processor.latest_job.describe()['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri'])

### 2. Hyperparameter tuning step

In [None]:
# ========================= Training resources =========================
training_instance_type = 'ml.m5.large'
training_instance_count = 1


# ========================== training inputs ==========================
objective = 'validation:rmse'
metric_definitions = [
    {'Name': 'validation:rmse', 'Regex': 'val_rmse: ([0-9.]+)'},
    {'Name': 'validation:mse', 'Regex': 'val_mse: ([0-9.]+)'},
    {'Name': 'validation:mae', 'Regex': 'val_mae: ([0-9.]+)'},
    {'Name': 'validation:r2', 'Regex': 'val_r2: ([0-9.]+)'},
]

In [None]:
# =========================  Sensory attributes to loop on  =========================

models_s3_uri = "s3://{}/models".format(bucket)

sensory_attributes = [
    'Flavor_intensity ',
    'sweetness',
    'Fruit_intensity',
    'Chalkiness',
    'Color_intensity',
    'thickness',
    'Coating',
    'Global Appreciation'
]

In [None]:
# =========================  Hyperparameter tuning job parameters  =========================
max_jobs = 2
max_parallel_jobs = 2

In [None]:
for attribute in sensory_attributes:
    # ===========================================================================================
    # =========================  Static hyperparameters =========================
    static_hyperparameters = {
        'random_state': 2024,
        'sensory_output': attribute
    }


    # ==========================  Hyperparameter ranges ==========================

    hyperparameter_ranges = {
        'n_estimators': IntegerParameter(min_value=10, 
                                         max_value=200, 
                                         scaling_type='Logarithmic'),
        
        'max_depth': IntegerParameter(min_value=3, 
                                      max_value=10, 
                                      scaling_type='Linear'),
        
        'criterion': CategoricalParameter(values=['squared_error', 'friedman_mse'])
    }


    # =========================  Instanciate estimator  =========================
    sklearn_estimator = SKLearn(
        entry_point='./src/training.py',
        framework_version=FRAMEWORK_VERSION,
        instance_type=training_instance_type,
        instance_count=training_instance_count,
        role=role,
        hyperparameters=static_hyperparameters,
        output_path=os.path.join(models_s3_uri, attribute),
        metric_definitions=metric_definitions
    )


    # =========================  Instanciate hyperparameter tuner  =========================
    tuner = HyperparameterTuner(
            estimator=sklearn_estimator,
            hyperparameter_ranges=hyperparameter_ranges,
            objective_metric_name=objective,
            metric_definitions=metric_definitions,
            strategy='Bayesian',
            objective_type='Minimize',
            max_jobs=max_jobs,
            max_parallel_jobs=max_parallel_jobs,
            autotune=True
    )


    # ====================== Configure training/tuning inputs ======================
    tuning_inputs = {
        'train': TrainingInput(
            # s3_data='s3://dairy-generative-formulation/sagemaker-scikit-learn-2024-12-02-14-43-07-811/output/dairy-generative-formulation-train',
            #s3_data=sklearn_processor.latest_job.describe()['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri'],
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'dairy-generative-formulation-train'
            ].S3Output.S3Uri,
            content_type='text/csv',
            input_mode='File'
        ),
        'validation': TrainingInput(
            # s3_data='s3://dairy-generative-formulation/sagemaker-scikit-learn-2024-12-02-14-43-07-811/output/dairy-generative-formulation-validation',
            # s3_data=sklearn_processor.latest_job.describe()['ProcessingOutputConfig']['Outputs'][1]['S3Output']['S3Uri'],
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'dairy-generative-formulation-validation'
            ].S3Output.S3Uri,
            content_type='text/csv',
            input_mode='File'
        )
    }

 
    # ====================== Cache configuration  ======================
    cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") # PT1H represents `one hour`


    # ====================== Configure hyperparameter tuning step ======================
    tuning_step =TuningStep(
        name='ModelTraining',
        tuner=tuner,
        inputs=tuning_inputs,
        cache_config=cache_config
    )

    
    # =========================================================================================
    # ============================ Configure processing job unputs ============================
    evaluation_metrics = PropertyFile(
        name='EvaluationReport',
        output_name='metrics',
        # path='evaluation_{}.json'.format(attribute)
        path=os.path.join(models_s3_uri, 'evaluation', attribute, 'evaluation_{}.json'.format(attribute))
    )

    sensory_output_arg = ParameterString(
        name='sensory-target',
        default_value=attribute
    )
    

    # ============================ Instanciate evaluation processor ===========================
    evaluation_processor = SKLearnProcessor(
        framework_version=FRAMEWORK_VERSION,
        role=role,
        instance_type=processing_instance_type,
        instance_count = processing_instance_count,
        sagemaker_session=sess,
        env={'AWS_DEFAULT_REGION': region}  
    )
    
    # ====================  Configure processing job for model evaluation  ====================
    evaluation_step = ProcessingStep(
        name='EvaluateMetrics',
        code='./src/evaluate_metrics.py',
        processor=evaluation_processor,
        inputs=[
            ProcessingInput(
                source=tuning_step.get_top_model_s3_uri(top_k=0, s3_bucket=bucket),
                # source=tuning_step.properties.ModelArtifacts.S3ModelArtifacts,
                destination='/opt/ml/processing/input/model'
            ),
            ProcessingInput(
                source=processing_step.properties.ProcessingOutputConfig.Outputs[
                    'dairy-generative-formulation-test'
                ].S3Output.S3Uri,
                destination='/opt/ml/processing/input/data'
            )
        ],
        outputs = [
            ProcessingOutput(
                output_name='metrics',
                s3_upload_mode='EndOfJob',
                source='/opt/ml/processing/output/metrics/',
                destination=os.path.join(models_s3_uri, 'evaluation', attribute)
            )
        ],
        property_files=[evaluation_metrics],
        job_arguments=['--sensory-target', str(sensory_output_arg.default_value)]
    )


    # =========================================================================================
    # ======================== Configure model registration parameters ========================
    deploy_instance_type = ParameterString(
        name='deploy_instance_type',
        default_value='ml.m5.large'
    )

    deploy_instance_count = ParameterInteger(
        name="DeployInstanceCount",
        default_value=1
    )

    model_package_group_name = ParameterString(
        name='model_package_group_name',
        default_value='dairy-gen-form-{}-model-package-group'.format(attribute).replace(' ', '').replace('_', '-')
    )

    model_approval_status = ParameterString(
        name='model_approval_status',
        default_value='PendingManualApproval'
    )
    
    # ================== Configure model metrics source from evaluation step ==================
    model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            content_type=['application/json'],
            s3_uri=evaluation_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri'] + '/evaluation_{}.json'.format(attribute)
        )
    )

    
    # =========================== Retrieve image uri for inference ============================
    inference_image_uri = sagemaker.image_uris.retrieve(
        framework='sklearn',
        version=FRAMEWORK_VERSION,
        instance_type=deploy_instance_type,
        image_scope='inference',
        region=region,
    )
    

    # =========================================================================================
    # ======================== Create model for model deployment step =========================
    timestamp = int(time.time())
    model_name = f'{attribute}-{timestamp}'.replace(' ', '').replace('_', '-')

    model = Model(
        name=model_name,
        entry_point='./src/inference.py',
        image_uri=inference_image_uri,
        model_data=tuning_step.get_top_model_s3_uri(top_k=0, s3_bucket=bucket),
        sagemaker_session=sess,
        role=role
    )

    model_inputs = CreateModelInput(
        instance_type=deploy_instance_type
    )

    # ================================= Create Model Step ==================================
    create_model_step = CreateModelStep(
        name='CreateModel',
        model=model,
        inputs=model_inputs
    )

    # ========================== Configure model registration step ============================
    # register_args = model.register(
    #     content_types=["text/csv"],
    #     response_types=["text/csv"],
    #     inference_instances=[deploy_instance_type.default_value],
    #     transform_instances=[deploy_instance_type.default_value],
    #     model_package_group_name=model_package_group_name.default_value,
    #     approval_status=model_approval_status.default_value,
    # )
    
    # register_best_step = ModelStep(name="RegisterBestAbaloneModel", step_args=register_args)

    
    register_step = RegisterModel(
        name='RegisterModel',
        estimator=sklearn_estimator,
        image_uri=inference_image_uri,
        inference_instances=[deploy_instance_type],
        transform_instances=[deploy_instance_type],
        model_data=tuning_step.get_top_model_s3_uri(top_k=0, s3_bucket=bucket),
        # model_package_group_name=model_package_group_name,
        # model_metrics=model_metrics,
        # approval_status=model_approval_status,
        content_types=['text/csv'],
        response_types=['text/csv'],
    )

    # =========================================================================================
    # ================================ Cofigure Condition Step ================================
    minimum_r2_value = ParameterFloat(
        name='MinimumR2',
        default_value=0.80
    )

    maximum_rmse_value = ParameterFloat(
        name='MaximumRMSE',
        default_value=0.20
    )


    minimum_r2_condition = ConditionGreaterThanOrEqualTo(
        left=JsonGet(
            step_name=evaluation_step.name,
            step=evaluation_step,
            property_file=evaluation_metrics,
            json_path="metrics.R2.value"
        ),
        
        right=0.80
    )

    maximum_rmse_condition = ConditionLessThanOrEqualTo(
        left=JsonGet(
            step_name=evaluation_step.name,
            step=evaluation_step,
            property_file=evaluation_metrics,
            json_path="metrics.RMSE.value"
        ),
        
        right=0.20
    )


    condition_step = ConditionStep(
        name='ModelValidationConditions',
        conditions=[minimum_r2_condition,],
        if_steps = [create_model_step, register_step],
        else_steps = []
    )


    # =========================================================================================
    # =============================== Create & Execute Pipeline ===============================
    timestamp = int(time.time())
    pipeline_name = f'dairy-gen-form-{attribute}-pipeline-{timestamp}'.replace(' ', '').replace('_', '-')

    pipeline_parameters = [
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        training_instance_count,
        model_package_group_name,
        model_approval_status,
        deploy_instance_type,
        minimum_r2_value,
        maximum_rmse_value
    ]

    pipeline = Pipeline(
        name=pipeline_name,
        sagemaker_session=sess,
        parameters=pipeline_parameters,
        steps=[ 
            # processing_step,
            # tuning_step, 
            # evaluation_step,
            condition_step
        ]
    )

    response = pipeline.create(role_arn=role)
    execution = pipeline.start()

    # =============================== Pipeline description ===============================
    print('Executing training pipeline for sensory property: ', attribute)
    
    # Pipeline execution overview
    pipeline_execution = sm.list_pipeline_executions(PipelineName=pipeline_name)['PipelineExecutionSummaries']
    pipeline_execution_status = pipeline_execution[0]["PipelineExecutionStatus"]

    while pipeline_execution_status == "Executing":
        pipeline_execution = sm.list_pipeline_executions(PipelineName=pipeline_name)['PipelineExecutionSummaries']
        pipeline_execution_status = pipeline_execution[0]["PipelineExecutionStatus"]

    pprint(pipeline_execution)


    # Retrieve tuning job artifacts
    training_job_arn=None

    for execution_step in execution_steps:
        if execution_step["StepName"] == "ModelTrainingl": 
            training_job_arn = execution_step['Metadata']['TuningJob']['Arn']      
            pprint(execution_step)
            break
    print('Tuning job ARN: {}'.format(training_job_arn))
            
    training_job_name = training_job_arn.split('/')[-1]
    print('Tuning job Name: {}'.format(training_job_name))


    # Focus on model performance
    processing_job_name = None

    for execution_step in reversed(execution_steps):
        if execution_step["StepName"] == "EvaluateMetrics": 
            processing_job_name=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]

    describe_evaluation_processing_job_response = sm.describe_processing_job(ProcessingJobName=processing_job_name)

    evaluation_metrics_s3_uri = describe_evaluation_processing_job_response['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
    print('Evaluation output: {}'.format(evaluation_metrics_s3_uri))

    evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
        evaluation_metrics_s3_uri
    ))

    pprint(json.loads(evaluation_json))



In [None]:
pipeline.list_executions()

In [None]:
model_metrics

In [None]:
evaluation_metrics

In [None]:
for par in pipeline_parameters:
    print(par)

In [None]:
tuner.best_estimator

In [None]:
# Pipeline execution overview
pipeline_execution = sm.list_pipeline_executions(PipelineName=pipeline_name)['PipelineExecutionSummaries']
pipeline_execution_status = pipeline_execution[0]["PipelineExecutionStatus"]

while pipeline_execution_status == "Executing":
    pipeline_execution = sm.list_pipeline_executions(PipelineName=pipeline_name)['PipelineExecutionSummaries']
    pipeline_execution_status = pipeline_execution[0]["PipelineExecutionStatus"]

pprint(pipeline_execution)

In [None]:
pipeline_execution

In [None]:
deploy_instance_type = ParameterString(
    name='deploy_instance_type',
    default_value='ml.m5.large'
)

deploy_instance_count = ParameterInteger(
    name="DeployInstanceCount",
    default_value=1
)

model_package_group_name = ParameterString(
    name='model_package_group_name',
    default_value='dairy-generative-formulation-{}-model-package-group'.format(attribute)
)

model_approval_status = ParameterString(
    name='model_approval_status',
    default_value='PendingManualApproval'
)

# ================== Configure model metrics source from evaluation step ==================
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        content_type=['application/json'],
        s3_uri=evaluation_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri'] + '/evaluation_{}.json'.format(attribute)
    )
)

# =========================== Retrieve image uri for inference ============================
inference_image_uri = sagemaker.image_uris.retrieve(
    framework='sklearn',
    version=FRAMEWORK_VERSION,
    instance_type=deploy_instance_type,
    image_scope='inference',
    region=region,
)

# ========================== Configure model registration step ============================
register_step = RegisterModel(
    name='RegisterModel',
    estimator=tuner.best_estimator,
    image_uri=inference_image_uri,
    inference_instances=[deploy_instance_type],
    transform_instances=[deploy_instance_type],
    model_data=tuning_step.get_top_model_s3_uri(top_k=0, s3_bucket=bucket),
    # model_data=tuning_step.properties.ModelArtifacts.S3ModelArtifacts,
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics,
    approval_status=model_approval_status,
    content_types=['text/csv'],
    response_types=['text/csv'],
)


In [None]:
RegisterModel?

In [None]:
model_package_group_name

In [None]:
tuning_step?