# Dataset Preprocessing

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import re
import torch
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
df = pd.read_csv('/content/drive/MyDrive/NLP_PJ/labeled_data.csv')

In [None]:
df.drop(columns=['count', 'hate_speech', 'offensive_language', 'neither'], inplace=True)

In [None]:
def preprocess_text(text):
    # Remove patterns like 'RT @username:' from the tweet and the trailing colon
    text = re.sub(r'!+\s*RT\s*@[\w_]+\s*:?\s*', '', text, flags=re.IGNORECASE)
    text = re.sub(r'@\w+', '', text)
    text = re.sub(r':\s*', '', text)
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
    # Remove emojis (including unicode emojis like '&#128517;')
    text = re.sub(r'&#\d+;', '', text)
    # Remove hashtags
    text = re.sub(r'#\w+', '', text)
    # Remove URLs
    text = re.sub(r'http\S+', '', text)
    # Convert tweet text to lowercase
    text = text.lower()
    # Remove extra whitespaces
    text = text.strip()
    return text

In [None]:
df['tweet'] = df['tweet'].apply(preprocess_text)
df = df[df['tweet'].str.len() > 2]

In [None]:
df.rename(columns={'Unnamed: 0': 'index'}, inplace=True)

print(df['tweet'][15534])


In [None]:
train_data, test_data = train_test_split(df, test_size=0.2, random_state=42)
train_data, val_data = train_test_split(train_data, test_size=0.2, random_state=42)

In [None]:
import matplotlib.pyplot as plt

# Compute sequence lengths
sequence_lengths = [len(text.split()) for text in df['tweet']]

# Plot histogram
plt.hist(sequence_lengths, bins=50)
plt.xlabel('Sequence Length')
plt.ylabel('Frequency')
plt.title('Distribution of Sequence Lengths')
plt.show()

# Choose a percentile (e.g., 95th percentile)
percentile_value = 95
max_length = int(np.percentile(sequence_lengths, percentile_value))
print(f"Chosen maximum length: {max_length}")

# BERT model

In [None]:
%pip install transformers

In [None]:
import torch
import torch.nn as nn
from transformers import BertTokenizer, BertForSequenceClassification, AdamW, BertConfig
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler
from tqdm import tqdm

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels = 3)

dropout_prob = 0.4  # Adjust dropout probability as needed

In [None]:
config = model.config
config.hidden_dropout_prob = dropout_prob

for layer in model.bert.encoder.layer:
    # Add dropout to attention output
    layer.attention.output.add_module('Dropout', nn.Dropout(dropout_prob))
    # Add batch normalization to attention output
    layer.attention.output.add_module('BatchNorm', nn.BatchNorm1d(config.hidden_size))

    # Add dropout to intermediate layer
    layer.intermediate.add_module('Dropout', nn.Dropout(dropout_prob))
    # Add batch normalization to intermediate layer
    layer.intermediate.add_module('BatchNorm', nn.BatchNorm1d(config.intermediate_size))

# Add dropout and batch normalization layers to the classifier
model.classifier.add_module('Dropout', nn.Dropout(dropout_prob))
model.classifier.add_module('BatchNorm', nn.BatchNorm1d(config.hidden_size))


In [None]:
class HateSpeechDataset(Dataset):
    def __init__(self, data, tokenizer, max_length = 25):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        index = self.data.iloc[idx]['index']
        label = self.data.iloc[idx]['class']
        text = self.data.iloc[idx]['tweet']

        # Tokenize text
        inputs = self.tokenizer.encode_plus(
            text,
            None,
            add_special_tokens=True,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_token_type_ids=True
        )

        input_ids = inputs['input_ids']
        attention_mask = inputs['attention_mask']

        return torch.tensor(index), torch.tensor(input_ids), torch.tensor(attention_mask), torch.tensor(label)


In [None]:
train_dataset = HateSpeechDataset(train_data, tokenizer)
val_dataset = HateSpeechDataset(val_data, tokenizer)
test_dataset = HateSpeechDataset(test_data, tokenizer)

In [None]:
for idx, input_ids, attention_mask, label in train_dataset:
    if idx == 15534:
        print(f"Index: {idx}, Label: {label}, Text: {tokenizer.decode(input_ids)}")
        break


In [None]:
batch_size = 4
train_dataloader = DataLoader(train_dataset, sampler=RandomSampler(train_dataset), batch_size=batch_size)
val_dataloader = DataLoader(val_dataset, sampler=SequentialSampler(val_dataset), batch_size=batch_size)
test_dataloader = DataLoader(test_dataset, sampler=SequentialSampler(test_dataset), batch_size=batch_size)


In [None]:
model.to(device)

optimizer = AdamW(model.parameters(), lr=5e-6)
loss_fn = torch.nn.CrossEntropyLoss()

In [None]:
epochs = 7

train_losses = []
val_losses = []
val_accuracy_list = []

for epoch in range(epochs):
    print(f'Epoch {epoch + 1}/{epochs}')
    print('-' * 10)

    model.train()
    total_loss = 0

    for batch in tqdm(train_dataloader):
        indexes = batch[0]
        input_ids = batch[1]
        attention_masks = batch[2]
        labels = batch[3]

        input_ids = input_ids.to(device)
        attention_masks = attention_masks.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        outputs = model(input_ids, attention_mask=attention_masks, labels=labels)
        loss = outputs.loss
        total_loss += loss.item()

        loss.backward()
        optimizer.step()

    avg_train_loss = total_loss / len(train_dataloader)
    print(f'Training loss: {avg_train_loss}')
    train_losses.append(avg_train_loss)

    model.eval()
    val_accuracy = 0
    val_loss = 0

    for batch in tqdm(val_dataloader):
        indexes = batch[0]
        input_ids = batch[1]
        attention_masks = batch[2]
        labels = batch[3]

        input_ids = input_ids.to(device)
        attention_masks = attention_masks.to(device)
        labels = labels.to(device)

        with torch.no_grad():
            outputs = model(input_ids, attention_mask=attention_masks, labels=labels)

        loss = outputs.loss
        logits = outputs.logits

        val_loss += loss.item()

        preds = torch.argmax(logits, dim=1)
        val_accuracy += (preds == labels).float().mean().item()

    avg_val_loss = val_loss / len(val_dataloader)
    avg_val_accuracy = val_accuracy / len(val_dataloader)
    print(f'Validation loss: {avg_val_loss}')
    print(f'Validation accuracy: {avg_val_accuracy}')
    val_losses.append(avg_val_loss)
    val_accuracy_list.append(avg_val_accuracy)


In [None]:
from sklearn.metrics import f1_score

def calculate_f1_score(model, dataloader):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(dataloader):
            input_ids = batch[1].to(device)
            attention_masks = batch[2].to(device)
            labels = batch[3].to(device)

            outputs = model(input_ids, attention_mask=attention_masks)
            logits = outputs.logits

            preds = torch.argmax(logits, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    weighted_f1 = f1_score(all_labels, all_preds, average='weighted')
    macro_f1 = f1_score(all_labels, all_preds, average='macro')

    return weighted_f1, macro_f1

In [None]:
def evaluate_test_dataset(model, dataloader, loss_fn):
    model.eval()
    total_loss = 0
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for batch in tqdm(dataloader):
            input_ids = batch[1].to(device)
            attention_masks = batch[2].to(device)
            labels = batch[3].to(device)

            outputs = model(input_ids, attention_mask=attention_masks, labels=labels)
            loss = outputs.loss
            logits = outputs.logits

            total_loss += loss.item()

            preds = torch.argmax(logits, dim=1)
            total_correct += (preds == labels).sum().item()
            total_samples += labels.size(0)

    avg_loss = total_loss / len(dataloader)
    accuracy = total_correct / total_samples

    return accuracy, avg_loss

In [None]:
weighted_f1, macro_f1 = calculate_f1_score(model, test_dataloader)
print("Weighted F1 Score:", weighted_f1)
print("Macro F1 Score:", macro_f1)

In [None]:
test_accuracy, test_loss = evaluate_test_dataset(model, test_dataloader, loss_fn)
print("Test Accuracy:", test_accuracy)
print("Test Loss:", test_loss)

In [None]:
import matplotlib.pyplot as plt
# Plot training losses
plt.figure(figsize=(10, 5))
plt.plot(range(1, epochs + 1), train_losses, label='Train Loss')
plt.plot(range(1, epochs + 1), val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training vs Validation Loss over Epochs')
plt.legend()
plt.show()

# Plot validation accuracy
plt.figure(figsize=(10, 5))
plt.plot(range(1, epochs + 1), val_accuracy_list, label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Validation Accuracy over Epochs')
plt.legend()
plt.show()


# Check points and Demo

In [None]:
import os

def save_checkpoint(model, filepath=None, default_dir="/content/drive/MyDrive/NLP_PJ", default_filename="bert_model_checkpoint.pth"):

    if filepath is None or not os.path.exists(os.path.dirname(filepath)):
        # Create the default directory if it doesn't exist
        os.makedirs(default_dir, exist_ok=True)
        filepath = os.path.join(default_dir, default_filename)

    torch.save(model.state_dict(), filepath)
    print("Model checkpoint saved successfully at:", filepath)

In [None]:
def load_checkpoint(model, filepath = "/content/drive/MyDrive/NLP_PJ/bert_model_checkpoint.pth"):
    model.load_state_dict(torch.load(filepath))
    print("Model checkpoint loaded successfully.

In [None]:
save_checkpoint(model)

In [None]:
loaded_model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=3)
load_checkpoint(loaded_model)

In [None]:
weighted_f1, macro_f1 = calculate_f1_score(loaded_model, test_dataloader)
print("Weighted F1 Score (Loaded Model):", weighted_f1)
print("Macro F1 Score (Loaded Model):", macro_f1)