In [None]:
import pandas as pd
#Loads dataset and prints few rows
train_df = pd.read_csv("/kaggle/input/drawguess/train_sent_emo.csv")
test_df = pd.read_csv("/kaggle/input/drawguess/test_sent_emo.csv")

train_df.head()

In [None]:
#Sorts the dialogues in correct utterance order
train_df = train_df.sort_values(["Dialogue_ID", "Utterance_ID"])
test_df  = test_df.sort_values(["Dialogue_ID", "Utterance_ID"])

In [None]:
from collections import Counter

def create_windows(df):
    X, y = [], []
    #Groups by dialogue
    grouped = df.groupby("Dialogue_ID")
    
    for _, dialog in grouped:
        #For each dialogue, ensures utterances are in order
        dialog = dialog.sort_values("Utterance_ID")

        #Converts utterances and emotions in a dialogue to list
        utts = dialog["Utterance"].tolist()
        emos = dialog["Emotion"].tolist()
        
        for i in range(len(utts) - 4):
            #Takes a size 5 subset of the utts and emos lists for window
            window_utts = utts[i:i+5]
            window_emos = emos[i:i+5]
            
            #Counts occurances of each emotion in window then picks the top(majority)
            counts = Counter(window_emos)
            top = counts.most_common()

            #Ignores the window is there is tie in majority emotion
            if len(top) > 1 and top[0][1] == top[1][1]:
                continue  
            #Adds the window to the list of all windows(input)
            X.append(window_utts)
            #Adds the emotion(label) to list of all emotions(output)
            y.append(top[0][0])
    
    return X, y
#Gets input,output for the RNN using above function
train_windows, train_labels = create_windows(train_df)
test_windows, test_labels = create_windows(test_df)


In [None]:
import re

def clean_text(s):
    #Converts to lower case and only keeps the character given(discards other chars - replaces with "")
    s = s.lower()
    s = re.sub(r"[^a-z0-9.,!? ]", "", s)
    return s

def combine_window(window):
    #Applies above clean function for each utterance in a window and puts it in a list.
    #Then elements of the list are seperated with a "<SEP>"
    return " <SEP> ".join([clean_text(u) for u in window])
#Applies above function for each window.
#So output will be a list where every element is a single string consisting of the 5 utterances in a window seperated by "<SEP>"
train_texts = [combine_window(w) for w in train_windows]
test_texts  = [combine_window(w) for w in test_windows]


In [None]:
word2idx = {"<PAD>": 0, "<UNK>": 1}
idx = 2
#Loops through all texts in train_texts 
for text in train_texts:
    #Loops through each word in a text(split into seperate words)
    for w in text.split():
        #Adds every unique word to dictionary and assigns it an integer value(idx)
        if w not in word2idx:
            word2idx[w] = idx
            idx += 1
#Returns size of dictionary including the pad and unk
vocab_size = len(word2idx)
vocab_size

In [None]:
import torch

MAX_LEN = 80  #defines length of each sqeuence

def encode(text):
    tokens = text.split()#Split the text sequence into list of words
    ids = [word2idx.get(t, 1) for t in tokens]#Get the index for each word in tokens from above dictionary(if word not present let index be 1(UNK))
    if len(ids) < MAX_LEN:
        ids += [0] * (MAX_LEN - len(ids)) #Pads index with 0 until it reaches max length
    return ids[:MAX_LEN] #Returns only max length number 

#Creates tensor for each window string(number of window strings,80) in both train and test
X_train = torch.tensor([encode(t) for t in train_texts])
X_test  = torch.tensor([encode(t) for t in test_texts])


In [None]:
#Assigns an integer to each unique emotion
emotion2idx = {e: i for i, e in enumerate(sorted(set(train_labels)))}
#Reverse - maps the emotion with its integer
idx2emotion = {i: e for e, i in emotion2idx.items()}

#Get tensors of emotions converted into their labels(integers)
y_train = torch.tensor([emotion2idx[e] for e in train_labels])
y_test  = torch.tensor([emotion2idx[e] for e in test_labels])

In [None]:
from sklearn.model_selection import train_test_split
#Splits training data into 90% for training and 10% for valuation(not same as testing)
X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size=0.1, random_state=42
)

In [None]:
import torch.nn as nn

class SimpleRNNClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_classes):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dim)
        self.rnn = nn.RNN(embed_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        x = self.embed(x)
        out, hidden = self.rnn(x)
        return self.fc(hidden.squeeze(0))


In [None]:
from torch.utils.data import TensorDataset, DataLoader

batch_size = 32
#Creates batches of data
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size, shuffle=True)
val_loader   = DataLoader(TensorDataset(X_val, y_val), batch_size)
#Instantiates the model
model = SimpleRNNClassifier(
    vocab_size=vocab_size,
    embed_dim=100,
    hidden_dim=128,
    num_classes=len(emotion2idx)
)
#Uses cross entropy for loss
criterion = nn.CrossEntropyLoss()
#Uses Adam optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)


In [None]:
train_losses, val_losses = [], []
train_accs, val_accs = [], []
#Training loop
for epoch in range(10):
    #Training mode
    model.train()
    total, correct, total_loss = 0, 0, 0

    for xb, yb in train_loader:
        optimizer.zero_grad() #Resets gradients
        logits = model(xb) #Runs input through model and gets output
        loss = criterion(logits, yb) #Computes loss 
        loss.backward() #Backprop
        optimizer.step() #Optimizes parameters

        total_loss += loss.item() #Adds loss in each run
        preds = logits.argmax(1) #Gets highest value from output(logits)
        correct += (preds == yb).sum().item() #Checks how many were correct predictions from output
        total += len(yb)
    #Keeps track of loss and accuracy 
    train_losses.append(total_loss / len(train_loader)) 
    train_accs.append(correct / total)

    #Validation mode(all steps same as above but only validation - no updation of paramaters)
    model.eval()
    v_total, v_correct, v_loss = 0, 0, 0
    
    with torch.no_grad():
        for xb, yb in val_loader:
            logits = model(xb)
            loss = criterion(logits, yb)
            v_loss += loss.item()
            preds = logits.argmax(1)
            v_correct += (preds == yb).sum().item()
            v_total += len(yb)

    val_losses.append(v_loss / len(val_loader))
    val_accs.append(v_correct / v_total)

    print(f"Epoch {epoch+1} | Train Acc: {train_accs[-1]:.3f} | Val Acc: {val_accs[-1]:.3f}")


In [None]:
from sklearn.metrics import f1_score
#Computes f1 score after putting model in validation mode
model.eval()
preds = []

with torch.no_grad():
    for xb, _ in val_loader:
        preds.extend(model(xb).argmax(1).tolist()) #List of all prediction

macro_f1 = f1_score(y_val.tolist(), preds, average="macro")
print("Macro F1:", macro_f1)

Till now we had used the simple RNN model - hence validation accuracy did not improve and remained low and constant. Now we try to improve

In [None]:
import torch.nn as nn

class ImprovedRNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_classes):
        super().__init__()
        
        # Bigger embedding dimension(from 100 → 200)
        self.embed = nn.Embedding(vocab_size, embed_dim)
        
        # Same RNN structure
        self.rnn = nn.RNN(embed_dim, hidden_dim, batch_first=True)
        
        # Dropout added → reduces overfitting
        self.dropout = nn.Dropout(0.4)
        
        # Final classifier
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        x = self.embed(x)              # (batch, seq, embed_dim)
        out, hidden = self.rnn(x)      # hidden: (1, batch, hidden_dim)
        hidden = hidden.squeeze(0)     # (batch, hidden_dim)
        hidden = self.dropout(hidden)  # Apply dropout
        logits = self.fc(hidden)       # (batch, num_classes)
        return logits


In [None]:
model = ImprovedRNN(
    vocab_size=vocab_size,
    embed_dim=200,       # increased from 100
    hidden_dim=128,
    num_classes=len(emotion2idx)
)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

In [None]:
train_losses, val_losses = [], []
train_accs, val_accs = [], []
#Training loop
for epoch in range(10):
    #Training mode
    model.train()
    total, correct, total_loss = 0, 0, 0

    for xb, yb in train_loader:
        optimizer.zero_grad() #Resets gradients
        logits = model(xb) #Runs input through model and gets output
        loss = criterion(logits, yb) #Computes loss 
        loss.backward() #Backprop
        optimizer.step() #Optimizes parameters

        total_loss += loss.item() #Adds loss in each run
        preds = logits.argmax(1) #Gets highest value from output(logits)
        correct += (preds == yb).sum().item() #Checks how many were correct predictions from output
        total += len(yb)
    #Keeps track of loss and accuracy 
    train_losses.append(total_loss / len(train_loader)) 
    train_accs.append(correct / total)

    #Validation mode(all steps same as above but only validation - no updation of paramaters)
    model.eval()
    v_total, v_correct, v_loss = 0, 0, 0
    
    with torch.no_grad():
        for xb, yb in val_loader:
            logits = model(xb)
            loss = criterion(logits, yb)
            v_loss += loss.item()
            preds = logits.argmax(1)
            v_correct += (preds == yb).sum().item()
            v_total += len(yb)

    val_losses.append(v_loss / len(val_loader))
    val_accs.append(v_correct / v_total)

    print(f"Epoch {epoch+1} | Train Acc: {train_accs[-1]:.3f} | Val Acc: {val_accs[-1]:.3f}")


In [None]:
#Computes f1 score after putting model in validation mode
model.eval()
preds = []

with torch.no_grad():
    for xb, _ in val_loader:
        preds.extend(model(xb).argmax(1).tolist()) #List of all prediction

macro_f1 = f1_score(y_val.tolist(), preds, average="macro")
print("Macro F1:", macro_f1)