# Deploying to AWS

This notebook contains scripts that we will deploy to AWS, starting with the original dataset already in an S3 bucket.

First let's manage our imports and system paths.

In [2]:
%load_ext dotenv
%dotenv

import os
import sys
import boto3
from pathlib import Path

# Change to root directory
os.chdir('..')

# Create a folder for all our code
SRC_PATH = Path("src")
sys.path.extend([f"./{SRC_PATH}"])

# And we'll need our role's
glue_role = os.getenv('GLUE_ROLE')
sagemaker_role = os.getenv('SAGEMAKER_ROLE')
bucket = os.getenv('BUCKET')

## Glue

### ETL

In [21]:
(SRC_PATH / "etl").mkdir(parents=True, exist_ok=True)
sys.path.extend([f"./{SRC_PATH}/etl"])

In [22]:
%%writefile {SRC_PATH}/etl/script.py

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pandas as pd
from io import StringIO
import boto3

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'INPUT_BUCKET', 'INPUT_KEY', 'OUTPUT_BUCKET', 'OUTPUT_KEY'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Read data from S3
s3_client = boto3.client('s3')
obj = s3_client.get_object(Bucket=args['INPUT_BUCKET'], Key=args['INPUT_KEY'])
df = pd.read_csv(StringIO(obj['Body'].read().decode('utf-8')))

#target label encoding
df['EngagementLevel'] = df['EngagementLevel'].map({'Low': 0, 'Medium': 1, 'High': 2})

# Perform transformations to independent variables
df['Gender'] = df['Gender'].map({'Male': 1, 'Female': 0})
df['GameDifficulty'] = df['GameDifficulty'].map({'Easy': 0, 'Medium': 1, 'Hard': 2})
df_encoded = pd.get_dummies(df, columns=['Location', 'GameGenre'], drop_first=True)

encoded_cols = list(set(df_encoded.columns) - set(df.columns))
df_encoded[encoded_cols] = df_encoded[encoded_cols].astype(int)

# Convert the DataFrame back to CSV
csv_buffer = StringIO()
df_encoded.to_csv(csv_buffer, index=False)

# Upload the transformed data to S3
s3_client.put_object(Bucket=args['OUTPUT_BUCKET'], Key=args['OUTPUT_KEY'], Body=csv_buffer.getvalue())

job.commit()

Overwriting src/etl/script.py


In [27]:
file_path = f"{(SRC_PATH / 'etl' / 'script.py').as_posix()}"
s3_client = boto3.client('s3')
bucket_name = 'gaming-behavior'
script_file_name = 'script.py'
s3_key = f'glue-scripts/{script_file_name}'

# Upload the script to S3
s3_client.upload_file(file_path, bucket_name, s3_key)
print(f'Script uploaded to s3://{bucket_name}/{s3_key}')

Script uploaded to s3://gaming-behavior/glue-scripts/script.py


In [29]:
glue_client = boto3.client('glue')

# Parameters for the Glue job
job_name = 'etl-job'
script_location = f's3://{bucket_name}/{s3_key}'

# S3 locations for input and output data
input_bucket = 'gaming-behavior'
input_key = 'raw_data/online_gaming_behavior_dataset.csv'
output_bucket = 'gaming-behavior'
output_key = 'transformed_data/transformed_online_gaming_behavior_dataset.csv'

# Create or update the Glue job
response = glue_client.create_job(
    Name=job_name,
    Role=glue_role,
    Command={
        'Name': 'glueetl',
        'ScriptLocation': script_location,
        'PythonVersion': '3'
    },
    DefaultArguments={
        '--job-language': 'python',
        '--enable-continuous-cloudwatch-log': 'true',
        '--enable-spark-ui': 'true',
        '--INPUT_BUCKET': input_bucket,
        '--INPUT_KEY': input_key,
        '--OUTPUT_BUCKET': output_bucket,
        '--OUTPUT_KEY': output_key
    },
    MaxRetries=0,
    MaxCapacity=2.0,
    Timeout=2880,
    GlueVersion='2.0'
)

print(f'Glue job {job_name} created successfully')


Glue job etl-job created successfully


In [30]:
start_response = glue_client.start_job_run(JobName=job_name)
print(f'Glue job {job_name} started successfully with run ID: {start_response["JobRunId"]}')

Glue job etl-job started successfully with run ID: jr_d1cd5216b60496c48c2bc33ef3e33c44391d9a5e587b3d160ffaa58465a8e98c


## Sagemaker

### Pre-processing

In [34]:
import sagemaker
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig
from sagemaker.workflow.pipeline_context import PipelineSession

S3_LOCATION = f"s3://{bucket}"

In [32]:
f"{S3_LOCATION}/transformed_data"

's3://gaming-behavior/transformed_data'

In [35]:
sm_boto3 = boto3.client("sagemaker")
pipeline_session = PipelineSession(default_bucket=bucket)
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_session.region_name
print("Using bucket" + bucket)

Using bucketgaming-behavior


In [37]:
(SRC_PATH / "preprocessing").mkdir(parents=True, exist_ok=True)
sys.path.extend([f"./{SRC_PATH}/preprocessing"])

In [14]:
import pandas as pd

df = pd.read_csv('online_gaming_behavior_dataset.csv')

#target label encoding
df['EngagementLevel'] = df['EngagementLevel'].map({'Low': 0, 'Medium': 1, 'High': 2})

# Perform transformations to independent variables
df['Gender'] = df['Gender'].map({'Male': 1, 'Female': 0})
df['GameDifficulty'] = df['GameDifficulty'].map({'Easy': 0, 'Medium': 1, 'Hard': 2})
df_encoded = pd.get_dummies(df, columns=['Location', 'GameGenre'], drop_first=True)

encoded_cols = list(set(df_encoded.columns) - set(df.columns))
df_encoded[encoded_cols] = df_encoded[encoded_cols].astype(int)

In [15]:
from sklearn.model_selection import train_test_split

df_encoded = df_encoded.drop(columns=['PlayerID'])
df_train, df_test = train_test_split(df_encoded, test_size=0.2)

y_train = df_train.EngagementLevel
y_test = df_test.EngagementLevel

X_train = df_train.drop("EngagementLevel", axis=1)
X_test = df_test.drop("EngagementLevel", axis=1)

In [42]:
train = pd.concat([X_train, y_train], axis=1)
test = pd.concat([X_test, y_test], axis=1)

In [36]:
%%writefile {SRC_PATH}/preprocessing/script.py

from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split



def preprocess(base_directory):
    """Load the supplied data, split it and transform it."""
    df = _read_data_from_input_csv_files(base_directory)

    # the only transformation we need to do is drop the player id and split the data
    # everything else was done in the etl script
    
    df.drop(columns=['PlayerID'])
    df_train, df_test = train_test_split(df, test_size=0.2)

    y_train = df_train.EngagementLevel
    y_test = df_test.EngagementLevel

    X_train = df_train.drop("EngagementLevel", axis=1)
    X_test = df_test.drop("EngagementLevel", axis=1)

    _save_splits(base_directory, X_train, y_train, X_test, y_test)


def _read_data_from_input_csv_files(base_directory):
    """Read the data from the input CSV files.

    This function reads every CSV file available and
    concatenates them into a single dataframe.
    """
    input_directory = Path(base_directory) / "input"
    files = list(input_directory.glob("*.csv"))

    if len(files) == 0:
        message = f"The are no CSV files in {input_directory.as_posix()}/"
        raise ValueError(message)

    raw_data = [pd.read_csv(file) for file in files]
    df = pd.concat(raw_data)

    # Shuffle the data
    return df.sample(frac=1, random_state=42)


def _save_splits(base_directory, X_train, y_train, X_test, y_test):
    """Save data splits to disk.

    This function concatenates the transformed features
    and the target variable, and saves each one of the split
    sets to disk.
    """
    train = pd.concat([X_train, y_train], axis=1)
    test = pd.concat([X_test, y_test], axis=1)

    train_path = Path(base_directory) / "train"
    test_path = Path(base_directory) / "test"

    train_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)

    pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(test_path / "test.csv", header=False, index=False)


if __name__ == "__main__":
    preprocess(base_directory="/opt/ml/processing")


Overwriting src/preprocessing/script.py


In [37]:
pipeline_definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

dataset_location = ParameterString(
    name="dataset_location",
    default_value=f"{S3_LOCATION}/transformed_data",
)

In [38]:
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    base_job_name="preprocess-data",
    framework_version="1.2-1",
    instance_type="ml.t3.medium",
    instance_count=1,
    role=sagemaker_role,
    sagemaker_session=pipeline_session,
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [60]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

preprocessing_step = ProcessingStep(
    name="preprocess-data",
    step_args=processor.run(
        code=f"{(SRC_PATH / 'preprocessing' / 'script.py').as_posix()}",
        inputs=[
            ProcessingInput(
                source=dataset_location,
                destination="/opt/ml/processing/input",
            ),
        ],
        outputs=[
            ProcessingOutput(
                output_name="train",
                source="/opt/ml/processing/train",
                destination=f"{S3_LOCATION}/preprocessing/train",
            ),
            ProcessingOutput(
                output_name="test",
                source="/opt/ml/processing/test",
                destination=f"{S3_LOCATION}/preprocessing/test",
            )
        ],
    )
)



In [40]:
from sagemaker.workflow.pipeline import Pipeline

preprocessing_pipeline = Pipeline(
    name="preprocessing-pipeline-pipeline",
    parameters=[dataset_location],
    steps=[
        preprocessing_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=pipeline_session,
)

preprocessing_pipeline.upsert(role_arn=sagemaker_role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:590184030535:pipeline/preprocessing-pipeline-pipeline',
 'ResponseMetadata': {'RequestId': '4b37ea0b-3e7e-48f4-b74d-95db0f20eb76',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '4b37ea0b-3e7e-48f4-b74d-95db0f20eb76',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '99',
   'date': 'Wed, 07 Aug 2024 00:25:47 GMT'},
  'RetryAttempts': 0}}

In [41]:
preprocessing_pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:590184030535:pipeline/preprocessing-pipeline-pipeline/execution/g6zfmmlh822i', sagemaker_session=<sagemaker.workflow.pipeline_context.PipelineSession object at 0x0000025DB778D190>)

### Modeling

In [49]:
(SRC_PATH / "modeling").mkdir(parents=True, exist_ok=True)
sys.path.extend([f"./{SRC_PATH}/modeling"])

In [50]:
%%writefile {SRC_PATH}/modeling/script.py

import argparse
import os
import json
import pandas as pd
import xgboost as xgb
from sklearn.metrics import accuracy_score, 
from pathlib import Path
import joblib
import tarfile



def train(model_directory, train_path, test_path, pipeline_path, learning_rate=0.1, max_depth=7,):
    print(f"Keras version: {keras.__version__}")

    X_train = pd.read_csv(Path(train_path) / "train.csv")
    y_train = X_train[X_train.columns[-1]]
    X_train = X_train.drop(X_train.columns[-1], axis=1)

    X_test = pd.read_csv(Path(test_path) / "test.csv")
    y_test = X_test[X_test.columns[-1]]
    X_test = X_test.drop(X_test.columns[-1], axis=1)

    model = XGBClassifier(objective='multi:softmax', num_class=3, eval_metric='mlogloss', learning_rate=learning_rate, max_depth=max_depth)
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    kappa = cohen_kappa_score(y_test, y_pred)

    model_path = os.path.join(model_directory, "model.joblib")
    joblib.dump(model, model_path)



if __name__ =='__main__':
    print("[INFO] Extracting arguements")
    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script.
    parser.add_argument('--learning_rate', type=float, default=0.1)
    parser.add_argument('--max_depth', type=int, default=7)

    args, _ = parser.parse_known_args()

    train(
        model_directory=os.environ["SM_MODEL_DIR"],
        train_path=os.environ["SM_CHANNEL_TRAIN"],
        validation_path=os.environ["SM_CHANNEL_TEST"],
        pipeline_path=os.environ["SM_CHANNEL_PIPELINE"],
        learning_rate=args.learning_rate,
        max_depth=args.max_depth,
    )

Overwriting src/modeling/script.py


In [61]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.xgboost import XGBoost

estimator = XGBoost(
    entry_point="script.py",
    source_dir=f"{(SRC_PATH / 'modeling').as_posix()}",
    hyperparameters={
        "learning_rate": 0.1,
        "max_depth": 7,
    },
    framework_version="1.2-1",
    py_version="py3",
    instance_type="ml.t3.medium",
    instance_count=1,
    role=sagemaker_role,
    sagemaker_session=sagemaker_session,
)

INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.t3.medium.


In [64]:
def create_training_step(estimator):
    """Create a SageMaker TrainingStep using the provided estimator."""
    return TrainingStep(
        name="train-model",
        step_args=estimator.fit(
            inputs={
                "train": TrainingInput(
                    s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                        "train"
                    ].S3Output.S3Uri,
                    content_type="text/csv",
                ),
                "test": TrainingInput(
                    s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                        "test"
                    ].S3Output.S3Uri,
                    content_type="text/csv",
                )
            },
        )
    )

train_model_step = create_training_step(estimator)

INFO:sagemaker:Creating training-job with name: sagemaker-xgboost-2024-08-07-02-02-38-054


TypeError: Object of type Properties is not JSON serializable

In [None]:
from sagemaker.workflow.pipeline import Pipeline

train_pipeline = Pipeline(
    name="train-pipeline",
    parameters=[dataset_location],
    steps=[
        preprocessing_step,
        train_model_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=sagemaker_session,
)

train_pipeline.upsert(role_arn=sagemaker_role)