# Preprocessing Data for ML using SageMaker
When working with a machine learning project, it is typical to perform pre and post processing to build the datasets specific to the model training requirements. Often times, data scientists develop preprocessing scripts in a notebook to prepare the data for training a machine learning model. For the context of this workshop, we are working with a synthetic video game dataset to train a classifier to predict the probability of player churn. The data preprocessing in this notebook focuses on curating the data structure to train an [XGBoost classifier](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html). 


![preprocessing notebook](img/sagemaker-mlops-notebook-preprocessing.jpg)

In [26]:
%pip install scikit-learn "pandas>=2.0.0" s3fs==0.4.2 sagemaker xgboost mlflow==2.13.2 sagemaker-mlflow==0.1.0

Note: you may need to restart the kernel to use updated packages.


Import all the essential packages to be used in the notebook

In [27]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [42]:
import boto3
from sagemaker.session import Session
from sagemaker.feature_store.feature_store import FeatureStore
from sagemaker.feature_store.feature_group import FeatureGroup
import numpy as np
import sagemaker
import mlflow
import pandas as pd
from time import gmtime, strftime
import os
import json
from botocore.exceptions import ClientError

## Helper Functions

In [43]:
def download_from_s3(s3_client, local_file_path, bucket_name, s3_file_path):
    try:
        # Download the file
        s3_client.download_file(bucket_name, s3_file_path, local_file_path)
        print(f"File downloaded successfully to {local_file_path}")
        return True
    except ClientError as e:
        if e.response['Error']['Code'] == "404":
            print("The object does not exist.")
        else:
            print(f"An error occurred: {e}")
        return False
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return False

def upload_to_s3(s3_client, local_file_path, bucket_name, s3_file_path=None):
    # If S3 file path is not specified, use the basename of the local file
    if s3_file_path is None:
        s3_file_path = os.path.basename(local_file_path)

    try:
        # Upload the file
        s3_client.upload_file(local_file_path, bucket_name, s3_file_path)
        print(f"File {local_file_path} uploaded successfully to {bucket_name}/{s3_file_path}")
        return True
    except ClientError as e:
        print(f"ClientError: {e}")
        return False
    except FileNotFoundError:
        print(f"The file {local_file_path} was not found")
        return False
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return False
        
def write_params(s3_client, step_name, params, notebook_param_s3_bucket_prefix):
    local_file_path = f"{step_name}.json"
    with open(local_file_path, "w") as f:
        f.write(json.dumps(params))
    base_local_file_path = os.path.basename(local_file_path)
    bucket_name = notebook_param_s3_bucket_prefix.split("/")[2] # Format: s3://<bucket_name>/..
    s3_file_path = os.path.join("/".join(notebook_param_s3_bucket_prefix.split("/")[3:]), base_local_file_path)
    upload_to_s3(s3_client, local_file_path, bucket_name, s3_file_path)
    
def read_params(s3_client, notebook_param_s3_bucket_prefix, step_name):
    local_file_path = f"{step_name}.json"
    base_local_file_path = os.path.basename(local_file_path)
    bucket_name = notebook_param_s3_bucket_prefix.split("/")[2] # Format: s3://<bucket_name>/..
    s3_file_path = os.path.join("/".join(notebook_param_s3_bucket_prefix.split("/")[3:]),  base_local_file_path)
    downloaded = download_from_s3(s3_client, local_file_path, bucket_name, s3_file_path)
    with open(local_file_path, "r") as f:
        data = f.read()
        params = json.loads(data)
    return params


Define constants for the preprocessing job.

In [44]:
target_col = "player_churn"

# Initializes Variables / Parameters
The following variables are defined in this cell specifically used throughout the notebook. In addition to the hardcoded values, these variables can also be passed into the notebook as parameters when the notebook is scheduled to run remotely, such as a SageMaker Pipeline job. We'll dive into how to pass parameters into this notebook in the next lab. The variables defined in the following cell can be updated with different values when scheduled via CICD pipeline through SageMaker Project. Please refer to [this](https://docs.aws.amazon.com/sagemaker/latest/dg/notebook-auto-run-troubleshoot-override.html) documentation for more information notebook parameterization.

Specifically, there are 2 variables that we need to obtain in order to run this notebook successfully in SageMaker Studio. 

* feature_group_name
* mlflow_tracking_server_arn


The following section describes how to obtain these variables through SageMaker Studio Launcher.

### Feature Store Group
If you have completed the feature-store ingestion lab (lab1), there should be a SageMaker feature group created. To obtain the feature group name, navigate to SageMaker Studio launcher, then select `Data` dropdown from the left pane, select `Feature Store` and find the appropriate Feature Store Group name on the right pane, as depicted in the following diagram:

![studio feature group console](img/sagemaker-studio-feature-group-console.jpg)

### MLFlow Tracking Server¶
Machine learning is an iterative process that requires experimenting with various combinations of data, algorithms, and parameters, while observing their impact on model accuracy. The iterative nature of ML experimentation results in numerous model training runs and versions, making it challenging to track the best performing models and their configurations. The complexity of managing and comparing iterative training runs increases with generative artificial intelligence (generative AI), where experimentation involves not only fine-tuning models but also exploring creative and diverse outputs. Researchers must adjust hyperparameters, select suitable model architectures, and curate diverse datasets to optimize both the quality and creativity of the generated content. Evaluating generative AI models requires both quantitative and qualitative metrics, adding another layer of complexity to the experimentation process.

Throughout the workshop, you'll integrate MLflow with Amazon SageMaker to track, organize, view, analyze, and compare iterative ML experimentation to gain comparative insights and register and deploy your best performing models.

Please refer to this documentation to learn more about SageMaker MLFlow integration.


If you completed the `00-start-here.ipynb` prior to this notebook, there should be an MLFlow tracking server provisioned for you to use. To get the ARN for the tracking server, go to the SageMaker Studio Launcher, select `MLFlow` from the Application list on the top left corner of the launcher, select the appropriate tracking server, and find the ARN under the `Configuration` section, as depicted in the following diagram:

![mlflow tracking](img/sagemaker-studio-launcher-mlflow-console.jpg)

In [45]:
region = "us-east-1"
os.environ["AWS_DEFAULT_REGION"] = region
boto_session = boto3.Session(region_name=region)

# Sagemaker session
sess = sagemaker.Session(boto_session=boto_session)
bucket_name = sess.default_bucket()
sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)
bucket_prefix = "player-churn/xgboost"
notebook_param_s3_bucket_prefix=f"s3://{bucket_name}/{bucket_prefix}/params"
experiment_name = "player-churn-model-experiment"
feature_group_name = "FG-player-churn-feature-engineering-312d5f19" # Replace the feature group name created in the previous lab
mlflow_tracking_server_arn = "arn:aws:sagemaker:us-east-1:141310854322:mlflow-tracking-server/mlflow-d-ms17jq3aatnw" # Provide a valid mlflow tracking server ARN. You can find the value in the output from 00-start-here.ipynb
run_id = None

In [46]:
assert len(feature_group_name) > 0
assert len(mlflow_tracking_server_arn) > 0

The following cell integrates MLFlow tracking server with this preprocessing job.

In [47]:
suffix = strftime('%d-%H-%M-%S', gmtime())
mlflow.set_tracking_uri(mlflow_tracking_server_arn)
experiment = mlflow.set_experiment(experiment_name=experiment_name)
run = mlflow.start_run(run_id=run_id) if run_id else mlflow.start_run(run_name=f"processing-{suffix}", nested=True)

In [48]:
# Create FeatureStore session object
feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime,
)

feature_store = FeatureStore(sagemaker_session=feature_store_session)
dataset_feature_group = FeatureGroup(feature_group_name)

query_output_s3_path = f's3://{bucket_name}/data/query_results/'
# Create dataset builder to retrieve the most recent version of each record
builder = feature_store.create_dataset(
    base=dataset_feature_group,
    # included_feature_names=inlcuded_feature_names,
    output_path=query_output_s3_path,
).with_number_of_recent_records_by_record_identifier(1)

player_churn_fs_df, query = builder.to_dataframe()

In [50]:
player_churn_fs_df

Unnamed: 0,player_id,player_lifetime,session_count,player_churn,begin_session_time_of_day_mean_last_day_1,end_session_time_of_day_mean_last_day_1,begin_session_time_of_day_mean_last_day_2,end_session_time_of_day_mean_last_day_2,begin_session_time_of_day_mean_last_day_3,end_session_time_of_day_mean_last_day_3,...,cohort_id_2024_09_11,cohort_id_2024_09_15,cohort_id_2024_09_12,cohort_id_2024_09_13,cohort_id_2024_09_16,cohort_id_2024_09_14,cohort_id_2024_09_08,cohort_id_2024_09_09,cohort_id_2024_09_17,event_time
0,bb8d901a78a645539ac4fb5cc3629225,7233.0,1,0,84693.0,5527.0,0.0,0.0,0.0,0.0,...,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024-11-20T20:58:24.285Z
1,6ae3e79c8b9c44b5aba3d38eddde7406,7270.0,1,0,81743.0,2614.0,0.0,0.0,0.0,0.0,...,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024-11-20T20:58:24.285Z
2,5f432744a08f4c18ae44f450f7f4b709,199911.0,2,0,72372.0,79530.0,0.0,0.0,52418.0,59618.0,...,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,2024-11-20T20:58:24.285Z
3,f4f9b569a31a4afbb407bfcd1658aeb3,7172.0,1,0,12918.0,20090.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,2024-11-20T20:58:24.285Z
4,9b67152c4b444453a655edff6b0c476b,7242.0,1,0,61431.0,68673.0,0.0,0.0,0.0,0.0,...,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024-11-20T20:58:24.285Z
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1635,9ce6704e66354f8aac3574d72ae898b5,286151.0,3,0,85864.0,6562.0,0.0,0.0,68910.0,76217.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,2024-11-20T20:58:24.285Z
1636,4beb659a69944b82a431f35eb7d90547,86810.0,2,0,69038.0,36410.0,75765.0,0.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,2024-11-20T20:58:24.285Z
1637,8e4b2b87534d41fd92a7fca7c46e2f36,184561.0,3,1,72425.0,79630.0,70880.0,78135.0,67869.0,74976.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024-11-20T20:58:24.285Z
1638,03f5c1fde23c444cbad4f6505f2b86ae,7250.0,1,0,56384.0,63635.0,0.0,0.0,0.0,0.0,...,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2024-11-20T20:58:24.285Z


# Data Processing and Splitting Train Data 
In the previous step, we extracted the required dataset from SageMaker Feature Store for training a model. For our use case, we'll leverage a powerful and efficient ML algorithm called  [XGBoost](https://xgboost.readthedocs.io/en/stable/) to train a classifier to predict player churn. The data structure for XGBoost can be found [here](https://xgboost.readthedocs.io/en/stable/tutorials/input_format.html#id1). In the following section, we'll format the dataset to meet the requirements for XGBoost model.

In [51]:
# Removing the columns from Feature Store not relevant to training a model.
df_model_data = player_churn_fs_df.drop( columns = ["player_id", "event_time"])

# Reorganize the data structure to make the label column as the first column in the dataset.
df_model_data = pd.concat([df_model_data["player_churn"], df_model_data.drop(columns=["player_churn"])], axis=1)

In [52]:
# Shuffle and splitting dataset
train_data, validation_data, test_data = np.split(
    df_model_data.sample(frac=1, random_state=1729),
    [int(0.7 * len(df_model_data)), int(0.9 * len(df_model_data))],
)

print(f"Data split > train:{train_data.shape} | validation:{validation_data.shape} | test:{test_data.shape}")

Data split > train:(1148, 28) | validation:(328, 28) | test:(164, 28)


  return bound(*args, **kwds)


In [53]:
train_data_output_s3_path = f"s3://{bucket_name}/{bucket_prefix}/train/train.csv"
validation_data_output_s3_path = f"s3://{bucket_name}/{bucket_prefix}/validation/validation.csv"
test_x_data_output_s3_path = f"s3://{bucket_name}/{bucket_prefix}/test/test_x.csv"
test_y_data_output_s3_path = f"s3://{bucket_name}/{bucket_prefix}/test/test_y.csv"
baseline_data_output_s3_path = f"s3://{bucket_name}/{bucket_prefix}/baseline/baseline.csv"

train_data.to_csv(train_data_output_s3_path, index=False, header=False)
validation_data.to_csv(validation_data_output_s3_path, index=False, header=False)
test_data[target_col].to_csv(test_y_data_output_s3_path, index=False, header=False)
test_data.drop([target_col], axis=1).to_csv(test_x_data_output_s3_path, index=False, header=False)

# We need the baseline dataset for model monitoring
df_model_data.drop([target_col], axis=1).to_csv(baseline_data_output_s3_path, index=False, header=False)

# Store Parameters
In the following cell, we'll store the relevant parameters to S3 bucket so that they could be passed to other steps in the subsequent steps. 

In [54]:
params = {}
params['train_data'] = train_data_output_s3_path
params['validation_data'] = validation_data_output_s3_path
params['test_x_data'] = test_x_data_output_s3_path
params['test_y_data'] = test_y_data_output_s3_path
params['baseline_data'] = baseline_data_output_s3_path
params['experiment_name'] = experiment.name

In [55]:
s3_client = boto3.client("s3", region_name=region)
step_name = "02-preprocess"
write_params(s3_client, step_name, params, notebook_param_s3_bucket_prefix)

File 02-preprocess.json uploaded successfully to sagemaker-us-east-1-141310854322/player-churn/xgboost/params/02-preprocess.json


In [56]:
mlflow.end_run()