# Develop, Train, Optimize and Deploy Scikit-Learn Random Forest


* Doc https://sagemaker.readthedocs.io/en/stable/using_sklearn.html
* SDK https://sagemaker.readthedocs.io/en/stable/sagemaker.sklearn.html
* boto3 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#client

In this notebook we show how to use Amazon SageMaker to develop, train, tune and deploy a Scikit-Learn based ML model (Random Forest). More info on Scikit-Learn can be found here https://scikit-learn.org/stable/index.html. 

In [33]:
import datetime
import time
import tarfile

import boto3
import pandas as pd
import numpy as np
from sagemaker import get_execution_role
import sagemaker
from sklearn.model_selection import train_test_split
from sklearn.datasets import fetch_california_housing


sm_boto3 = boto3.client("sagemaker")

sess = sagemaker.Session()

region = sess.boto_session.region_name

bucket = sess.default_bucket()  # this could also be a hard-coded bucket name

print("Using bucket " + bucket)

Using bucket sagemaker-us-east-1-630508781792


## Prepare data
We load a dataset from sklearn, split it and send it to S3

In [34]:
# Load the uploaded dataset
data_df = pd.read_csv('training.csv') # update with your own dataset name
data_df = data_df.drop('ACCT_NBR',axis=1) # if you do not have this, remove it
# Define target and features
target_col = 'IS_WARN' # update if needed
X = data_df.drop(target_col, axis=1)
y = data_df[target_col]

# Store feature names (ensure they are valid strings for command line args)
feature_names = list(X.columns)
features_string = " ".join(feature_names) # Create space-separated string for script arg

print(f"Target column: {target_col}")
print(f"Features ({len(feature_names)}): {features_string}")

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.25, random_state=42, stratify=y # Use stratify for classification
)

# Create dataframes for saving
train_df = X_train.copy()
train_df[target_col] = y_train

test_df = X_test.copy()
test_df[target_col] = y_test

# Define filenames
train_file = "combined_data_train.csv"
test_file = "combined_data_test.csv"

# Save to CSV
train_df.to_csv(train_file, index=False)
test_df.to_csv(test_file, index=False)

# Upload to S3
prefix = "sagemaker/sklearn-combined-data"
trainpath = sess.upload_data(
    path=train_file, bucket=bucket, key_prefix=prefix
)

testpath = sess.upload_data(
    path=test_file, bucket=bucket, key_prefix=prefix
)

print(f"Train data uploaded to: {trainpath}")
print(f"Test data uploaded to: {testpath}")

Target column: IS_WARN
Features (23): CANCEL_NO_CONTACT IS_DIGITAL AUM_AMT YEARLYINCOMELEVEL TX_COUNT TX_SUM TX_MEAN TX_STD TX_MAX TX_MIN AVG_PB_BAL MAX_PB_BAL MIN_PB_BAL MODE_TX_TIME UNIQUE_CHANNELS UNIQUE_TRN_CODES SUM_MB_CHECK SUM_EB_CHECK FLAG_SAME_IP FLAG_SAME_UUID IS_WEEKEND ACCOUNT_AGE AGE
Train data uploaded to: s3://sagemaker-us-east-1-630508781792/sagemaker/sklearn-combined-data/combined_data_train.csv
Test data uploaded to: s3://sagemaker-us-east-1-630508781792/sagemaker/sklearn-combined-data/combined_data_test.csv


## Writing a *Script Mode* script
The below script contains both training and inference functionality and can run both in SageMaker Training hardware or locally (desktop, SageMaker notebook, on prem, etc). Detailed guidance here https://sagemaker.readthedocs.io/en/stable/using_sklearn.html#preparing-the-scikit-learn-training-script

In [35]:
%%writefile script.py

import argparse
import joblib
import os
import json

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier # Changed to Classifier
from sklearn.metrics import f1_score # Import f1_score

# inference functions ---------------
def model_fn(model_dir):
    clf = joblib.load(os.path.join(model_dir, "model.joblib"))
    return clf

if __name__ == "__main__":
    print("extracting arguments")
    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script.
    parser.add_argument("--n-estimators", type=int, default=100)
    parser.add_argument("--min-samples-leaf", type=int, default=3)
    # Add class_weight hyperparameter
    parser.add_argument("--class-weight", type=str, default=None) # Can be 'balanced' or None
    # Add other hyperparameters like max_depth if needed
    # parser.add_argument("--max-depth", type=int, default=None)

    # Data, model, and output directories
    parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))
    # Update default filenames to match the saved files
    parser.add_argument("--train-file", type=str, default="combined_data_train.csv")
    parser.add_argument("--test-file", type=str, default="combined_data_test.csv")
    parser.add_argument(
        "--features", type=str, required=True
    )  # Make features required
    parser.add_argument(
        "--target", type=str, required=True
    )  # Make target required

    args, _ = parser.parse_known_args()

    print(f"Received features: {args.features}")
    print(f"Received target: {args.target}")
    print(f"Received class_weight: {args.class_weight}")
    feature_columns = args.features.split() # Split the space-separated string
    target_column = args.target

    print("reading data")
    train_df = pd.read_csv(os.path.join(args.train, args.train_file))
    test_df = pd.read_csv(os.path.join(args.test, args.test_file))

    print("building training and testing datasets")
    X_train = train_df[feature_columns]
    X_test = test_df[feature_columns]
    y_train = train_df[target_column]
    y_test = test_df[target_column]

    # Handle class_weight argument potentially being the string 'None'
    class_weight_param = args.class_weight if args.class_weight != 'None' else None
    weights = {0: 1, 1: 100}
    # train
    print("training model")
    # Use RandomForestClassifier
    model = RandomForestClassifier(
        n_estimators=args.n_estimators,
        min_samples_leaf=args.min_samples_leaf,
        random_state=42, # Add random state for reproducibility
        n_jobs=-1,
        class_weight=weights # Use the processed class_weight
    )

    model.fit(X_train, y_train)

    # Evaluate model
    print("validating model")
    y_pred = model.predict(X_test)
    # Calculate F1 score (use average='binary' if it's binary classification, or 'weighted'/'macro' for multi-class)
    f1 = f1_score(y_test, y_pred, average='binary') # Assuming binary classification for IS_WARN
    print(f"F1-score: {f1:.4f}") # Print F1 score for metric tracking

    # persist model
    path = os.path.join(args.model_dir, "model.joblib")
    joblib.dump(model, path)
    print("model persisted at " + path)

Overwriting script.py


## SageMaker Training

### Launching a training job with the Python SDK

In [37]:
# We use the Estimator from the SageMaker Python SDK
from sagemaker.sklearn.estimator import SKLearn

# Use a supported Scikit-learn version, e.g., 1.0-1 or 1.2-1
FRAMEWORK_VERSION = "1.2-1" 

# Define the metric name and regex to capture F1 score
metric_definitions = [{'Name': 'f1-score', 'Regex': 'F1-score: ([0-9\.]+)'}]

sklearn_estimator = SKLearn(
    entry_point="script.py",
    role=get_execution_role(),
    instance_count=1,
    instance_type="ml.m5.large", # Choose an appropriate instance type
    framework_version=FRAMEWORK_VERSION,
    base_job_name="sklearn-clf-combined",
    metric_definitions=metric_definitions,
    hyperparameters={
        "n-estimators": 100,
        "min-samples-leaf": 3,
        # Pass features and target dynamically
        "features": features_string,
        "target": target_col,
        # Add class_weight to handle imbalance
        "class-weight": "balanced",
        # Pass train/test filenames if they differ from script defaults
        # "train-file": train_file,
        # "test-file": test_file
    },
    py_version="py3" # Ensure python version is specified
)

In [38]:
# launch training job, with asynchronous call
sklearn_estimator.fit({"train": trainpath, "test": testpath}, wait=True)

2025-04-23 10:57:41 Starting - Starting the training job...
2025-04-23 10:58:03 Starting - Preparing the instances for training...
2025-04-23 10:58:28 Downloading - Downloading input data...
2025-04-23 10:59:13 Downloading - Downloading the training image......
2025-04-23 11:00:09 Training - Training image download completed. Training in progress..[34m2025-04-23 11:00:13,565 sagemaker-containers INFO     Imported framework sagemaker_sklearn_container.training[0m
[34m2025-04-23 11:00:13,569 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2025-04-23 11:00:13,571 sagemaker-training-toolkit INFO     No Neurons detected (normal if no neurons installed)[0m
[34m2025-04-23 11:00:13,591 sagemaker_sklearn_container.training INFO     Invoking user training script.[0m
[34m2025-04-23 11:00:13,847 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2025-04-23 11:00:13,850 sagemaker-training-toolkit INFO     No

In [None]:
# first compress the code and send to S3

source = "source.tar.gz"
project = "scikitlearn-train-from-boto3"

tar = tarfile.open(source, "w:gz")
tar.add("script.py")
tar.close()

s3 = boto3.client("s3")
s3.upload_file(source, bucket, project + "/" + source)

When using `boto3` to launch a training job we must explicitly point to a docker image.

In [None]:
from sagemaker import image_uris


training_image = image_uris.retrieve(
    framework="sklearn",
    region=region,
    version=FRAMEWORK_VERSION,
    py_version="py3",
    instance_type="ml.c5.xlarge",
)
print(training_image)

In [None]:
# launch training job

response = sm_boto3.create_training_job(
    TrainingJobName="sklearn-boto3-" + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S"),
    HyperParameters={
        "n_estimators": "100", # Example hyperparameter
        "min_samples_leaf": "3", # Example hyperparameter
        # "max-depth": "10", # Example hyperparameter
        "class-weight": "balanced", # Add class weight for imbalance
        "sagemaker_program": "script.py",
        # Use the dynamic features_string variable
        "features": features_string,
        "target": target_col,
        "sagemaker_submit_directory": "s3://" + bucket + "/" + project + "/" + source,
        # Add train/test file names if they differ from script defaults
        # "train-file": train_file,
        # "test-file": test_file,
    },
    AlgorithmSpecification={
        "TrainingImage": training_image,
        "TrainingInputMode": "File",
        # Update metric definitions for F1 score if using boto3 for tuning/evaluation
        "MetricDefinitions": [
            {'Name': 'f1-score', 'Regex': 'F1-score: ([0-9\.]+)'}
        ],
    },
    RoleArn=get_execution_role(),
    InputDataConfig=[
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": trainpath,
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
        },
        {
            "ChannelName": "test",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": testpath,
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
        },
    ],
    OutputDataConfig={"S3OutputPath": "s3://" + bucket + "/sagemaker-sklearn-artifact/"},
    ResourceConfig={"InstanceType": "ml.m5.large", "InstanceCount": 1, "VolumeSizeInGB": 10}, # Match instance type
    StoppingCondition={"MaxRuntimeInSeconds": 86400},
    EnableNetworkIsolation=False,
)

print(response)

### Launching a tuning job with the Python SDK

In [39]:
# we use the Hyperparameter Tuner
from sagemaker.tuner import IntegerParameter, HyperparameterTuner, CategoricalParameter

# Define exploration boundaries
hyperparameter_ranges = {
    "n-estimators": IntegerParameter(200, 1000),
    "min-samples-leaf": IntegerParameter(2, 10),
    "max_depth": IntegerParameter(10, 80)
}

# Define the objective metric based on the metric definitions
objective_metric_name = "f1-score"
objective_type = "Maximize" # Maximize F1 score
metric_definitions = [{'Name': 'f1-score', 'Regex': 'F1-score: ([0-9\.]+)'}]

# create Optimizer
# Note: The base estimator 'sklearn_estimator' already has class-weight='balanced'.
# If 'class-weight' is NOT included in hyperparameter_ranges, all tuning jobs will use 'balanced'.
# If 'class-weight' IS included in hyperparameter_ranges, the tuner will explore both 'balanced' and None.
tuner = HyperparameterTuner(
    estimator=sklearn_estimator, # Estimator already configured with class_weight='balanced'
    hyperparameter_ranges=hyperparameter_ranges,
    base_tuning_job_name="sklearn-clf-tuner",
    objective_type=objective_type,
    objective_metric_name=objective_metric_name,
    metric_definitions=metric_definitions,
    max_jobs=50, # Increase max_jobs for better tuning
    max_parallel_jobs=10, # Increase parallel jobs if budget allows
    strategy='Bayesian', # Or 'Random'
)

In [40]:
tuner.fit({"train": trainpath, "test": testpath}, wait=True)

.........................................................................................................!


In [41]:
# get tuner results in a df
results = tuner.analytics().dataframe()
while results.empty:
    time.sleep(1)
    results = tuner.analytics().dataframe()
# Sort by F1 score descending
results = results.sort_values('FinalObjectiveValue', ascending=False)
results.head()

Unnamed: 0,max_depth,min-samples-leaf,n-estimators,TrainingJobName,TrainingJobStatus,FinalObjectiveValue,TrainingStartTime,TrainingEndTime,TrainingElapsedTimeSeconds
9,39.0,7.0,201.0,sklearn-clf-tuner-250423-1105-041-7a63201a,Completed,0.4674,2025-04-23 11:12:08+00:00,2025-04-23 11:13:02+00:00,54.0
11,67.0,7.0,202.0,sklearn-clf-tuner-250423-1105-039-712cd60b,Completed,0.4649,2025-04-23 11:11:35+00:00,2025-04-23 11:12:24+00:00,49.0
29,52.0,7.0,207.0,sklearn-clf-tuner-250423-1105-021-88dd51ff,Completed,0.4649,2025-04-23 11:09:45+00:00,2025-04-23 11:10:29+00:00,44.0
10,33.0,7.0,206.0,sklearn-clf-tuner-250423-1105-040-064d69be,Completed,0.4624,2025-04-23 11:11:43+00:00,2025-04-23 11:12:32+00:00,49.0
19,32.0,7.0,215.0,sklearn-clf-tuner-250423-1105-031-51c6971e,Completed,0.4599,2025-04-23 11:10:43+00:00,2025-04-23 11:11:38+00:00,55.0


## Deploy to a real-time endpoint

### Deploy with Python SDK

An `Estimator` could be deployed directly after training, with an `Estimator.deploy()` but here we showcase the more extensive process of creating a model from s3 artifacts, that could be used to deploy a model that was trained in a different session or even out of SageMaker.

In [42]:
# Deploy the best model found by the tuner
# sklearn_estimator.latest_training_job.wait(logs="None") # This applies to the base estimator, not the tuner
tuner.best_estimator().deploy(
    initial_instance_count=1,
    instance_type="ml.m5.large" # Use an appropriate instance type for inference
)

# Get the predictor from the tuner's deployment
predictor = tuner.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.large",
    # You might need to specify the serializer/deserializer if defaults aren't right
    # serializer=sagemaker.serializers.CSVSerializer(),
    # deserializer=sagemaker.deserializers.JSONDeserializer()
)
print(f"Endpoint Name: {predictor.endpoint_name}")


2025-04-23 11:13:04 Starting - Found matching resource for reuse
2025-04-23 11:13:04 Downloading - Downloading the training image
2025-04-23 11:13:04 Training - Training image download completed. Training in progress.
2025-04-23 11:13:04 Uploading - Uploading generated training model
2025-04-23 11:13:04 Completed - Resource retained for reuse


---------!
2025-04-23 11:13:04 Starting - Found matching resource for reuse
2025-04-23 11:13:04 Downloading - Downloading the training image
2025-04-23 11:13:04 Training - Training image download completed. Training in progress.
2025-04-23 11:13:04 Uploading - Uploading generated training model
2025-04-23 11:13:04 Completed - Resource released due to keep alive period expiry


---------!Endpoint Name: sklearn-clf-tuner-250423-1105-041-7a63201a


### Invoke with the Python SDK

In [None]:
# Use the test_df and feature_names defined in cell 55792196
# Ensure the predictor object is correctly assigned (from tuner.deploy or model.deploy)

# Select only the feature columns from the test set
test_features = test_df[feature_names]

# the SKLearnPredictor does the serialization from pandas for us by default
predictions = predictor.predict(test_features)
print(predictions)

In [None]:
runtime = boto3.client("sagemaker-runtime")

#### Option 1: `csv` serialization

In [None]:
# csv serialization
# Use the test_df and feature_names defined in cell 55792196
# Ensure the predictor object is correctly assigned
response = runtime.invoke_endpoint(
    EndpointName=predictor.endpoint_name, # Use endpoint_name attribute
    Body=test_df[feature_names].to_csv(header=False, index=False).encode("utf-8"),
    ContentType="text/csv",
)

print(response["Body"].read().decode()) # Decode the response

#### Option 2: `npy` serialization

In [None]:
# npy serialization
from io import BytesIO

# Use the test_df and feature_names defined in cell 55792196
# Ensure the predictor object is correctly assigned

# Serialise numpy ndarray as bytes
buffer = BytesIO()
np.save(buffer, test_df[feature_names].values)
buffer.seek(0) # Reset buffer position

response = runtime.invoke_endpoint(
    EndpointName=predictor.endpoint_name, # Use endpoint_name attribute
    Body=buffer.getvalue(), 
    ContentType="application/x-npy"
)

print(response["Body"].read().decode()) # Decode the response

## Don't forget to delete the endpoint !

In [None]:
# Ensure the predictor object is correctly assigned
print(f"Deleting endpoint: {predictor.endpoint_name}")
predictor.delete_endpoint()