In [None]:
import torch 
import torch.nn as nn
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torchtext.vocab import GloVe
from torch.utils.data import Dataset, DataLoader
import math
import spacy 
import sys 
import os
import pandas as pd
from typing import List, Tuple
import csv
from tqdm.notebook import tqdm

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"device = {device}")

In [None]:
POS_VOCAB = {'ADJ': 0, 'ADP': 1, 'ADV': 2, 'AUX': 3, 'CONJ': 4, 'CCONJ': 5, 'DET': 6,
                          'INTJ': 7, 'NOUN': 8, 'NUM': 9, 'PART': 10, 'PRON': 11, 'PROPN': 12,
                          'PUNCT': 13, 'SCONJ': 14, 'SYM': 15, 'VERB': 16, '<UNK>': 17, '<PAD>': 18}

In [None]:
# UTILS.py

class JustDataset(Dataset):

    def __init__(self, data):
        self.data = data 

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        return self.data[index]

# FUNCTIONS:

def convert_pos_tags_list_into_list_of_indices_given_vocab(pos_tags_list: List[List[str]], pos_tag_vocab: dict = POS_VOCAB) -> List[List[int]]:

        '''
        given the pos_tags for all sentences converts them into a list of tensors
        each tensor is of shape : (seq_len, )
        if it is a padding token then i give the embedding to be 0
        '''

        print("Started to map pos_tags into indices using vocab")

        pos_tags_list_std = [keys for keys, values in pos_tag_vocab.items()] # contains all the pos tags we are using

        all_indices_list = []
        for pos_sent in pos_tags_list:
            pos_index_list = []
            for pos in pos_sent:
                pos_index = pos_tag_vocab[pos] if pos in pos_tags_list_std else pos_tag_vocab['<UNK>']
                pos_index_list.append(pos_index)
            all_indices_list.append(pos_index_list)

        print("Done mapping pos_tags to index ")
        print(len(all_indices_list))

        return all_indices_list

def get_positional_encodings(masking_list: List[List[int]], sent_len: int = 64, input_dim: int=100) -> List[torch.Tensor]:
        
        '''
        given a list of lists, where each sublist contains mask (0 or 1) values, 0 correspond to padding tokens
        return the positional encoding for every sentence in the form of a list of tensors
        '''

        ans_list = []

        for my_index, mask in enumerate(masking_list):
            print('*' * 100)
            print('\n\n')
            print(f"my_index = {my_index}")
            pos_encoding = get_positional_encoding_single_sent(mask, sent_len, input_dim)
            print("got the positional_encoding")
            ans_list.append(pos_encoding)

        return ans_list
        # pos_encodings = torch.stack(ans_list)

def get_positional_encoding_single_sent(mask: List[int], sent_len: int = 64, input_dim: int = 100) -> torch.Tensor:

    # Create a tensor with shape (sent_len, input_dim)

    print("Start generating positional encodings")

    position = torch.arange(0, sent_len, dtype=torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, input_dim, 2, dtype=torch.float) * -(math.log(10000.0) / input_dim))
    encodings = torch.zeros((sent_len, input_dim))
    encodings[:, 0::2] = torch.sin(position * div_term)
    encodings[:, 1::2] = torch.cos(position * div_term)
    # Apply the mask to zero out the padded tokens
    encodings = encodings.masked_fill(torch.LongTensor(mask).unsqueeze(1) == 0, 0)

    print("Got the positional encoding tensors")

    return encodings

def convert_lemmas_list_into_tensor_embeddings(lemmas_list: List[List[str]], input_dim: int = 100) -> List[torch.Tensor]:

    '''
     given the lemmas_list from pre-processing convert it into tensors embeddings using pre-trained glove
    '''
    
    assert input_dim in [50, 100, 200, 300]

    print("Started converting the lemmas into tensor embeddings")

    glove = GloVe(name='6B', dim=input_dim, cache='glove.6B') # to convert text into embeddings
    lemmas_embeddings_list = []
    for sent in lemmas_list:
        temp = []
        for my_lemma in sent: 
            my_tensor = glove[my_lemma]
            temp.append(my_tensor)
        temp = torch.stack(temp)
        lemmas_embeddings_list.append(temp)

    print("Converted all the lemmas into embeddings")

    return lemmas_embeddings_list

def prepare_data(my_dataset: My_Dataset, input_dim: int = 100):

    lemmas_list, pos_tags_list, masking_list, labels_list = my_dataset.pre_process_raw_text() 
    labels_list = list(map(int, labels_list)) 

    lemma_tensors = convert_lemmas_list_into_tensor_embeddings(lemmas_list, input_dim)
    positional_tensors = get_positional_encodings(masking_list, my_dataset.max_len_sent, input_dim)
    pos_indices = convert_pos_tags_list_into_list_of_indices_given_vocab(pos_tags_list)

    lemma_combine_positional = []
    for x, y in zip(lemma_tensors, positional_tensors):
        temp = x + y
        lemma_combine_positional.append(temp)

    data = []
    for i in range(len(lemmas_list)):
        temp = (lemma_combine_positional[i], pos_indices[i], labels_list[i])
        data.append(temp)

    torch.save(data, 'my_dataset_max_len_64_input_dim_100.pt')
    print("Successfully saved the dataset")
    


In [None]:
# My_dataset.py

class My_Dataset():

    def __init__(self, path_to_train_dataset_csv: str = 'train.csv'):
        '''
        used for the complete pre-processing part and then to get dataloaders as well after converting to tensor embeddings
        '''

        # ATTRIBUTES

        self.train_csv_path = path_to_train_dataset_csv
        self.nlp = spacy.load('en_core_web_sm') # for tokenization
        self.max_len_sent = 64  # if the length of any sentence is bigger than this we truncate it otherwise we pad it with a padding token
        self.padding_token = '<PAD>'

        # READING THE DATASET

        self.tuple_generator = self.read_csv()


    def read_csv(self, colname_1='sentence', colname_2='gold_label'):

        '''
        this creates a generator object that yield 1 tuple of (text, label) in one iteration
        '''

        print("Started to read the data line by line")

        with open(self.train_csv_path, 'r') as f: 
            reader = csv.DictReader(f)
            for row in reader:
                yield row[colname_1], row[colname_2]

    def pre_process_raw_text(self) -> Tuple[List[List[str]], List[List[str]], List[List[int]], List[int]]:

        print("Started pre-processing the data")

        ''' 
        i have the raw_text in a list, which needs to be pre-processed
        i will be using lemmatization and also remove the punctuation marks, stop words
        also the make the sentence to be of equal length, add the <PAD> token to make the length equal to max_len
        (lemma_list, pos_tags_list, masks_list)
        '''

        self.lemmas_list = [] # this is a list of lists, containing the lemmas for each sent in a list
        self.pos_tags_list = [] # list of lists, containing the pos_tags for each sent in a list
        self.masking_list = [] # list of lists, where each list contains 0 or 1 : 0 if the token is self.padding_token
        self.label_list = [] # list containing the labels for each sentence


        for my_index, my_tuple in enumerate(self.tuple_generator):
            print('*' * 20)
            print('\n\n')
            print(f"my_index = {my_index}, STARTED PRE_PROCESSING\n")
            sent, label = my_tuple
            my_lemmas, my_pos_tags, my_masking = self.pre_process_single_sent(sent)
            self.lemmas_list.append(my_lemmas)
            self.pos_tags_list.append(my_pos_tags)
            self.masking_list.append(my_masking)
            self.label_list.append(int(label)+1)
            print(f"my_index = {my_index}, COMPLETED PRE_PROCESSING\n\n")

        print("Pre-processing complete")
        
        return (self.lemmas_list, self.pos_tags_list, self.masking_list, self.label_list)
        
    def pre_process_single_sent(self, sent: str) -> Tuple[List[str], List[str], List[int]]:
        '''
        given a single sentence returns a list of the lemmas in the list along with their Part of speech tags in a seperate list
        and masking (my_lemmas, my_pos, mask), padding or truncation is also done to make the length of each sentence to be same
        each list is of length self.max_len_sent = 256
        '''

        doc = self.nlp(sent.lower())
        my_lemmas = [token.lemma_ for token in doc if token.is_alpha]
        my_pos = [token.pos_ for token in doc if token.is_alpha]

        len_of_my_lemmas = len(my_lemmas)

        if len_of_my_lemmas >= self.max_len_sent:
            # truncate at maximum length of the sentence
            my_lemmas = my_lemmas[:self.max_len_sent]
            my_pos = my_pos[:self.max_len_sent]
            my_mask = [1 for _ in range(self.max_len_sent)]

        else:
            while len(my_lemmas) < self.max_len_sent:
                my_lemmas.append(self.padding_token)
                my_pos.append(self.padding_token)
            
            t1 = [1 for _ in range(len_of_my_lemmas)]
            t0 = [0 for _ in range(self.max_len_sent - len_of_my_lemmas)]
            my_mask = t1 + t0

        return my_lemmas, my_pos, my_mask

In [None]:
# Encoder.py

class Encoder_Sentiment_Analysis(nn.Module):

    def __init__(self, input_dim: int, num_heads: int, pos_tags_vocab_len: int = len(POS_VOCAB), output_dim: int = 3, dim_feedfwd: int = 2048, dropout_prob: int=0.1, activation: str='relu', num_layers: int=4):

        '''
        first will be an Transformer encoding layer, then will use some method of pooling to convert 3D tensors to 2D tensors and
        then passing through feed fwd network to get the logit scores and then using cross entropy loss function

        '''

        super().__init__()

        # self.dataset = dataset
        self.num_heads = num_heads
        self.dim_feedfwd = dim_feedfwd
        self.input_dim = input_dim
        self.dropout_prob = dropout_prob
        self.activation = activation
        self.num_layers = num_layers
        self.pos_tags_vocab_len = pos_tags_vocab_len
        self.output_dim = output_dim

        self.encoder_layer = TransformerEncoderLayer(d_model=self.input_dim, nhead=self.num_heads, dim_feedforward=self.dim_feedfwd,
                                                     dropout=self.dropout_prob, activation=self.activation, batch_first=True)
        self.encoder = TransformerEncoder(self.encoder_layer, self.num_layers)

        self.feed_fwd_part_end = nn.Sequential(
            nn.Linear(self.input_dim * 3, self.input_dim),
            nn.ReLU(), nn.Dropout(p=0.3),
            nn.Linear(self.input_dim, self.output_dim)
        )

        self.pos_embedding = nn.Embedding(num_embeddings=self.pos_tags_vocab_len, embedding_dim=self.input_dim, padding_idx=POS_VOCAB['<PAD>']) # to be learned

    def forward(self, inputs):

        ''''
        given a batch of inputs, give the outputs

        input : will be a list of two elements
        input_list : (lemma_embed+positional_embeddings) [Tensor], list of pos-tags mapped to their indices 
        first tensor is lemma_emebeddings + positonal_embeddings : shape = (batch_size, seq_len=256, input_dim=50)
        second tensor is pos_indices of each sentence : shape = (batch_size, seq_len)
        '''

        pos_tags_embeds = self.pos_embedding(inputs[1])
        features = pos_tags_embeds + inputs[0]

        out = self.encoder(features)
        # need to apply some pooling here 

        max_pool, _ = torch.max(out, dim=1)
        min_pool, _ = torch.min(out, dim=1)
        avg_pool = torch.mean(out, dim=1)

        out = torch.cat([max_pool, min_pool, avg_pool], dim=1) # dim = (batch_size, 3*embed_dim)

        # now shape is (batch_size, 3 * embed_dim)
        # then pass through feed forward neural network to get some raw scores before feeding into the loss function

        ans = self.feed_fwd_part_end(out)
        return ans


In [None]:
input_dim = 100
batch_size = 128
num_heads = 10
lr=7e-5
max_epochs = 10

In [None]:
device = torch.device("mps")

In [None]:
data = torch.load('my_dataset_max_len_64_input_dim_100.pt')
train_loader = DataLoader(data, batch_size, shuffle=True)

model = Encoder_Sentiment_Analysis(input_dim, num_heads)
# model.load_state_dict(torch.load('test_set_sub_1/sentiment.params'))
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr)

loss_list = []

for epoch in tqdm(range(max_epochs)):

    print('*' * 100)
    print('\n\n')
    print(f"epoch = {epoch}/{max_epochs}")

    epoch_loss = 0.0

    for step, temp in tqdm(enumerate(train_loader)):
        # returns a list of length 3
        lemma_tensors = temp[0]
        pos_tensors = torch.stack(temp[1], dim=1)
        label_tensors = temp[2]

        # print(type(lemma_tensors))
        # print(type(pos_tensors))
        # print(type(label_tensors))

        # sys.exit()


        # fwd_pass
        output = model([lemma_tensors.to(device), pos_tensors.to(device)])
        loss = criterion(output, label_tensors.to(device))

        # print(f"Step = {step}/{len(train_loader)}, step_loss = {loss.item()}")

        epoch_loss += loss.item()

        # backward pass
        optimizer.zero_grad()
        loss.backward()

        # update the params
        optimizer.step()

    print(f"epoch = {epoch}/{max_epochs}, epoch_loss = {epoch_loss}")
    print('\n\n')
    print('*' * 100)

    with open("loss.txt_mps", "a") as f: 
        my_dict = {'epoch' : epoch, 'max_epochs': max_epochs, 'epoch_loss': epoch_loss}
        f.write(f"{my_dict}\n")
    
    loss_list.append(epoch_loss)

    torch.save(model.state_dict().cpu(), 'sentiment.params_mps')