<a href="https://colab.research.google.com/github/harini-si/saidl-assignment/blob/main/nlp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader
from torchtext.legacy import data
from torchtext.legacy import datasets

import nltk
nltk.download('averaged_perceptron_tagger')
nltk.download('punkt')
device='cuda'

[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [2]:
text = data.Field(tokenize = 'spacy', lower = True)
label = data.LabelField()
train_data, val_data,test_data= datasets.SNLI.splits(text,label)

downloading snli_1.0.zip


snli_1.0.zip: 100%|██████████| 94.6M/94.6M [00:30<00:00, 3.12MB/s]


extracting


In [3]:
text.build_vocab(train_data, val_data)
label.build_vocab(train_data)
train_iter, val_iter, test_iter = data.BucketIterator.splits(
    (train_data, val_data, test_data), batch_size=64)

In [4]:
class Model(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, 300, padding_idx = 1)
        self.fc1 = nn.Linear(300, 300)
        self.lstm = nn.LSTM(300, 300, num_layers = 2, bidirectional = True, dropout=0.25 )
     
        self.fc2 = nn.Linear(1200 , 3)
        self.dropout = nn.Dropout(0.5)
        
    def forward(self, prem, hypo):

        p = self.embedding(prem)
        h = self.embedding(hypo)
        
        p = F.relu(self.fc1(p))
        h = F.relu(self.fc1(h))
        
        o_p, (h_p, c_p) = self.lstm(p)
        o_h, (h_h, c_h) = self.lstm(h)

        h_p = torch.cat((h_p[-1], h_p[-2]), dim=-1)
        h_h = torch.cat((h_h[-1], h_h[-2]), dim=-1)
        
        hidden = torch.cat((h_p, h_h), dim=1)

        prediction = self.fc2(hidden)
        return prediction

input_dim = len(text.vocab)



In [15]:
device='cpu'

In [16]:
num_epochs=1
model = Model(input_dim).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
model

In [None]:
def eval( iterator):
    
    loss = 0
    acc = 0
    correct=0
    model.eval()
    
    with torch.no_grad():
    
        for batch in iterator:

            prem = batch.premise.to(device)
            hypo = batch.hypothesis.to(device)
            labels = batch.label.to(device)
                        
            predictions = model(prem, hypo)
            
            loss += criterion(predictions, labels).item()
            correct += (predictions.argmax(1) == labels).type(torch.float).sum().item()
            
        acc= correct/len(val_data)*100
        
    return  acc 

In [None]:

n_total_steps = len(train_iter)
for epoch in range(num_epochs):
    model.train()
    ep_loss=0
    acc=0
    correct=0
    total=0
    for batch in train_iter:
        prem = batch.premise.to(device)
        hypo = batch.hypothesis.to(device)
        labels = batch.label.to(device)
                        
        outputs = model(prem, hypo)
            
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()        
        
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
        
        ep_loss += loss.item()
     
    accu= 100 * correct / len(train_data)
    val_accu=eval(val_iter)
    
    print(f'Epoch [{epoch+1}/{num_epochs}], train_accuracy: {accu:.4f} ,train_loss: {ep_loss/len(train_iter):.4f}, val_accuracy: {val_accu:.4f} ')  

Epoch [1/1], train_accuracy: 64.0421 ,train_loss: 0.8045, val_accuracy: 66.6938 


In [None]:
test_accu= eval(test_iter)

In [None]:
print(test_accu)

66.42958748221906


In [6]:

psent=[]
ptags=[]
hsent=[]
htags=[]
for i, batch in enumerate(train_data[0:10000]):
 
  ps, pt = zip(*nltk.pos_tag(batch.premise))
  psent.append(ps)
  ptags.append(pt)

  hs, ht = zip(*nltk.pos_tag(batch.hypothesis))
  hsent.append(hs)
  htags.append(ht)
  


In [7]:
import collections
from torchtext.data.functional import numericalize_tokens_from_iterator
from torchtext.vocab import build_vocab_from_iterator
vocab = build_vocab_from_iterator(psent+hsent, specials=["<unk>"])
tag_vocab = build_vocab_from_iterator(ptags+htags, specials=["<unk>"])

def convert_to_ids(sentences, taggings):
  
  for sentence, tagging in zip(sentences, taggings):
    
    sentence_iter = (numericalize_tokens_from_iterator(vocab,sentences))
    tagging_iter = (numericalize_tokens_from_iterator(tag_vocab,taggings))
    
  return list(sentence_iter), list(tagging_iter)

ptrain_sent, ptrain_tag = convert_to_ids(psent, ptags)
htrain_sent, htrain_tag = convert_to_ids(hsent, htags)



In [None]:
len(tag_vocab)

41

In [8]:
pts=[]
for i in ptrain_sent:
   pts.append([n for n in i])
ptt=[]
for i in ptrain_tag:
   ptt.append([n for n in i])
hts=[]
for i in htrain_sent:
   hts.append([n for n in i])

htt=[]
for i in htrain_tag:
   htt.append([n for n in i])

In [None]:
da=[]
for a,b in zip(pts,ptt):

  l=[a,b]
  l=torch.tensor(l)
  da.append(l)
db=[]
for a,b in zip(hts,htt):
 
  l=[a,b]
  l=torch.tensor(l)
  db.append(l)


In [12]:
def collate_fn(items):
  max_len = max(len(item[0]) for item in items)

  sentences = torch.zeros((len(items), max_len)).long()
  taggings = torch.zeros((len(items), max_len)).long()

  for i, (sentence, tagging) in enumerate(items):
    sentences[i][0:len(sentence)] = sentence
    taggings[i][0:len(tagging)] = tagging

  return sentences, taggings

  
pdataloader=DataLoader(da,batch_size=100, collate_fn=collate_fn)
hdataloader=DataLoader(db,batch_size=100, collate_fn=collate_fn)

In [77]:
probe_model

probe(
  (features): Embedding(33932, 300, padding_idx=1)
  (pr): Linear(in_features=300, out_features=41, bias=True)
)

In [75]:
class probe(nn.Module):
    def __init__(self, model):
        super(probe, self).__init__()
        
        self.features= model.embedding
        for p in model.parameters():
          p.requires_grad=False
        self.pr= nn.Linear(300,41)
    def forward(self, prem, hypo):
        p = self.features(prem)
        h = self.features(hypo)
        p= self.pr(p)
        h= self.pr(h)
        
        return p,h


probe_model = probe(model).to(device)


In [80]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(probe_model.parameters(), lr=0.01)

In [91]:
#create a test function
def evaluate_probe(model):
    epoch_loss = 0
    epoch_acc = 0
    correct=0
    total=0
    
    l1=0
    l2=0
    t1=0
    t2=0
    model.eval()
    
    with torch.no_grad():
    
        for i, (p,h) in enumerate(zip(pdataloader,hdataloader)) :

            ps=p[0].to(device)
            pt=p[1].to(device)
            hs=h[0].to(device)
            ht=h[1].to(device)
                
            tag1, tag2 = model(ps,hs)
         
          
            x=[]
            for i in range(100):
              _, predicted = tag1[i].max(1)
              x.append(predicted)
           
            x=torch.stack(x)
           
            
            y=[]
            for i in range(100):
              _, pred = tag2[i].max(1)
              y.append(pred)
      
            y=torch.stack(y)
            
            
            l1+= ((pt==x)|(pt==0)).sum()
            l2+= ((ht==y)|(ht==0)).sum()
            t1+= ((pt!=-1)|(pt==0)).sum()
            t2+= ((ht!=-1)|(ht==0)).sum()
            
            
        correct=l1+l2
        total=t1+t2
        epoch_acc=(correct/total)*100
        
    return  epoch_acc

In [87]:
def train_probe(model):
    epoch_loss = 0
    epoch_acc = 0
    correct=0
    total=0
    pc=0
    hc=0
    ptest_loss=0
    htest_loss=0
    l1=0
    l2=0
    t1=0
    t2=0
    
    model.train()
    
    
    
    for i, (p,h) in enumerate(zip(pdataloader,hdataloader)) :

       ps=p[0].to(device)
       pt=p[1].to(device)
       hs=h[0].to(device)
       ht=h[1].to(device)
                    
       tag1, tag2 = model(ps,hs)
         
          
       x=[]
       for i in range(100):
        _, predicted = tag1[i].max(1)
        x.append(predicted)
           
       x=torch.stack(x)
           
       pc +=x.eq(pt).sum().item()
       y=[]
       for i in range(100):
         _, pred = tag2[i].max(1)
         y.append(pred)
      
       y=torch.stack(y)
          
       
       l1+= ((pt==x)|(pt==0)).sum()
       l2+= ((ht==y)|(ht==0)).sum()
       t1+= ((pt!=-1)|(pt==0)).sum()
       t2+= ((ht!=-1)|(ht==0)).sum()
       pt = torch.zeros((len(pt),41)).long()
       ht = torch.zeros((len(ht),41)).long() 
       ploss=criterion(tag1,pt)
       hloss=criterion(tag2,ht)     
       optimizer.zero_grad()
       loss=(ploss+hloss)/2
       loss.backward()
       optimizer.step()     
       
            
    correct=l1+l2
    total=t1+t2
    epoch_acc=(correct/total)*100
        
    return  epoch_acc

In [None]:
for epoch in range(10):
  print(train_probe(probe_model),evaluate_probe(probe_model))