In [None]:
import pandas as pd
import numpy as np
import torch

import matplotlib.pyplot as plt
import neattext.functions as nfx
import torch.nn as nn

In [None]:
import pandas as pd
import torch
from torchtext.data import get_tokenizer
from collections import Counter
from keras_preprocessing.sequence import pad_sequences
import numpy as np
from sklearn.model_selection import train_test_split
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset, WeightedRandomSampler
import tqdm

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
def to_categorical(y, num_classes=None, dtype="float32"):
    y = np.array(y, dtype="int")
    input_shape = y.shape

    # Shrink the last dimension if the shape is (..., 1).
    if input_shape and input_shape[-1] == 1 and len(input_shape) > 1:
        input_shape = tuple(input_shape[:-1])

    y = y.reshape(-1)
    if not num_classes:
        num_classes = np.max(y) + 1
    n = y.shape[0]
    categorical = np.zeros((n, num_classes), dtype=dtype)
    categorical[np.arange(n), y] = 1
    output_shape = input_shape + (num_classes,)
    categorical = np.reshape(categorical, output_shape)
    return categorical

In [None]:
class EmotionModel(nn.Module):
    def __init__(self, vocab_size, output_size, embedding_matrix, hidden_dim, n_layers, drop_prob=0.5):
        super(EmotionModel, self).__init__()
        self.output_size = output_size
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        
        embedding_dim = embedding_matrix.size(1)
        self.embedding = nn.Embedding(vocab_size, embedding_dim, _weight=embedding_matrix)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=drop_prob, batch_first=True)
        self.dropout = nn.Dropout(drop_prob)
        self.fc = nn.Linear(hidden_dim, output_size)
        self.sigmoid = nn.Sigmoid()
        self.Softmax = nn.Softmax(dim=1)
        
    def forward(self, x, hidden):
        batch_size = x.size(0)
        embeds = self.embedding(x) # [1, 79, 200]
        lstm_out, hidden = self.lstm(embeds, hidden) # [1, 79, 128]
        print(hidden)

        lstm_out = lstm_out.contiguous().view(-1, self.hidden_dim)  #[79, 128]
    
        
        out = self.dropout(lstm_out) # [79, 128]
        out = self.fc(out) # [79, 8]
        out = self.sigmoid(out)
        
        out = out.view(batch_size, -1, self.output_size)
        out = out[:, -1]  
        out = self.Softmax(out)
        
        return out, hidden
    
    def init_hidden(self, batch_size, device):
        weight = next(self.parameters()).data
        hidden = (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().to(torch.float32).to(device),
                      weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().to(torch.float32).to(device))
        return hidden

In [None]:
text_data = pd.read_csv("../data/data.csv")
text_labels = pd.read_csv("../data/labels.csv")

label_emotion = text_labels['Emotion'].tolist()
label_encoded = text_labels['emotion_encoded'].tolist()

label_mapping = {idx: emotion for idx,emotion in zip(label_encoded, label_emotion)}
vectorized_labels = to_categorical(label_encoded)

sentences = text_data['Clean_Text'].tolist()
full_doc = " ".join([str(s) for s in sentences])
tokenizer = get_tokenizer("basic_english")

tokenized_doc = tokenizer(full_doc)

vocab = list(set(tokenized_doc))

tokens_map_wn = {token:idx+1 for idx,token in enumerate(vocab)}
tokens_map_nw = {idx+1:token for idx,token in enumerate(vocab)}
tokens_map_nw[0] = ""
tokens_map_wn[""] = 0



sentence_tokenized = [tokenizer(str(s)) for s in sentences]
sentence_tokenized = [[tokens_map_wn[w] for w in s] for s in sentence_tokenized]

max_sequence_length = max([len(s) for s in sentence_tokenized])
vocab_size = len(vocab) + 1

sentence_tokenized_p = pad_sequences(sentence_tokenized, maxlen=max_sequence_length)


sentence_tokenized_p = np.array(sentence_tokenized_p)

glove_file = "glove.6B.200d.txt"
glove_path = "../data/glove/" + glove_file

glove = {}

with open(glove_path, "r", encoding="utf-8") as f:
    for line in f:
        values = line.split()
        word = values[0]
        coefs = np.asarray(values[1:], dtype='float32')
        glove[word] = coefs

embedding_matrix = np.zeros((vocab_size, 200), dtype="float32")


for token in tokenized_doc:
    embedding_vector = glove.get(token, None)

    if embedding_vector is not None:
        embedding_matrix[tokens_map_wn[token]] = embedding_vector


sentence_embedded_p = np.array([[embedding_matrix[t,:] for t in q] for q in sentence_tokenized_p.tolist()])

In [None]:
vectorized_labels_t = torch.Tensor(vectorized_labels).to(device)
embedding_matrix_t = torch.Tensor(embedding_matrix).to(torch.float32).to(device)
training_data = torch.Tensor(sentence_tokenized_p).to(torch.long).to(device)

train_X, sec_X, train_y, sec_y = train_test_split(training_data, vectorized_labels_t, test_size=0.3)


In [None]:
from collections import Counter
def get_balance_weight(y):
    y = torch.argmax(y, dim=1).tolist()
    count = Counter(y)
    count = sorted([(k,v) for k,v in count.items()])
    count = np.array([c[1] for c in count])

    return torch.Tensor(1. / count).to(device)

In [None]:
class EmotionModel(nn.Module):
    def __init__(self, vocab_size, output_size, embedding_matrix, hidden_dim, n_layers, drop_prob=0.5):
        super(EmotionModel, self).__init__()
        self.output_size = output_size
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        
        embedding_dim = embedding_matrix.size(1)
        self.embedding = nn.Embedding(vocab_size, embedding_dim, _weight=embedding_matrix)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=drop_prob, batch_first=True)
        self.dropout = nn.Dropout(drop_prob)
        self.fc = nn.Linear(hidden_dim, output_size)
        self.sigmoid = nn.Sigmoid()
        self.Softmax = nn.Softmax(dim=1)
        
    def forward(self, x, hidden):
        batch_size = x.size(0)
        embeds = self.embedding(x) # [1, 79, 200]
        lstm_out, (hidden, cell) = self.lstm(embeds, hidden) # [1, 79, 128]

        # hid = self.dropout(hidden[-1,:,:])
        # out = self.sigmoid(self.fc(hid))

        lstm_out = lstm_out.contiguous().view(-1, self.hidden_dim)  #[79, 128]
    
        
        out = self.dropout(lstm_out) # [79, 128]
        out = self.fc(out) # [79, 8]
        out = self.sigmoid(out)
        
        out = out.view(batch_size, -1, self.output_size)
        out = out[:, -1]  
        
        return out, (hidden, cell)
    
    def init_hidden(self, batch_size, device):
        weight = next(self.parameters()).data
        hidden = (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().to(torch.float32).to(device),
                      weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().to(torch.float32).to(device))
        return hidden

In [None]:
# Fields for the model
model = EmotionModel(
    vocab_size= embedding_matrix_t.shape[0],
    output_size= vectorized_labels_t.shape[1],
    embedding_matrix= embedding_matrix_t,
    hidden_dim= 128,
    n_layers= 2,
    drop_prob=0.2
    ).to(device=device)
loss_fn = nn.BCELoss().to(device)
optim = torch.optim.Adam(model.parameters(), lr=0.005)


def train_step(epochs, dataloader, batch_size):
    model.train()
    h = model.init_hidden(batch_size=batch_size, device=device)
    
    for i, (x, y) in enumerate(dataloader):
        h = tuple([e.data for e in h])
        x, y = x.to(device), y.to(device)
        
        optim.zero_grad()
        output, h = model(x, h)
        loss = loss_fn(output, y.float())

        if i % 200 == 0:
            print(output, y)
            print(f"Curr Loss: {loss.item()}")

        loss.backward()
        optim.step()
        # break

epochs = 1
batch_size = 4

# Balancing the data set
from collections import Counter
count = Counter(label_encoded)
count = sorted([(k,v) for k,v in count.items()])
count = np.array([c[1] for c in count])

weights =  get_balance_weight(train_y)

sampler = WeightedRandomSampler(weights, train_X.shape[0],replacement=True)

train_data = TensorDataset(train_X, train_y)
train_loader = DataLoader(train_data, batch_size=batch_size, sampler=sampler)

for e in range(epochs):
    print(f"Epoch: {e + 1}:")
    train_step(e+1, train_loader, batch_size)