<a href="https://colab.research.google.com/github/radonys/Deep-Learning-Assignments/blob/master/Assignment-1-Q3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!pip install torch torchvision
!pip install pandas nltk

In [0]:
import pandas as pd
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
import re
from bs4 import BeautifulSoup
from collections import Counter
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn

In [0]:
REPLACE_BY_SPACE_RE = re.compile('[/(){}\[\]\|@,;]')
BAD_SYMBOLS_RE = re.compile('[^0-9a-z #+_]')
STOPWORDS = set(stopwords.words('english'))

def string_form(value):
    return str(value).lower()

def clean_text(text):
   
    text = BeautifulSoup(text, "lxml").text
    text = REPLACE_BY_SPACE_RE.sub(' ', text)
    text = BAD_SYMBOLS_RE.sub('', text)
    text = ' '.join(word for word in text.split() if word not in STOPWORDS)
    return text

def pad_features(reviews_int, seq_length=100):
    
    features = np.zeros((len(reviews_int), seq_length), dtype = int)
    
    for i, review in enumerate(reviews_int):
        
        review_len = len(review)
        
        if review_len <= seq_length:
            
            zeroes = list(np.zeros(seq_length-review_len))
            new = zeroes+review
        
        elif review_len > seq_length:
            
            new = review[0:seq_length]
        
        features[i,:] = np.array(new)
    
    return features

In [0]:
data = pd.read_table('mrdata.tsv')
data['Phrase'] = data['Phrase'].apply(string_form)
data['Phrase'] = data['Phrase'].apply(clean_text)
del data['PhraseId']
del data['SentenceId']
data.head(10)

In [0]:
all_text = ' '.join(data['Phrase'])
words = all_text.split()
count_words = Counter(words)
total_words = len(words)
sorted_words = count_words.most_common(total_words)
print(count_words)

In [0]:
vocab_to_int = {w:i+1 for i, (w,c) in enumerate(sorted_words)}
reviews_int = []
for review in data['Phrase']:
    r = [vocab_to_int[w] for w in review.split()]
    reviews_int.append(r)
print (reviews_int[0:3])

In [0]:
encoded_labels = [int(i) for i in data['Sentiment']]
encoded_labels = np.array(encoded_labels)

In [0]:
reviews_len = [len(x) for x in reviews_int]
pd.Series(reviews_len).hist()
plt.show()
pd.Series(reviews_len).describe()

reviews_int = [ reviews_int[i] for i, l in enumerate(reviews_len) if l>0]
encoded_labels = [ encoded_labels[i] for i, l in enumerate(reviews_len) if l> 0]
features = pad_features(reviews_int)
print(features[:10,:])

In [0]:
split_frac = 0.8
len_feat = len(features)
train_x = np.array(features[0:int(split_frac*len_feat)])
train_y = np.array(encoded_labels[0:int(split_frac*len_feat)])
test_x = np.array(features[int(split_frac*len_feat):])
test_y = np.array(encoded_labels[int(split_frac*len_feat):])

In [0]:
train_data = TensorDataset(torch.from_numpy(train_x), torch.from_numpy(train_y))
test_data = TensorDataset(torch.from_numpy(test_x), torch.from_numpy(test_y))

batch_size = 50

train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
test_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size)

In [0]:
class SentimentLSTM(nn.Module):

    def __init__(self, vocab_size, output_size, embedding_dim, hidden_dim, n_layers, drop_prob=0.5):
        
        super().__init__()

        self.output_size = output_size
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers, 
                            dropout=drop_prob, batch_first=True)
        
        self.dropout = nn.Dropout(0.3)
        
        self.fc = nn.Linear(hidden_dim, output_size)
        self.sig = nn.Sigmoid()
        

    def forward(self, x, hidden):
        
        batch_size = x.size(0)

        embeds = self.embedding(x)
        lstm_out, hidden = self.lstm(embeds, hidden)
        
        out = self.dropout(lstm_out[-1])
        out = self.fc(out)
        sig_out = self.sig(out)
        
        sig_out = sig_out.view(batch_size, -1)
        
        return sig_out, hidden
    
    
    def init_hidden(self, batch_size):
        
        weight = next(self.parameters()).data
        
        if (train_on_gpu):
            hidden = (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().cuda(),
                  weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().cuda())
        else:
            hidden = (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_(),
                      weight.new(self.n_layers, batch_size, self.hidden_dim).zero_())
        
        return hidden

In [0]:
vocab_size = len(vocab_to_int)+1
output_size = 5
embedding_dim = 400
hidden_dim = 256
n_layers = 3
train_on_gpu = False
net = SentimentLSTM(vocab_size, output_size, embedding_dim, hidden_dim, n_layers)

In [0]:
lr=0.001

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=lr)

epochs = 4

counter = 0
print_every = 1
clip=5

net.train()

for e in range(epochs):
    
    h = net.init_hidden(batch_size)

    
    for inputs, labels in train_loader:
        counter += 1

        if(train_on_gpu):
            inputs, labels = inputs.cuda(), labels.cuda()

        h = tuple([each.data for each in h])

        net.zero_grad()

        inputs = inputs.type(torch.LongTensor)
        output, h = net(inputs, h)

        loss = criterion(output.squeeze(), labels)
        loss.backward()
        nn.utils.clip_grad_norm_(net.parameters(), clip)
        optimizer.step()

        # loss stats
        if counter % print_every == 0:

            net.train()
            print("Epoch: {}/{}...".format(e+1, epochs),
                  "Step: {}...".format(counter),
                  "Loss: {:.6f}...".format(loss.item()))

In [0]:
test_losses = []
num_correct = 0

h = net.init_hidden(batch_size)

net.eval()

for inputs, labels in test_loader:

    h = tuple([each.data for each in h])

    if(train_on_gpu):
        inputs, labels = inputs.cuda(), labels.cuda()
    
    inputs = inputs.type(torch.LongTensor)
    output, h = net(inputs, h)
    
    test_loss = criterion(output.squeeze(), labels.float())
    test_losses.append(test_loss.item())
    
    pred = torch.round(output.squeeze())
    
    correct_tensor = pred.eq(labels.float().view_as(pred))
    correct = np.squeeze(correct_tensor.numpy()) if not train_on_gpu else np.squeeze(correct_tensor.cpu().numpy())
    num_correct += np.sum(correct)


print("Test loss: {:.3f}".format(np.mean(test_losses)))

test_acc = num_correct/len(test_loader.dataset)
print("Test accuracy: {:.3f}".format(test_acc))