In [None]:
!pip install torchtext

In [349]:
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence

import string as st

import nltk
from nltk.corpus import wordnet
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
from nltk.tokenize import word_tokenize

nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('punkt')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [55]:
import torchtext
from torchtext.data import get_tokenizer
from torchtext.vocab import GloVe
from torchtext import data
from torchtext import vocab

In [343]:
dataset = pd.read_csv('Restaurant_Reviews.tsv', delimiter = '\t', quoting = 3)

In [358]:
dataset.tail()

Unnamed: 0,Review,Liked
995,I think food should have flavor and texture an...,0
996,Appetite instantly gone.,0
997,Overall I was not impressed and would not go b...,0
998,"The whole experience was underwhelming, and I ...",0
999,"Then, as if I hadn't wasted enough of my life ...",0


In [345]:
stop_words = set(stopwords.words('english'))
ps = PorterStemmer()

In [346]:
reviews = dataset.Review.str.lower()
labels = dataset.Liked

In [356]:
punctuations = st.punctuation

X=[]

for review in list(reviews):
  temp_list = []
  tokens = word_tokenize(review)

  for token in tokens:
    if token not in punctuations:
      if token == 'not':
        temp_list.append(token)
      elif token not in stop_words and '...' not in token:
        stem = ps.stem(token)
        temp_list.append(stem)

  X.append(' '.join(temp_list))

# DL part

In [360]:
unique_words = set()

for string in X:
    words = string.split()
    unique_words.update(words)

unique_words = list(unique_words)
print("Corpus size:", len(unique_words))

embedding_dim = 100
global_vectors = GloVe(name='6B', dim=embedding_dim) # 42B, 840B

corpus_size = len(unique_words)
weights_matrix = np.zeros((corpus_size, embedding_dim))

found_word = 0
for i, word in enumerate(unique_words):
  word_vector = global_vectors.get_vecs_by_tokens(word)

  if word_vector.sum().item() == '0':
    weights_matrix[i] = np.random.normal(scale=0.6, size=(embedding_dim, ))
  else:
    weights_matrix[i] = word_vector
    found_word += 1

Corpus size: 1625


## GRU Model

In [361]:
class Classifier(nn.Module):
  def __init__(self, corpus_size, embedding_dim, hidden_size, load_embed=False, weights_matrix=None, trainable_embedding=False):
    super().__init__()
    self.num_layers = 2
    self.hidden_size = hidden_size

    self.embedding = nn.Embedding(corpus_size, embedding_dim)

    if load_embed and weights_matrix is not None:
      self.embedding.load_state_dict({'weight': torch.tensor(weights_matrix)})

    self.embedding.weight.requires_grad = trainable_embedding

    self.gru = nn.GRU(embedding_dim, hidden_size, self.num_layers, dropout=0.3, bidirectional=True, batch_first=True)

    self.linear = nn.Linear(embedding_dim, hidden_size)
    self.output = nn.Linear(hidden_size*2, 1)

    self.relu = nn.ReLU()
    self.sigmoid = nn.Sigmoid()

  def forward(self, x):
    embedding = self.embedding(x)
    # pooled = embedding.mean(1)

    out, hidden = self.gru(embedding)
    out = out[:, -1, :]

    # out = self.relu(self.linear(pooled))
    out = self.output(out)

    return out

## HAN Model

In [362]:
class WordAttention(nn.Module):
  def __init__(self, hidden_size, embedding_dim):
    super().__init__()

    self.lin1 = nn.Linear(hidden_size, hidden_size)
    self.lin2 = nn.Linear(hidden_size, 1, bias=False)

  def forward(self, x):
    u = torch.tanh(self.lin1(x))
    attention = F.softmax(self.lin2(x), dim=1)

    output = torch.sum(
        attention * x, dim=1
    )

    return attention, output

class SentenceAttention(nn.Module):
  def __init__(self, hidden_size, embedding_dim):
    super().__init__()

    self.lin1 = nn.Linear(hidden_size, hidden_size)
    self.lin2 = nn.Linear(hidden_size, 1, bias=False)

  def forward(self, x):
    u = torch.tanh(self.lin1(x))
    attention = F.softmax(self.lin2(x), dim=1)

    output = torch.sum(
        attention * x, dim=1
    )

    return attention, output

class WordEncoder(nn.Module):
  def __init__(self, corpus_size, embedding_dim, hidden_size, load_embed=False, weights_matrix=None, trainable_embedding=False):
    super().__init__()

    self.embedding = nn.Embedding(corpus_size, embedding_dim)

    if load_embed and weights_matrix is not None:
      self.embedding.load_state_dict({'weight': torch.tensor(weights_matrix)})

    self.embedding.weight.requires_grad = trainable_embedding

    self.gru = nn.GRU(embedding_dim, hidden_size, 2, dropout=0.3, bidirectional=True, batch_first=True)
    self.attention = WordAttention(hidden_size*2, embedding_dim)

  def forward(self, x):
    embeddings = self.embedding(x)
    out, hidden = self.gru(embeddings)
    attention, out = self.attention(out)

    return out

class HAN(nn.Module):
  def __init__(self, corpus_size, embedding_dim, hidden_size, load_embed=False, weights_matrix=None, trainable_embedding=False):
    super().__init__()

    self.wordEncoder = WordEncoder(corpus_size=corpus_size, embedding_dim=embedding_dim, hidden_size=50, load_embed=True, weights_matrix=weights_matrix, trainable_embedding=True)

    self.sentGRU = nn.GRU(hidden_size*2, hidden_size, bidirectional=True, batch_first=True)
    self.sentence_attention = SentenceAttention(hidden_size * 2, hidden_size)

    self.sigmoid = nn.Sigmoid()
    self.classifier = nn.Linear(hidden_size*2, 1)

  def forward(self, x):
    word_output = self.wordEncoder(x) # 16, 100

    sent_out, _ = self.sentGRU(word_output.unsqueeze(1)) # 16, 100
    _, sent_output = self.sentence_attention(sent_out)

    return self.classifier(sent_output)

## Training

In [363]:
class CustomDataset(Dataset):
    def __init__(self, X, y, unique_words, weights_matrix):
        self.X = X
        self.y = y
        self.unique_words = unique_words
        self.weights_matrix = weights_matrix

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        sentence = self.X[idx]
        label = self.y[idx]

        indices = [self.unique_words.index(word) for word in sentence.split()]

        return {
            'input': torch.tensor(indices, dtype=torch.long),
            'label': torch.tensor(label, dtype=torch.float)
        }

def collate_fn(batch):
    inputs = [item['input'] for item in batch]
    labels = [item['label'] for item in batch]

    inputs_padded = pad_sequence(inputs, batch_first=True, padding_value=0)

    return {
        'input': inputs_padded,
        'label': torch.stack(labels)
    }

batch_size = 16

dataset = CustomDataset(X=X, y=labels, unique_words=unique_words, weights_matrix=weights_matrix)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size

train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False, collate_fn=collate_fn)

In [364]:
def train(model, optim, loss_fn, epochs=50, print_loss=False):
  for epoch in range(50):
    epoch_loss = 0

    model.train()
    for i in train_loader:
      optim.zero_grad()

      output = model(i["input"])
      target = i["label"]
      target = target.unsqueeze(1)

      loss = loss_fn(target, output)
      epoch_loss += loss.item()

      loss.backward()
      optim.step()

    model.eval()
    for i in test_loader:
      output = model(i["input"])
      target = i["label"]
      target = target.unsqueeze(1)

      l = loss_fn(target, output)

    if print_loss:
      if epoch % 10 == 0:
        print("Epoch loss:", round(epoch_loss/len(train_loader), 4))
        print("Eval Loss:", round(l.item(), 4))

  print("Eval Loss:", round(l.item(), 4))
  return model

In [367]:
corpus_size, embedding_dim = weights_matrix.shape

gru_model = Classifier(corpus_size=corpus_size, embedding_dim=embedding_dim, hidden_size=50, load_embed=True, weights_matrix=weights_matrix, trainable_embedding=False)
optim =  torch.optim.Adam(gru_model.parameters(), 0.001)
loss_fn = torch.nn.MSELoss()
gru_model = train(gru_model, optim, loss_fn)

han_model = HAN(corpus_size=corpus_size, embedding_dim=embedding_dim, hidden_size=50, load_embed=True, weights_matrix=weights_matrix, trainable_embedding=False)
optim =  torch.optim.Adam(han_model.parameters(), 0.001)
loss_fn = torch.nn.MSELoss()
han_model = train(han_model, optim, loss_fn)

Eval Loss: 0.2554
Eval Loss: 0.2383


In [368]:
def evaluate(model):
  for i in test_loader:
      output = model(i["input"])
      target = i["label"]


  from sklearn.metrics import accuracy_score

  print(accuracy_score(
      target,
      np.where(output.squeeze(1).detach().numpy() > 0.5, 1, 0)
  ))

evaluate(gru_model)
evaluate(han_model)

0.725
0.76
