In [1]:
WORD_EMBEDDING_MODEL_PATH = "models/skipgram_emotion_text_transfer/best_val_model_5.98.pt"
VOCAB_PATH = "models/skipgram_emotion_text_transfer/vocab.pt"

DATA_SET_SIZE = 3000
BATCH_SIZE = 64
LEARING_RATE = 0.01
EPOCHS = 10

In [2]:
import copy

import pandas as pd
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split

import torch
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import random_split
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchtext.data.utils import get_tokenizer

# from model import SentenceModel

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# Try to use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

# Read Data

In [4]:
# Load the emotions dataset.
sentenceDf = pd.read_csv("data/text-emotion.zip")
sentenceDf.head()

Unnamed: 0,Text,Emotion
0,i didnt feel humiliated,sadness
1,i can go from feeling so hopeless to so damned...,sadness
2,im grabbing a minute to post i feel greedy wrong,anger
3,i am ever feeling nostalgic about the fireplac...,love
4,i am feeling grouchy,anger


In [5]:
# Sample the first 5 sentences.
sentence = sentenceDf["Text"][0]
sentence

'i didnt feel humiliated'

# Read Word Embedding Model and Vocab

In [6]:
embedding_model = torch.load(WORD_EMBEDDING_MODEL_PATH, map_location=device)
# Get the weight of the embedding layer.
embedding_weight = embedding_model["embeddings.weight"]
vocab = torch.load(VOCAB_PATH, map_location=device)
tokenizer = get_tokenizer("basic_english")

# Dataset and DataLoader

In [7]:
# One-hot encode the labels.
emotion_encoder = OneHotEncoder(handle_unknown="ignore")
emotion_encoder.fit(sentenceDf[["Emotion"]].values)
emotion_encoder.categories_

[array(['anger', 'fear', 'happy', 'love', 'sadness', 'surprise'],
       dtype=object)]

In [8]:
# Emotion Dataset.
class EmotionDataset(Dataset):
    def __init__(
        self,
        df: pd.DataFrame,
        vocab,
        tokenizer,
        emotion_encoder: OneHotEncoder,
        max_length=20,
        size=-1
    ):
        self.df = df
        if size > 0:
            self.df = self.df.sample(size).reset_index(drop=True)
        self.vocab = vocab
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.emotion_encoder = emotion_encoder

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        sentence = self.df["Text"][idx]
        emotion = self.df["Emotion"][idx]
        # Convert the sentence to a list of tokens.
        tokens = self.tokenizer(sentence)
        # Convert the tokens to indices.
        indices = [self.vocab[token] for token in tokens]
        # Crop the indices to the max length.
        if len(indices) > self.max_length:
            indices = indices[:self.max_length]
        # Padding the indices to the max length.
        if len(indices) < self.max_length:
            indices = indices + [0] * (self.max_length - len(indices))
        # Convert the indices to a tensor.
        indices = torch.tensor(indices)
        # One-hot encode the label.
        label = self.emotion_encoder.transform([[emotion]]).toarray()
        # Convert the label to a tensor.
        label = torch.tensor(label, dtype=torch.float32)
        return indices, label

In [9]:
# Generate the emotion dataset.
dataset = EmotionDataset(sentenceDf, vocab, tokenizer, emotion_encoder, max_length=20, size=DATA_SET_SIZE)
# Split the dataset into train and validation.
train_set, val_set = train_test_split(dataset, test_size=0.2, random_state=42)
len(train_set), len(val_set)

(800, 200)

In [10]:
train_dataloader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)
val_data_loader = DataLoader(val_set, batch_size=BATCH_SIZE, shuffle=True)

# Train

In [18]:
# LSTM model to classify the sentiment of a given sentence
class SentenceModel(nn.Module):
    def __init__(
        self,
        embedding: nn.Embedding,
        input_word_num,
        hidden_dim,
        output_dim,
        n_layers
    ):
        super().__init__()
        self.embedding = embedding
        self.embedding_dim = embedding.embedding_dim
        self.lstm = nn.LSTM(
            self.embedding_dim * input_word_num,
            hidden_dim,
            num_layers=n_layers,
        )
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, text):
        embedded = self.embedding(text)
        # print(f"Embedded shape: {embedded.shape}")
        # Read the documentation.
        for i in range(0, embedded.shape[1]-1):
            input_word = embedded[:, i:i+2, :]
            input_word = input_word.view(input_word.shape[0], -1)
            # print(f"Input word shape: {input_word.shape}")
            if i == 0:
                output, (hidden, cell) = self.lstm(input_word)
            else:
                output, (hidden, cell) = self.lstm(input_word, (hidden, cell))
        
        x = self.fc(output.squeeze(0))
        x = F.softmax(x, dim=1)
        x = x.view(x.shape[0], 1, -1)
        return x

In [19]:
# Load the model.
model = SentenceModel(
    embedding=nn.Embedding.from_pretrained(embedding_weight),
    input_word_num=2,
    hidden_dim=128,
    output_dim=emotion_encoder.categories_[0].shape[0],
    n_layers=2
)
model.to(device)

SentenceModel(
  (embedding): Embedding(6826, 300)
  (lstm): LSTM(600, 128, num_layers=2)
  (fc): Linear(in_features=128, out_features=6, bias=True)
)

In [20]:
# Specify optimizer and loss function.
optimizer = optim.Adam(model.parameters(), lr=LEARING_RATE)
loss_function = nn.BCEWithLogitsLoss()

In [21]:
train_losses = []
val_loss = []
# Train the model.
for epoch in range(EPOCHS):
    train_loss = 0.0
    val_loss = 0.0
    model.train()
    for batch in train_dataloader:
        # Get the input and label.
        input = batch[0].to(device)
        label = batch[1].to(device)
        # Reset the gradients.
        optimizer.zero_grad()
        # Forward pass.
        output = model(input)
        # Calculate the loss.
        loss = loss_function(output, label)
        # Backward pass.
        loss.backward()
        # Update the weights.
        optimizer.step()
        # Accumulate the loss.
        train_loss += loss.item() * input.size(0)

    # Evaluate the model.
    model.eval()
    for batch in val_data_loader:
        # Get the input and label.
        input = batch[0].to(device)
        label = batch[1].to(device)
        # Forward pass.
        output = model(input)
        # Calculate the loss.
        loss = loss_function(output, label)
        # Accumulate the loss.
        val_loss += loss.item() * input.size(0)

    # Calculate the average losses.
    train_loss = train_loss / len(train_dataloader.dataset)
    val_loss = val_loss / len(val_data_loader.dataset)

    # Print the progress.
    print(f"Epoch: {epoch+1}/{EPOCHS}")
    print(f"Training Loss: {train_loss:.6f}")
    print(f"Validation Loss: {val_loss:.6f}")
    train_losses.append(train_loss)
    val_loss.append(val_loss)

Epoch: 1/10
Training Loss: 0.434757
Validation Loss: 0.413351
Epoch: 2/10
Training Loss: 0.408804
Validation Loss: 0.411448
Epoch: 3/10
Training Loss: 0.406781
Validation Loss: 0.410742
Epoch: 4/10
Training Loss: 0.405024
Validation Loss: 0.409863
Epoch: 5/10
Training Loss: 0.396665
Validation Loss: 0.421414
Epoch: 6/10
Training Loss: 0.384015
Validation Loss: 0.418144
Epoch: 7/10
Training Loss: 0.368931
Validation Loss: 0.432806
Epoch: 8/10
Training Loss: 0.351597
Validation Loss: 0.444981
Epoch: 9/10
Training Loss: 0.343085
Validation Loss: 0.462324
Epoch: 10/10
Training Loss: 0.331921
Validation Loss: 0.483819
