In [45]:
!pip install -U sagemaker

[0m

In [46]:
import os

import boto3
import sagemaker

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    Processor,
    ScriptProcessor,
)

from sagemaker import Model
from sagemaker.xgboost import XGBoostPredictor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import ProcessingStep, CacheConfig, TuningStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep

from sagemaker.workflow.functions import Join, JsonGet
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.tuner import (
    ContinuousParameter,
    HyperparameterTuner,
    WarmStartConfig,
    WarmStartTypes,
)

In [47]:
import sys  
import boto3  
import sagemaker  
from sagemaker.workflow.pipeline_context import PipelineSession  
import pandas as pd  

sagemaker_session = sagemaker.session.Session()             # Initialize a SageMaker session
region = sagemaker_session.boto_region_name                 # Get the AWS region
role = sagemaker.get_execution_role()                       # Get the execution role for SageMaker
pipeline_session = PipelineSession()                        # Initialize a pipeline session
default_bucket = sagemaker_session.default_bucket()         # Get the default S3 bucket for SageMaker
model_package_group_name = f"AutoMpgPackageGroupName"       # Define the model package group name

print(default_bucket)

sagemaker-us-east-1-211343875790


In [48]:
!mkdir -p data

In [49]:
local_path = "data/auto-mpg-dataset.csv"  # Local path to save the downloaded dataset

# Create an S3 resource object
s3 = boto3.resource("s3")

# Download the dataset from your S3 bucket with bucket name and file name
s3.Bucket(sagemaker_session.default_bucket()).download_file(
    "auto-mpg.csv", local_path  
)

# Define the base URI for the uploaded dataset in your bucket
base_uri = f"s3://{default_bucket}/auto-mpg"

# Upload the local dataset to the defined S3 URI
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,  
    desired_s3_uri=base_uri,  
)

# Print the S3 URI where the dataset was uploaded
print(input_data_uri)  

s3://sagemaker-us-east-1-211343875790/auto-mpg/auto-mpg-dataset.csv


In [50]:
df = pd.read_csv(local_path)

In [51]:
df.head()

Unnamed: 0,mpg,cylinders,displacement,horsepower,weight,acceleration,model year,origin,car name
0,18.0,8,307.0,130,3504,12.0,70,1,chevrolet chevelle malibu
1,15.0,8,350.0,165,3693,11.5,70,1,buick skylark 320
2,18.0,8,318.0,150,3436,11.0,70,1,plymouth satellite
3,16.0,8,304.0,150,3433,12.0,70,1,amc rebel sst
4,17.0,8,302.0,140,3449,10.5,70,1,ford torino


In [52]:
from sagemaker.workflow.parameters import (
ParameterInteger,
ParameterString,
ParameterFloat,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge") 
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval" )
base_job_prefix = "ab3-autompg-example"
input_data = ParameterString(name="InputData", default_value=input_data_uri,)
mse_threshold = ParameterFloat(name="MseThreshold", default_value=46.0)
# Cache Pipeline steps to reduce execution time on subsequent executions
cache_config = CacheConfig(enable_caching=True, expire_after="30d")

In [53]:
!mkdir -p code

In [54]:
# %%writefile code/preprocess.py
# import argparse
# import logging
# import os
# import pathlib
# import boto3
# import numpy as np
# import pandas as pd
# from sklearn.compose import ColumnTransformer
# from sklearn.impute import SimpleImputer
# from sklearn.pipeline import Pipeline
# from sklearn.preprocessing import StandardScaler, OneHotEncoder

# logger = logging.getLogger()
# logger.setLevel(logging.INFO)
# logger.addHandler(logging.StreamHandler())

# # Define column names and data types for the Auto MPG dataset
# feature_columns_names = [
#     "mpg", "cylinders", "displacement", "horsepower", "weight", "acceleration", "model_year", "origin"
# ]
# label_column = "mpg"

# feature_columns_dtype = {
#     "cylinders": np.float64,  # Change to float64 to handle NA values before converting to int
#     "displacement": np.float64,
#     "horsepower": np.float64,
#     "weight": np.float64,
#     "acceleration": np.float64,
#     "model_year": np.float64,  # Change to float64 to handle NA values before converting to int
#     "origin": str,
# }
# label_column_dtype = {"mpg": np.float64}

# def merge_two_dicts(x, y):
#     """Merges two dicts, returning a new copy."""
#     z = x.copy()
#     z.update(y)
#     return z

# if __name__ == "__main__":
#     logger.debug("Starting preprocessing.")
#     parser = argparse.ArgumentParser()
#     parser.add_argument("--input-data", type=str, required=True)
#     args = parser.parse_args()

#     base_dir = "/opt/ml/processing"
#     pathlib.Path(f"{base_dir}/data").mkdir(parents=True, exist_ok=True)  # Create directories for input and output
#     input_data = args.input_data
#     bucket = input_data.split("/")[2]
#     key = "/".join(input_data.split("/")[3:])

#     logger.info("Downloading data from bucket: %s, key: %s", bucket, key)
#     fn = f"{base_dir}/data/auto-mpg-dataset.csv"
#     s3 = boto3.resource("s3")
#     s3.Bucket(bucket).download_file(key, fn)  # Download the dataset from S3

#     logger.debug("Reading downloaded data.")
#     df = pd.read_csv(fn, header=None, names=feature_columns_names)  # Read the CSV file into a DataFrame
#     os.unlink(fn)  # Delete the local file after reading

#     # Ensure 'horsepower' is handled correctly
#     logger.debug("Handling missing values in 'horsepower' column.")
#     df['horsepower'] = pd.to_numeric(df['horsepower'], errors='coerce')  # Convert 'horsepower' to numeric, coerce errors to NaN

#     # Convert non-numeric values to NaN
#     for col in feature_columns_dtype.keys():
#         df[col] = pd.to_numeric(df[col], errors='coerce')

#     # Handle missing values before converting data types
#     df = df.fillna(df.median())  # Fill NaN values with the median

#     # Convert columns to the appropriate data types
#     for col, dtype in feature_columns_dtype.items():
#         df[col] = df[col].astype(dtype)

#     logger.debug("Defining transformers.")
#     features = list(feature_columns_names)
#     features.remove(label_column)  # Remove the label column from the features list

#     numeric_features = [name for name in features if df.dtypes[name] != 'object']  # Identify numeric features
#     categorical_features = [name for name in features if df.dtypes[name] == 'object']  # Identify categorical features

#     numeric_transformer = Pipeline(
#         steps=[
#             ("imputer", SimpleImputer(strategy="median")),  # Impute missing values with median
#             ("scaler", StandardScaler()),  # Standardize the numeric features
#         ]
#     )

#     categorical_transformer = Pipeline(
#         steps=[
#             ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),  # Impute missing values with 'missing'
#             ("onehot", OneHotEncoder(handle_unknown="ignore")),  # One-hot encode categorical features
#         ]
#     )

#     preprocess = ColumnTransformer(
#         transformers=[
#             ("num", numeric_transformer, numeric_features),  # Apply numeric transformations
#             ("cat", categorical_transformer, categorical_features),  # Apply categorical transformations
#         ]
#     )

#     logger.info("Applying transforms.")
#     y = df.pop(label_column)  # Separate the label from the features
#     X_pre = preprocess.fit_transform(df)  # Fit and transform the features
#     y_pre = y.to_numpy().reshape(len(y), 1)  # Reshape the label array

#     X = np.concatenate((y_pre, X_pre), axis=1)  # Concatenate the label and features

#     logger.info("Splitting %d rows of data into train, validation, test datasets.", len(X))
#     np.random.shuffle(X)  # Shuffle the dataset
#     train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])  # Split the dataset

#     # Further split the test set into test_set_1 and test_set_2
#     test_set_1, test_set_2 = np.split(test, [int(0.5 * len(test))])

#     logger.info("Writing out datasets to %s.", base_dir)
#     pathlib.Path(f"{base_dir}/train").mkdir(parents=True, exist_ok=True)
#     pathlib.Path(f"{base_dir}/validation").mkdir(parents=True, exist_ok=True)
#     pathlib.Path(f"{base_dir}/test_set_1").mkdir(parents=True, exist_ok=True)
#     pathlib.Path(f"{base_dir}/test_set_2").mkdir(parents=True, exist_ok=True)

#     pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)  # Save training set
#     pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)  # Save validation set
#     pd.DataFrame(test_set_1).to_csv(f"{base_dir}/test_set_1/test_set_1.csv", header=False, index=False)  # Save test_set_1
#     pd.DataFrame(test_set_2).to_csv(f"{base_dir}/test_set_2/test_set_2.csv", header=False, index=False)  # Save test_set_2

In [56]:
%%writefile code/preprocess.py
import argparse
import logging
import os
import pathlib
import boto3
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

# Define column names and data types for the Auto MPG dataset
feature_columns_names = [
    "mpg", "cylinders", "displacement", "horsepower", "weight", "acceleration", "model_year", "origin"
]
label_column = "mpg"

feature_columns_dtype = {
    "cylinders": np.float64,  # Change to float64 to handle NA values before converting to int
    "displacement": np.float64,
    "horsepower": np.float64,
    "weight": np.float64,
    "acceleration": np.float64,
    "model_year": np.float64,  # Change to float64 to handle NA values before converting to int
    "origin": str,
}
label_column_dtype = {"mpg": np.float64}

def merge_two_dicts(x, y):
    """Merges two dicts, returning a new copy."""
    z = x.copy()
    z.update(y)
    return z

if __name__ == "__main__":
    logger.debug("Starting preprocessing.")
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-data", type=str, required=True)
    args = parser.parse_args()

    base_dir = "/opt/ml/processing"
    pathlib.Path(f"{base_dir}/data").mkdir(parents=True, exist_ok=True)  # Create directories for input and output
    input_data = args.input_data
    bucket = input_data.split("/")[2]
    key = "/".join(input_data.split("/")[3:])

    logger.info("Downloading data from bucket: %s, key: %s", bucket, key)
    fn = f"{base_dir}/data/auto-mpg-dataset.csv"
    s3 = boto3.resource("s3")
    s3.Bucket(bucket).download_file(key, fn)  # Download the dataset from S3

    logger.debug("Reading downloaded data.")
    df = pd.read_csv(fn, header=None, names=feature_columns_names)  # Read the CSV file into a DataFrame
    os.unlink(fn)  # Delete the local file after reading

    # Ensure 'horsepower' is handled correctly
    logger.debug("Handling missing values in 'horsepower' column.")
    df['horsepower'] = pd.to_numeric(df['horsepower'], errors='coerce')  # Convert 'horsepower' to numeric, coerce errors to NaN

    # Convert non-numeric values to NaN
    for col in feature_columns_dtype.keys():
        df[col] = pd.to_numeric(df[col], errors='coerce')

    # Handle missing values before converting data types
    df = df.fillna(df.median())  # Fill NaN values with the median

    # Convert columns to the appropriate data types
    for col, dtype in feature_columns_dtype.items():
        df[col] = df[col].astype(dtype)

    logger.debug("Defining transformers.")
    features = list(feature_columns_names)
    features.remove(label_column)  # Remove the label column from the features list

    numeric_features = [name for name in features if df.dtypes[name] != 'object']  # Identify numeric features
    categorical_features = [name for name in features if df.dtypes[name] == 'object']  # Identify categorical features

    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),  # Impute missing values with median
            ("scaler", StandardScaler()),  # Standardize the numeric features
        ]
    )

    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),  # Impute missing values with 'missing'
            ("onehot", OneHotEncoder(handle_unknown="ignore")),  # One-hot encode categorical features
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),  # Apply numeric transformations
            ("cat", categorical_transformer, categorical_features),  # Apply categorical transformations
        ]
    )

    logger.info("Applying transforms.")
    y = df.pop(label_column)  # Separate the label from the features
    X_pre = preprocess.fit_transform(df)  # Fit and transform the features
    y_pre = y.to_numpy().reshape(len(y), 1)  # Reshape the label array

    X = np.concatenate((y_pre, X_pre), axis=1)  # Concatenate the label and features

    logger.info("Splitting %d rows of data into train, validation, test datasets.", len(X))
    np.random.shuffle(X)  # Shuffle the dataset
    train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])  # Split the dataset

    # Further split the test set into test_set_1 and test_set_2
    test_set_1, test_set_2 = np.split(test, [int(0.5 * len(test))])

    logger.info("Writing out datasets to %s.", base_dir)
    pathlib.Path(f"{base_dir}/train").mkdir(parents=True, exist_ok=True)
    pathlib.Path(f"{base_dir}/validation").mkdir(parents=True, exist_ok=True)
    pathlib.Path(f"{base_dir}/test_set_1").mkdir(parents=True, exist_ok=True)
    pathlib.Path(f"{base_dir}/test_set_2").mkdir(parents=True, exist_ok=True)

    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)  # Save training set
    pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)  # Save validation set
    pd.DataFrame(test_set_1).to_csv(f"{base_dir}/test_set_1/test_set_1.csv", header=False, index=False)  # Save test_set_1

    # Remove the label column from test_set_2
    test_set_2_features = test_set_2[:, 1:]
    pd.DataFrame(test_set_2_features).to_csv(f"{base_dir}/test_set_2/test_set_2.csv", header=False, index=False)  # Save test_set_2 without the label column

Overwriting code/preprocess.py


In [57]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.functions import Join
from sagemaker.workflow.execution_variables import ExecutionVariables

# Initialize the SKLearnProcessor for processing the Auto MPG dataset
sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",  # Version of the scikit-learn framework
    instance_type="ml.m5.xlarge",  # Type of instance to use
    instance_count=processing_instance_count,  # Number of instances to use
    base_job_name=f"{base_job_prefix}/sklearn-autompg-preprocess",  # Base name for the processing job
    sagemaker_session=pipeline_session,  # SageMaker session
    role=role,  # IAM role for the processing job
)

# Define the run arguments for the processing job
processor_run_args = sklearn_processor.run(
    outputs=[
        ProcessingOutput(
            output_name="train",  # Name of the output
            source="/opt/ml/processing/train",  # Source directory in the container
            destination=Join(  # Destination S3 URI
                on="/",
                values=[
                    "s3:/",
                    default_bucket,
                    base_job_prefix,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,  # Use pipeline execution ID for dynamic output path
                    "PreprocessAutoMpgData",  # Folder name for train data
                ],
            ),
        ),
        ProcessingOutput(
            output_name="validation",  # Name of the output
            source="/opt/ml/processing/validation",  # Source directory in the container
            destination=Join(  # Destination S3 URI
                on="/",
                values=[
                    "s3:/",
                    default_bucket,
                    base_job_prefix,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,  # Use pipeline execution ID for dynamic output path
                    "PreprocessAutoMpgData",  # Folder name for validation data
                ],
            ),
        ),
        ProcessingOutput(
            output_name="test_set_1",  # Name of the output
            source="/opt/ml/processing/test_set_1",  # Source directory in the container
            destination=Join(  # Destination S3 URI
                on="/",
                values=[
                    "s3:/",
                    default_bucket,
                    base_job_prefix,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,  # Use pipeline execution ID for dynamic output path
                    "PreprocessAutoMpgData",  # Folder name for test set 1
                ],
            ),
        ),
        ProcessingOutput(
            output_name="test_set_2",  # Name of the output
            source="/opt/ml/processing/test_set_2",  # Source directory in the container
            destination=Join(  # Destination S3 URI
                on="/",
                values=[
                    "s3:/",
                    default_bucket,
                    base_job_prefix,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,  # Use pipeline execution ID for dynamic output path
                    "PreprocessAutoMpgData",  # Folder name for test set 2
                ],
            ),
        ),
    ],
    code="code/preprocess.py",  # Path to the preprocessing script
    arguments=["--input-data", input_data],  # Arguments to pass to... the script
)

# Define the processing step for the pipeline
step_process = ProcessingStep(
    name="PreprocessAutoMpgData",  
    step_args=processor_run_args,  # Arguments for the processing step
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
Defaulting to only available Python version: py3
Defaulting to only available Python version: py3


In [58]:
from sagemaker.estimator import Estimator  
from sagemaker.inputs import TrainingInput  
from sagemaker.tuner import HyperparameterTuner, ContinuousParameter  
from sagemaker.workflow.steps import TuningStep  
from sagemaker.workflow.functions import Join  
from sagemaker.image_uris import retrieve  

# Define the output path for the model artifacts from the Hyperparameter Tuning Job
model_path = f"s3://{default_bucket}/{base_job_prefix}/AutoMpgTrain"

# Retrieve the image URI for the XGBoost algorithm
image_uri = retrieve(
    framework="xgboost",  # Specify the framework as XGBoost
    region=region,  
    version="1.0-1",  # Specify the version of XGBoost
    py_version="py3",  # Specify the Python version
    instance_type="ml.m5.xlarge",  
)

# Initialize the XGBoost estimator
xgb_train = Estimator(
    image_uri=image_uri,  # The URI of the container image for XGBoost
    instance_type=instance_type,  # The type of EC2 instance to use for training
    instance_count=1,  
    output_path=model_path,  # The S3 location where the model artifacts will be saved
    base_job_name=f"{base_job_prefix}/autompg-train",  # The base name for the training job
    sagemaker_session=pipeline_session,  # The SageMaker session to use
    role=role,  # The IAM role for the training job
)

# Set the hyperparameters for the XGBoost model
xgb_train.set_hyperparameters(
    eval_metric="rmse",  # The evaluation metric to use
    objective="reg:squarederror",  # The objective metric for the training job
    num_round=50,  # The number of boosting rounds
    max_depth=5,  # The maximum depth of a tree
    eta=0.2,  # The step size shrinkage
    gamma=4,  # The minimum loss reduction required to make a further partition
    min_child_weight=6,  # The minimum sum of instance weight needed in a child
    subsample=0.7,  # The subsample ratio of the training instances
    silent=0,  # The logging mode
)

# Define the objective metric for the Hyperparameter Tuning Job
objective_metric_name = "validation:rmse"

# Define the hyperparameter ranges to tune
hyperparameter_ranges = {
    "alpha": ContinuousParameter(0.01, 10, scaling_type="Logarithmic"),  # The range for the alpha parameter
    "lambda": ContinuousParameter(0.01, 10, scaling_type="Logarithmic"),  # The range for the lambda parameter
}

# Initialize the Hyperparameter Tuner
tuner_log = HyperparameterTuner(
    xgb_train,  # The XGBoost estimator to tune
    objective_metric_name,  # The objective metric to optimize
    hyperparameter_ranges,  # The ranges of hyperparameters to tune
    max_jobs=3,  # The maximum number of jobs to run in total
    max_parallel_jobs=3,  # The maximum number of jobs to run in parallel
    strategy="Bayesian",  # The search strategy for hyperparameter tuning
    objective_type="Minimize",  # The type of objective metric (minimize or maximize)
)

# Define the inputs for the Hyperparameter Tuning Job
hpo_args = tuner_log.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,  # S3 URI for training data
            content_type="text/csv",  # Content type of the training data
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,  # S3 URI for validation data
            content_type="text/csv",  # Content type of the validation data
        ),
    }
)

# Defining the Tuning Step for the pipeline
step_tuning = TuningStep(
    name="HPTuning",  
    step_args=hpo_args,  # The arguments for the tuning step
    cache_config=cache_config,  # The cache configuration to reduce execution time on subsequent executions
)

In [59]:
# Creating SageMaker Model
model_prefix = f"{base_job_prefix}/AutoMpgTrain"  # Define the prefix for the model path in S3

# Initialize the Model object for the best model
best_model = Model(
    image_uri=image_uri,                          # URI of the container image for the model
    model_data=step_tuning.get_top_model_s3_uri(
        top_k=0,                                  # Get the top model from the hyperparameter tuning job
        s3_bucket=default_bucket,                 # The S3 bucket where the model artifacts are stored
        prefix=model_prefix                       # The prefix for the model path in S3
    ),
    predictor_cls=XGBoostPredictor,               # The predictor class to use for deployment
    sagemaker_session=pipeline_session,           # The SageMaker session to use
    role=role,                                    # The IAM role for the model
)

# Define the model creation step in the pipeline
step_create_model = ModelStep(
    name="CreateBestModel",  
    step_args=best_model.create(instance_type="ml.m5.xlarge"),  # Arguments for creating the model and specifying the instance type to use
)

In [60]:
%%writefile code/evaluate.py

import json
import pathlib
import tarfile
import pickle
import numpy as np
import pandas as pd
import xgboost
from sklearn.metrics import mean_squared_error

if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"  # Path to the tarred model file
    with tarfile.open(model_path) as tar:  # Extract the tarred model file
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))  # Load the XGBoost model from the extracted file

    test_path = "/opt/ml/processing/test_set_1/test_set_1.csv"  # Path to the test dataset
    df = pd.read_csv(test_path, header=None)  # Read the test dataset into a DataFrame
    y_test = df.iloc[:, 0].to_numpy()  # Extract the labels (assuming the label is the first column)
    df.drop(df.columns[0], axis=1, inplace=True)  # Drop the label column from the test data
    dmatrix_test = xgboost.DMatrix(df.values)  # Create DMatrix for XGBoost prediction

    predictions = model.predict(dmatrix_test)  # Make predictions using the loaded model
    mse = mean_squared_error(y_test, predictions)  # Calculate mean squared error
    std = np.std(y_test - predictions)  # Calculate standard deviation of errors

    report_dict = {  # Create a dictionary to store the evaluation metrics
        "regression_metrics": {
            "mse": {"value": mse, "standard_deviation": std},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"                     # Define the output directory for the evaluation report
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)      # Create the output directory if it doesn't exist
    evaluation_path = f"{output_dir}/evaluation.json"                # Path to the evaluation report JSON file
    with open(evaluation_path, "w") as f:                            # Write the evaluation metrics to the JSON file
        f.write(json.dumps(report_dict))

Overwriting code/evaluate.py


In [61]:
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.functions import Join
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.execution_variables import ExecutionVariables

script_eval = ScriptProcessor(
    image_uri=image_uri,  # Docker image URI for the processing job
    command=["python3"],  # Command to run
    instance_type="ml.m5.xlarge",  
    instance_count=1,  
    base_job_name=f"{base_job_prefix}/script-autompg-eval",  # Base name for the processing job
    sagemaker_session=pipeline_session,  
    role=role,  
)

evaluation_report = PropertyFile(
    name="AutoMPGEvaluationReport",  # Name of the property file
    output_name="evaluation",  # Output name
    path="evaluation.json",  # Path to the evaluation JSON file
)

processor_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_tuning.get_top_model_s3_uri(
                top_k=0, s3_bucket=default_bucket, prefix=model_prefix     # Source S3 URI for the model artifact
            ), 
            destination="/opt/ml/processing/model",  # Destination directory in the container
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test_set_1"].S3Output.S3Uri,  # Source S3 URI for the test data
            destination="/opt/ml/processing/test_set_1",  # Destination directory in the container
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),  # Output directory in the container
    ],
    code="code/evaluate.py",  # Path to the evaluation script
)

step_eval = ProcessingStep(
    name="EvaluateTopModel",  
    step_args=processor_args,  # Arguments for the processing step
    property_files=[evaluation_report],  # Property file for the evaluation report
    cache_config=cache_config,  # Cache configuration
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]     # S3 URI for the evaluation JSON file
        ),  
        content_type="application/json",  # Content type of the evaluation file
    )
)

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


In [62]:
from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep
from sagemaker.workflow.functions import Join

# Define the Transformer
transformer = Transformer(
    model_name=step_create_model.properties.ModelName,  # Name of the model created in the previous step
    instance_type="ml.m5.xlarge",  # Type of instance to use for the transform job
    instance_count=1,  # Number of instances to use for the transform job
    output_path=f"s3://{default_bucket}/AutoMPGTransform",  # S3 path where the transform output will be stored
)

# Define the TransformInput
batch_data = Join(
    on="/",
    values=[
        f"s3://{default_bucket}",
        "ab3-autompg-example",
        ExecutionVariables.PIPELINE_EXECUTION_ID,
        "PreprocessAutoMpgData",
        "test_set_2.csv"
    ]
)

# Define the TransformStep
step_transform = TransformStep(
    name="AutoMPGTransform",  # Name of the transform step
    transformer=transformer,  # Transformer object defined above
    inputs=TransformInput(data=batch_data, content_type="text/csv"),  # Input data for the transform job
)

In [63]:
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep

# Define the model registration arguments
register_args = best_model.register(
    content_types=["text/csv"],  # Define the content types for the model input
    response_types=["text/csv"],  # Define the response types for the model output
    inference_instances=["ml.t2.medium", "ml.m5.large"],  # Define the instance types for inference
    transform_instances=["ml.m5.large"],  # Define the instance types for batch transformation
    model_package_group_name=model_package_group_name,  # Define the model package group name
    approval_status=model_approval_status,  # Define the approval status of the model
)

# Define the model registration step
step_register = ModelStep(
    name="RegisterBestAutoMPGModel",  # Name of the model registration step
    step_args=register_args,  # Arguments for registering the model
)

In [64]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

# Define the fail step
step_fail = FailStep(
    name="AutoMPGMSEFail",  # Name of the fail step
    error_message=Join(
        on=" ",  # Join elements with a space
        values=["Execution failed due to MSE >", mse_threshold]  # Error message parts
    ),
)

In [65]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

# Create a condition that checks if the MSE is less than or equal to the threshold
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,  # Name of the evaluation step
        property_file=evaluation_report,  # Property file containing the evaluation metrics
        json_path="regression_metrics.mse.value",  # JSON path to the MSE value in the evaluation report
    ),
    right=mse_threshold,  # The threshold value for the condition
)

# Define the condition step
step_cond = ConditionStep(
    name="AutoMPGMSECond",  # Name of the condition step
    conditions=[cond_lte],  # List of conditions to check
    if_steps=[step_register, step_create_model, step_transform],  # Steps to execute if the condition is met
    else_steps=[step_fail],  # Steps to execute if the condition is not met (e.g., logging or notifications)
)

In [66]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig

# Defining the pipeline

pipeline_name = "AutoMPGPipeline"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        batch_data,
        mse_threshold,
    ],
    steps=[step_process, step_tuning, step_eval, step_cond],
)

In [67]:
import json

definition = json.loads(pipeline.definition())
definition

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building wo

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-211343875790/auto-mpg/auto-mpg-dataset.csv'},
  {'Name': 'MseThreshold', 'Type': 'Float', 'DefaultValue': 46.0}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'PreprocessAutoMpgData',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '683313688378.dkr.ecr.us-

In [68]:
pipeline.upsert(role_arn=role)

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building wo

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:211343875790:pipeline/AutoMPGPipeline',
 'ResponseMetadata': {'RequestId': '558b5bfa-da37-4dd4-918d-998703c97d94',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '558b5bfa-da37-4dd4-918d-998703c97d94',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '83',
   'date': 'Thu, 20 Jun 2024 02:01:05 GMT'},
  'RetryAttempts': 0}}

In [69]:
execution = pipeline.start()

In [70]:
execution.wait()

In [71]:
from pprint import pprint

evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


{'regression_metrics': {'mse': {'standard_deviation': 0.23584178988203086,
                                'value': 0.06323755908057743}}}
