In [1]:
import sys
sys.path.append('/home/hice1/dnguyen409/.local/lib/python3.10/site-packages')
!{sys.executable} -m pip install transformers
from transformers import AutoTokenizer

Defaulting to user installation because normal site-packages is not writeable
Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from torch.utils.data import random_split, Dataset, DataLoader 

In [3]:
df1 = pd.read_csv("data/go-emotions/goemotions_1.csv")
df2 = pd.read_csv("data/go-emotions/goemotions_2.csv")
df3 = pd.read_csv("data/go-emotions/goemotions_3.csv")

df = pd.concat([df1, df2, df3])
df.drop(columns=['example_very_unclear', 'rater_id', 'created_utc', 'link_id', 'parent_id', 'subreddit', 'author', 'id'], inplace=True)

class_names = df.columns[1:].tolist()

In [4]:
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, num_classes,
                 num_layers=2, dropout=0.3, bidirectional=True):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0,
            bidirectional=bidirectional
        )
        self.bidirectional = bidirectional
        lstm_output_dim = hidden_size * 2 if bidirectional else hidden_size
        self.fc = nn.Linear(lstm_output_dim, num_classes)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input_ids, attention_mask=None):
        embedded = self.embedding(input_ids) 

        lstm_out, (h_n, c_n) = self.lstm(embedded)

        if self.bidirectional:
            forward_hidden = h_n[-2, :, :]  
            backward_hidden = h_n[-1, :, :] 
            hidden = torch.cat([forward_hidden, backward_hidden], dim=1)  
        else:
            hidden = h_n[-1, :, :]  

        out = self.dropout(hidden)
        out = self.fc(out)  

        return out

def preprocess_texts(texts):
    return tokenizer(
        texts,
        max_length=150,
        padding='max_length',
        truncation=True,
        return_tensors='pt'
    )

class GoEmotionsDataset(Dataset):
    def __init__(self, df, text, predict_only=False):
        tokenized = preprocess_texts(text)
        self.input_ids = tokenized["input_ids"]
        self.attention_mask = tokenized["attention_mask"]
        if not predict_only:
            self.labels = torch.tensor(df.iloc[:, 1:].values, dtype=torch.float32)
        else:
            self.labels = None

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        if self.labels is not None:
            return self.input_ids[idx], self.attention_mask[idx], self.labels[idx]
        else:
            return self.input_ids[idx], self.attention_mask[idx], None

def compute_accuracy(outputs, labels, threshold=0.5):
    preds = torch.sigmoid(outputs) >= threshold
    correct = (preds == labels.int()).sum().float()
    accuracy = correct / labels.numel()
    return accuracy.item()

def train_lstm(model, dataset, criterion, optimizer, device, epochs=30):
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    generator = torch.Generator().manual_seed(42)
    train_dataset, val_dataset = random_split(dataset, [train_size, val_size], generator=generator)

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
    history = [[], []]

    for epoch in range(epochs):
        model.train()
        epoch_loss = 0
        epoch_accuracy = 0

        for input_ids, attention_mask, labels in train_loader:
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()
            epoch_accuracy += compute_accuracy(outputs, labels)

        avg_train_loss = epoch_loss / len(train_loader)
        avg_train_accuracy = epoch_accuracy / len(train_loader)
        history[0].append((avg_train_loss, avg_train_accuracy))

        model.eval()
        val_loss = 0
        val_accuracy = 0
        with torch.no_grad():
            for input_ids, attention_mask, labels in val_loader:
                input_ids = input_ids.to(device)
                attention_mask = attention_mask.to(device)
                labels = labels.to(device)
                outputs = model(input_ids, attention_mask)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                val_accuracy += compute_accuracy(outputs, labels)

        avg_val_loss = val_loss / len(val_loader)
        avg_val_accuracy = val_accuracy / len(val_loader)
        history[1].append((avg_val_loss, avg_val_accuracy))
        print(f"Epoch [{epoch+1}/{epochs}], "
              f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {avg_train_accuracy:.4f}, "
              f"Val Loss: {avg_val_loss:.4f}, Val Accuracy: {avg_val_accuracy:.4f}")

    return history, val_loader

def predict(model, dataloader, device, threshold=0.5):
    model.eval()
    predictions = []
    with torch.no_grad():
        for input_ids, attention_mask, _ in dataloader:
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            outputs = model(input_ids, attention_mask)
            preds = torch.sigmoid(outputs) >= threshold
            predictions.append(preds)
    return torch.vstack(predictions)

In [5]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased', padding_side="left")
dataset = GoEmotionsDataset(df, df["text"].tolist())

Using device: cuda


In [None]:
print('starting...')
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
model = LSTMModel(
    vocab_size=tokenizer.vocab_size,
    embedding_dim=300,
    hidden_size=256,
    num_classes=28
).to(device)

criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

history, val_loader = train_lstm(model, dataset, criterion, optimizer, device, epochs=30)

preds = predict(model, val_loader, device)
print("Predictions on validation set:")
print(preds)

starting...


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Av

In [None]:
torch.save(model.state_dict(), "GoEmotionsLSTM.pth")
print('Model saved!')

In [None]:
# if you don't want to run Execute Code/Training.
# upload .pth file downloaded in Training if you're in a new session.

model_pth = "GoEmotionsLSTM.pth" # change this if you name things like a weirdo

loaded_model = LSTMModel(
    vocab_size=tokenizer.vocab_size,
    embedding_dim=300,
    hidden_size=128,
    num_classes=28
)

loaded_model.load_state_dict(torch.load(model_pth))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
loaded_model.to(device)
loaded_model.eval()

val_dataset = GoEmotionsDataset(df, df["text"].tolist())
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# for predicting new text using given model
# requires you run Preprocess and Setup Code sections to define class_names

def predict_emotion(text, model=loaded_model, device=torch.device("cuda" if torch.cuda.is_available() else "cpu"), thresholds=None, showProbs=False, returnOne=False):
    inputs = tokenizer(
        text,
        max_length=150,
        padding='max_length',
        truncation=True,
        return_tensors='pt',
    )
    
    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    with torch.no_grad():
        outputs = model(inputs['input_ids'], inputs['attention_mask'])
        probs = torch.sigmoid(outputs).flatten()

    if thresholds is None:
        threshold_tensor = torch.full_like(probs, 0.5)
    else:
        threshold_tensor = torch.tensor(thresholds, dtype=probs.dtype, device=probs.device)

    preds = (probs >= threshold_tensor).int().numpy()

    if showProbs:  # showProbs displays the probability of each class.
        annotated_probs = {class_names[i]: probs[i].item() for i in range(len(class_names))}
        sorted_probs = dict(sorted(annotated_probs.items(), key=lambda item: item[1], reverse=True))
        print(f'Class probabilities for "{text}": {sorted_probs}')

    if returnOne:  # returnOne returns only the highest probability class, ignoring thresholds.
        return class_names[probs.argmax().item()]

    return [class_names[i] for i, val in enumerate(preds) if val == 1]

def tune_thresholds(model=loaded_model, val_loader=val_loader, step=0.05, device=torch.device("cuda" if torch.cuda.is_available() else "cpu")):
    model.eval()
    all_outputs = []
    all_labels = []

    with torch.no_grad():
        for input_ids, attention_mask, labels in val_loader:
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)
            
            outputs = model(input_ids, attention_mask)
            probs = torch.sigmoid(outputs)
            
            all_outputs.append(probs.cpu())
            all_labels.append(labels.cpu())
    
    all_outputs = torch.cat(all_outputs)
    all_labels = torch.cat(all_labels)

    num_classes = all_labels.shape[1]
    thresholds = []

    for i in range(num_classes):
        best_threshold = 0.5
        best_f1 = 0.0
        for threshold in np.arange(0.0, 1.0 + step, step):
            preds = (all_outputs[:, i] >= threshold).int().numpy()
            true = all_labels[:, i].int().numpy()
            current_f1 = f1_score(true, preds, zero_division=0)
            if current_f1 > best_f1:
                best_f1 = current_f1
                best_threshold = threshold
        thresholds.append(best_threshold)

    print("Optimal thresholds per class:")
    for idx, thr in enumerate(thresholds):
        print(f"Label {idx}: {thr:.2f}")
    return thresholds

thresholds = tune_thresholds(model=loaded_model, val_loader=val_loader, device=device)

In [None]:
# batch/basic phrase test

test = [
    "oh my god that is amazing!",
    "fuck you dude.",
    "politics are so scary man",
    "what am i even doing here...?",
    "oh my god",
    "oh my god!",
    "oh my god?",
    "i know you're trying your best.",
    "i hope you're trying your best.",
    "i think you're trying your best.",
    "i know you're not trying your best."
]

for phrase in test:
    print(f'"{phrase}"\n', predict_emotion(text=phrase, thresholds=thresholds))

In [None]:
# returnOne phrase test

test = [
    "oh my god that is amazing!",
    "fuck you dude.",
    "politics are so scary man",
    "what am i even doing here...?",
    "oh my god",
    "oh my god!",
    "oh my god?",
    "i know you're trying your best.",
    "i hope you're trying your best.",
    "i think you're trying your best.",
    "i know you're not trying your best."
]

for phrase in test:
    print(f'"{phrase}"\n', predict_emotion(text=phrase, thresholds=thresholds, returnOne=True))

In [None]:
# threshold option test

test = "i hope you're trying your best"
print(predict_emotion(text=test, thresholds=thresholds))

In [None]:
# showProbs option test

test = "hello everyone i'm very cool because i'm an uno draw 4 card irl"
print(predict_emotion(text=test, thresholds=thresholds, showProbs=True, returnOne= True))