In [9]:
# install SageMaker python SDK
!pip install -U sagemaker

[33mDEPRECATION: celery 4.4.0 has a non-standard dependency specifier pytz>dev. pip 24.1 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of celery or contact the author to suggest that they release a version with a conforming dependency specifiers. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
[0m

In [10]:
%load_ext autoreload
%autoreload 2
%load_ext dotenv
%dotenv

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv


In [11]:
import sys
import os
import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import  PipelineSession 

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
# role = sagemaker.get_execution_role()
role = os.environ["ROLE"]
pipeline_session = PipelineSession()
# default_bucket = sagemaker_session.default_bucket()
default_bucket = os.environ["BUCKET"]
model_package_group_name = f"AbaloneModelPackageGroupName"

print(role)
print(default_bucket)


arn:aws:iam::956763695025:role/service-role/AmazonSageMaker-ExecutionRole-20231107T213330
hillaryabalone


In [12]:
!mkdir -p data

In [13]:
local_path = "data/abalone-dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-example-files-prod-{region}").download_file("datasets/tabular/uci_abalone/abalone.csv", local_path)

base_uri = f"s3://{default_bucket}/abalone"
input_data_uri = sagemaker.s3.S3Uploader.upload(local_path=local_path, desired_s3_uri=base_uri)
print(input_data_uri)

s3://hillaryabalone/abalone/abalone-dataset.csv


In [14]:
# download a second dataset for batch transformation after model creation

local_path = "data/abalone-dataset-batch"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file("dataset/abalone-dataset-batch", local_path)

base_uri = f"s3://{default_bucket}/abalone"
batch_data_uri = sagemaker.s3.S3Uploader.upload(local_path=local_path, desired_s3_uri=base_uri)
print(batch_data_uri)

s3://hillaryabalone/abalone/abalone-dataset-batch


In [15]:
# Define Parameters to Parametrize Pipeline Execution
# ParameterString
# ParameterInteger
# ParameterFloat
# processing_instance_count - The instance count of the processing job.
# instance_type - The ml.* instance type of the training job.
# model_approval_status - The approval status to register with the trained model for CI/CD purposes (“PendingManualApproval” is the default).
# input_data - The S3 bucket URI location of the input data.
# batch_data - The S3 bucket URI location of the batch data.
# mse_threshold - The Mean Squared Error (MSE) threshold used to verify the accuracy of a model.

from sagemaker.workflow.parameters import (ParameterString, ParameterInteger, ParameterFloat)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")
input_data = ParameterString(name="InputData", default_value=input_data_uri)
batch_data = ParameterString(name="BatchData", default_value=batch_data_uri)
mse_threshold = ParameterFloat(name="MseThreshold", default_value=6.0)


In [16]:
# Define a Processing Step for Feature Engineering
# Fill in missing sex category data and encode it so that it is suitable for training.
# Scale and normalize all numerical fields, aside from sex and rings numerical data.
# Split the data into training, validation, and test datasets.
# The Processing step executes the script on the input data.
# The Training step uses the preprocessed training features and labels to train a model. 
# The Evaluation step uses the trained model and preprocessed test features and labels to evaluate the model.

In [17]:
!mkdir -p code

In [18]:
%%writefile code/preprocessing.py
import argparse
import os
import requests
import tempfile

import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

# Since we get a headerless CSV file, we specify the column names here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]

label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64,
}
label_column_dtype = {"rings": np.float64}

def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z

if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/abalone-dataset.csv",
        header=None,
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),
    )
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
    )

    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore")),
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features),
        ]
    )

    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)

    X = np.concatenate((y_pre, X_pre), axis=1)

    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])

    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        f"{base_dir}/validation/validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)


Overwriting code/preprocessing.py


In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=pipeline_session,
)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocessing.py"
)
step_process = ProcessingStep(name="AbaloneProcess", step_args=processor_args)

In [None]:
# Define a Training Step to Train a Model
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/AbaloneTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)

xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)

xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="AbaloneTrain",
    step_args=train_args,
)