# Toxicity Type Detection

![SageMaker](https://img.shields.io/badge/SageMaker-%23FF9900.svg?style=for-the-badge&logo=amazon-aws&logoColor=white)

This notebook is a part of [ToChiquinho](https://dougtrajano.github.io/ToChiquinho/) project, which trains a model to detect toxicity types in a Portuguese text using the [OLID-BR](https://dougtrajano.github.io/olid-br/) dataset.

The model is trained using [Amazon SageMaker](https://aws.amazon.com/sagemaker/).

- [Parameters](#Parameters)
- [Setup](#Setup)
- [Prepare the data](#Prepare-the-data)
  - [Uploading the data to S3](#Uploading-the-data-to-S3)
- [Training process](#Training-process)
  - [Define the estimator](#Define-the-estimator)
  - [Hyperparameter tuning](#Hyperparameter-tuning)
  - [Training model with best hyperparameters](#Training-model-with-best-hyperparameters)
- [Documentation](#Documentation)

## Parameters

In this section, we define the parameters used in the notebook.

The `Parameters` class is used to store the parameters and it has the description of each parameter.

In [1]:
import os
import json
from dataclasses import dataclass, field
from typing import Any, Dict, Optional

@dataclass
class Parameters:
    num_train_epochs: Optional[int] = field(
        default=5,
        metadata={
            "help": "Number of training epochs",
        }
    )

    num_train_epochs_per_child: Optional[int] = field(
        default=1,
        metadata={
            "help": "The number of epochs to train the model. An epoch is an iteration over the entire training set."
        }
    )

    batch_size: Optional[int] = field(
        default=8,
        metadata={
            "help": "Batch size for training and evaluation."
        }
    )
    
    validation_split: Optional[float] = field(
        default=0.2,
        metadata={
            "help": "The percentage of the training set to use as validation set."
        }
    )
    
    seed: Optional[int] = field(
        default=1993,
        metadata={
            "help": "The seed to use for random number generation."
        }
    )

    mlflow_tracking_uri: Optional[str] = field(
        default=os.environ.get("MLFLOW_TRACKING_URI"),
        repr=False,
        metadata={
            "help": "The URI of the MLFlow tracking server."
        }
    )

    mlflow_experiment_name: Optional[str] = field(
        default=os.environ.get("MLFLOW_EXPERIMENT_NAME", "Default"),
        metadata={
            "help": "The name of the MLFlow experiment."
        }
    )

    mlflow_tags: Optional[Dict[str, Any]] = field(
        default=None,
        metadata={
            "help": "The tags to use for the MLFlow run."
        }
    )
    
    mlflow_tracking_username: Optional[str] = field(
        default=None,
        repr=False,
        metadata={
            "help": "The username to use to authenticate with the MLFlow tracking server."
        }
    )

    mlflow_tracking_password: Optional[str] = field(
        default=None,
        repr=False,
        metadata={
            "help": "The password to use to authenticate with the MLFlow tracking server."
        }
    )

    mlflow_run_id: Optional[str] = field(
        default=os.environ.get("MLFLOW_RUN_ID"),
        repr=False,
        metadata={
            "help": "The ID of the MLFlow run."
        }
    )

    sagemaker_tuning_job_name: Optional[str] = field(
        default=None,
        repr=False,
        metadata={
            "help": "The name of the SageMaker hyperparameter tuning job."
        }
    )

    sagemaker_execution_role_arn: Optional[str] = field(
        default=None,
        repr=False,
        metadata={
            "help": "The ARN of the SageMaker execution role."
        }
    )

    aws_profile_name: Optional[str] = field(
        default="default",
        repr=False,
        metadata={
            "help": "The name of the AWS profile to use."
        }
    )

    def __post_init__(self):
        if self.mlflow_tracking_uri is not None:
            os.environ["MLFLOW_TRACKING_URI"] = self.mlflow_tracking_uri
        if self.mlflow_experiment_name is not None:
            os.environ["MLFLOW_EXPERIMENT_NAME"] = self.mlflow_experiment_name
        if self.mlflow_tracking_username is not None:
            os.environ["MLFLOW_TRACKING_USERNAME"] = self.mlflow_tracking_username
        if self.mlflow_tracking_password is not None:
            os.environ["MLFLOW_TRACKING_PASSWORD"] = self.mlflow_tracking_password        
        if self.mlflow_run_id is not None:
            os.environ["MLFLOW_RUN_ID"] = self.mlflow_run_id
        
        if isinstance(self.mlflow_tags, dict):
            self.mlflow_tags = json.dumps(self.mlflow_tags)
            os.environ["MLFLOW_TAGS"] = self.mlflow_tags
        elif self.mlflow_tags is not None:
            raise ValueError("The mlflow_tags parameter must be a dictionary.")

In the next cell, you can change the parameters to fit your needs.

In [2]:
params = Parameters()
params

Parameters(num_train_epochs=5, num_train_epochs_per_child=1, batch_size=8, validation_split=0.2, seed=1993, mlflow_experiment_name='toxicity-type-detection', mlflow_tags='{"project": "ToChiquinho", "dataset": "OLID-BR", "model_type": "bert", "problem_type": "multi_label_classification"}')

## Setup

In [3]:
import boto3
import sagemaker

sagemaker_session = sagemaker.Session(
    boto_session=boto3.Session(profile_name=params.aws_profile_name)
)

bucket_name = sagemaker_session.default_bucket()
prefix = "ToChiquinho/toxicity-type-detection"

if params.sagemaker_execution_role_arn is None:
    params.sagemaker_execution_role_arn = sagemaker.get_execution_role(sagemaker_session)

In [4]:
def remove_checkpoints(
        bucket_name: str,
        checkpoint_prefix: str,
        aws_profile_name: str = "default"):
    """
    Remove all checkpoints from the specified S3 bucket and prefix.

    Args:
    - bucket_name: The name of the S3 bucket.
    - checkpoint_prefix: The prefix of the checkpoints to remove.
    - aws_profile_name: The name of the AWS profile to use.
    """
    session = boto3.Session(profile_name=aws_profile_name)
    s3 = session.resource("s3")
    bucket = s3.Bucket(bucket_name)
    response = bucket.objects.filter(Prefix=checkpoint_prefix).delete()

    if len(response) == 0:
        print("No checkpoints found.")
    elif response[0]["ResponseMetadata"]["HTTPStatusCode"] == 200:
        count = len(response[0]["Deleted"])
        print(f"Deleted {count} checkpoints.")

## Prepare the data

In this section, we will prepare the data to be used in the training process.

We will download OLID-BR dataset from [HuggingFace Datasets](https://huggingface.co/datasets/olidbr), process it and upload it to S3 to be used in the training process.

In [None]:
from datasets import load_dataset

dataset = load_dataset("dougtrajano/olid-br")

In [None]:
import datasets
from typing import Union

def prepare_dataset(
    dataset: Union[datasets.Dataset, datasets.DatasetDict],
    test_size: float = 0.2,
    seed: int = 42
) -> Union[datasets.Dataset, datasets.DatasetDict]:

    # Filter only rows with is_offensive = "OFF"
    dataset = dataset.filter(lambda example: example["is_offensive"] == "OFF")

    # Filter only offensive comments with at least one toxicity label
    labels = [
        "health",
        "ideology",
        "insult",
        "lgbtqphobia",
        "other_lifestyle",
        "physical_aspects",
        "profanity_obscene",
        "racism",
        "sexism",
        "xenophobia"
    ]

    dataset = dataset.filter(
        lambda example: any([example[label] == True for label in labels])
    )

    # Keep only toxicity labels columns and text column
    dataset = dataset.remove_columns(
        [
            col for col in dataset["train"].column_names if col not in labels + ["text"]
        ]
    )

    train_dataset = dataset["train"].train_test_split(
        test_size=test_size,
        shuffle=True,
        seed=seed
    )

    dataset["train"] = train_dataset["train"]
    dataset["validation"] = train_dataset["test"]

    return dataset


dataset = prepare_dataset(
    dataset,
    test_size=params.validation_split,
    seed=params.seed
)

dataset

In [None]:
dataset.save_to_disk("data")

### Uploading the data to S3

We are going to use the `sagemaker.Session.upload_data` function to upload our datasets to an S3 location.

The return value inputs identifies the location -- we will use later when we start the training job.

In [5]:
# inputs = sagemaker_session.upload_data(
#     path="data",
#     bucket=bucket_name,
#     key_prefix=f"{prefix}/data"
# )

inputs = "s3://sagemaker-us-east-1-215993976552/ToChiquinho/toxicity-type-detection/data"

print("input spec (in this case, just an S3 path): {}".format(inputs))

input spec (in this case, just an S3 path): s3://sagemaker-us-east-1-215993976552/ToChiquinho/toxicity-type-detection/data


In [None]:
import shutil
shutil.rmtree("data")

## Training session

In this section, we will run the training process.

To use Amazon SageMaker to run Docker containers, we need to provide a Python script for the container to run. In our case, all the code is in the `modeling` folder, including the `train.py` script.

We will start doing a hyperparameter tuning process to find the best hyperparameters for our model.

Then, we will train the model using the best hyperparameters found.

In [12]:
import os
import mlflow

mlflow.start_run()

print(f"MLFlow run ID: {mlflow.active_run().info.run_id}")

MLFlow run ID: f8e16dd693f14067a215d503da7f85d0


### Define the estimator

We will use the `sagemaker.pytorch.PyTorch` class to define our estimator.

In [13]:
from sagemaker.pytorch import PyTorch

checkpoint_s3_uri = f"s3://{bucket_name}/{prefix}/checkpoints"

# instance_type = "ml.m5.4xlarge" # 16 vCPUs, 64 GB RAM, no GPU - $ 0.922 per hour
# instance_type = "ml.m4.4xlarge" # 16 vCPUs, 64 GB RAM, no GPU - $ 0.966 per hour
# instance_type = "ml.c4.8xlarge" # 36 vCPUs, 60 GB RAM, no GPU - $ 1.909 per hour
# instance_type = "ml.c5.9xlarge" # 36 vCPUs, 72 GB RAM, no GPU - $ 1.836 per hour

instance_type = "ml.g4dn.2xlarge" # 8 vCPUs, 32 GB RAM, 1 x NVIDIA T4 GPU - $ 0.94 per hour
# instance_type = "ml.p3.2xlarge" # 8 vCPUs, 61 GB RAM, 1 x NVIDIA Tesla V100 GPU - $ 3.825 per hour

estimator = PyTorch(
    entry_point="train.py",
    source_dir="modeling",
    role=params.sagemaker_execution_role_arn,
    sagemaker_session=sagemaker_session,
    py_version="py38",
    framework_version="1.12.0",
    instance_count=1,
    instance_type=instance_type,
    use_spot_instances=True,
    max_wait=10800,
    max_run=10800,
    checkpoint_s3_uri=checkpoint_s3_uri,
    checkpoint_local_path="/opt/ml/checkpoints",
    environment={
        "MLFLOW_TRACKING_URI": params.mlflow_tracking_uri,
        "MLFLOW_EXPERIMENT_NAME": params.mlflow_experiment_name,
        "MLFLOW_TRACKING_USERNAME": params.mlflow_tracking_username,
        "MLFLOW_TRACKING_PASSWORD": params.mlflow_tracking_password,
        "MLFLOW_TAGS": params.mlflow_tags,
        "MLFLOW_RUN_ID": mlflow.active_run().info.run_id,
        "MLFLOW_FLATTEN_PARAMS": "True",
        "WANDB_DISABLED": "True"
    },
    hyperparameters={
        ## If you want to test the code, uncomment the following lines to use smaller datasets
        # "max_train_samples": 50,
        # "max_val_samples": 50,
        # "max_test_samples": 50,
        "num_train_epochs": params.num_train_epochs_per_child,
        "batch_size": params.batch_size,
        "seed": params.seed
    },
)

To test our training job before hyperparameter tuning, we will run it with a small number of samples.

In [14]:
estimator.fit(inputs, wait=False)

### Hyperparameter Tuning

We will use the `sagemaker.tuner.HyperparameterTuner` class to run a hyperparameter tuning process.

We use MLflow to track the training process, so we can analyze the results through the MLflow UI.

#### Workaround for boto/boto3/issues/3488 issue

Due to the issue [Estimator.environment not using in SageMaker.Client.create_hyper_parameter_tuning_job() · Issue #3488 · boto/boto3](https://github.com/boto/boto3/issues/3488), we need to include our environment variables in the `hyperparameters` parameter.

In [None]:
for k, v in estimator.environment.items():
    if k != "MLFLOW_TAGS":
        estimator._hyperparameters[k] = v

In [None]:
from sagemaker.tuner import (
    IntegerParameter,
    CategoricalParameter,
    ContinuousParameter,
    HyperparameterTuner,
)

estimator._hyperparameters.pop("max_train_samples", None)
estimator._hyperparameters.pop("max_val_samples", None)
estimator._hyperparameters.pop("max_test_samples", None)

tuner = HyperparameterTuner(
    estimator,
    max_jobs=18,
    max_parallel_jobs=3,
    objective_type="Maximize",
    objective_metric_name="test_f1",
    metric_definitions=[
        {
            "Name": "test_f1",
            "Regex": "Test F1-score: ([0-9\\.]+)"
        }
    ],
    hyperparameter_ranges={
        "learning_rate": ContinuousParameter(1e-5, 1e-3, scaling_type="Logarithmic"),
        "weight_decay": ContinuousParameter(0.0, 0.1),
        "adam_beta1": ContinuousParameter(0.8, 0.999),
        "adam_beta2": ContinuousParameter(0.8, 0.999),
        "adam_epsilon": ContinuousParameter(1e-8, 1e-6, scaling_type="Logarithmic")
    }
)

In [16]:
tuner.fit(inputs, wait=False)

params.sagemaker_tuning_job_name = tuner.latest_tuning_job.name

print(f"SageMaker tuning job name: {params.sagemaker_tuning_job_name}")

SageMaker tuning job name: pytorch-training-221110-0849


In [17]:
mlflow.log_params({
    "instance_type": estimator.instance_type,
    "instance_count": estimator.instance_count,
    "num_train_epochs_per_child": params.num_train_epochs_per_child,
    "batch_size": params.batch_size,
    "seed": params.seed
})

In [18]:
import pandas as pd

tuner_metrics: pd.DataFrame = sagemaker.HyperparameterTuningJobAnalytics(
    hyperparameter_tuning_job_name=params.sagemaker_tuning_job_name,
    sagemaker_session=sagemaker_session
).dataframe()

tuner_metrics.sort_values("FinalObjectiveValue", ascending=False, inplace=True)
tuner_metrics[["TrainingJobName", "FinalObjectiveValue", "TrainingJobStatus"]]

Unnamed: 0,TrainingJobName,FinalObjectiveValue,TrainingJobStatus
2,pytorch-training-221110-0849-011-b73293fa,0.761191,Completed
1,pytorch-training-221110-0849-012-94a0142f,0.750948,Completed
0,pytorch-training-221110-0849-013-c71b29a3,0.743556,Completed
7,pytorch-training-221110-0849-006-3955426c,0.737553,Completed
9,pytorch-training-221110-0849-004-a1003b73,0.694309,Completed
3,pytorch-training-221110-0849-010-39dcc340,0.694248,Completed
10,pytorch-training-221110-0849-003-2b58641f,0.694248,Completed
11,pytorch-training-221110-0849-002-404d00c0,0.694248,Completed
12,pytorch-training-221110-0849-001-286e660c,0.694248,Completed
8,pytorch-training-221110-0849-005-dfe707c1,0.672951,Completed


Now, we can sort the results by the `FinalObjectiveValue` metric and see the best hyperparameters found.

In [19]:
best_job = tuner_metrics.iloc[0]
best_job.to_dict()

{'adam_beta1': 0.9057509584600282,
 'adam_beta2': 0.9548132227438636,
 'adam_epsilon': 1e-08,
 'learning_rate': 1e-05,
 'weight_decay': 0.027187699510002696,
 'TrainingJobName': 'pytorch-training-221110-0849-011-b73293fa',
 'TrainingJobStatus': 'Completed',
 'FinalObjectiveValue': 0.7611912488937378,
 'TrainingStartTime': Timestamp('2022-11-10 14:44:24-0300', tz='tzlocal()'),
 'TrainingEndTime': Timestamp('2022-11-10 16:01:50-0300', tz='tzlocal()'),
 'TrainingElapsedTimeSeconds': 4646.0}

### Training model with best hyperparameters

After analyzing the results of the hyperparameter tuning process, we can train the model using the best hyperparameters found.

In [20]:
remove_checkpoints(
    bucket_name=bucket_name,
    checkpoint_prefix=f"{prefix}/checkpoints",
    aws_profile_name=params.aws_profile_name
)

estimator = PyTorch(
    entry_point="train.py",
    source_dir="modeling",
    role=params.sagemaker_execution_role_arn,
    sagemaker_session=sagemaker_session,
    py_version="py38",
    framework_version="1.12.0",
    instance_count=1,
    instance_type=instance_type,
    use_spot_instances=True,
    max_wait=21600,
    max_run=21600,
    checkpoint_s3_uri=checkpoint_s3_uri,
    checkpoint_local_path="/opt/ml/checkpoints",
    environment={
        "MLFLOW_TRACKING_URI": params.mlflow_tracking_uri,
        "MLFLOW_EXPERIMENT_NAME": params.mlflow_experiment_name,
        "MLFLOW_TRACKING_USERNAME": params.mlflow_tracking_username,
        "MLFLOW_TRACKING_PASSWORD": params.mlflow_tracking_password,
        "MLFLOW_TAGS": params.mlflow_tags,
        "MLFLOW_RUN_ID": mlflow.active_run().info.run_id,
        "MLFLOW_FLATTEN_PARAMS": "True",
        "HF_MLFLOW_LOG_ARTIFACTS": "True",
        "WANDB_DISABLED": "True"
    },
    hyperparameters={
        "num_train_epochs": params.num_train_epochs,
        "batch_size": params.batch_size,
        "adam_beta1": best_job["adam_beta1"],
        "adam_beta2": best_job["adam_beta2"],
        "adam_epsilon": best_job["adam_epsilon"],
        "learning_rate": best_job["learning_rate"],
        "weight_decay": best_job["weight_decay"],
        "seed": params.seed
    },
)

estimator.fit(inputs, wait=False)

Deleted 68 checkpoints.


In [21]:
mlflow.log_params(
    {
        "best_adam_beta1": best_job["adam_beta1"],
        "best_adam_beta2": best_job["adam_beta2"],
        "best_adam_epsilon": best_job["adam_epsilon"],
        "best_learning_rate": best_job["learning_rate"],
        "best_weight_decay": best_job["weight_decay"]
    }
)

In [16]:
mlflow.end_run()

## Documentation

- [Estimators — sagemaker documentation](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html)
- [HyperparameterTuner — sagemaker documentation](https://sagemaker.readthedocs.io/en/stable/api/training/tuner.html)
- [Configure and Launch a Hyperparameter Tuning Job - Amazon SageMaker](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning-ex-tuning-job.html)
- [Managed Spot Training in Amazon SageMaker - Amazon SageMaker](https://docs.aws.amazon.com/sagemaker/latest/dg/model-managed-spot-training.html)