In [1]:
from spacy.lang.en import English
import re
import string
from collections import Counter

In [2]:
nlp = English()
tok = nlp.tokenizer

In [3]:
import torch
from torch import nn
import pytorch_lightning as pl
from torch.utils.data import DataLoader,Dataset
from torch.nn import functional as F
from torch.nn import Sequential,Conv1d,Linear,MaxPool1d,ReLU,LogSoftmax,Tanh
from torch import flatten
import torchmetrics
from datasets import load_dataset
import os
import numpy as np
from tqdm.notebook import tqdm

class SNLI_LSTM(pl.LightningModule):

  def __init__(self,embedding,vocab,vocab2idx):
    super().__init__()
    self.vocab = vocab
    self.vocab_to_idx = vocab2idx
    self.vocab_size = self.vocab.shape[0]
    self.embedding_dim = 300
    self.hidden_dim = 100
    self.embeddings = nn.Embedding(self.vocab_size, self.embedding_dim, padding_idx=0)
    self.embeddings.weight.data.copy_(torch.from_numpy(embedding))
    self.embeddings.weight.requires_grad = False 
    self.lstm = nn.LSTM(self.embedding_dim, self.hidden_dim, batch_first=True)
    self.num_classes = 3
    self.dense = Sequential(
      Linear(2*self.hidden_dim,2*self.hidden_dim),
      ReLU(inplace=True),
      Linear(2*self.hidden_dim,2*self.hidden_dim), 
      ReLU(inplace=True),
      Linear(2*self.hidden_dim,self.num_classes),
      LogSoftmax(dim= 1)
    )
    self.train_acc = torchmetrics.Accuracy()
    self.train_prec = torchmetrics.Precision()
    self.train_rec = torchmetrics.Recall()
    self.train_f1 = torchmetrics.F1()
    
    self.val_acc = torchmetrics.Accuracy()
    self.val_prec = torchmetrics.Precision()
    self.val_rec = torchmetrics.Recall()
    self.val_f1 = torchmetrics.F1()
    
    self.test_acc = torchmetrics.Accuracy()
    self.test_prec = torchmetrics.Precision()
    self.test_rec = torchmetrics.Recall()
    self.test_f1 = torchmetrics.F1()
    
    self.test_preds = []
    self.test_labels = []
  

  def forward(self, x,x1):
      print(x.size())
      batch_size, width, height = x.size()
      _,(y1,c1) = self.lstm(x)
      _,(y2,c2) = self.lstm(x1)
      # print(y1.shape)
      # print(y2.shape)
      y = torch.cat((y1,y2),dim=2)
      # print(y.shape)
      y = torch.squeeze(y)
      # print(y.shape)
      z = self.dense(y)
      return z

  def cross_entropy_loss(self, logits, labels):
      for i in labels:
            if i<0 or i >2:
                  print("Something wrong")
                  print(i)
      return F.nll_loss(logits, labels)

  def training_step(self, train_batch, batch_idx):
      x,x1, y = train_batch
      x = self.embeddings(torch.LongTensor(x))
      x1 = self.embeddings(torch.LongTensor(x1))
      logits = self.forward(x,x1)
      # print(logits.shape)
      # print(y.shape)
      # logits = torch.unsqueeze(logits,dim=0)
      # print(logits.shape)
      loss = self.cross_entropy_loss(logits, y)
      self.log('train_loss', loss)
      self.train_acc(logits, y)
      self.log('train_acc', self.train_acc, on_step=False, on_epoch=True)
      self.train_prec(logits, y)
      self.log('train_prec', self.train_acc, on_step=False, on_epoch=True)
      self.train_rec(logits, y)
      self.log('train_rec', self.train_acc, on_step=False, on_epoch=True)
      self.train_f1(logits, y)
      self.log('train_f1', self.train_acc, on_step=False, on_epoch=True)
      return loss

  def validation_step(self, val_batch, batch_idx):
      x,x1, y = val_batch
      # print("X: ",x)
      # print("X1: ",x1)
      # print("y: ",y)
      x = self.embeddings(torch.LongTensor(x))
      x1 = self.embeddings(torch.LongTensor(x1))
      logits = self.forward(x,x1)
      
      # logits = torch.unsqueeze(logits,dim=0)
      # print(logits.shape)
      # print(y.shape)
      loss = self.cross_entropy_loss(logits, y)
      self.log('val_loss', loss)
      self.val_acc(logits,y)
      self.log('val_acc', self.val_acc, on_step=False, on_epoch=True)
      self.val_prec(logits,y)
      self.log('val_prec', self.val_acc, on_step=False, on_epoch=True)
      self.val_rec(logits,y)
      self.log('val_rec', self.val_acc, on_step=False, on_epoch=True)
      self.val_f1(logits,y)
      self.log('val_f1', self.val_acc, on_step=False, on_epoch=True)

  def test_step(self, test_batch, batch_idx):
      x,x1, y = test_batch
      x = self.embeddings(torch.LongTensor(x))
      x1 = self.embeddings(torch.LongTensor(x1))
      logits = self.forward(x,x1)
      # logits = torch.unsqueeze(logits,dim=0)
      # print(logits.shape)
      # print(y.shape)
      self.test_labels += y.cpu().numpy().tolist()
      self.test_preds += np.argmax(logits.cpu().numpy(), axis=1).flatten().tolist()
      loss = self.cross_entropy_loss(logits, y)
      self.log('test_loss', loss)
      self.test_acc(logits,y)
      self.log('test_acc', self.test_acc)
      self.test_prec(logits,y)
      self.log('test_prec', self.test_acc)
      self.test_rec(logits,y)
      self.log('test_rec', self.test_acc)
      self.test_f1(logits,y)
      self.log('test_f1', self.test_acc)
      
  def configure_optimizers(self):
    optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
    return optimizer

In [4]:
class SNLIDataModule(pl.LightningDataModule):
    def load_glove_vectors(self, glove_file="../data/glove.42B.300d.txt"):
        word_vectors = {}
        with open(glove_file) as f:
            for line in f:
                split = line.split()
                word_vectors[split[0]] = np.array([float(x) for x in split[1:]])
        return word_vectors

    def tokenize (self,text):
        text = re.sub(r"[^\x00-\x7F]+", " ", text)
        regex = re.compile('[' + re.escape(string.punctuation) + '0-9\\r\\t\\n]') # remove punctuation and numbers
        nopunct = regex.sub(" ", text.lower())
        return [token.text for token in tok(nopunct)] 

    def get_counter(self,dataset,counts = Counter()):
        labels = dataset['label']
        train_demo = zip(dataset['premise'],dataset['hypothesis'])
        for premise,hypothesis in train_demo:
            counts.update(self.tokenize(premise))
            counts.update(self.tokenize(hypothesis))
        return labels,counts

    def encode_sentence(self, text, vocab2index, N=20):
        tokenized = self.tokenize(text)
        encoded = np.zeros(N, dtype=int)
        enc1 = np.array([vocab2index.get(word, vocab2index["UNK"]) for word in tokenized])
        length = min(N, len(enc1))
        encoded[:length] = enc1[:length]
        return encoded

    def get_emb_matrix(self,pretrained, word_counts, emb_size = 300):
        """ Creates embedding matrix from word vectors"""
        vocab_size = len(word_counts) + 2
        vocab_to_idx = {}
        vocab = ["", "UNK"]
        W = np.zeros((vocab_size, emb_size), dtype="float32")
        W[0] = np.zeros(emb_size, dtype='float32') # adding a vector for padding
        W[1] = np.random.uniform(-0.25, 0.25, emb_size) # adding a vector for unknown words 
        vocab_to_idx["UNK"] = 1
        i = 2
        for word in word_counts:
            if word in pretrained:
                W[i] = pretrained[word]
            else:
                W[i] = np.random.uniform(-0.25,0.25, emb_size)
            vocab_to_idx[word] = i
            vocab.append(word)
            i += 1   
        return W, np.array(vocab), vocab_to_idx
    
    def __init__(self):
        super().__init__()
        dataset = load_dataset("snli")
        dataset = dataset.filter(lambda x: x['label'] != -1)
        self.train = dataset['train'].shuffle(seed=42)
        self.test = dataset['test'].shuffle(seed=42)
        self.train = self.train[:]
        self.test = self.test[:]
        pretrained = self.load_glove_vectors()
        self.train_labels,counter = self.get_counter(self.train)
        self.test_labels,counter = self.get_counter(self.test,counter) 
        self.W,self.vocab,self.vocab_to_idx = self.get_emb_matrix(pretrained,counter)
        
    def prepare_data(self):  
        self.batch_size = 512
        self.train_X = {'premise':[],'hypothesis':[]}
        self.train_y = []
        self.test_X = {'premise':[],'hypothesis':[]}
        self.test_y = []
        train_temp = zip(self.train['premise'],self.train['hypothesis'])
        for ind,ex in enumerate(train_temp):
            self.train_X['premise'].append(self.encode_sentence(ex[0],vocab2index=self.vocab_to_idx))
            self.train_X['hypothesis'].append(self.encode_sentence(ex[1],vocab2index=self.vocab_to_idx))
            self.train_y.append(self.train_labels[ind])

        test_temp = zip(self.test['premise'],self.test['hypothesis'])
        for ind,ex in enumerate(test_temp):
            self.test_X['premise'].append(self.encode_sentence(ex[0],vocab2index=self.vocab_to_idx))
            self.test_X['hypothesis'].append(self.encode_sentence(ex[1],vocab2index=self.vocab_to_idx))
            self.test_y.append(self.test_labels[ind])    
                
    def setup(self, stage): 
        from torch.utils.data import TensorDataset, random_split    
        train_ds = TensorDataset(torch.LongTensor(np.array(self.train_X['premise']).astype(np.int64)),torch.LongTensor(np.array(self.train_X['hypothesis']).astype(np.int64)),torch.LongTensor(np.array(self.train_y).astype(np.int64)))
        self.test_ds = TensorDataset(torch.LongTensor(np.array(self.test_X['premise']).astype(np.int64)),torch.LongTensor(np.array(self.test_X['hypothesis']).astype(np.int64)),torch.LongTensor(np.array(self.test_y).astype(np.int64)))
        self.train_ds,self.val_ds = random_split(train_ds,[int(0.85*len(train_ds)),len(train_ds)-int(0.85*len(train_ds))])
        
    def train_dataloader(self):
        return DataLoader(self.train_ds, batch_size=self.batch_size, shuffle=True)
    
    def val_dataloader(self):
        return DataLoader(self.train_ds, batch_size=self.batch_size)
    
    def test_dataloader(self):
        return DataLoader(self.test_ds, batch_size=self.batch_size)
    

In [5]:
from sklearn.metrics import classification_report, accuracy_score
import pandas as pd
def get_metrics(y_true, y_pred):
    f = open(f'report_crepe.txt','w+')
    result1 = classification_report(y_true, y_pred)
    print('Classification Report: ', result1)
    f.write('\nClassification Report: \n')
    f.write(result1)
    # print(type(result1))
    # df = pd.DataFrame(result1).transpose()
    # df.to_csv('report.csv')
    result2 = accuracy_score(y_true, y_pred)
    print('Accuracy: ', result2, "\n\n")
    f.write('\nAccuracy Report: \n')
    f.write(str(result2))

In [6]:
data_module = SNLIDataModule()

Reusing dataset snli (C:\Users\KAWSHIK\.cache\huggingface\datasets\snli\plain_text\1.0.0\1f60b67533b65ae0275561ff7828aad5ee4282d0e6f844fd148d05d3c6ea251b)
100%|██████████| 3/3 [00:00<00:00, 66.84it/s]
Loading cached processed dataset at C:\Users\KAWSHIK\.cache\huggingface\datasets\snli\plain_text\1.0.0\1f60b67533b65ae0275561ff7828aad5ee4282d0e6f844fd148d05d3c6ea251b\cache-9e96a95d39194edc.arrow
Loading cached processed dataset at C:\Users\KAWSHIK\.cache\huggingface\datasets\snli\plain_text\1.0.0\1f60b67533b65ae0275561ff7828aad5ee4282d0e6f844fd148d05d3c6ea251b\cache-f1e3131db2e212eb.arrow
Loading cached processed dataset at C:\Users\KAWSHIK\.cache\huggingface\datasets\snli\plain_text\1.0.0\1f60b67533b65ae0275561ff7828aad5ee4282d0e6f844fd148d05d3c6ea251b\cache-58c78ab3118c5108.arrow
Loading cached shuffled indices for dataset at C:\Users\KAWSHIK\.cache\huggingface\datasets\snli\plain_text\1.0.0\1f60b67533b65ae0275561ff7828aad5ee4282d0e6f844fd148d05d3c6ea251b\cache-bcc69591eb61c6ef.arrow


In [7]:
model = SNLI_LSTM(data_module.W,data_module.vocab,data_module.vocab_to_idx)
trainer = pl.Trainer(gpus= 0,max_epochs= 10,log_every_n_steps=100)

trainer.fit(model, data_module)
trainer.test(datamodule= data_module,model=model)
get_metrics(model.test_labels,model.test_preds)

  stream(template_mgs % msg_args)
GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs


In [69]:
torch.cat((torch.randn((1,2,25)),torch.randn((1,2,25))),dim=2).shape

torch.Size([1, 2, 50])