<a href="https://colab.research.google.com/github/GangitiNeeraj4120/Spider_ML_Lats/blob/main/Task1_NLP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import ast
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import math

In [None]:
class Vocab:
  def __init__(self, texts):
    self.special = ['[PAD]', '[CLS]', '[SEP]', '[UNK]']
    self.word2idx = {w:i for i, w in enumerate(self.special)}
    self.idx2word = {i:w for i, w in enumerate(self.special)}
    self.build(texts)

  def build(self, texts):
    for text in texts:
      for word in text.split():
        if word not in self.word2idx:
          idx = len(self.word2idx)
          self.word2idx[word] = idx
          self.idx2word[idx] = word

  def encode(self, text, max_len):
    tokens = ['[CLS]'] + text.split()
    ids = [self.word2idx.get(t, self.word2idx['[UNK]']) for t in tokens]
    ids = ids[:max_len] #Trunucate
    ids += [0] * (max_len - len(ids)) #Padding
    return ids

In [None]:
class DailyDialogDataset(Dataset):
  def __init__(self, df, vocab, max_len=128):
    self.dialogs = df['dialog']
    self.labels = df['emotion']
    self.vocab = vocab
    self.max_len = max_len

  def __getitem__(self, idx):
    dialog_list = ast.literal_eval(self.dialogs.iloc[idx])
    text = dialog_list[0]
    input_ids = self.vocab.encode(text, self.max_len)

    emotion_str = self.labels.iloc[idx]
    from collections import Counter
    emotion_list = list(map(int, emotion_str.strip("[]").split()))
    label = Counter(emotion_list).most_common(1)[0][0]
    return torch.tensor(input_ids), torch.tensor(label)

  def __len__(self):
    return len(self.dialogs)

In [None]:
class MultiHeadSelfAttention(nn.Module):
  def __init__(self, d_model, num_heads):
    super().__init__()
    assert d_model % num_heads == 0
    self.num_heads = num_heads
    self.d_k = d_model // num_heads

    self.Wq = nn.Linear(d_model, d_model)
    self.Wk = nn.Linear(d_model, d_model)
    self.Wv = nn.Linear(d_model, d_model)
    self.Wo = nn.Linear(d_model, d_model)

  def forward(self, x):
    B, T, D = x.shape #Batch Size, Sequence length, Embedding dimensions

    Q = self.Wq(x).view(B, T, self.num_heads, self.d_k).transpose(1, 2) #Reshaping
    K = self.Wk(x).view(B, T, self.num_heads, self.d_k).transpose(1, 2)
    V = self.Wv(x).view(B, T, self.num_heads, self.d_k).transpose(1, 2)

    scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k) #Attention = Softmax(Q.K'/âˆšd_k)V
    attn = torch.softmax(scores, dim=1) #Softmax applied alon the rows

    out = torch.matmul(attn, V) #Matrix multiplication(matmul) with V
    out = out.transpose(1, 2).contiguous().view(B, T, D) #Reshaping back to merge the heads

    return self.Wo(out)

In [None]:
class EncoderLayer(nn.Module):
  def __init__(self, d_model, num_heads, d_ff):
    super().__init__()
    self.attn = MultiHeadSelfAttention(d_model, num_heads)
    self.norm1 = nn.LayerNorm(d_model)
    self.ff = nn.Sequential(
        nn.Linear(d_model, d_ff),
        nn.ReLU(),
        nn.Linear(d_ff, d_model)
    )
    self.norm2 = nn.LayerNorm(d_model)

  def forward(self, x):
    x = self.norm1(x + self.attn(x))#residual connection
    x = self.norm2(x + self.ff(x))
    return x

In [None]:
class TransformerClassifier(nn.Module):
  def __init__(self, vocab_size, d_model, num_heads, d_ff, num_layers, num_classes):
    super().__init__()
    self.embedding = nn.Embedding(vocab_size, d_model)
    self.positional = PositionalEncoding(d_model)
    self.layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)])
    self.classifier = nn.Linear(d_model, num_classes)

  def forward(self, x):
    x = x.to(self.embedding.weight.device)

    x = self.embedding(x)
    z = self.positional(x)

    for layer in self.layers:
      x = layer(x)

    cls_token = x[:, 0]
    return self.classifier(cls_token)

In [None]:
class PositionalEncoding(nn.Module):
  def __init__(self, d_model, max_len=512):
    super().__init__()
    pe = torch.zeros(max_len, d_model)
    position = torch.arange(0, max_len).unsqueeze(1) #pos(2i) = sin(pos/10000^(2i/d_model))
    div = torch.exp(torch.arange(0, d_model, 2)*(-math.log(10000.0)/d_model))
    pe[:, 0::2] = torch.sin(position*div)
    pe[:, 1::2] = torch.cos(position*div)
    self.register_buffer("pe", pe.unsqueeze(0))

  def forward(self, x):
    return x + self.pe[:, :x.size(1)]

In [None]:
train_df = pd.read_csv("train.csv")
test_df = pd.read_csv("test.csv")

all_texts = []
for d in train_df['dialog']:
  all_texts.append(" ".join(eval(d)))

vocab = Vocab(all_texts)

dataset = DailyDialogDataset(train_df, vocab)
train_loader = DataLoader(dataset, batch_size=32, shuffle=True)

test_dataset = DailyDialogDataset(test_df, vocab)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

model = TransformerClassifier(
    vocab_size = len(vocab.word2idx),
    d_model=128,
    num_heads=4,
    d_ff=256,
    num_layers=2,
    num_classes=7
)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

TransformerClassifier(
  (embedding): Embedding(25191, 128)
  (positional): PositionalEncoding()
  (layers): ModuleList(
    (0-1): 2 x EncoderLayer(
      (attn): MultiHeadSelfAttention(
        (Wq): Linear(in_features=128, out_features=128, bias=True)
        (Wk): Linear(in_features=128, out_features=128, bias=True)
        (Wv): Linear(in_features=128, out_features=128, bias=True)
        (Wo): Linear(in_features=128, out_features=128, bias=True)
      )
      (norm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
      (ff): Sequential(
        (0): Linear(in_features=128, out_features=256, bias=True)
        (1): ReLU()
        (2): Linear(in_features=256, out_features=128, bias=True)
      )
      (norm2): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
    )
  )
  (classifier): Linear(in_features=128, out_features=7, bias=True)
)

In [None]:
from collections import Counter

labels = []
for e in train_df['emotion']:
    labels.append(int(e.strip("[]").split()[-1]))

print(Counter(labels))


Counter({0: 7595, 4: 3031, 6: 159, 5: 141, 1: 130, 2: 47, 3: 15})


In [None]:
labels = []
for e in train_df['emotion']:
  labels.append(int(e.strip("[]").split()[-1]))

from collections import Counter
Counts = Counter(labels)
total = sum(Counts.values())

class_weights = []
for i in range(7):
  class_weights.append(math.log(total/(Counts[i])))

class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)


In [None]:
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

In [None]:
def evaluate(model, DataLoader, device):
  model.eval()
  correct = 0
  total = 0
  with torch.no_grad():
    for batch_x, batch_y in test_loader:
      batch_x = batch_x.to(device, non_blocking=True)
      batch_y = batch_y.to(device, non_blocking=True)

      logits = model(batch_x)
      predictions = torch.argmax(logits, dim=1)

      correct += (predictions == batch_y).sum().item()
      total += batch_y.size(0)

  accuracy = correct/total
  return accuracy

In [None]:
from sklearn.metrics import f1_score
def evaluate_with_f1(model, DataLoader, device):
  model.eval()
  all_preds = []
  all_labels = []

  with torch.no_grad():
    for x, y in DataLoader:
      x = x.to(device, non_blocking=True)
      y = y.to(device, non_blocking=True)

      logits = model(x)
      preds = torch.argmax(logits, dim=1)

      all_preds.extend(preds.cpu().numpy())
      all_labels.extend(y.cpu().numpy())

  macro_f1 = f1_score(all_labels, all_preds, average='macro')
  return macro_f1

In [None]:
num_epochs = 5

for epoch in range(num_epochs):
  model.train()
  total_loss = 0

  for x, y in train_loader:
    x = x.to(device, non_blocking=True)
    y = y.to(device, non_blocking=True)

    optimizer.zero_grad()

    logits = model(x)
    loss = criterion(logits, y)

    loss.backward()
    optimizer.step()

    total_loss += loss.item()

  avg_loss = total_loss / len(train_loader)

  test_accuracy = evaluate(model, test_loader, device)
  macro_f1 = evaluate_with_f1(model, test_loader, device)

  print(f"Epoch {epoch+1}, Loss: {avg_loss:4f}, Test Accuracy: {test_accuracy:.2f}, Macro F1:{macro_f1:.3f}")

Epoch 1, Loss: 0.519332, Test Accuracy: 0.89, Macro F1:0.261
Epoch 2, Loss: 0.434792, Test Accuracy: 0.84, Macro F1:0.305
Epoch 3, Loss: 0.386275, Test Accuracy: 0.85, Macro F1:0.337
Epoch 4, Loss: 0.344056, Test Accuracy: 0.85, Macro F1:0.384
Epoch 5, Loss: 0.323491, Test Accuracy: 0.90, Macro F1:0.419
