In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import pandas as pd
import numpy as np
import torch
import sys
import torch.nn as nn

## Ensure TensorFlow is not used
import os
os.environ["USE_TF"] = "0"

## Set Random State for Reproducability
random_state = 42

In [3]:
!pip install evaluate

Collecting evaluate
  Downloading evaluate-0.4.6-py3-none-any.whl.metadata (9.5 kB)
Downloading evaluate-0.4.6-py3-none-any.whl (84 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.1/84.1 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: evaluate
Successfully installed evaluate-0.4.6


In [4]:
# Import Hugging Face Tooling
from transformers import BertTokenizer
from transformers import BertModel
from transformers import Trainer, TrainingArguments
from transformers.modeling_outputs import SequenceClassifierOutput
import evaluate
from datasets import Dataset

In [5]:
# Use CPU/MPS if possible
device = None
if "google.colab" in sys.modules:
    # Running in Colab
    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
else:
    # Not in Colab (e.g., Mac)
    device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")

print("Using device:", device)

Using device: cuda


In [6]:
train_df = pd.read_csv('/content/drive/MyDrive/CIS 5300 Term Project/Final_Data/train_data.csv')
dev_df = pd.read_csv('/content/drive/MyDrive/CIS 5300 Term Project/Final_Data/dev_data.csv')
test_df = pd.read_csv('/content/drive/MyDrive/CIS 5300 Term Project/Final_Data/test_data.csv')

In [7]:
# Compute Class Proportions
p0 = (train_df['label'] == 0).mean() # Computes the percentage of our training dataset that has label = 0
p1 = (train_df['label'] == 1).mean() # Computes the percentage of our training dataset that has label = 1
print(f"{p0  * 100}% of our dataset has label = 0 and {p1  * 100}% of our dataset has label = 1")

78.81711885571573% of our dataset has label = 0 and 21.182881144284256% of our dataset has label = 1


In [8]:
# Define Custom Loss Criterion to Address Class Imbalance
class_weights = torch.tensor([p1, p0]).float().to(device)
custom_criterion = nn.CrossEntropyLoss(weight = class_weights)
print(f"Class Weights: {class_weights}")

Class Weights: tensor([0.2118, 0.7882], device='cuda:0')


In [9]:
# Fetch BERT Tokenizer from HuggingFace
bert_model_name = 'bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(bert_model_name)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

In [10]:
class BertLSTMClassifier(nn.Module):
    def __init__(
        self,
        bert_model_name="bert-base-uncased",
        num_labels=2,
        hidden_dim=256,
        num_layers=2,
        bidirectional=True,
        dropout=0.3
    ):
        super().__init__()

        # Load pretrained BERT encoder
        self.bert = BertModel.from_pretrained(bert_model_name)
        bert_hidden = self.bert.config.hidden_size

        # LSTM configuration
        self.bidirectional = bidirectional
        self.lstm = nn.LSTM(
            input_size=bert_hidden,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=bidirectional
        )

        lstm_output_dim = hidden_dim * (2 if bidirectional else 1)

        # Fully connected classifier layers
        self.fc1 = nn.Linear(lstm_output_dim, 128)
        self.dropout = nn.Dropout(dropout)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, num_labels)

    def forward(self, input_ids, attention_mask, token_type_ids=None, labels=None):
        # BERT encoder
        outputs = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            return_dict=True
        )

        # Extract sequence embeddings
        x = outputs.last_hidden_state  # (batch, seq_len, hidden_size)

        # Feed into LSTM
        lstm_out, (h_n, c_n) = self.lstm(x)     # (batch, seq_len, hidden_dim * num_directions)

        if self.bidirectional:
            # Take the last hidden state of LSTM for forward and backward directions
            x = torch.cat((h_n[-2], h_n[-1]), dim=1)
        else:
            # Take the last hidden state of LSTM
            x = h_n[-1]

        # Classifier layers
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        logits = self.fc2(x)

        return SequenceClassifierOutput(logits=logits)

In [11]:
model = BertLSTMClassifier(bert_model_name, num_labels = 2) # num_labels = 2 since we have 2 classes!

# Create `Hugging Face` Datasets [Train + Dev + Test]
train_hf_dataset = Dataset.from_pandas(train_df)
dev_hf_dataset = Dataset.from_pandas(dev_df)
test_hf_dataset = Dataset.from_pandas(test_df)

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

In [12]:
# Tokenize Text Data
def tokenize_function(row):
    tokens = tokenizer(row['text'], truncation = True, padding = 'max_length', max_length = tokenizer.model_max_length)
    row['input_ids'] = tokens['input_ids']
    row['attention_mask'] = tokens['attention_mask']
    row['token_type_ids'] = tokens['token_type_ids']
    return row

train_hf_dataset = train_hf_dataset.map(tokenize_function)
dev_hf_dataset = dev_hf_dataset.map(tokenize_function)
test_hf_dataset = test_hf_dataset.map(tokenize_function)

# Define Accuracy, Precision, Recall, and F1 Metrics from Hugging Face
accuracy_metric = evaluate.load("accuracy")
precision_metric = evaluate.load("precision")
recall_metric = evaluate.load('recall')
f1_metric = evaluate.load("f1")

Map:   0%|          | 0/26427 [00:00<?, ? examples/s]

Map:   0%|          | 0/3303 [00:00<?, ? examples/s]

Map:   0%|          | 0/3304 [00:00<?, ? examples/s]

Downloading builder script: 0.00B [00:00, ?B/s]

Downloading builder script: 0.00B [00:00, ?B/s]

Downloading builder script: 0.00B [00:00, ?B/s]

Downloading builder script: 0.00B [00:00, ?B/s]

In [13]:
# Define a compute_metrics function
def compute_metrics(eval_pred):
    # Get the model predictions
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)

    # Return Metrics
    return {
        "accuracy": accuracy_metric.compute(predictions=predictions, references=labels)['accuracy'], # Accuracy
        "pos_precision": precision_metric.compute(predictions=predictions, references=labels, pos_label = 1, average = 'binary', zero_division = 0)["precision"], # Precision on the Class w/ Label = 1 [Hate Samples]
        "pos_recall": recall_metric.compute(predictions=predictions, references=labels, pos_label = 1, average = 'binary', zero_division = 0)['recall'], # Recall on the Class w/ Label = 1 [Hate Samples]
        "pos_f1": f1_metric.compute(predictions=predictions, references=labels, pos_label = 1, average = 'binary')["f1"], # F1 Score on the Class w/ Label = 1 [Hate Samples]
        "neg_precision": precision_metric.compute(predictions=predictions, references=labels, pos_label = 0, average = 'binary', zero_division = 0)['precision'], # Precision on the Class w/ Label = 0 [Non-Hate Samples]
        "neg_recall": recall_metric.compute(predictions=predictions, references=labels, pos_label = 0, average = 'binary', zero_division = 0)['recall'], # Recall on the Class w/ Label = 0 [Non-Hate Samples]
        "neg_f1": f1_metric.compute(predictions=predictions, references=labels, pos_label = 0, average = 'binary')['f1'], # F1 Score on the Class w/ Label = 0 [Non-Hate Samples]
        "f1_macro": f1_metric.compute(predictions=predictions, references=labels, average='macro')['f1'], # Macro F1 Score
        "f1_micro": f1_metric.compute(predictions=predictions, references=labels, average='micro')['f1'], # Micro F1 Score
        "f1_weighted": f1_metric.compute(predictions=predictions, references=labels, average='weighted')['f1'], # Weighted F1 Score
    }

In [14]:
# Subclass the `Trainer` Class from HuggingFace to use Custom Loss Criterion
# Create a subclassed Trainer that enables us to use the custom loss function defined earlier
class SubTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs = False, **kwargs):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits
        loss = custom_criterion(logits, labels)
        return (loss, outputs) if return_outputs else loss

# **Initialize the `TrainingArguments` and `Trainer`**

In [15]:
training_args = TrainingArguments(
    output_dir="Milestone3-BERT-BiLSTM-FineTuning",
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    learning_rate=5e-5,
    num_train_epochs=3,
    save_strategy="steps",      # save checkpoints every N steps
    save_steps=50,             # save every 50 steps
    eval_strategy="steps",      # evaluate every N steps
    eval_steps=50,             # evaluate every 50 steps
    logging_strategy="steps",
    logging_steps=50,          # log every 50 steps
    report_to="none",
    full_determinism=True,
    load_best_model_at_end=True, # Load best model(i.e. one that has highest dev F1 score)
    metric_for_best_model="pos_f1", # Metric used for model comparison
    greater_is_better=True # Greater F1 score is better!
)

trainer = SubTrainer(
    model=model,
    args=training_args,
    train_dataset=train_hf_dataset,
    eval_dataset=dev_hf_dataset,
    compute_metrics=compute_metrics,
)

# **Train the Model: `Fine-Tuning`**

In [16]:
trainer.train() # Always Resume from Last Checkpoint to Save Time
trainer.save_model('Milestone3-BERT-BiLSTM-FinalModel') # Save the Final Model
trainer.save_state() # Save the State of the Trainer (e.g. Losses, etc)

Step,Training Loss,Validation Loss,Accuracy,Pos Precision,Pos Recall,Pos F1,Neg Precision,Neg Recall,Neg F1,F1 Macro,F1 Micro,F1 Weighted
50,0.6114,0.592201,0.831971,0.631103,0.498571,0.557063,0.872364,0.921629,0.89632,0.726691,0.831971,0.824422
100,0.5162,0.453224,0.825008,0.562887,0.78,0.653892,0.933991,0.837111,0.882901,0.768397,0.825008,0.834368
150,0.4397,0.435746,0.779594,0.488618,0.858571,0.622798,0.952243,0.758356,0.844311,0.733555,0.779594,0.797366
200,0.4523,0.483843,0.831365,0.585629,0.698571,0.637134,0.914506,0.867076,0.89016,0.763647,0.831365,0.836536
250,0.442,0.413849,0.801998,0.52014,0.848571,0.644951,0.950949,0.789474,0.86272,0.753836,0.801998,0.816569
300,0.4052,0.424093,0.810475,0.534007,0.83,0.649888,0.946275,0.805225,0.870071,0.759979,0.810475,0.823408
350,0.4629,0.423127,0.750227,0.456017,0.925714,0.611033,0.97237,0.703035,0.816054,0.713543,0.750227,0.772604
400,0.4594,0.409776,0.823191,0.556974,0.81,0.66007,0.941794,0.826738,0.880524,0.770297,0.823191,0.833803
450,0.426,0.407511,0.80442,0.524021,0.841429,0.645833,0.949059,0.794468,0.86491,0.755372,0.80442,0.818481
500,0.3869,0.443512,0.832879,0.580435,0.762857,0.659259,0.93034,0.85171,0.88929,0.774275,0.832879,0.84054


# **Evaluate on Train, Dev, and Test Datasets**

In [17]:
# Split: Train, Dev, or Test
def generate_evaluation_results(split):
    dataset = None
    if split == "train":
        dataset = train_hf_dataset
    elif split == "dev" or split == "validation" or split == "val":
        dataset = dev_hf_dataset
    elif split == "test":
        dataset = test_hf_dataset

    results = trainer.evaluate(eval_dataset=dataset, metric_key_prefix=split)
    df_results = pd.DataFrame([results])
    df_results.to_csv(f"BERT-BiLSTM-{split}-results.csv", index=False)
    print(f"Saved {split} evaluation metrics to BERT-BiLSTM-{split}-results.csv")

# Generate Evaluation Results on Train, Dev, and Test Splits
generate_evaluation_results("train")
generate_evaluation_results("dev")
generate_evaluation_results("test")

Saved train evaluation metrics to BERT-BiLSTM-train-results.csv
Saved dev evaluation metrics to BERT-BiLSTM-dev-results.csv
Saved test evaluation metrics to BERT-BiLSTM-test-results.csv


In [18]:
import shutil
import os

# Replace 'my_folder' with the actual name of the folder you want to download
folder_to_zip = '/content/Milestone3-BERT-BiLSTM-FinalModel'
zip_file_name = f'{folder_to_zip}.zip'

# Create the zip archive
shutil.make_archive(folder_to_zip, 'zip', folder_to_zip)

'/content/Milestone3-BERT-BiLSTM-FinalModel.zip'

In [None]:
# Replace 'my_folder' with the actual name of the folder you want to download
folder_to_zip = '/content/Milestone3-BERT-BiLSTM-FineTuning'
zip_file_name = f'{folder_to_zip}.zip'

# Create the zip archive
shutil.make_archive(folder_to_zip, 'zip', folder_to_zip)