In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
plt.style.use('ggplot')
import seaborn as sns

from collections import Counter
from pyvi import ViTokenizer
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
from torch.optim import lr_scheduler
from pytorch_model_summary import summary

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, f1_score
from sklearn.utils import resample

import re
import os
import itertools

In [None]:
from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter()

In [None]:
reviews = pd.read_csv("C:/Users/Admin/OneDrive - Đại học FPT- FPT University/This PC/Documents/Datasets/reviews.csv")
df = reviews[['content', 'score']]
df.loc[df['content'].isnull(), 'content'] = ''
df.head()

In [None]:
ax = df.score.value_counts().sort_index().plot(kind='barh', title='Count Reviews by Stars', figsize=(10, 5))
ax.set_xlabel("Counts")
ax.set_ylabel("Ratings")

In [None]:
class AttentionModel(torch.nn.Module):
	def __init__(self, batch_size, output_size, hidden_size, vocab_size, embedding_length):
		super(AttentionModel, self).__init__()
		self.batch_size = batch_size
		self.output_size = output_size
		self.hidden_size = hidden_size
		self.vocab_size = vocab_size
		self.embedding_length = embedding_length
		
		self.word_embeddings = nn.Embedding(vocab_size, embedding_length)
		self.lstm = nn.LSTM(embedding_length, hidden_size)
		self.label = nn.Linear(hidden_size, output_size)
		
	def attention_net(self, lstm_output, final_state):
		hidden = final_state.squeeze(0)
		attn_weights = torch.bmm(lstm_output, hidden.unsqueeze(2)).squeeze(2)
		soft_attn_weights = F.softmax(attn_weights, 1)
		return torch.bmm(
			lstm_output.transpose(1, 2), soft_attn_weights.unsqueeze(2)
		).squeeze(2)
	
	def forward(self, input_sentences, batch_size=None):
		i = self.word_embeddings(input_sentences)
		i = i.permute(1, 0, 2)
		if batch_size is None:
			h_0 = Variable(torch.zeros(1, self.batch_size, self.hidden_size))
			c_0 = Variable(torch.zeros(1, self.batch_size, self.hidden_size))
		else:
			h_0 = Variable(torch.zeros(1, batch_size, self.hidden_size))
			c_0 = Variable(torch.zeros(1, batch_size, self.hidden_size))

		output, (final_hidden_state, final_cell_state) = self.lstm(i, (h_0, c_0))
		output = output.permute(1, 0, 2)

		attn_output = self.attention_net(output, final_hidden_state)
		return self.label(attn_output)

In [None]:
uniChars = "àáảãạâầấẩẫậăằắẳẵặèéẻẽẹêềếểễệđìíỉĩịòóỏõọôồốổỗộơờớởỡợùúủũụưừứửữựỳýỷỹỵÀÁẢÃẠÂẦẤẨẪẬĂẰẮẲẴẶÈÉẺẼẸÊỀẾỂỄỆĐÌÍỈĨỊÒÓỎÕỌÔỒỐỔỖỘƠỜỚỞỠỢÙÚỦŨỤƯỪỨỬỮỰỲÝỶỸỴÂĂĐÔƠƯ"
unsignChars = "aaaaaaaaaaaaaaaaaeeeeeeeeeeediiiiiooooooooooooooooouuuuuuuuuuuyyyyyAAAAAAAAAAAAAAAAAEEEEEEEEEEEDIIIOOOOOOOOOOOOOOOOOOOUUUUUUUUUUUYYYYYAADOOU"

def load_dic_char():
    char1252 = 'à|á|ả|ã|ạ|ầ|ấ|ẩ|ẫ|ậ|ằ|ắ|ẳ|ẵ|ặ|è|é|ẻ|ẽ|ẹ|ề|ế|ể|ễ|ệ|ì|í|ỉ|ĩ|ị|ò|ó|ỏ|õ|ọ|ồ|ố|ổ|ỗ|ộ|ờ|ớ|ở|ỡ|ợ|ù|ú|ủ|ũ|ụ|ừ|ứ|ử|ữ|ự|ỳ|ý|ỷ|ỹ|ỵ|À|Á|Ả|Ã|Ạ|Ầ|Ấ|Ẩ|Ẫ|Ậ|Ằ|Ắ|Ẳ|Ẵ|Ặ|È|É|Ẻ|Ẽ|Ẹ|Ề|Ế|Ể|Ễ|Ệ|Ì|Í|Ỉ|Ĩ|Ị|Ò|Ó|Ỏ|Õ|Ọ|Ồ|Ố|Ổ|Ỗ|Ộ|Ờ|Ớ|Ở|Ỡ|Ợ|Ù|Ú|Ủ|Ũ|Ụ|Ừ|Ứ|Ử|Ữ|Ự|Ỳ|Ý|Ỷ|Ỹ|Ỵ'.split('|')
    charutf8 = "à|á|ả|ã|ạ|ầ|ấ|ẩ|ẫ|ậ|ằ|ắ|ẳ|ẵ|ặ|è|é|ẻ|ẽ|ẹ|ề|ế|ể|ễ|ệ|ì|í|ỉ|ĩ|ị|ò|ó|ỏ|õ|ọ|ồ|ố|ổ|ỗ|ộ|ờ|ớ|ở|ỡ|ợ|ù|ú|ủ|ũ|ụ|ừ|ứ|ử|ữ|ự|ỳ|ý|ỷ|ỹ|ỵ|À|Á|Ả|Ã|Ạ|Ầ|Ấ|Ẩ|Ẫ|Ậ|Ằ|Ắ|Ẳ|Ẵ|Ặ|È|É|Ẻ|Ẽ|Ẹ|Ề|Ế|Ể|Ễ|Ệ|Ì|Í|Ỉ|Ĩ|Ị|Ò|Ó|Ỏ|Õ|Ọ|Ồ|Ố|Ổ|Ỗ|Ộ|Ờ|Ớ|Ở|Ỡ|Ợ|Ù|Ú|Ủ|Ũ|Ụ|Ừ|Ứ|Ử|Ữ|Ự|Ỳ|Ý|Ỷ|Ỹ|Ỵ".split('|')
    return {char1252[i]: charutf8[i] for i in range(len(char1252))}

dicchar = load_dic_char()

def convert_unicode(text):
    return re.sub(r'à|á|ả|ã|ạ|ầ|ấ|ẩ|ẫ|ậ|ằ|ắ|ẳ|ẵ|ặ|è|é|ẻ|ẽ|ẹ|ề|ế|ể|ễ|ệ|ì|í|ỉ|ĩ|ị|ò|ó|ỏ|õ|ọ|ồ|ố|ổ|ỗ|ộ|ờ|ớ|ở|ỡ|ợ|ù|ú|ủ|ũ|ụ|ừ|ứ|ử|ữ|ự|ỳ|ý|ỷ|ỹ|ỵ|À|Á|Ả|Ã|Ạ|Ầ|Ấ|Ẩ|Ẫ|Ậ|Ằ|Ắ|Ẳ|Ẵ|Ặ|È|É|Ẻ|Ẽ|Ẹ|Ề|Ế|Ể|Ễ|Ệ|Ì|Í|Ỉ|Ĩ|Ị|Ò|Ó|Ỏ|Õ|Ọ|Ồ|Ố|Ổ|Ỗ|Ộ|Ờ|Ớ|Ở|Ỡ|Ợ|Ù|Ú|Ủ|Ũ|Ụ|Ừ|Ứ|Ử|Ữ|Ự|Ỳ|Ý|Ỷ|Ỹ|Ỵ', lambda x: dicchar[x.group()], text)

def clean_text(text):
    text_uni = convert_unicode(text).lower()
    text_rmspectoken = re.findall(r'(?i)\b[a-záàảãạăắằẳẵặâấầẩẫậéèẻẽẹêếềểễệóòỏõọôốồổỗộơớờởỡợíìỉĩịúùủũụưứừửữựýỳỷỹỵđ0-9]+\b', text_uni)
    return ' '.join(text_rmspectoken)

df.loc[:, 'content'] = df['content'].apply(clean_text)
df.head(20)

In [None]:
class CustomDataset(Dataset):
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return torch.from_numpy(self.x[idx][0].astype(np.float32)), self.y[idx]

In [None]:
df['label'] = df['score'].map({5: 2, 4: 1, 3: 1, 2: 0, 1: 0})
df['sentiment'] = df['label']. map({2: 'positive', 1: 'neutral', 0: 'negative'})
df.head()

In [None]:
df.sentiment.value_counts()

In [None]:
positive = df[df['label'] == 2]
neutral = df[df['label'] == 1]
negative = df[df['label'] == 0]

upsampling_neutral = resample(neutral, random_state=42, n_samples=2000, replace=True)
upsampling_negative = resample(negative, random_state=42, n_samples=2000, replace=True)

df = pd.concat([df, upsampling_neutral, upsampling_negative])
df['length'] = [len(item) for item in list(df['content'])]
df['num_word'] = [len(item.split(' ')) for item in list(df['content'])]
df.shape

In [None]:
sns.histplot(data=df['num_word'], bins=20, kde=True)
print(f"Mean: {np.mean(df['num_word'])}")

In [None]:
def tokenize(text):
    list_token = ViTokenizer.tokenize(text)
    return list_token.split(' ')

counts = Counter()
for i, n in df.iterrows():
    counts.update(tokenize(n['content']))

print("num_words before:", len(counts.keys()))

for word in list(counts):
    if counts[word] < 2:
        del counts[word]

print("num_words after:", len(counts.keys()))

In [None]:
vocab2index = {"": 0, "UNK": 1}
words = ["", "UNK"]
for word in counts:
    vocab2index[word] = len(words)
    words.append(word)

def encode_sentence(text, vocab2index, n=50):
    tokenized = tokenize(text)
    encoded = np.zeros(n, dtype=int)
    enc1 = np.array([vocab2index.get(word, vocab2index["UNK"]) for word in tokenized])
    length = min(n, len(enc1))
    encoded[:length] = enc1[:length]
    return [encoded]

df['encoded'] = df['content'].apply(lambda x: np.array(encode_sentence(x, vocab2index)))
df.head()

In [None]:
df.to_csv("C:/Users/Admin/OneDrive - Đại học FPT- FPT University/This PC/Documents/Datasets/sentiment.csv")

In [None]:
x = list(df['encoded'])
y = list(df['label'])

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3,random_state=42)

In [None]:
print(len(x_train))
print(len(x_test))

In [None]:
class_counts = np.bincount(y_train)
total_samples = len(y_train)
class_weights = torch.tensor(total_samples / (len(class_counts) * class_counts), dtype=torch.float)

print(class_weights)

In [None]:
train_dataset = CustomDataset(x_train, y_train)
test_dataset = CustomDataset(x_test, y_test)

In [None]:
batch_size = 1
vocab_size = len(words)
output_size = 3
hidden_size = 128
embedding_length = 400

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

model = AttentionModel(batch_size=batch_size, 
                        output_size=output_size, 
                        hidden_size=hidden_size, 
                        vocab_size=vocab_size, 
                        embedding_length=embedding_length
                        )

In [None]:
model = model.train()

In [None]:
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss(weight=class_weights)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.8)

In [None]:
def train_one_epoch(model, train_dl, optimizer, criterion, writer, epoch):
    epoch_loss = 0
    list_pred = []
    list_true = []
    for x, y in train_dl:
        y = y.type(torch.int64)
        x = x.long()

        y_pred = model(x)

        optimizer.zero_grad()

        loss = criterion(y_pred, y)

        writer.add_scalar("Loss/train", loss, epoch)

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        list_pred.extend(y_pred.argmax(dim=1).tolist())
        list_true.extend(y.tolist())
    return epoch_loss / len(train_dl), list_pred, list_true

In [None]:
def evaluate_one_epoch(model, valid_dl, criterion, writer, epoch):
    model.eval()
    epoch_loss = 0
    list_true = []
    list_pred = []
    with torch.no_grad():
        for x, y in valid_dl:
            y = y.type(torch.int64)
            x = x.long()
            y_hat = model(x)
            loss = criterion(y_hat, y)
            writer.add_scalar("Loss/valid", loss, epoch)
            epoch_loss += loss.item()
            list_true.extend(y.tolist())
            list_pred.extend(y_hat.argmax(dim=1).tolist())
    return epoch_loss / len(valid_dl), list_true, list_pred

In [None]:
loss = []
acc = []
val_loss = []
acc_max = 0
valid_loss_min = 1.

for epoch in tqdm(range(30)):
    train_loss, train_pred, train_true = train_one_epoch(model, train_loader, optimizer, criterion, writer, epoch)
    valid_loss, test_true, test_pred = evaluate_one_epoch(model, test_loader, criterion, writer, epoch)
    print('Epoch-{0} lr: {1}'.format(epoch, optimizer.param_groups[0]['lr']))
    print(f'\tTrain Loss: {train_loss:.3f} | Valid Loss: {valid_loss:.3f}')

    if valid_loss < valid_loss_min:
        valid_loss_min = valid_loss
        checkpoint = {
            'model': model,
            'state_dict': model.state_dict(),
            'optimizer': optimizer.state_dict()
        }
        torch.save(
            checkpoint,
            f'checkpoint_{valid_loss_min}.pth',
        )

    exp_lr_scheduler.step()

writer.flush()
writer.close()

In [None]:
def load_checkpoint(filepath):
    checkpoint = torch.load(filepath)
    model = checkpoint['model']
    model.load_state_dict(checkpoint['state_dict'])
    for parameter in model.parameters():
        parameter.requires_grad = False

    model.eval()
    return model

In [None]:
checkpoint_files = os.listdir("C:\\Users\\Admin\\Documents\\Google Drive\\huytvo.2003@gmail.com\\My Drive\\Colab Notebooks")

checkpoint_files = [filename for filename in checkpoint_files if filename.startswith('checkpoint_') and filename.endswith('.pth')]
checkpoint_files = sorted(checkpoint_files, key=lambda x: float(x.split('checkpoint_')[1].replace('.pth', '')))

if len(checkpoint_files) == 0:
    print("No valid checkpoint files found.")

best_checkpoint_file = checkpoint_files[0]
best_checkpoint_path = os.path.join("C:\\Users\\Admin\\Documents\\Google Drive\\huytvo.2003@gmail.com\\My Drive\\Colab Notebooks", best_checkpoint_file)

load_model = load_checkpoint(best_checkpoint_path)

In [None]:
print(summary(load_model, torch.zeros([1, 50]).long(), show_input=True))

In [None]:
print(load_model)

In [None]:
print(classification_report(test_true, test_pred, target_names=['negative', 'neutral', 'positive']))

In [None]:
def plot_confusion_matrix(cm, target_names, title='Confusion Matrix', cmap='Blues', normalize=False):
    accuracy = np.trace(cm) / cm.sum()
    misclass = 1 - accuracy

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1, keepdims=True)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()

    tick_marks = np.arange(len(target_names))
    plt.xticks(tick_marks, target_names, rotation=45)
    plt.yticks(tick_marks, target_names)

    thresh = (cm.max() + cm.min()) / 2.0 if normalize else cm.max() / 2.0

    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        text_color = "white" if cm[i, j] > thresh else "black"
        plt.text(j, i, f"{cm[i, j]}", horizontalalignment="center", color=text_color)

    plt.tight_layout()
    plt.ylabel('True labels')
    plt.xlabel(f'Predicted labels\n\naccuracy = {accuracy:.4f}; Misclassification rate = {misclass:.4f}')
    plt.show()

plot_confusion_matrix(confusion_matrix(test_true, test_pred), target_names = ['negative', 'neutral', 'positive'])

In [None]:
for i, target_name in enumerate(['negative', 'neutral', 'positive']):
    print(f"F1 score ({target_name}): {f1_score(test_true, test_pred, average=None)[i]:.4f}")