In [0]:
import os
import copy
import re
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score,f1_score

In [0]:
import collections
from collections import Counter

In [0]:
from sklearn.utils import shuffle

In [0]:
import sys
import string

In [0]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data
import torch.optim as optim

In [0]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [0]:
def preprocess(text):
  text = text.translate(str.maketrans('', '', string.punctuation))
  words = text.split()
  return " ".join(word.lower() for word in words)

In [0]:
data = []

poscount = 0
negcount = 0

with open('rt-polarity.pos','r',encoding='latin1') as f:
  for line in f.readlines():
    data.append(preprocess(line[:-1]))
    poscount+=1
    
with open('rt-polarity.neg','r',encoding='latin1') as f:
  for line in f.readlines():
    data.append(preprocess(line[:-1]))
    negcount+=1

In [0]:
labels = np.zeros(poscount+negcount)
labels[:poscount] = 1

In [0]:
data,labels = shuffle(data,labels,random_state=0)

In [0]:
traincorpus,testcorpus,trainlabels,testlabels = train_test_split(data,labels,test_size=0.2,random_state=0)

In [0]:
traincorpus,valcorpus,trainlabels,vallabels = train_test_split(traincorpus,trainlabels,test_size=0.1,random_state=0)

In [0]:
ytrain = torch.from_numpy(trainlabels)
yval = torch.from_numpy(vallabels)
ytest = torch.from_numpy(testlabels)

In [0]:
trainlen = len(traincorpus)
vallen = len(valcorpus)
testlen = len(testcorpus)

In [0]:
words = []
for sentence in traincorpus:
  words+=sentence.split()

In [0]:
counter = Counter(words).most_common()
vocabulary = {}
vocabulary['<PAD>'] = 0
index = 1
for word,_ in counter:
  vocabulary[word] = index
  index+=1

In [0]:
def get_vectors(sentence):
  temp = [vocabulary[word] for word in sentence.split() if word in vocabulary]
  vector = [0] * maxlen
  curlen = len(temp)
  if(maxlen-curlen<0):
    vector = temp[:maxlen]
  else:
    vector[maxlen-curlen:] = temp

  return torch.from_numpy(np.asarray(vector,dtype='int32'))

In [0]:
maxlen = 40
embeddim = 300

In [0]:
Xtrain = torch.zeros(trainlen,maxlen)
for i in range(trainlen):
  Xtrain[i] = get_vectors(traincorpus[i])

In [0]:
Xval = torch.zeros(vallen,maxlen)
for i in range(vallen):
  Xval[i] = get_vectors(valcorpus[i])

In [0]:
Xtest = torch.zeros(testlen,maxlen)
for i in range(testlen):
  Xtest[i] = get_vectors(testcorpus[i])

In [0]:
embeddingindex = {}
with open('glove.42B.300d.txt','r',encoding='utf-8') as f:
  for line in f.readlines():
    vectors = line.split()
    word = vectors[0]
    embedding = torch.from_numpy(np.asarray(vectors[1:],'float32'))
    embeddingindex[word] = embedding

In [0]:
embeddingmatrix = torch.zeros(len(vocabulary),embeddim)
for word,i in list(vocabulary.items()):
  if(word in embeddingindex):
    embeddingmatrix[i] = embeddingindex[word]

In [0]:
hiddendim = 50

In [0]:
class modelwithattention(nn.Module):
  def __init__(self,maxlen,embeddim,embedmatrix,hiddendim,numclasses=1):
    super(modelwithattention,self).__init__()
    self.maxlen = maxlen
    self.embeddim = embeddim
    self.embedmatrix = embedmatrix
    self.hiddendim = hiddendim
    self.numclasses = numclasses
    
    self.embed = nn.Embedding.from_pretrained(self.embedmatrix)
    self.lstm = nn.LSTM(self.embeddim,self.hiddendim,batch_first=True)
    self.attlinear = nn.Linear(self.hiddendim,1)
    self.linear = nn.Linear(self.hiddendim,self.numclasses)
    self.tanh = nn.Tanh()
    self.soft = nn.Softmax(dim=1)
    self.sig = nn.Sigmoid()
    
  def attention(self,outputs):
    outputs_flat = outputs.contiguous().view(-1,self.hiddendim)
    alpha = self.attlinear(outputs_flat).view(-1,self.maxlen)
    alpha = self.tanh(alpha)
    alpha = self.soft(alpha)
    att_feature_map = outputs * alpha.unsqueeze(2)
    att_feature_map = torch.sum(att_feature_map,dim=1)
    return att_feature_map
  
  def forward(self,x):
    embedout = self.embed(x)
    lstmout,_ = self.lstm(embedout,None)
    attout = self.attention(lstmout)
    out = self.sig(self.linear(attout))
    return out

In [0]:
attmodel = modelwithattention(maxlen,embeddim,embeddingmatrix,hiddendim).to(device)

In [43]:
x = torch.ones(4,maxlen).long().to(device)
outx = attmodel(x)
print(outx.size())

torch.Size([4, 1])


In [0]:
batchsize = 32

In [0]:
trainarray = torch.utils.data.TensorDataset(Xtrain,ytrain)
trainloader = torch.utils.data.DataLoader(trainarray,batchsize)

In [0]:
valarray = torch.utils.data.TensorDataset(Xval,yval)
valloader = torch.utils.data.DataLoader(valarray,batchsize)

In [0]:
testarray = torch.utils.data.TensorDataset(Xtest,ytest)
testloader = torch.utils.data.DataLoader(testarray,batchsize)

In [0]:
numepochs = 25

In [0]:
def get_accuracy(model,loader):
  acc = 0
  total = 0
  model.eval()
  with torch.no_grad():
    for indices,labels in loader:
      indices = indices.long().to(device)
      labels = labels.long().to(device)
      
      total+=indices.size(0)
      output = model(indices).view(labels.size(0))
      acc+=torch.sum(labels==(output>0.5).long()).item()
      
    return ((acc/total)*100)

In [0]:
def get_loss(model,loader):
  curloss = 0.0
  model.eval()
  with torch.no_grad():
    for indices,labels in loader:
      indices = indices.long().to(device)
      labels = labels.view(-1,1).float().to(device)
      
      output = model(indices)
      curloss+=loss(output,labels)
      
    return (curloss/len(loader))

In [0]:
optimizer = optim.Adam(attmodel.parameters(),lr=0.001)
loss = nn.BCELoss()

In [52]:
bestloss = np.Inf
best_model_wts = copy.deepcopy(attmodel.state_dict())
for epoch in range(numepochs):
  attmodel.train()
  epochloss = 0.0
  epochacc = 0
  for indices,labels in trainloader:
      indices = indices.long().to(device)
      labels = labels.view(-1,1).float().to(device)
      
      outputs = attmodel(indices)
      criterion = loss(outputs,labels)
      
      epochloss+=criterion.item()
      criterion.backward()
      optimizer.step()
      
  curloss = get_loss(attmodel,valloader)
  curacc = get_accuracy(attmodel,valloader)
  print("Epoch {} ValLoss {} Val Accuracy {} ".format(epoch+1,curloss,curacc))
  if(curloss<bestloss):
    bestloss = curloss
    best_model_wts = copy.deepcopy(attmodel.state_dict())

Epoch 1 ValLoss 0.5528085827827454 Val Accuracy 72.21570926143025 
Epoch 2 ValLoss 0.5373021364212036 Val Accuracy 73.5052754982415 
Epoch 3 ValLoss 0.5255476832389832 Val Accuracy 74.67760844079719 
Epoch 4 ValLoss 0.5187567472457886 Val Accuracy 75.61547479484173 
Epoch 5 ValLoss 0.5742065906524658 Val Accuracy 72.21570926143025 
Epoch 6 ValLoss 0.5154369473457336 Val Accuracy 75.49824150058618 
Epoch 7 ValLoss 0.5653463006019592 Val Accuracy 74.79484173505276 
Epoch 8 ValLoss 0.5636205673217773 Val Accuracy 75.96717467760844 
Epoch 9 ValLoss 0.6146457195281982 Val Accuracy 76.08440797186401 
Epoch 10 ValLoss 0.6108680367469788 Val Accuracy 76.55334114888629 
Epoch 11 ValLoss 0.6515982747077942 Val Accuracy 75.61547479484173 
Epoch 12 ValLoss 0.6415785551071167 Val Accuracy 75.3810082063306 
Epoch 13 ValLoss 0.7198540568351746 Val Accuracy 74.67760844079719 
Epoch 14 ValLoss 0.6670474410057068 Val Accuracy 76.31887456037515 
Epoch 15 ValLoss 0.7782196402549744 Val Accuracy 74.7948417

In [53]:
attmodel.load_state_dict(best_model_wts)
testacc = get_accuracy(attmodel,testloader)
print("Test Accuracy {} ".format(testacc))

Test Accuracy 76.74636661978434 
