In [2]:
import re
import os
import torch
import random
import torch.nn as nn

from os import listdir
from tqdm.notebook import tqdm
from collections import Counter
from gensim.models import Word2Vec
from torch.utils.data import Dataset, DataLoader

In [None]:
class RNN(nn.Module):

    def __init__(self):
        super(RNN, self).__init__()

        self.embedding_dim = 64
        self.output_dim = 2

        self.hidden_dim = 25
        self.no_layers = 2

        #lstm
        self.lstm = nn.LSTM(input_size=self.embedding_dim, hidden_size=self.hidden_dim,
                           num_layers=self.no_layers, batch_first=True)

        # dropout layer
        self.dropout = nn.Dropout(0.3)

        # linear and sigmoid layer
        self.fc = nn.Linear(self.hidden_dim, self.output_dim)
        self.sig = nn.Sigmoid()

    def forward(self, x):
        batch_size = x.size(0)
        # embeddings and lstm_out

        embeds = x
        hidden = self.init_hidden(batch_size)
        hidden = tuple([each.data for each in hidden])

        #print(embeds.shape)  #[50, 500, 1000]
        lstm_out, hidden = self.lstm(embeds, hidden)

        # dropout and fully connected layer
        out = self.dropout(lstm_out)
        out = self.fc(out)

        # sigmoid function
        sig_out = self.sig(out)
        # reshape to be batch_size first

        sig_out = sig_out[:, -1] # get last batch of labels
        # return last sigmoid output and hidden state
        return sig_out

    def init_hidden(self, batch_size):
        ''' Initializes hidden state '''
        # Create two new tensors with sizes n_layers x batch_size x hidden_dim,
        # initialized to zero, for hidden state and cell state of LSTM
        h0 = torch.zeros((self.no_layers, batch_size, self.hidden_dim))
        c0 = torch.zeros((self.no_layers, batch_size, self.hidden_dim))
        hidden = (h0, c0)
        return hidden

In [None]:
class RnnDataset(Dataset):

    def __init__(self, data, model = None):
        #https://radimrehurek.com/gensim/models/word2vec.html for w2v embedding
        padding_string = "PADDING___PADDING"
        self.data = data
        self.length = len(self.data)
        self.words2index = {}
        threshold = 5

        sentences = []
        #max size sentence size for each label (total mean of the size of all the sentences of that label)
        max_size = 0
        cnt = Counter()

        for l,s in data:
            max_size += len(s)
            for w in s:
                cnt[w] += 1

        max_size = int(max_size / self.length)
        #Add padding
        new_train_data = []
        for l,s in data:
            new_s = [w if cnt[w] > threshold else "unk" for w in s]
            if len(new_s) < max_size:
                pad = [padding_string] * (max_size - len(new_s))
                new_s = new_s + pad
            elif len(new_s) > max_size:
                new_s = new_s[0:max_size]
            new_train_data.append((l,new_s))
            sentences.append(new_s)

        self.data = new_train_data
        if model is None :
            self.model = Word2Vec(sentences=sentences, min_count=-1, workers=4, size=64)
            self.model.build_vocab(cnt.keys(), update=True)
            self.model.save("w2vemb_rnn.model")
        else:
            self.model = model

    def __len__(self):
        return self.length

    def __getitem__(self, index):
        label,sentence = self.data[index]
        sentence = [w if w in self.model else "unk" for w in sentence]
        #self.model.build_vocab(sentence,update=True)
        out = torch.tensor(self.model.wv[sentence],dtype=torch.float)
        label = torch.tensor(label,dtype=torch.long)
        return out, label

In [None]:
def preprocess_token(s):
    # Removing all non-word character except letters and numbers
    s = re.sub(r"[^\w\s]", '', s)

    # Replacing all extra whitespaces with no space
    s = re.sub(r"\s+", '', s)

    # replacing digits with no space
    s = re.sub(r"\d", '', s)

    return s

In [None]:
def tokenize(x_train):
    word_list = []

    for word in x_train.lower().split():
        word = preprocess_token(word)
        if word != '':
            word_list.append(word)

    return word_list

def get_rnn_dataset(path : str, optional_file : str = None):
    # assume that at path points to a folder which contains two subfolders 0, 1
    # these subfolders then contain *.txt files (they may not start at 0.txt)
    # open these *.txt files using open(file_path, "r", encoding="utf-8"), the important bit is encoding="utf-8"
    # on some platforms it might not work when you don't provide the encoding
    num_folder = 2
    i = 0
    data = []
    cwd = os.getcwd()
    while i < num_folder:
        label = i
        folder_name = cwd + "/" + path + "/" + str(i)
        file_names = listdir(folder_name)
        for file_name in file_names:
            file_path = folder_name + "/" + file_name
            f = open(file_path,"r",encoding="utf-8")
            s = tokenize(f.read())
            data.append((label,s))
            f.close()

        i += 1

    random.shuffle(data)

    if optional_file is None :
        return RnnDataset(data)

    return RnnDataset(data, Word2Vec.load(optional_file))

In [None]:
def train_rnn(rnn_instance: RNN, dataloader, epochs = 10):
    # loss and optimization functions
    lr=0.005
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(rnn_instance.parameters(), lr=lr)

    clip = 5

    for epoch in range(epochs):
        iterator = tqdm(dataloader)

        for inputs, labels in iterator:
            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            # h = tuple([each.data for each in h])
            output = rnn_instance(inputs)

            # calculate the loss and perform backprop
            loss = criterion(output, labels)
            rnn_instance.zero_grad()
            loss.backward()

            #`clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm_(rnn_instance.parameters(), clip)
            optimizer.step()

In [None]:
rnn_inst = RNN() # make sure this works
rnn_dataset = get_rnn_dataset("/train") # make sure this works (potentially also different foldername)

In [3]:
# train= 80% | valid = 10% | test = 10%
train_size = int(0.8 * len(rnn_dataset))
rem_size = len(rnn_dataset) - train_size
train_dataset, rem_dataset = torch.utils.data.random_split(rnn_dataset, [train_size, rem_size])

valid_size = int(rem_size/2)
test_size = rem_size - valid_size
validation_dataset, test_dataset = torch.utils.data.random_split(rem_dataset, [valid_size, test_size])

# dataloaders
batch_size = 50

# make sure to SHUFFLE your data
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)
validation_loader = DataLoader(validation_dataset, shuffle=True, batch_size=batch_size)
test_loader = DataLoader(test_dataset, shuffle=True, batch_size=batch_size)

In [5]:
train_rnn(rnn_inst, train_loader, 10) # make sure this works

  0%|          | 0/480 [00:00<?, ?it/s]



  0%|          | 0/480 [00:00<?, ?it/s]

  0%|          | 0/480 [00:00<?, ?it/s]

  0%|          | 0/480 [00:00<?, ?it/s]

  0%|          | 0/480 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
torch.save(rnn_inst.state_dict(), "rnn.pt")# save model after training

In [None]:
# code here for demonstration, may remove
rnn_loaded = RNN() # make sure this works
rnn_loaded.load_state_dict(torch.load("rnn.pt"))#  make sure this works (potentially also different filename)
rnn_loaded.eval() # make sure this works

In [None]:
from sklearn.metrics import accuracy_score
from torch.utils.data import DataLoader
def evaluate(clf, test_data):

    true_labels = []
    inf_labels = []

    for data, labels in DataLoader(test_data, batch_size=100):
        out = clf(data)
        cls = torch.argmax(F.softmax(out, dim=1), dim=1)
        inf_labels.extend(cls.detach().numpy().tolist())
        true_labels.extend(labels.numpy().tolist())

    return accuracy_score(true_labels, inf_labels)

In [None]:
evaluate(validation_loader, rnn_dataset)

In [None]:
evaluate(test_loader, rnn_dataset)