In [3]:
import json
import math
import os
from pathlib import Path
import random
import time
# from tqdm.notebook import tqdm, trange
from typing import Dict, List, Set, Tuple

import numpy as np
import torch
import torch.nn as nn
from torch.nn import init
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler


In [17]:
from tqdm.notebook import trange, tqdm

In [19]:
from tqdm import tqdm as tqdm

In [4]:
emotion_to_idx = {
    "anger": 0,
    "fear": 1,
    "joy": 2,
    "love": 3,
    "sadness": 4,
}
idx_to_emotion = {v: k for k, v in emotion_to_idx.items()}
UNK = "<UNK>"

In [5]:
def fetch_data(train_data_path, val_data_path, test_data_path):
    with open(train_data_path) as training_f:
        training = training_f.read().split("\n")[1:-1]
    with open(val_data_path) as valid_f:
        validation = valid_f.read().split("\n")[1:-1]
    with open(test_data_path) as testing_f:
        testing = testing_f.read().split("\n")[1:-1]
    tra = []
    val = []
    test = []
    for elt in training:
        if elt == '':
            continue
        txt, emotion = elt.split(",")
        tra.append((txt.split(" "), emotion_to_idx[emotion]))
    for elt in validation:
        if elt == '':
            continue
        txt, emotion = elt.split(",")
        val.append((txt.split(" "), emotion_to_idx[emotion]))
    for elt in testing:
        if elt == '':
            continue
        txt = elt
        test.append(txt.split(" "))
    return tra, val, test

In [6]:
def make_vocab(data):
    vocab = set()
    for document, _ in data:
        for word in document:
            vocab.add(word)
    return vocab 


def make_indices(vocab):
	vocab_list = sorted(vocab)
	vocab_list.append(UNK)
	word2index = {}
	index2word = {}
	for index, word in enumerate(vocab_list):
		word2index[word] = index 
		index2word[index] = word 
	vocab.add(UNK)
	return vocab, word2index, index2word 


def convert_to_vector_representation(data, word2index, test=False):
	if test:
		vectorized_data = []
		for document in data:
			vector = torch.zeros(len(word2index)) 
			for word in document:
				index = word2index.get(word, word2index[UNK])
				vector[index] += 1
			vectorized_data.append(vector)
	else:
		vectorized_data = []
		for document, y in data:
			vector = torch.zeros(len(word2index)) 
			for word in document:
				index = word2index.get(word, word2index[UNK])
				vector[index] += 1
			vectorized_data.append((vector, y))
	return vectorized_data

In [7]:
class EmotionDataset(Dataset):
    """EmotionDataset is a torch dataset to interact with the emotion data.

    :param data: The vectorized dataset with input and expected output values
    :type data: List[Tuple[List[torch.Tensor], int]]
    """
    def __init__(self, data):
        self.X = torch.cat([X.unsqueeze(0) for X, _ in data])
        self.y = torch.LongTensor([y for _, y in data])
        self.len = len(data)
    
    def __len__(self):
        """__len__ returns the number of samples in the dataset.

        :returns: number of samples in dataset
        :rtype: int
        """
        return self.len
    
    def __getitem__(self, index):
        """__getitem__ returns the tensor, output pair for a given index

        :param index: index within dataset to return
        :type index: int
        :returns: A tuple (x, y) where x is model input and y is our label
        :rtype: Tuple[torch.Tensor, int]
        """
        return self.X[index], self.y[index]

def get_data_loaders(train, val, batch_size=16):
    dataset = EmotionDataset(train + val)
    train_indices = [i for i in range(len(train))]
    val_indices = [i for i in range(len(train), len(train) + len(val))]
    train_sampler = SubsetRandomSampler(train_indices)
    train_loader = DataLoader(dataset, batch_size=batch_size, sampler=train_sampler)
    
    val_sampler = SubsetRandomSampler(val_indices)
    val_loader = DataLoader(dataset, batch_size=batch_size, sampler=val_sampler)

    return train_loader, val_loader

In [8]:
train, val, test = fetch_data('dataset/train.txt', 'dataset/val.txt', 'dataset/test.txt')

In [9]:
vocab = make_vocab(train)
vocab, word2index, index2word = make_indices(vocab)
train_vectorized = convert_to_vector_representation(train, word2index)
val_vectorized = convert_to_vector_representation(val, word2index)
test_vectorized = convert_to_vector_representation(test, word2index, True)

In [10]:
train_loader, val_loader = get_data_loaders(train_vectorized, val_vectorized, batch_size=1)

In [11]:
get_device = lambda : "cuda:0" if torch.cuda.is_available() else "cpu"

In [24]:
unk = '<UNK>'

class FFNN(nn.Module):
	def __init__(self, input_dim, h, output_dim):
		super(FFNN, self).__init__()
		self.h = h
		self.W1 = nn.Linear(input_dim, h)
		self.activation = nn.ReLU()
		self.W2 = nn.Linear(h, output_dim)
		self.softmax = nn.LogSoftmax(dim=1) 
		self.loss = nn.NLLLoss() 

	def compute_Loss(self, predicted_vector, gold_label):
		return self.loss(predicted_vector, gold_label)

	def forward(self, input_vector):
		z1 = self.W1(input_vector)
		z2 = self.activation(z1)
		z3 = self.W2(z2)
		return self.softmax(z3)
	
	def load_model(self, save_path):
		self.load_state_dict(torch.load(save_path))
	
	def save_model(self, save_path):
		torch.save(self.state_dict(), save_path)


def train_epoch(model, train_loader, optimizer):
	model.train()
	total = 0
	loss = 0
	correct = 0
	for (input_batch, expected_out) in train_loader:
		optimizer.zero_grad() 
		output = model(input_batch.to(get_device()))
		#print(output)
		total += output.size()[0]
		_, predicted = torch.max(output, 1)
		correct += (expected_out == predicted.to("cpu")).cpu().numpy().sum()
		loss = model.compute_Loss(output, expected_out.to(get_device()))
		loss.backward()
		optimizer.step()
	print('Accuracy: ' + str(correct/total))
	return


def evaluation(model, val_loader, optimizer):
	model.eval()
	loss = 0
	correct = 0
	total = 0
	for (input_batch, expected_out) in val_loader:
		output = model(input_batch.to(get_device()))
		total += output.size()[0]
		_, predicted = torch.max(output, 1)
		correct += (expected_out.to("cpu") == predicted.to("cpu")).cpu().numpy().sum()

		loss += model.compute_Loss(output, expected_out.to(get_device()))
	loss /= len(val_loader)
	print('Validation Accuracy: ' + str(correct/total))
	pass

def train_and_evaluate(number_of_epochs, model, train_loader, val_loader):
	optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
	for epoch in range(number_of_epochs):
		train_epoch(model, train_loader, optimizer)
	evaluation(model, val_loader, optimizer)
	return

In [25]:
h = 512
model = FFNN(len(vocab), h, len(emotion_to_idx)).to(get_device())
train_and_evaluate(2, model, train_loader, val_loader)
# model.save_model("ffnn_fixed.pth")

Accuracy: 0.3485
Accuracy: 0.7208
Validation Accuracy: 0.7889328063241107


In [26]:
def rnn_preprocessing(data, test=False):
    """rnn_preprocessing

    :param data:
    :type data:
    :param test:
    :type test:
    """
    # Do some preprocessing similar to convert_to_vector_representation
    # For the RNN, remember that instead of a single vector per training
    # example, you will have a sequence of vectors where each vector
    # represents some information about a specific token.

    # Add padding to ensure sequences have the same lengths, used in the 
    # second lstm model
    seq_length = 200
    for item in data:
        if len(item) >= seq_length:
            data.append(data[:seq_length])
        else:
            data.append([0]*(seq_length-len(data)) + data)
    return data

In [27]:
class RNN(nn.Module):
	def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim):
		super(RNN, self).__init__()
		# Fill in relevant parameters 
		self.embedding = nn.Embedding(input_dim, embedding_dim)
		self.rnn = nn.RNN(embedding_dim, hidden_dim)
		self.fc = nn.Linear(hidden_dim, output_dim)
		self.softmax = nn.LogSoftmax(dim=1) 
		self.loss = nn.NLLLoss()
	
	def compute_Loss(self, predicted_vector, gold_label):
		return self.loss(predicted_vector, gold_label)

	def forward(self, inputs):
		embedded = self.embedding(torch.Tensor.long(inputs))
		output, hidden = self.rnn(embedded)
		rst = self.fc(hidden.squeeze(0))
		return self.softmax(rst)

	def load_model(self, save_path):
		self.load_state_dict(torch.load(save_path))
	
	def save_model(self, save_path):
		torch.save(self.state_dict(), save_path)

In [28]:
rnn_model = RNN(len(vocab), 100, 256, len(emotion_to_idx)).to(get_device())
train_and_evaluate(2, rnn_model, train_loader, val_loader)
# rnn_model.save_model("rnn_fixed.pth") 

Accuracy: 0.3301
Accuracy: 0.7098
Validation Accuracy: 0.7779


In [29]:
class LSTM(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim, n_layers, dropout_p = 0.3):
        super(LSTM, self).__init__()       
        self.embedding = nn.Embedding(input_dim, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers, batch_first = True, dropout = dropout_p)
        self.dropout = nn.Dropout(dropout_p)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.softmax = nn.LogSoftmax(dim=1)

    def compute_Loss(self, predicted_vector, gold_label):
        return self.loss(predicted_vector, gold_label)
           
    def forward(self, inputs):                                        
        embedded = self.embedding(inputs)   
        lstm_out, h = self.lstm(embedded)        
        lstm_out = self.dropout(lstm_out)
        fc_out = self.fc(lstm_out)
        return self.softmax(fc_out)[:, -1], h
    
    def init_hidden(self, batch_size): 
        device = "cuda" if torch.cuda.is_available() else "cpu"
        weights = next(self.parameters()).data
        h = (weights.new(self.n_layers, batch_size, self.n_hidden).zero_().to(device),
             weights.new(self.n_layers, batch_size, self.n_hidden).zero_().to(device))
        return h

    def load_model(self, save_path):
        self.load_state_dict(torch.load(save_path))

    def save_model(self, save_path):
        torch.save(self.state_dict(), save_path)

In [30]:
lstm_model = LSTM(len(vocab), 100, 256, len(emotion_to_idx), 2).to(get_device())
train_and_evaluate(2, lstm_model, train_loader, val_loader)
# lstm_model.save_model("lstm_fixed.pth") # Save our model!

Accuracy: 0.3677
Accuracy: 0.7512
Validation Accuracy: 0.8391


In [31]:
def categorical_accuracy(preds, y):
    max_preds = preds.argmax(dim = 1, keepdim = True)
    correct = max_preds.squeeze(1).eq(y)
    return correct.sum() / torch.FloatTensor([y.shape[0]])