In [None]:
import torch
from transformers import BertTokenizer, BertForSequenceClassification
from transformers import Trainer, TrainingArguments
from datasets import Dataset
import pandas as pd
from sklearn.model_selection import train_test_split

RANDOM_SEED = 69
TOKEN_LIMIT = 512
TORCH_SEED = 69

In [None]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
torch.manual_seed(TORCH_SEED)

In [None]:
data = pd.read_csv('emotions_dataset.csv')

dataset = Dataset.from_pandas(data)

In [None]:
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

In [None]:
def tokenize_function(data):
    return tokenizer(data['text'], truncation=True, max_length=TOKEN_LIMIT)

tokenized_dataset = dataset.map(tokenize_function, batched=True)

In [None]:
df = tokenized_dataset.to_pandas()

train_df, eval_df = train_test_split(
    df,
    test_size=0.2,
    stratify=df['source'],
    random_state=RANDOM_SEED
)

train_df = train_df.reset_index(drop=True)
eval_df = eval_df.reset_index(drop=True)

train_dataset = Dataset.from_pandas(train_df)
eval_dataset = Dataset.from_pandas(eval_df)

train_dataset = train_dataset.map(lambda row: {'labels': row['polarity']})
eval_dataset = eval_dataset.map(lambda row: {'labels': row['polarity']})

print(f"Training size: {len(train_dataset)}, Evaluation size: {len(eval_dataset)}")

In [None]:
from transformers import DataCollatorWithPadding

data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='macro')
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

In [None]:
batch_size = 16
epochs = 3
total_steps = (len(train_dataset) // batch_size) * epochs

# Base BERT model

In [None]:
base_model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=6)

In [None]:
training_args = TrainingArguments(
    output_dir="./emotions_base_model_results",
    eval_strategy="epoch",
    # eval_steps=500,
    learning_rate=3e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=epochs,
    weight_decay=0.01,
    warmup_steps=int(0.1 * total_steps),
    save_strategy="epoch",
    save_total_limit=5,
    logging_dir="./logs",
    report_to="none",
    logging_steps=500,
    load_best_model_at_end=True
)

trainer = Trainer(
    model=base_model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    processing_class=tokenizer,
    compute_metrics=compute_metrics,
    data_collator=data_collator
)

trainer.train()

trainer.evaluate()

base_model.save_pretrained('./emotions_base_model')
tokenizer.save_pretrained('./emotions_base_model')

# Proposed Model

In [None]:
# Unzip pre trained BERT
!unzip -q '/content/drive/MyDrive/Trained Models/mlm_pretraining_6.zip' -d ./

In [None]:
tokenizer = BertTokenizer.from_pretrained("./mlm_pretraining_6")

# To verify is tokenizer is expanded properly, else can ignore
test_slang = "gratz"
test_emoji = "✅"

slang_id = tokenizer.convert_tokens_to_ids(test_slang)
emoji_id = tokenizer.convert_tokens_to_ids(test_emoji)

print(f"Token ID for slang '{test_slang}': {slang_id}")
print(f"Token ID for emoji '{test_emoji}': {emoji_id}")

if slang_id == tokenizer.unk_token_id:
    print(f"Slang '{test_slang}' is not in the vocabulary.")
else:
    print(f"Slang '{test_slang}' is in the vocabulary.")

if emoji_id == tokenizer.unk_token_id:
    print(f"Emoji '{test_emoji}' is not in the vocabulary.")
else:
    print(f"Emoji '{test_emoji}' is in the vocabulary.")

In [None]:
def tokenize_function(data):
    return tokenizer(data['text'], truncation=True, max_length=TOKEN_LIMIT)

tokenized_dataset = dataset.map(tokenize_function, batched=True)

In [None]:
df = tokenized_dataset.to_pandas()

train_df, eval_df = train_test_split(
    df,
    test_size=0.2,
    stratify=df['source'],
    random_state=RANDOM_SEED
)

train_df = train_df.reset_index(drop=True)
eval_df = eval_df.reset_index(drop=True)

train_dataset = Dataset.from_pandas(train_df)
eval_dataset = Dataset.from_pandas(eval_df)

train_dataset = train_dataset.map(lambda row: {'labels': row['polarity']})
eval_dataset = eval_dataset.map(lambda row: {'labels': row['polarity']})

print(f"Training size: {len(train_dataset)}, Evaluation size: {len(eval_dataset)}")

In [None]:
from transformers import DataCollatorWithPadding

data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='macro')
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

In [None]:
batch_size = 16
epochs = 3
total_steps = (len(train_dataset) // batch_size) * epochs

In [None]:
from torch.nn import CrossEntropyLoss
from transformers import BertModel
import torch.nn as nn

class WHLA_BERT(nn.Module):
    def __init__(self, pretrained_model="bert-base-uncased", num_labels=2):
        super(WHLA_BERT, self).__init__()

        self.bert = BertModel.from_pretrained(pretrained_model, output_hidden_states=True)
        self.hidden_size = self.bert.config.hidden_size

        self.gates = nn.Parameter(torch.ones(4))
        self.fc = nn.Linear(self.hidden_size, num_labels)
        self.dropout = nn.Dropout(0.5)
        self.layer_norm = nn.LayerNorm(self.hidden_size)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, return_dict=True)
        hidden_states = outputs.hidden_states

        L9 = hidden_states[-4]
        L10 = hidden_states[-3]
        L11 = hidden_states[-2]
        L12 = hidden_states[-1]

        weighted_sum = self.gates[0] * L9 + self.gates[1] * L10 + self.gates[2] * L11 + self.gates[3] * L12
        normalized_sum = self.layer_norm(weighted_sum)
        cls_representation = normalized_sum[:, 0, :]

        logits = self.fc(self.dropout(cls_representation))

        if labels is not None:
            loss_fct = CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, logits.size(-1)), labels.view(-1))
            return {"loss": loss, "logits": logits}
        return logits

In [None]:
emotions_proposed_model = WHLA_BERT(pretrained_model="./mlm_pretraining_6", num_labels=6)

In [None]:
emotions_proposed_model_training_args = TrainingArguments(
    output_dir="./emotions_proposed_model_results",
    eval_strategy="epoch",
    # eval_steps=500,
    learning_rate=3e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=epochs,
    weight_decay=0.01,
    warmup_steps=int(0.1 * total_steps),
    save_strategy="epoch",
    save_total_limit=5,
    logging_dir="./logs",
    report_to="none",
    logging_steps=500,
    load_best_model_at_end=True
)

emotions_proposed_model_trainer = Trainer(
    model=emotions_proposed_model,
    args=emotions_proposed_model_training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    processing_class=tokenizer,
    compute_metrics=compute_metrics,
    data_collator=data_collator
)

emotions_proposed_model_trainer.train()

emotions_proposed_model_trainer.evaluate()