In [1]:
!pip install torchtext



In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence

import string as st

import nltk
from nltk.corpus import wordnet
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
from nltk.tokenize import word_tokenize

nltk.download('stopword')
nltk.download('wordnet')
nltk.download('punkt')


[nltk_data] Error loading stopword: Package 'stopword' not found in
[nltk_data]     index
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [4]:
import torchtext
from torchtext.data import get_tokenizer
from torchtext.vocab import GloVe
from torchtext import data
from torchtext import vocab


In [5]:
yelp_path_text="/content/drive/MyDrive/yelp_2013_texts.txt"
yelp_path_score="/content/drive/MyDrive/yelp_2013_score.txt"
texts = []
scores = []

texts = []
scores = []
with open(yelp_path_text, 'r', encoding='utf-8', errors="ignore") as file:
    for line in file:
        texts.append(line.strip())
with open(yelp_path_score, 'r') as file:
    for line in file:
        scores.append(line.strip())

In [6]:
def clean_str(string):
    string = re.sub(r'[^\x00-\x7F]+', r'', string)
    string = re.sub(r"\\", "", string)
    string = re.sub(r"\'", "", string)
    string = re.sub(r"\"", "", string)
    string = re.sub(r"<sssss>", "", string)
    string = re.sub(r"-lrb-", "", string)
    string = re.sub(r"-rrb-", "", string)
    string = re.sub(r"\.\.\.", "", string)
    string = string.strip().lower()
    return string

paired = list(zip(texts, scores))
np.random.shuffle(paired)
texts, scores = zip(*paired)
texts = list(texts)
scores = list(scores)


In [7]:
texts = texts[:5000]
scores = scores[:5000]
scores = [int(score) -1  for score in scores]

In [8]:
unique_words = set()

for string in texts:
    words = string.split()
    unique_words.update(words)

unique_words = list(unique_words)
print("Corpus size:", len(unique_words))

embedding_dim = 100
global_vectors = GloVe(name='6B', dim=embedding_dim) # 42B, 840B

corpus_size = len(unique_words)
weights_matrix = np.zeros((corpus_size, embedding_dim))

found_word = 0
for i, word in enumerate(unique_words):
  word_vector = global_vectors.get_vecs_by_tokens(word)

  if word_vector.sum().item() == '0':
    weights_matrix[i] = np.random.normal(scale=0.6, size=(embedding_dim, ))
  else:
    weights_matrix[i] = word_vector
    found_word += 1

Corpus size: 22548


In [9]:
class CustomDataset(Dataset):
    def __init__(self, X, y, unique_words, weights_matrix):
        self.X = X
        self.y = y
        self.unique_words = unique_words
        self.weights_matrix = weights_matrix

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        sentence = self.X[idx]
        label = self.y[idx]

        indices = [self.unique_words.index(word) for word in sentence.split()]

        return {
            'input': torch.tensor(indices, dtype=torch.long),
            'label': torch.tensor(label, dtype=torch.long)
        }
def collate_fn(batch):
    inputs = [item['input'] for item in batch]
    labels = [item['label'] for item in batch]

    inputs_padded = pad_sequence(inputs, batch_first=True, padding_value=0)

    return {
        'input': inputs_padded,
        'label': torch.stack(labels)
    }

In [10]:
class WordAttention(nn.Module):
  def __init__(self,hidden_size, word_embedd_dim) :
    super(WordAttention, self).__init__()
    self.hidden_size = hidden_size
    self.word_embedd_dim= word_embedd_dim
    self.lin1 = nn.Linear(hidden_size,hidden_size)
    self.lin2 =nn.Linear(hidden_size,1,bias = False)
  def forward(self,x):
    u = torch.tanh(self.lin1(x))
    attention = F.softmax(self.lin2(x),dim=1)

    output = torch.sum(attention*x, dim =1)

    return attention, output

class SenAttention(nn.Module):
  def __init__(self, hidden_size, embedding_dim):
    super(SenAttention, self).__init__()
    self.lin1=nn.Linear(hidden_size,hidden_size)
    self.lin2 = nn.Linear(hidden_size,1,bias=False)
  def forward(self, x):
    u = torch.tanh(self.lin1(x))
    attention = F.softmax(self.lin2(u),dim =1)

    output = torch.sum(attention * x, dim =1)

    return attention, output

class WordEncoder(nn.Module):
  def __init__(self, corpus_size, embedding_dim, hidden_size, load_embed=False, weights_matrix=None, trainable_embedding= False):
    super(WordEncoder, self).__init__()

    self.embedding = nn.Embedding(corpus_size, embedding_dim)

    if load_embed and weights_matrix is not None :
      self.embedding.load_state_dict({'weight': torch.tensor(weights_matrix)})

    self.embedding.weight.requires_grad = trainable_embedding
    self.gru = nn.GRU(embedding_dim, hidden_size,2, dropout=0.3, bidirectional= True,batch_first = True)
    self.attention = WordAttention(hidden_size * 2, embedding_dim)
  def forward(self, x):
    embeddings = self.embedding(x)
    out, hidden = self.gru(embeddings)
    attention, out = self.attention(out)
    return out

class HAN(nn.Module):
  def __init__(self, corpus_size, embedding_dim, hidden_size,load_embed= False, weights_matrix=None,trainable_embedding=False):
    super(HAN,self).__init__()

    self.wordEncoder=WordEncoder(corpus_size=corpus_size,embedding_dim=embedding_dim,hidden_size=50, load_embed=True, weights_matrix=weights_matrix, trainable_embedding=True)

    self.sentGRU= nn.GRU(hidden_size*2, hidden_size, bidirectional=True, batch_first=True)
    self.sentence_att = SenAttention(hidden_size * 2, hidden_size)

    self.sigmoid = nn.Sigmoid()
    self.classifer= nn.Linear(hidden_size*2,5)
  def forward(self, x):
    word_output= self.wordEncoder(x)

    sen_out,_ = self.sentGRU(word_output.unsqueeze(1))
    _, sen_output = self.sentence_att(sen_out)

    out = self.classifer(sen_output)
    return F.softmax(out,dim=1)







**Tranning**

In [11]:
batch_size = 16
dataset = CustomDataset(X=texts,y=scores,unique_words = unique_words, weights_matrix = weights_matrix)
train_size = int(0.8*len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset,[train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size = len(test_dataset), shuffle = False, collate_fn=collate_fn)


In [12]:
def train(model, optim, loss_fn, epochs=50, print_loss=True):
  for epoch in range(50):
    epoch_loss = 0

    model.train()
    for i in train_loader:
      optim.zero_grad()

      output = model(i["input"])
      target = torch.tensor(i["label"], dtype=torch.long)
      target = target.unsqueeze(1)

      loss = loss_fn(output, target.squeeze())
      epoch_loss += loss.item()

      loss.backward()
      optim.step()

    model.eval()
    with torch.no_grad():
        eval_loss = 0
        for i in test_loader:
            output = model(i["input"])
            target = torch.tensor(i["label"], dtype=torch.long)

            loss = loss_fn(output, target)
            eval_loss += loss.item()

    if print_loss:
        if epoch % 10 == 0:
            print("Epoch loss:", round(epoch_loss / len(train_loader), 4))
            print("Eval Loss:", round(eval_loss / len(test_loader), 4))

    print("Eval Loss:", round(eval_loss / len(test_loader), 4))
    return model

In [13]:
corpus_size, embedding_dim = weights_matrix.shape
han_model = HAN(corpus_size=corpus_size, embedding_dim=embedding_dim, hidden_size=50, load_embed=True, weights_matrix=weights_matrix, trainable_embedding=False)
optim =  torch.optim.Adam(han_model.parameters(), 0.001)
loss_fn = torch.nn.CrossEntropyLoss()
han_model = train(han_model, optim, loss_fn)

  target = torch.tensor(i["label"], dtype=torch.long)


Epoch loss: 1.5074
Eval Loss: 1.4994
Eval Loss: 1.4994


  target = torch.tensor(i["label"], dtype=torch.long)


In [14]:
def evaluate(model):
  for i in test_loader:
      output = model(i["input"])
      target = i["label"]


  from sklearn.metrics import accuracy_score

  print(accuracy_score(
      target,
      torch.argmax(output, dim=1)
  ))

evaluate(han_model)

0.352
