# Train an ML model using SageMaker
Churn prediction provides insight and opportunities to understand why players are leaving and what sort of changes can be made to the game to try and retain such players. For instance, a machine learning (ML) driven churn prediction system could trigger specific personalized offers or targeted promotions in-game to perhaps retain players at the right time before they decide to leave the game. In this notebook, we'll train a classification model using XGBoost using the dataset prepared in the [previous](02-preprocess.ipynb) notebook. The following diagram depicts the training process within the context of MLOps:

![training notebook](img/sagemaker-mlops-model-train-diagram.jpg)

In [2]:
%pip install scikit-learn s3fs==0.4.2 sagemaker xgboost mlflow==2.13.2 sagemaker-mlflow==0.1.0

Note: you may need to restart the kernel to use updated packages.


Import relevant libraries

In [3]:
import os
import boto3
import sagemaker
import mlflow
from time import gmtime, strftime, sleep
import xgboost as xgb
import tarfile
import json
from botocore.exceptions import ClientError

from sagemaker.inputs import TrainingInput



sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


# Define Helper Functions

In [4]:
def download_from_s3(s3_client, local_file_path, bucket_name, s3_file_path):
    try:
        # Download the file
        s3_client.download_file(bucket_name, s3_file_path, local_file_path)
        print(f"File downloaded successfully to {local_file_path}")
        return True
    except ClientError as e:
        if e.response['Error']['Code'] == "404":
            print("The object does not exist.")
        else:
            print(f"An error occurred: {e}")
        return False
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return False

def upload_to_s3(s3_client, local_file_path, bucket_name, s3_file_path=None):
    # If S3 file path is not specified, use the basename of the local file
    if s3_file_path is None:
        s3_file_path = os.path.basename(local_file_path)

    try:
        # Upload the file
        s3_client.upload_file(local_file_path, bucket_name, s3_file_path)
        print(f"File {local_file_path} uploaded successfully to {bucket_name}/{s3_file_path}")
        return True
    except ClientError as e:
        print(f"ClientError: {e}")
        return False
    except FileNotFoundError:
        print(f"The file {local_file_path} was not found")
        return False
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return False
        
def write_params(s3_client, step_name, params, notebook_param_s3_bucket_prefix):
    local_file_path = f"{step_name}.json"
    with open(local_file_path, "w") as f:
        f.write(json.dumps(params))
    base_local_file_path = os.path.basename(local_file_path)
    bucket_name = notebook_param_s3_bucket_prefix.split("/")[2] # Format: s3://<bucket_name>/..
    s3_file_path = os.path.join("/".join(notebook_param_s3_bucket_prefix.split("/")[3:]), base_local_file_path)
    upload_to_s3(s3_client, local_file_path, bucket_name, s3_file_path)
    
def read_params(s3_client, notebook_param_s3_bucket_prefix, step_name):
    local_file_path = f"{step_name}.json"
    base_local_file_path = os.path.basename(local_file_path)
    bucket_name = notebook_param_s3_bucket_prefix.split("/")[2] # Format: s3://<bucket_name>/..
    s3_file_path = os.path.join("/".join(notebook_param_s3_bucket_prefix.split("/")[3:]),  base_local_file_path)
    downloaded = download_from_s3(s3_client, local_file_path, bucket_name, s3_file_path)
    with open(local_file_path, "r") as f:
        data = f.read()
        params = json.loads(data)
    return params


In [5]:
# helper function to load XGBoost model into xgboost.Booster
def load_model(model_data_s3_uri):
    model_file = "./xgboost-model.tar.gz"
    bucket, key = model_data_s3_uri.replace("s3://", "").split("/", 1)
    boto3.client("s3").download_file(bucket, key, model_file)
    
    with tarfile.open(model_file, "r:gz") as t:
        t.extractall(path=".")
    
    # Load model
    model = xgb.Booster()
    model.load_model("xgboost-model")

    return model

In [6]:
def get_xgb_estimator(
    session,
    instance_type,
    output_s3_url,
    base_job_name,
):
    # Instantiate an XGBoost estimator object
    estimator = sagemaker.estimator.Estimator(
        image_uri=XGBOOST_IMAGE_URI,
        role=sagemaker.get_execution_role(), 
        instance_type=instance_type,
        instance_count=1,
        output_path=output_s3_url,
        sagemaker_session=session,
        base_job_name=base_job_name
    )
    
    # Define algorithm hyperparameters
    estimator.set_hyperparameters(
        num_round=100, # the number of rounds to run the training
        max_depth=3, # maximum depth of a tree
        eta=0.5, # step size shrinkage used in updates to prevent overfitting
        alpha=2.5, # L1 regularization term on weights
        objective="binary:logistic",
        eval_metric="auc", # evaluation metrics for validation data
        subsample=0.8, # subsample ratio of the training instance
        colsample_bytree=0.8, # subsample ratio of columns when constructing each tree
        min_child_weight=3, # minimum sum of instance weight (hessian) needed in a child
        early_stopping_rounds=10, # the model trains until the validation score stops improving
        verbosity=1, # verbosity of printing messages
    )

    return estimator

# Initialize Variables
Similar to the previous notebook, the following variables are defined in this cell specifically used throughout this notebook. In addition to the hardcoded values, these variables can be passed into the notebook as parameters when the notebook is scheduled to run remotely, such as a SageMaker Pipeline job, or a CICD pipeline through SageMaker Project. We'll dive into how to pass parameters into this notebook in the next lab. Please refer to [this](https://docs.aws.amazon.com/sagemaker/latest/dg/notebook-auto-run-troubleshoot-override.html) documentation for more information notebook parameterization.

Similar to `02-preprocess.ipynb` notebook, the following variables can be obtained via SageMaker Studio launcher. Instructions and screenshots are provided in the notebook to guide you through it if you need additional assistance. 

In [7]:
region = "us-east-1"
os.environ["AWS_DEFAULT_REGION"] = region
boto_session = boto3.Session(region_name=region)
sess = sagemaker.Session(boto_session=boto_session)
bucket_name = sess.default_bucket()
bucket_prefix = "player-churn/xgboost"
notebook_param_s3_bucket_prefix=f"s3://{bucket_name}/{bucket_prefix}/params"
experiment_name = "player-churn-model-experiment"
mlflow_tracking_server_arn = "arn:aws:sagemaker:us-east-1:858451092442:mlflow-tracking-server/mlflow-d-muwzxeno4zza" # Provide a valid mlflow tracking server ARN. You can find the value in the output from 00-start-here.ipynb
run_id = None
train_instance_type = "ml.m5.xlarge"

In [8]:
assert len(mlflow_tracking_server_arn) > 0

In [9]:
# Define the output S3 location for storing the model artifacts.
output_s3_url = f"s3://{bucket_name}/{bucket_prefix}/output"

In [10]:
XGBOOST_IMAGE_URI = sagemaker.image_uris.retrieve(
            "xgboost",
            region=boto3.Session().region_name,
            version="1.7-1"
)

Retrieves step variables from previous notebooks.

In [11]:
preprocess_step_name = "02-preprocess"
s3_client = boto3.client("s3", region_name=region)
preprocess_step_params = read_params(s3_client, notebook_param_s3_bucket_prefix, preprocess_step_name)

File downloaded successfully to 02-preprocess.json


In [12]:
# use sagemaker.Session() in the estimator to a training job immediately
estimator = get_xgb_estimator(
    session=sagemaker.Session(),
    instance_type=train_instance_type,
    output_s3_url=output_s3_url,
    base_job_name=f"player-churn-xgboost-train",
)

In [13]:
# Set up the training inputs using the outputs from preprocess function
training_inputs = {
    "train": TrainingInput(
        preprocess_step_params['train_data'],
        content_type="text/csv",
    ),
    "validation": TrainingInput(
        preprocess_step_params['validation_data'],
        content_type="text/csv",
    ),
}

The following cell integrates MLFlow tracking server with this model training job.

In [14]:
# Run the training job
suffix = strftime('%d-%H-%M-%S', gmtime())
mlflow.set_tracking_uri(mlflow_tracking_server_arn)
mlflow.set_experiment(experiment_name)

with mlflow.start_run(
    run_name=f"training-{strftime('%d-%H-%M-%S', gmtime())}",
    description="training in the notebook with a training job") as run:
    mlflow.log_params(estimator.hyperparameters())
    
    estimator.fit(training_inputs)

    mlflow.log_param("training job name", estimator.latest_training_job.name)
    mlflow.log_metrics({i['metric_name'].replace(':', '_'):i['value'] for i in estimator.training_job_analytics.dataframe().iloc})
    mlflow.xgboost.log_model(load_model(estimator.model_data), artifact_path="xgboost")

2025-01-16 11:28:45 Starting - Starting the training job...
2025-01-16 11:29:10 Starting - Preparing the instances for training...
2025-01-16 11:29:32 Downloading - Downloading input data...
2025-01-16 11:29:57 Downloading - Downloading the training image...
2025-01-16 11:30:43 Training - Training image download completed. Training in progress..[34m[2025-01-16 11:30:48.636 ip-10-2-209-156.ec2.internal:7 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34m[2025-01-16 11:30:48.661 ip-10-2-209-156.ec2.internal:7 INFO profiler_config_parser.py:111] User has disabled profiler.[0m
[34m[2025-01-16:11:30:49:INFO] Imported framework sagemaker_xgboost_container.training[0m
[34m[2025-01-16:11:30:49:INFO] Failed to parse hyperparameter eval_metric value auc to Json.[0m
[34mReturning the value itself[0m
[34m[2025-01-16:11:30:49:INFO] Failed to parse hyperparameter objective value binary:logistic to Json.[0m
[34mReturning the value itself[0m
[34m[2025-01-16:11:30:49:INFO] No 



# Store Parameters
In the following cell, we'll store the relevant parameters to S3 bucket so that they could be passed to other steps in the subsequent steps. 

In [15]:
params = {}
params['model_s3_path'] = estimator.model_data

In [16]:
step_name = "03-train"
write_params(s3_client, step_name, params, notebook_param_s3_bucket_prefix)

File 03-train.json uploaded successfully to sagemaker-us-east-1-858451092442/player-churn/xgboost/params/03-train.json


In [17]:
mlflow.end_run()