# imports 


In [2]:

import pandas as pd
import json
import boto3
import pathlib
import io
import sagemaker
from time import gmtime, strftime, sleep
from sagemaker.deserializers import CSVDeserializer
from sagemaker.serializers import CSVSerializer

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import (
    ProcessingInput, 
    ProcessingOutput, 
    ScriptProcessor
)
from sagemaker.inputs import TrainingInput

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import (
    ProcessingStep, 
    TrainingStep, 
    CreateModelStep
)
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.parameters import (
    ParameterInteger, 
    ParameterFloat, 
    ParameterString, 
    ParameterBoolean
)
from sagemaker.workflow.clarify_check_step import (
    ModelBiasCheckConfig, 
    ClarifyCheckStep, 
    ModelExplainabilityCheckConfig
)
from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.conditions import (
    ConditionGreaterThan,
    ConditionGreaterThanOrEqualTo
)
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import (
    Join,
    JsonGet
)
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda

from sagemaker.model_metrics import (
    MetricsSource, 
    ModelMetrics, 
    FileSource
)
from sagemaker.drift_check_baselines import DriftCheckBaselines

from sagemaker.image_uris import retrieve

sagemaker.__version__

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


'2.215.0'

# ADD ML PIPELINE


In this step you automate our end-to-end ML workflow using Amazon SageMaker Pipelines and Amazon SageMaker Model Registry. You make feature engineering re-usable, repeatable, and scaleable using Amazon SageMaker Feature Store.

# set pipeline constants

In [3]:
# Set names of pipeline objects
project = "bike-ride-sharing"
pipeline_name = f"{project}-pipeline"
pipeline_model_name = f"{project}-model-xgb"
model_package_group_name = f"{project}-model-group"
endpoint_config_name = f"{project}-endpoint-config"
endpoint_name = f"{project}-endpoint"

# set instances for pipeline jobs

In [4]:
# Set instance types and counts
process_instance_type = "ml.t3.medium"
train_instance_count = 1
train_instance_type = "ml.m5.xlarge"

In [5]:
boto_session = boto3.Session()
region = boto_session.region_name
print(f"Region--{region}")
sm_role = sagemaker.get_execution_role()
print(f"Sagemaker Role--{sm_role}")
sagemaker_session = sagemaker.Session()
default_bucket = sagemaker_session.default_bucket()
print(f"Default bucket--{default_bucket}")

Region--eu-north-1
Sagemaker Role--arn:aws:iam::058264393695:role/service-role/AmazonSageMaker-ExecutionRole-20240621T001438
Default bucket--sagemaker-eu-north-1-058264393695


# set s3 urls 

In [6]:
bucket_name = default_bucket
bucket_prefix = 'bits-webminar-june-24'

print(f"bucket_prefix for pipelines--> {bucket_prefix}")

input_s3_url = f's3://{bucket_name}/{bucket_prefix}/bike-sharing-dataset.csv'
print(f"input_s3_url--> {input_s3_url}")

train_s3_url = f"s3://{bucket_name}/{bucket_prefix}/train"
print(f"train_s3_url--> {train_s3_url}")

validation_s3_url = f"s3://{bucket_name}/{bucket_prefix}/validation"
print(f"validation_s3_url--> {validation_s3_url}")

test_s3_url = f"s3://{bucket_name}/{bucket_prefix}/test"
print(f"test_s3_url--> {test_s3_url}")

baseline_s3_url = f"s3://{bucket_name}/{bucket_prefix}/baseline"
print(f"baseline_s3_url--> {baseline_s3_url}")


evaluation_s3_url = f"s3://{bucket_name}/{bucket_prefix}/evaluation"
print(f"evaluation_s3_url--> {evaluation_s3_url}")

prediction_baseline_s3_url = f"s3://{bucket_name}/{bucket_prefix}/prediction_baseline"
print(f"prediction_baseline_s3_url--> {prediction_baseline_s3_url}")

output_s3_url = f"s3://{bucket_name}/{bucket_prefix}/pipeline_output"
print(f"output_s3_url--> {output_s3_url}")

bucket_prefix for pipelines--> bits-webminar-june-24
input_s3_url--> s3://sagemaker-eu-north-1-058264393695/bits-webminar-june-24/bike-sharing-dataset.csv
train_s3_url--> s3://sagemaker-eu-north-1-058264393695/bits-webminar-june-24/train
validation_s3_url--> s3://sagemaker-eu-north-1-058264393695/bits-webminar-june-24/validation
test_s3_url--> s3://sagemaker-eu-north-1-058264393695/bits-webminar-june-24/test
baseline_s3_url--> s3://sagemaker-eu-north-1-058264393695/bits-webminar-june-24/baseline
evaluation_s3_url--> s3://sagemaker-eu-north-1-058264393695/bits-webminar-june-24/evaluation
prediction_baseline_s3_url--> s3://sagemaker-eu-north-1-058264393695/bits-webminar-june-24/prediction_baseline
output_s3_url--> s3://sagemaker-eu-north-1-058264393695/bits-webminar-june-24/pipeline_output


# create pipeline 
    
    
Setup pipeline parameters
SageMaker Pipelines supports parameterization, which allows you to specify input parameters at runtime without changing your pipeline code. You can use the parameter classes available under the sagemaker.workflow.parameters module. Parameters have a default value, which you can override by specifying parameter values when starting a pipeline execution.

> set up pipeline parameters
    
    


In [7]:
# Set processing instance type
process_instance_type_param = ParameterString(
    name="ProcessingInstanceType",
    default_value=process_instance_type,
)

# Set training instance type
train_instance_type_param = ParameterString(
    name="TrainingInstanceType",
    default_value=train_instance_type,
)

# Set training instance count
train_instance_count_param = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=train_instance_count
)

# Set model approval param
model_approval_status_param = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

# Minimal threshold for model performance on the test dataset
test_score_threshold_param = ParameterFloat(
    name="TestScoreThreshold", 
    default_value=100.00
)

# Set S3 url for input dataset
input_s3_url_param = ParameterString(
    name="InputDataUrl",
    default_value=input_s3_url,
)

# experiment config

In [8]:
import sys
from pathlib import Path

root_dir = Path('../scripts').resolve().parents[0] 
print(root_dir)
sys.path.append(str(root_dir))

import scripts.utils as utils
run_suffix = utils.get_current_ist().strftime("%d-%m-%Y-%H-%M-%S")
run_name = f"pipeline-processing-{run_suffix}"
print(run_name)

/root/bits-webminar-june-24/ml
pipeline-processing-27-06-2024-15-27-06


In [9]:
experiment_name = "bike-ride-sharing"

In [10]:
from sagemaker.experiments.run import Run, load_run


with Run(experiment_name=experiment_name,
         run_name=run_name,
         run_display_name="pipeline-processing",
         sagemaker_session=sagemaker_session
        ) as run:
    run.log_parameters(
        {
            "train": 0.7,
            "validate": 0.2,
            "test": 0.1
        }
    )
   
    experiment_config = run.experiment_config
    # time.sleep(8) # wait until resource tags are propagated to the run

In [11]:
experiment_config

{'ExperimentName': 'bike-ride-sharing',
 'TrialName': 'Default-Run-Group-bike-ride-sharing',
 'RunName': 'bike-ride-sharing-pipeline-processing-27-06-2024-15-27-06'}

# build the the pipeline

Build the pipeline steps
You include the following steps in the pipeline:

* Data processing step: runs a SageMaker processing job for feature engineering and dataset split
* Training step: runs a SageMaker training job using XGBoost algorithm
* Evaluation step: evaluate the performance of the trained model
* Condition step: checks if the performance of the model meets the specified threshold
* Register step: registers a version of the model in the SageMaker model registry

In [12]:
session = PipelineSession()

In [13]:
sklearn_processor = SKLearnProcessor(
        # framework_version="0.23-1",
        framework_version = "1.0-1",
        role=sm_role,
        instance_type=process_instance_type_param,
        instance_count=1,
        base_job_name=f"{pipeline_name}/preprocess",
        sagemaker_session=session,
    )
    

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [14]:
# preprocessing step 

processing_inputs=[
    ProcessingInput(source=input_s3_url_param, destination="/opt/ml/processing/input")
]

processing_outputs=[
    ProcessingOutput(output_name="train_data", source="/opt/ml/processing/output/train", 
                     destination=train_s3_url),
    ProcessingOutput(output_name="validation_data", source="/opt/ml/processing/output/validation",
                     destination=validation_s3_url),
    ProcessingOutput(output_name="test_data", source="/opt/ml/processing/output/test",
                     destination=test_s3_url),
    ProcessingOutput(output_name="baseline_data", source="/opt/ml/processing/output/baseline", 
                     destination=baseline_s3_url),
]

processor_args = sklearn_processor.run(
    inputs=processing_inputs,
    outputs=processing_outputs,
    code='../scripts/preprocessing.py',
    # arguments = ['arg1', 'arg2'],
)
    
# Define processing step
step_process = ProcessingStep(
    name=f"{pipeline_name}-preprocess-data",
    step_args=processor_args,
)



In [15]:
# training step 
xgboost_image_uri = sagemaker.image_uris.retrieve("xgboost", region=region, version="1.5-1")

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


In [16]:
# Instantiate an XGBoost estimator object
estimator = sagemaker.estimator.Estimator(
    image_uri=xgboost_image_uri,  # XGBoost algorithm container
    instance_type=train_instance_type_param,  
    instance_count=train_instance_count_param,  
    role=sm_role,  
    max_run=20 * 60,  # Maximum allowed active runtime
    output_path=output_s3_url, # S3 location 
    sagemaker_session=session, # Session object  manages interactions with SageMaker API and AWS services
    base_job_name=f"{pipeline_name}/train", # Prefix for training job name
)

# define its hyperparameters
estimator.set_hyperparameters(
    num_round=150, # the number of rounds to run the training
    max_depth=3, # maximum depth of a tree
    eta=0.5, # step size shrinkage used in updates to prevent overfitting
    alpha=2.5, # L1 regularization term on weights
    objective="reg:squarederror",
    eval_metric="rmse", # evaluation metrics for validation data
    subsample=0.8, # subsample ratio of the training instance
    colsample_bytree=0.8, # subsample ratio of columns when constructing each tree
    min_child_weight=3, # minimum sum of instance weight (hessian) needed in a child
    early_stopping_rounds=10, # the model trains until the validation score stops improving
    verbosity=1, # verbosity of printing messages
)

In [17]:
training_inputs = {
    "train": TrainingInput(
        s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
            "train_data"
        ].S3Output.S3Uri,
        content_type="text/csv",
    ),
    "validation": TrainingInput(
        s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
            "validation_data"
        ].S3Output.S3Uri,
        content_type="text/csv",
    ),
}

In [18]:
training_args = estimator.fit(training_inputs)

# Define training step
step_train = TrainingStep(
    name=f"{pipeline_name}-train",
    step_args=training_args,
    
)

In [17]:
# %%writefile ../scripts/evaluation.py

# import json
# import os
# import pathlib
# import pickle as pkl
# import tarfile
# import joblib
# import numpy as np
# import pandas as pd
# import xgboost as xgb
# import datetime as dt
# from sklearn.metrics import mean_squared_error

# if __name__ == "__main__":   
    
#     # All paths are local for the processing container
#     model_path = "/opt/ml/processing/model/model.tar.gz"
#     test_x_path = "/opt/ml/processing/test/test_x.csv"
#     test_y_path = "/opt/ml/processing/test/test_y.csv"
#     output_dir = "/opt/ml/processing/evaluation"
#     output_prediction_path = "/opt/ml/processing/output/"
        
#     # Read model tar file
#     with tarfile.open(model_path, "r:gz") as t:
#         t.extractall(path=".")
    
#     # Load model
#     model = xgb.Booster()
#     model.load_model("xgboost-model")
    
#     # Read test data
#     X_test = xgb.DMatrix(pd.read_csv(test_x_path).values)
    
#     y_test = pd.read_csv(test_y_path).to_numpy()

#     # Run predictions
#     predictions = np.array(predictor.predict(X_test.values), dtype=float).squeeze()

#     # Evaluate predictions
#     test_results = pd.concat([pd.Series(predictions, name="y_pred", index=X_test.index),X_test,],axis=1,)
#     test_results.head()
    
#     test_rmse = mean_squared_error(y_test, test_results["y_pred"])
#     report_dict = {"regression_metric":{"test_rmse":{"value":test_rmse}}}
#     print(f"Test-rmse: {test_rmse:.2f}")


#     # Save evaluation report
#     pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
#     with open(f"{output_dir}/evaluation.json", "w") as f:
#         f.write(json.dumps(report_dict))
    
#     # Save prediction baseline file - we need it later for the model quality monitoring
#     test_results.to_csv(os.path.join(output_prediction_path, 'prediction_baseline/prediction_baseline.csv'), index=False, header=True)


In [19]:
# Processor to run the evaluation script and construct the evaluation step
script_processor = ScriptProcessor(
    image_uri=xgboost_image_uri,
    role=sm_role,
    command=["python3"],
    instance_type=process_instance_type_param,
    instance_count=1,
    base_job_name=f"{pipeline_name}/evaluate",
    sagemaker_session=session,
)

eval_inputs=[
    ProcessingInput(source=step_train.properties.ModelArtifacts.S3ModelArtifacts, 
                    destination="/opt/ml/processing/model"),
    ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri, 
                    destination="/opt/ml/processing/test"),
]

eval_outputs=[
    ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation", 
                     destination=evaluation_s3_url),
    ProcessingOutput(output_name="prediction_baseline_data", source="/opt/ml/processing/output/prediction_baseline", 
                     destination=prediction_baseline_s3_url),
]

eval_args = script_processor.run(
    inputs=eval_inputs,
    outputs=eval_outputs,
    code="../scripts/evaluation.py",
)
    
evaluation_report = PropertyFile(
    name="ModelEvaluationReport", output_name="evaluation", path="evaluation.json"
)

step_eval = ProcessingStep(
    name=f"{pipeline_name}-evaluate-model",
    step_args=eval_args,
    property_files=[evaluation_report]
)

In [20]:
# register step 
model = Model(
    image_uri=xgboost_image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    # name=f"bits-webminar-june-24-xgboost-model",
    name=f"bits-webminar-june-24-xgboost-model",
    
    sagemaker_session=session,
    role=sm_role,
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge", "ml.m5.large"],
    transform_instances=["ml.m5.xlarge", "ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status_param,
    model_metrics=model_metrics,
)

step_register = ModelStep(
    name=f"{pipeline_name}-register",
    step_args=register_args
)



In [21]:
# fail step 
step_fail = FailStep(
    name=f"{pipeline_name}-fail",
    error_message=Join(on=" ", values=["Execution failed due to RMSE Score >", test_score_threshold_param]),
)

In [22]:
# condition step 
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metric.test_rmse.value",
    ),
    right=test_score_threshold_param,
)

step_cond = ConditionStep(
    name=f"{pipeline_name}-check-test-score",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[step_fail],
)

# Pipeline

In [23]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        process_instance_type_param,
        train_instance_type_param,
        train_instance_count_param,
        model_approval_status_param,
        test_score_threshold_param,
        input_s3_url_param,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=session,
)

In [24]:
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sm_role)



{'PipelineArn': 'arn:aws:sagemaker:eu-north-1:058264393695:pipeline/bike-ride-sharing-pipeline',
 'ResponseMetadata': {'RequestId': '97af5725-420e-4b49-8327-a146879bb29c',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '97af5725-420e-4b49-8327-a146879bb29c',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '95',
   'date': 'Thu, 27 Jun 2024 09:58:30 GMT'},
  'RetryAttempts': 0}}

In [25]:
pipeline_definition = json.loads(pipeline.describe()['PipelineDefinition'])
pipeline_definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.t3.medium'},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'TrainingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'TestScoreThreshold', 'Type': 'Float', 'DefaultValue': 100.0},
  {'Name': 'InputDataUrl',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-eu-north-1-058264393695/bits-webminar-june-24/bike-sharing-dataset.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'bike-ride-sharing-pipeline-preprocess-data',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'

# execution of the pipeline 

In [26]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType=process_instance_type,
        TrainingInstanceType=train_instance_type,
        TrainingInstanceCount=train_instance_count,
        ModelApprovalStatus="PendingManualApproval",
        TestScoreThreshold=100.0,
        InputDataUrl=input_s3_url
    )
)

In [27]:
execution.list_steps()

[{'StepName': 'bike-ride-sharing-pipeline-preprocess-data',
  'StartTime': datetime.datetime(2024, 6, 27, 9, 58, 36, 953000, tzinfo=tzlocal()),
  'StepStatus': 'Starting',
  'Metadata': {},
  'AttemptCount': 1}]

# delete a pipeline 

In [59]:
sagemaker_client = boto3.client('sagemaker')

def delete_pipeline(pipeline_name):
    try:
        response = sagemaker_client.delete_pipeline(PipelineName=pipeline_name)
        print(f"Pipeline '{pipeline_name}' deleted successfully.")
    except sagemaker_client.exceptions.ResourceNotFound:
        print(f"Pipeline '{pipeline_name}' not found.")
    except Exception as e:
        print(f"An error occurred: {e}")

pipeline_name = 'bits-build-06-21-11-37-28-p-ljnvfgbcukgr'

delete_pipeline(pipeline_name)


Pipeline 'bits-build-06-21-11-37-28-p-ljnvfgbcukgr' deleted successfully.
