# Training

#### Introduction

This notebook uses the XGBoost algorithm to train and host model for the Telco-Customer-Churn dataset from [Kaggle](https://www.kaggle.com/datasets/blastchar/telco-customer-churn). \
The 01_preprocessing notebook is splitting the dataset into train, test and validation for this notebook. 

#### Prequisites and Preprocessing

This notebook was tested in Amazon SageMaker Studio on a ml.t3.medium instance with Python 3 (Data Science) kernel. 

#### Permissions and environment variables

Here we set up the linkage and authentication to AWS services.
1. The roles used to give learning and hosting access to your data. See the documentation for how to specify these.
2. The S3 buckets that you want to use for training and model data and where the downloaded data is located.

#### Imports:

In [68]:
import os
import boto3
import re
import copy
import time
import pandas as pd
from time import gmtime, strftime
import sagemaker
from sagemaker import get_execution_role

pd.set_option('display.max_columns', None)

#### Sessions:

In [69]:
role = get_execution_role()
region = boto3.Session().region_name
sess = sagemaker.Session()

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


#### Bucket paths:

In [70]:
prefix = "model"
bucket = "markos-telco-churn"
bucket_path = f"s3://{bucket}"
input_data_path = "ingest/ingest-2023-10-14-21-32-51"

In [71]:
 from sagemaker.image_uris import retrieve

container = retrieve("xgboost", region, version="1.2-1")

#### Training parameters:

In [72]:
# Ensure that the train and validation data folders generated above are reflected in the "InputDataConfig" parameter below.
job_name = f'telco-churn-xgboost-{strftime("%Y-%m-%d-%H-%M-%S", gmtime())}'

common_training_params = {
    "AlgorithmSpecification": {"TrainingImage": container, "TrainingInputMode": "File"},
    "RoleArn": role,
    "OutputDataConfig": {"S3OutputPath": f"{bucket}/{prefix}/{job_name}"},
    "ResourceConfig": {"InstanceCount": 1, "InstanceType": "ml.m4.xlarge", "VolumeSizeInGB": 5},
    "StoppingCondition": {"MaxRuntimeInSeconds": 1200},
    "HyperParameters": {"max_depth": "4", "num_classes": "2", "num_round": "100"},
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": f"{bucket_path}/{input_data_path}/train/train.csv",
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
            "ContentType": "csv",
            "CompressionType": "None",
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": f"{bucket_path}/{input_data_path}/val/val.csv",
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
            "ContentType": "csv",
            "CompressionType": "None",
        },
    ],
}

Now we'll create two separate jobs, updating the parameters that are unique to each.
    
#### Training on a single or multiple instances

In [75]:
distrubuted_training = False

In [76]:
sm = boto3.Session(region_name=region).client("sagemaker")
if distrubuted_training:
   
     # distributed job params
    distributed_job_name = f'distributed-machine-{job_name}'
    print("Job name is:", distributed_job_name)

    distributed_job_params = copy.deepcopy(common_training_params)
    distributed_job_params["TrainingJobName"] = distributed_job_name
    distributed_job_params["OutputDataConfig"][
        "S3OutputPath"
    ] = f"{bucket_path}/{prefix}/{job_name}/xgboost-distributed"
    # number of instances used for training
    distributed_job_params["ResourceConfig"][
        "InstanceCount"
    ] = 2  # no more than 5 if there are total 5 partition files generated above

    # data distribution type for train channel
    distributed_job_params["InputDataConfig"][0]["DataSource"]["S3DataSource"][
        "S3DataDistributionType"
    ] = "ShardedByS3Key"
    # data distribution type for validation channel
    distributed_job_params["InputDataConfig"][1]["DataSource"]["S3DataSource"][
        "S3DataDistributionType"
    ] = "ShardedByS3Key"
    
    # distrubited training
    sm.create_training_job(**distributed_job_params)

    status = sm.describe_training_job(TrainingJobName=distributed_job_name)["TrainingJobStatus"]
    print(status)
    sm.get_waiter("training_job_completed_or_stopped").wait(TrainingJobName=distributed_job_name)
    status = sm.describe_training_job(TrainingJobName=distributed_job_name)["TrainingJobStatus"]
    print(f"Training job ended with status: {status}")
    if status == "Failed":
        message = sm.describe_training_job(TrainingJobName=distributed_job_name)["FailureReason"]
        print(f"Training failed with the following error: {message}")
        raise Exception("Training job failed")
    
else:
    # single machine job params
    single_machine_job_name = f'single-machine-{job_name}'
    print("Job name is:", single_machine_job_name)

    single_machine_job_params = copy.deepcopy(common_training_params)
    single_machine_job_params["TrainingJobName"] = single_machine_job_name
    single_machine_job_params["OutputDataConfig"]["S3OutputPath"] = f"{bucket_path}/{prefix}/{job_name}/xgboost-single"
    single_machine_job_params["ResourceConfig"]["InstanceCount"] = 1
    # single training
    sm.create_training_job(**single_machine_job_params)

    print(status)
    sm.get_waiter("training_job_completed_or_stopped").wait(TrainingJobName=single_machine_job_name)
    status = sm.describe_training_job(TrainingJobName=single_machine_job_name)["TrainingJobStatus"]
    print(f"Training job ended with status: {status}")
    if status == "Failed":
        message = sm.describe_training_job(TrainingJobName=single_machine_job_name)["FailureReason"]
        print(f"Training failed with the following error: {message}")
        raise Exception("Training job failed")

Job name is: single-machine-telco-churn-xgboost-2023-10-14-22-20-50
InProgress


WaiterError: Waiter TrainingJobCompletedOrStopped failed: Waiter encountered a terminal failure state: For expression "TrainingJobStatus" we matched expected path: "Failed"

#### Confirm both jobs have finished:

In [None]:
if distrubuted_training:
    print(
        "Distributed:", sm.describe_training_job(TrainingJobName=distributed_job_name)["TrainingJobStatus"]
    )
else:
    print(
        "Single Machine:",
        sm.describe_training_job(TrainingJobName=single_machine_job_name)["TrainingJobStatus"],
    )

#### Set up hosting for the model:

In order to set up hosting, we have to import the model from training to hosting. The step below demonstrated hosting the model generated from the distributed training job. Same steps can be followed to host the model obtained from the single machine job. 

##### Import model into hosting

Next, you register the model with hosting. This allows you the flexibility of importing models trained elsewhere. 

In [None]:
%%time
import boto3
from time import gmtime, strftime

if distrubuted_training:
    model_name = f"{distributed_job_name}-mod"
    print(model_name)

    info = sm.describe_training_job(TrainingJobName=distributed_job_name)
    model_data = info["ModelArtifacts"]["S3ModelArtifacts"]
    print(model_data)
else:
    model_name = f"{single_machine_job_name}-mod"
    print(model_name)

    info = sm.describe_training_job(TrainingJobName=single_machine_job_name)
    model_data = info["ModelArtifacts"]["S3ModelArtifacts"]
    print(model_data)
    

primary_container = {"Image": container, "ModelDataUrl": model_data}

create_model_response = sm.create_model(
    ModelName=model_name, ExecutionRoleArn=role, PrimaryContainer=primary_container
)

print(create_model_response["ModelArn"])

##### Create endpoint configuration

SageMaker supports configuring REST endpoints in hosting with multiple models, e.g. for A/B testing purposes. In order to support this, customers create an endpoint configuration, that describes the distribution of traffic across the models, whether split, shadowed, or sampled in some way. In addition, the endpoint configuration describes the instance type required for model deployment.

In [None]:
from time import gmtime, strftime

endpoint_config_name = f'churn-demo-feature-engineered-xgbpconfig-{strftime("%Y-%m-%d-%H-%M-%S", gmtime())}'
print(endpoint_config_name)
create_endpoint_config_response = sm.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            "InstanceType": "ml.m4.xlarge",
            "InitialVariantWeight": 1,
            "InitialInstanceCount": 1,
            "ModelName": model_name,
            "VariantName": "AllTraffic",
        }
    ],
)

print(f'Endpoint Config Arn: {create_endpoint_config_response["EndpointConfigArn"]}')

####  Create endpoint:

In [None]:
%%time
import time

endpoint_name = f'churn-demo-feature-engineered-xgb-class-{strftime("%Y-%m-%d-%H-%M-%S", gmtime())}'
print(endpoint_name)
create_endpoint_response = sm.create_endpoint(
    EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
)
print(create_endpoint_response["EndpointArn"])

resp = sm.describe_endpoint(EndpointName=endpoint_name)
status = resp["EndpointStatus"]
print(f"Status: {status}")

while status == "Creating":
    time.sleep(60)
    resp = sm.describe_endpoint(EndpointName=endpoint_name)
    status = resp["EndpointStatus"]
    print(f"Status: {status}")

print(f'Arn: {resp["EndpointArn"]}')
print(f"Status: {status}")

#### Read the test data:

In [None]:
 runtime_client = boto3.client("runtime.sagemaker", region_name=region)
test = pd.read_csv(f"{bucket_path}/{input_data_path}/test/test.csv")

In [None]:
test.head()

#### Predict:

In [None]:
from pathlib import Path
step = 10000
to = 0
result = []

Path('data').mkdir(parents=True, exist_ok=True)

for start in range(0, test.shape[0], step):
    
    if os.path.exists('data/test.csv'):
        os.remove('data/test.csv')

    test_line = test.iloc[start:start+step,1:].to_numpy() #Remove target and iterate over rows
    pd.DataFrame(test_line).to_csv('data/test.csv',index=False, header=True)
    
    csv_buffer = open('data/test.csv')
    my_payload_as_csv = csv_buffer.read()

    response = runtime_client.invoke_endpoint(
        EndpointName=endpoint_name,
        Body= my_payload_as_csv,
        ContentType = 'text/csv')
    
    result += response["Body"].read().decode("ascii").split(",")[:-1]

In [None]:
test['pred'] = result
test['pred'] = test['pred'].astype('float').astype('int')
test.head()

#### Metrics:

In [None]:
from sklearn.metrics import classification_report

cr = classification_report(test['Churn'], test['pred'])
print(cr)

In [None]:
from sklearn.metrics import f1_score,accuracy_score

print(f"Accuracy: {accuracy_score(test['Churn'], test['pred']):.1%}")
print(f"F1 Score {f1_score(test['Churn'], test['pred'],average='macro'):.1%}")

In [None]:
sm.delete_endpoint(EndpointName=endpoint_name)