# AWS Deployment

Here we will take everything above and modularize it to automate training in Sagemaker.

In [1]:
%load_ext autoreload
%autoreload 2
%load_ext dotenv
%dotenv

import os
import sys
import logging
import boto3
import sagemaker
from pathlib import Path

CODE_FOLDER = Path("code")
MLFLOW_FOLDER = Path("mlruns")
sys.path.extend([f"./{CODE_FOLDER}"])

DATA_FILEPATH = "../data/raw_data_2014-2017.csv"

logging.getLogger("sagemaker.config").setLevel(logging.ERROR)

sagemaker.config INFO - Not applying SDK defaults from location: C:\ProgramData\sagemaker\sagemaker\config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: C:\Users\RaviB\AppData\Local\sagemaker\sagemaker\config.yaml


Loading evironment variables.

In [2]:
bucket = os.environ["BUCKET"]
role = os.environ["ROLE"]

mlflow_experiment_name = os.environ["MLFLOW_EXPERIMENT_NAME"]

S3_LOCATION = f"s3://{bucket}/price_history"

sagemaker_session = sagemaker.session.Session()
sagemaker_client = boto3.client("sagemaker")
iam_client = boto3.client("iam")
region = boto3.Session().region_name

## Preprocessing

In [3]:
(CODE_FOLDER / "processing").mkdir(parents=True, exist_ok=True)
sys.path.extend([f"./{CODE_FOLDER}/processing"])

In [26]:
%%writefile {CODE_FOLDER}/processing/script.py
# | filename: script.py
# | code-line-numbers: true

import ast
import pickle
from pathlib import Path
import numpy as np
import pandas as pd
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer

def _read_data_from_input_csv_files(base_directory):
    """Read the data from the input CSV files.

    This function reads the input data from a CSV file 
    and does some simple cleaning.
    """
    input_directory = Path(base_directory) / "input"
    files = list(input_directory.glob("*.csv"))

    if len(files) == 0:
        message = f"The are no CSV files in {input_directory.as_posix()}/"
        raise ValueError(message)

    raw_data = [pd.read_csv(file) for file in files]

    raw_train_test_data = pd.concat(raw_data)
    raw_train_test_data = raw_train_test_data.drop(columns='Unnamed: 0')

    return raw_train_test_data

def _get_early_discount_target(df):
    """Filter for games that actually went on sale.

    This function uses a SaleType columns to only consider games that actually went on sale
    Then drops that column and creates a new binary one for games that went on sale on release or not
    """
    discounted_games = df[df['SaleType'] == 'went on sale']
    discounted_games = discounted_games.drop(columns='SaleType')
    discounted_games['DiscountedEarly'] = discounted_games['TimeDelta'].apply(lambda x: 'discounted within 2 days' if x < 3 else 'discounted after 3 days')
    return discounted_games

def _encoding_multilabel_column(df, feature, frequency_threshold):
    #replace labels with spacing with a dash so that they remain one word
    #then split into list based on commas
    df[feature] = df[feature].apply(lambda x: x.replace(' ', '-').split(','))

    all_labels = [label for sublist in df[feature] for label in sublist]
    labels_counter = Counter(all_labels)

    frequent_cats = {label for label, count in labels_counter.items() if count >= frequency_threshold}
    df[f"Filtered_{feature}"] = df[feature].apply(lambda x: [label for label in x if label in frequent_cats])

    mlb = MultiLabelBinarizer()

    # Fit and transform the data
    mlb.fit(df[f"Filtered_{feature}"])
    one_hot_encoded = mlb.transform(df[f"Filtered_{feature}"])

    # Create a DataFrame with the one-hot encoded data
    one_hot_df_labels = pd.DataFrame(one_hot_encoded, columns=mlb.classes_)

    return one_hot_df_labels, mlb

def _split_data_discount_on_release(df):
    """Split the data into train and test for classification.

    This function splits the data into training and testing sets
    for classification - predicting if a game went on sale within 2 days or not
    """
    # pass in preprocessed_tabular_df as input
    y = df['DiscountedEarly']
    X = df.drop(columns=['DiscountedEarly', 'TimeDelta'])

    # duplicate columns to manually drop
    X = X.drop(columns=['Co-op', 'PvP'])

    label_encoding = {'discounted within 2 days': 0, 'discounted after 3 days': 1}
    y = y.apply(lambda x: label_encoding[x])

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    return X_train, X_test, y_train, y_test, label_encoding

def _split_data_discount_after_release(df):
    """Split the data into train and test for regression.
    
    This function splits the data into training and testing sets 
    for predicting how many months until a game goes one sale,
    given that it didn't within the first 2 days.
    """
    regression_df = df[df['DiscountedEarly'] == 'discounted after 3 days']
    # target for classification task that we don't need anymore after filtering
    regression_df = regression_df.drop(columns='DiscountedEarly')

    #removing games that went on sale after 6 months for optimal model performance
    regression_df['TimeDelta'] = regression_df[['TimeDelta']] // 30 
    regression_df = regression_df[regression_df['TimeDelta'] < 6]

    # log transform since data has exponential decay
    y = np.log1p(regression_df['TimeDelta'])
    X = regression_df.drop(columns=['TimeDelta'])

    # duplicate columns to manually drop
    X = X.drop(columns=['Co-op', 'PvP'])

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    return X_train, X_test, y_train, y_test

def _save_splits(
    base_directory,
    X_train,  # noqa: N803
    y_train,
    X_test,  # noqa: N803
    y_test,
    train_path_name,
    test_path_name
):
    """Save data splits to disk.

    This function concatenates the transformed features
    and the target variable, and saves each one of the split
    sets to disk.

    train_path_name (str): should be either "train_clf" or train_reg"
    test_path_name (str): should be either "test_clf" or test_reg"
    """
    train = pd.concat([X_train, y_train], axis=1)
    test = pd.concat([X_test, y_test], axis=1)

    train_path = Path(base_directory) / train_path_name
    test_path = Path(base_directory) / test_path_name

    train_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)

    train.to_csv(train_path / f"{train_path_name}.csv", header=True, index=False)
    test.to_csv(test_path / f"{test_path_name}.csv", header=True, index=False)

def _save_mlb(base_directory, mlb, mlb_name):
    """Save the MultiLabelBinarizer object locally."""
    file_path = Path(base_directory) / mlb_name
    with open(file_path, 'wb') as f:
        pickle.dump(mlb, f)

def preprocess(base_directory):
    df = _read_data_from_input_csv_files(base_directory)

    discounted_games = _get_early_discount_target(df)

    tabular_df = discounted_games[
        [
        'Achievements', 'Supported languages',
        'Mac', 'Linux', 'Categories', 'Tags', 
        'ReleaseDate', 'TimeDelta', 'DiscountedEarly'
        ]
    ]

    # converting binary columns to ints
    tabular_df['Achievements'] = tabular_df['Achievements'].astype(int)
    tabular_df['Mac'] = tabular_df['Mac'].astype(int)
    tabular_df['Linux'] = tabular_df['Linux'].astype(int)

    # converting supported languages to just the number of them
    tabular_df['Supported languages'] = tabular_df['Supported languages'].apply(ast.literal_eval)
    tabular_df['Supported languages'] = tabular_df['Supported languages'].apply(lambda lst: len(lst))

    # converting release date to the month only
    tabular_df['ReleaseDate'] = pd.to_datetime(tabular_df['ReleaseDate'])
    tabular_df['month'] = tabular_df['ReleaseDate'].dt.month

    tabular_df = tabular_df.dropna(subset=['Categories', 'Tags'])
    tabular_df = tabular_df.reset_index(drop=True)

    one_hot_df_cats, mlb_cats = _encoding_multilabel_column(tabular_df, 'Categories', 50)
    one_hot_df_tags, mlb_tags = _encoding_multilabel_column(tabular_df, 'Tags', 100)

    preprocessed_tabular_df = tabular_df.drop(columns=['Categories', 'Tags', 'Filtered_Categories', 'Filtered_Tags', 'ReleaseDate'])
    preprocessed_tabular_df = pd.concat([preprocessed_tabular_df, one_hot_df_cats, one_hot_df_tags], axis=1)

    X_train_clf, X_test_clf, y_train_clf, y_test_clf, label_encoding = _split_data_discount_on_release(preprocessed_tabular_df)
    X_train_reg, X_test_reg, y_train_reg, y_test_reg = _split_data_discount_after_release(preprocessed_tabular_df)

    _save_mlb(base_directory, mlb_cats, "mlb_cats.pkl")
    _save_mlb(base_directory, mlb_tags, "mlb_tags.pkl")
    _save_splits(base_directory, X_train_clf, y_train_clf, X_test_clf, y_test_clf, "train_clf", "test_clf")
    _save_splits(base_directory, X_train_reg, y_train_reg, X_test_reg, y_test_reg, "train_reg", "test_reg")


if __name__ == "__main__":
    preprocess(base_directory="/opt/ml/processing")

Overwriting code/processing/script.py


### Testing Preprocessing Functions Locally

In [5]:
import ast
from pathlib import Path
import numpy as np
import pandas as pd
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer

def _read_data_from_input_csv_files(base_directory):
    """Read the data from the input CSV files.

    This function reads the input data from a CSV file 
    and does some simple cleaning.
    """
    input_directory = Path(base_directory) / "input"
    files = list(input_directory.glob("*.csv"))

    if len(files) == 0:
        message = f"The are no CSV files in {input_directory.as_posix()}/"
        raise ValueError(message)

    raw_data = [pd.read_csv(file) for file in files]

    raw_train_test_data = pd.concat(raw_data)
    raw_train_test_data = raw_train_test_data.drop(columns='Unnamed: 0')

    return raw_train_test_data

def _get_early_discount_target(df):
    """Filter for games that actually went on sale.

    This function uses a SaleType columns to only consider games that actually went on sale
    Then drops that column and creates a new binary one for games that went on sale on release or not
    """
    discounted_games = df[df['SaleType'] == 'went on sale']
    discounted_games = discounted_games.drop(columns='SaleType')
    discounted_games['DiscountedEarly'] = discounted_games['TimeDelta'].apply(lambda x: 'discounted within 2 days' if x < 3 else 'discounted after 3 days')
    return discounted_games

def _encoding_multilabel_column(df, feature, frequency_threshold):
    #replace labels with spacing with a dash so that they remain one word
    #then split into list based on commas
    df[feature] = df[feature].apply(lambda x: x.replace(' ', '-').split(','))

    all_labels = [label for sublist in df[feature] for label in sublist]
    labels_counter = Counter(all_labels)

    frequent_cats = {label for label, count in labels_counter.items() if count >= frequency_threshold}
    df[f"Filtered_{feature}"] = df[feature].apply(lambda x: [label for label in x if label in frequent_cats])

    mlb = MultiLabelBinarizer()

    # Fit and transform the data
    one_hot_encoded = mlb.fit_transform(df[f"Filtered_{feature}"])

    # Create a DataFrame with the one-hot encoded data
    one_hot_df_labels = pd.DataFrame(one_hot_encoded, columns=mlb.classes_)

    return one_hot_df_labels

def _split_data_discount_on_release(df):
    """Split the data into train and test for classification.

    This function splits the data into training and testing sets
    for classification - predicting if a game went on sale within 2 days or not
    """
    # pass in preprocessed_tabular_df as input
    y = df['DiscountedEarly']
    X = df.drop(columns=['DiscountedEarly', 'TimeDelta'])

    # duplicate columns to manually drop
    X = X.drop(columns=['Co-op', 'PvP'])

    label_encoding = {'discounted within 2 days': 0, 'discounted after 3 days': 1}
    y = y.apply(lambda x: label_encoding[x])

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    return X_train, X_test, y_train, y_test, label_encoding

def _split_data_discount_after_release(df):
    """Split the data into train and test for regression.
    
    This function splits the data into training and testing sets 
    for predicting how many months until a game goes one sale,
    given that it didn't within the first 2 days.
    """
    regression_df = df[df['DiscountedEarly'] == 'discounted after 3 days']
    # target for classification task that we don't need anymore after filtering
    regression_df = regression_df.drop(columns='DiscountedEarly')

    #removing games that went on sale after 6 months for optimal model performance
    regression_df['TimeDelta'] = regression_df[['TimeDelta']] // 30 
    regression_df = regression_df[regression_df['TimeDelta'] < 6]

    # log transform since data has exponential decay
    y = np.log1p(regression_df['TimeDelta'])
    X = regression_df.drop(columns=['TimeDelta'])

    # duplicate columns to manually drop
    X = X.drop(columns=['Co-op', 'PvP'])

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    return X_train, X_test, y_train, y_test

def _save_splits(
    base_directory,
    X_train,  # noqa: N803
    y_train,
    X_test,  # noqa: N803
    y_test,
    train_path_name,
    test_path_name
):
    """Save data splits to disk.

    This function concatenates the transformed features
    and the target variable, and saves each one of the split
    sets to disk.

    train_path_name (str): should be either "train_clf" or train_reg"
    test_path_name (str): should be either "test_clf" or test_reg"
    """
    train = pd.concat([X_train, y_train], axis=1)
    test = pd.concat([X_test, y_test], axis=1)

    train_path = Path(base_directory) / train_path_name
    test_path = Path(base_directory) / test_path_name

    train_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)

    train.to_csv(train_path / f"{train_path_name}.csv", header=False, index=False)
    test.to_csv(test_path / f"{test_path_name}.csv", header=False, index=False)



df = pd.read_csv('../data/raw_data_2014-2017.csv')

discounted_games = _get_early_discount_target(df)

tabular_df = discounted_games[
    [
    'Achievements', 'Supported languages',
    'Mac', 'Linux', 'Categories', 'Tags', 
    'ReleaseDate', 'TimeDelta', 'DiscountedEarly'
    ]
]

# converting binary columns to ints
tabular_df['Achievements'] = tabular_df['Achievements'].astype(int)
tabular_df['Mac'] = tabular_df['Mac'].astype(int)
tabular_df['Linux'] = tabular_df['Linux'].astype(int)

# converting supported languages to just the number of them
tabular_df['Supported languages'] = tabular_df['Supported languages'].apply(ast.literal_eval)
tabular_df['Supported languages'] = tabular_df['Supported languages'].apply(lambda lst: len(lst))

# converting release date to the month only
tabular_df['ReleaseDate'] = pd.to_datetime(tabular_df['ReleaseDate'])
tabular_df['month'] = tabular_df['ReleaseDate'].dt.month

tabular_df = tabular_df.dropna(subset=['Categories', 'Tags'])
tabular_df = tabular_df.reset_index(drop=True)

one_hot_df_cats = _encoding_multilabel_column(tabular_df, 'Categories', 50)
one_hot_df_tags = _encoding_multilabel_column(tabular_df, 'Tags', 100)

preprocessed_tabular_df = tabular_df.drop(columns=['Categories', 'Tags', 'Filtered_Categories', 'Filtered_Tags', 'ReleaseDate'])
preprocessed_tabular_df = pd.concat([preprocessed_tabular_df, one_hot_df_cats, one_hot_df_tags], axis=1)

X_train_clf, X_test_clf, y_train_clf, y_test_clf, label_encoding = _split_data_discount_on_release(preprocessed_tabular_df)
#X_train_reg, X_test_reg, y_train_reg, y_test_reg = _split_data_discount_after_release(preprocessed_tabular_df)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  tabular_df['Achievements'] = tabular_df['Achievements'].astype(int)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  tabular_df['Mac'] = tabular_df['Mac'].astype(int)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  tabular_df['Linux'] = tabular_df['Linux'].astype(int)
A value is trying to be set on a 

In [6]:
X_test_clf.shape

(2540, 216)

In [7]:
y_test_clf.shape

(2540,)

In [None]:
pd.concat([X_train_clf, y_train_clf], axis=1).shape

(10159, 217)

In [8]:
pd.concat([X_train_clf, y_train_clf], axis=1).to_csv("../data/train_clf.csv", header=False, index=False)

In [7]:
from collections import Counter

In [10]:
train_clf = pd.read_csv("../data/train_clf.csv")
y_train_clf_saved = train_clf[train_clf.columns[-1]]
X_train_clf_saved = train_clf.drop(train_clf.columns[-1], axis=1)
Counter(y_train_clf_saved)

Counter({1: 6720, 0: 3438})

In [11]:
print('X_train shape:', X_train_clf_saved.shape)
print('y_train shape:', y_train_clf_saved.shape)

X_train shape: (10158, 216)
y_train shape: (10158,)


### Preprocessing Pipeline

Let's configure caching and a sagemaker pipeline.

In [27]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="15d")

In [28]:
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig

pipeline_definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

dataset_location = ParameterString(
    name="dataset_location",
    default_value=f"{S3_LOCATION}/data",
)

In [29]:
#check what version we are using
import xgboost as xgb
xgb.__version__

'1.7.1'

In [30]:
from sagemaker.workflow.pipeline_context import LocalPipelineSession, PipelineSession
import xgboost as xgb

pipeline_session = PipelineSession(default_bucket=bucket)

config = {
    "session": pipeline_session,
    #"instance_type": "ml.t3.medium",
    "instance_type": "ml.m5.xlarge",
    "image": None,
}

# These specific settings refer to the SageMaker
# xgboost container we'll use, for some reason the version uses a dash
config["framework_version"] = "1.7-1"
config["py_version"] = "py3"

In [31]:
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    base_job_name="preprocess-data",
    framework_version="1.2-1",
    # By default, a new account doesn't have access to `ml.m5.xlarge` instances.
    # If you haven't requested a quota increase yet, you can use an
    # `ml.t3.medium` instance type instead. This will work out of the box, but
    # the Processing Job will take significantly longer than it should have.
    # To get access to `ml.m5.xlarge` instances, you can request a quota
    # increase under the Service Quotas section in your AWS account.
    #instance_type="ml.t3.medium",
    instance_type=config["instance_type"],
    instance_count=1,
    role=role,
    sagemaker_session=config["session"],
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [32]:
dataset_location

ParameterString(name='dataset_location', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://steamgames/price_history/data')

In [33]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

preprocessing_step = ProcessingStep(
    name="preprocess-data",
    step_args=processor.run(
        code=f"{(CODE_FOLDER / 'processing' / 'script.py').as_posix()}",
        inputs=[
            ProcessingInput(
                source=dataset_location,
                destination="/opt/ml/processing/input",
            ),
        ],
        outputs=[
            ProcessingOutput(
                output_name="train_clf",
                source="/opt/ml/processing/train_clf",
                destination=f"{S3_LOCATION}/preprocessing/train_clf",
            ),
            ProcessingOutput(
                output_name="test_clf",
                source="/opt/ml/processing/test_clf",
                destination=f"{S3_LOCATION}/preprocessing/test_clf",
            ),
            ProcessingOutput(
                output_name="train_reg",
                source="/opt/ml/processing/train_reg",
                destination=f"{S3_LOCATION}/preprocessing/train_reg",
            ),
            ProcessingOutput(
                output_name="test_reg",
                source="/opt/ml/processing/test_reg",
                destination=f"{S3_LOCATION}/preprocessing/test_reg",
            ),
            ProcessingOutput(
                output_name="mlb_cats",
                source="/opt/ml/processing/mlb_cats",
                destination=f"{S3_LOCATION}/preprocessing/mlb_cats",
            ),
            ProcessingOutput(
                output_name="mlb_tags",
                source="/opt/ml/processing/mlb_tags",
                destination=f"{S3_LOCATION}/preprocessing/mlb_tags",
            ),
        ],
    ),
    cache_config=cache_config,
)

Now we can create the sagemaker pipeline, or update it.

In [34]:
from sagemaker.workflow.pipeline import Pipeline

preprocessing_pipeline = Pipeline(
    name="preprocessing-pipeline",
    parameters=[dataset_location],
    steps=[
        preprocessing_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"],
)

preprocessing_pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:590184030535:pipeline/preprocessing-pipeline',
 'ResponseMetadata': {'RequestId': 'b1aeb716-5763-44e6-a819-af385ddf76d8',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'b1aeb716-5763-44e6-a819-af385ddf76d8',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '90',
   'date': 'Sat, 08 Jun 2024 15:23:24 GMT'},
  'RetryAttempts': 0}}

## Model Training

In [36]:
(CODE_FOLDER / "training").mkdir(parents=True, exist_ok=True)
sys.path.extend([f"./{CODE_FOLDER}/training"])

In [37]:
%%writefile {CODE_FOLDER}/training/train_script.py
# | filename: train_script.py
# | code-line-numbers: true

import argparse
import os
import json
import pandas as pd
import xgboost as xgb
from sklearn.metrics import accuracy_score, f1_score, mean_squared_error, mean_absolute_error, r2_score
from pathlib import Path
import tarfile

def save_metrics(metrics, model_directory, model_name):
    metrics_filepath = Path(model_directory) / f"{model_name}_metrics.json"
    with open(metrics_filepath, 'w') as f:
        json.dump(metrics, f)

def train(
    model_directory,
    train_path,
    test_path,
    #pipeline_path,
    learning_rate=0.1,
    max_depth=3
):
    X_train = pd.read_csv(Path(train_path) / "train_clf.csv")
    y_train = X_train[X_train.columns[-1]]
    X_train = X_train.drop(X_train.columns[-1], axis=1)

    X_test = pd.read_csv(Path(test_path) / "test_clf.csv")
    y_test = X_test[X_test.columns[-1]]
    X_test = X_test.drop(X_test.columns[-1], axis=1)

    params = {
        'learning_rate': learning_rate,
        'max_depth': max_depth,
        'objective': 'binary:logistic',
        'eval_metric': 'logloss'
    }

    y_train = y_train.astype(int)


    model = xgb.XGBClassifier(**params)

    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)

    metrics = {
        'accuracy': accuracy,
        'f1_score': f1
    }

    save_metrics(metrics, model_directory, "discount_on_release-xgboost")

    model_filepath = (Path(model_directory) / "discount_on_release-xgboost")

    model.save_model(model_filepath)
    #pkl.dump(model, open(model_filepath, 'wb'))

def train_reg(
    model_directory,
    train_reg_path,
    test_reg_path,
    learning_rate=0.05,
    min_child_weight=5,
    n_estimators=50,
    reg_alpha=0.01,
    reg_lambda=1.5
):
    X_train = pd.read_csv(Path(train_reg_path) / "train_reg.csv")
    y_train = X_train[X_train.columns[-1]]
    X_train = X_train.drop(X_train.columns[-1], axis=1)

    X_test = pd.read_csv(Path(test_reg_path) / "test_reg.csv")
    y_test = X_test[X_test.columns[-1]]
    X_test = X_test.drop(X_test.columns[-1], axis=1)

    params = {
        'learning_rate': learning_rate,
        'min_child_weight': min_child_weight,
        'n_estimators': n_estimators,
        'reg_alpha': reg_alpha,
        'reg_lambda': reg_lambda,
        'objective': 'reg:squarederror',
    }

    model = xgb.XGBRegressor(**params)

    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)

    mse = mean_squared_error(y_test, y_pred)
    mae = mean_absolute_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)

    metrics = {
        'mse': mse,
        'mae': mae,
        'r2_score': r2
    }

    save_metrics(metrics, model_directory, "time_until_discount-xgboost")

    model_filepath = (Path(model_directory) / "time_until_discount-xgboost")

    model.save_model(model_filepath)
    #pkl.dump(model, open(model_filepath, 'wb'))

if __name__ == "__main__":
    # Any hyperparameters provided by the training job are passed to
    # the entry point as script arguments.
    # default values came from a local grid search run
    print(f"SM_MODEL_DIR: {os.environ.get('SM_MODEL_DIR')}")
    print(f"SM_CHANNEL_TRAIN: {os.environ.get('SM_CHANNEL_TRAIN_CLF')}")
    print(f"SM_CHANNEL_TEST: {os.environ.get('SM_CHANNEL_TEST_CLF')}")
    #print(f"SM_CHANNEL_PIPELINE: {os.environ.get('SM_CHANNEL_PIPELINE')}")
    
    parser = argparse.ArgumentParser()
    # for the classifier model
    parser.add_argument("--learning_rate_clf", type=float, default=0.1)
    parser.add_argument("--max_depth_clf", type=int, default=3)
    #for the regression model
    parser.add_argument("--learning_rate_reg", type=float, default=0.05)
    parser.add_argument("--min_child_weight_reg", type=int, default=5)
    parser.add_argument("--n_estimators_reg", type=int, default=50)
    parser.add_argument("--reg_alpha_reg", type=float, default=0.01)
    parser.add_argument("--reg_lambda_reg", type=float, default=1.5)
    args, _ = parser.parse_known_args()

    training_env = json.loads(os.environ.get("SM_TRAINING_ENV", {}))
    job_name = training_env.get("job_name", None) if training_env else None

    train(
        # This is the location where we need to save our model.
        # SageMaker will create a model.tar.gz file with anything
        # inside this directory when the training script finishes.
        model_directory=os.environ["SM_MODEL_DIR"],
        # SageMaker creates one channel for each one of the inputs
        # to the Training Step.
        train_path=os.environ['SM_CHANNEL_TRAIN_CLF'],
        test_path=os.environ['SM_CHANNEL_TEST_CLF'],
        learning_rate=args.learning_rate_clf,
        max_depth=args.max_depth_clf,
    )

    train_reg(
        model_directory=os.environ["SM_MODEL_DIR"],
        train_reg_path=os.environ["SM_CHANNEL_TRAIN_REG"],
        test_reg_path=os.environ["SM_CHANNEL_TEST_REG"],
        learning_rate=args.learning_rate_reg,
        min_child_weight=args.min_child_weight_reg,
        n_estimators=args.n_estimators_reg,
        reg_alpha=args.reg_alpha_reg,
        reg_lambda=args.reg_lambda_reg
    )

    model_dir = Path(os.environ["SM_MODEL_DIR"])
    tar_path = model_dir / "model.tar.gz"
    with tarfile.open(tar_path, "w:gz") as tar:
        tar.add(model_dir / "discount_on_release-xgboost", arcname="discount_on_release-xgboost")
        tar.add(model_dir / "time_until_discount-xgboost", arcname="time_until_discount-xgboost")



Overwriting code/training/train_script.py


Now we need to create a Training Step

In [38]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.xgboost import XGBoost

estimator = XGBoost(
    entry_point="train_script.py",
    source_dir=f"{(CODE_FOLDER / 'training').as_posix()}",
    hyperparameters={
        "learning_rate_clf": 0.1,
        "max_depth_clf": 3,
        "learning_rate_reg": 0.05,
        "min_child_weight_reg": 5,
        "n_estimators_reg": 50,
        "reg_alpha_reg": 0.01,
        "reg_lambda_reg": 1.5,
    },
    framework_version=config["framework_version"],
    py_version=config["py_version"],
    instance_type=config["instance_type"],
    instance_count=1,
    role=role,
    sagemaker_session=config["session"],
)

INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.xlarge.


In [39]:
def create_training_step(estimator):
    """Create a SageMaker TrainingStep using the provided estimator."""
    return TrainingStep(
        name="train-model",
        step_args=estimator.fit(
            inputs={
                "train_clf": TrainingInput(
                    s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                        "train_clf"
                    ].S3Output.S3Uri,
                    content_type="text/csv",
                ),
                "test_clf": TrainingInput(
                    s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                        "test_clf"
                    ].S3Output.S3Uri,
                    content_type="text/csv",
                ),
                "train_reg": TrainingInput(
                    s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                        "train_reg"
                    ].S3Output.S3Uri,
                    content_type="text/csv",
                ),
                "test_reg": TrainingInput(
                    s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                        "test_reg"
                    ].S3Output.S3Uri,
                    content_type="text/csv",
                ),
            },
        ),
        cache_config=cache_config,
    )

train_model_step = create_training_step(estimator)

In [40]:
from sagemaker.workflow.pipeline import Pipeline

train_pipeline = Pipeline(
    name="train-pipeline",
    parameters=[dataset_location],
    steps=[
        preprocessing_step,
        train_model_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"],
)

train_pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:590184030535:pipeline/train-pipeline',
 'ResponseMetadata': {'RequestId': 'e556a41e-881b-489d-85db-247c236fca04',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'e556a41e-881b-489d-85db-247c236fca04',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '82',
   'date': 'Sat, 08 Jun 2024 16:18:44 GMT'},
  'RetryAttempts': 0}}

## Running a Pipeline

In [41]:
# don't forget to change pipeline name if needed
train_pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:590184030535:pipeline/train-pipeline/execution/2inc2b2gwi7a', sagemaker_session=<sagemaker.workflow.pipeline_context.PipelineSession object at 0x000002CBA046A530>)

In [54]:
s3 = boto3.client('s3')

bucket_name = os.environ["BUCKET"]
mlb_cats_key = 'mlb_cats.pkl'
mlb_tags_key = 'mlb_tags.pkl'

local_mlb_cats_path = Path("/opt/ml/processing") / mlb_cats_key
local_mlb_tags_path = 'mlb_tags.pkl'

In [55]:
#s3.download_file(bucket_name, mlb_tags_key, local_mlb_tags_path)
s3.download_file(bucket_name, mlb_cats_key, local_mlb_cats_path)


ClientError: An error occurred (404) when calling the HeadObject operation: Not Found