In [None]:
!pip install transformers==4.16 --quiet
!pip install underthesea --quiet
!pip install seaborn --quiet
!pip install numpy
!pip install torch
!pip install pandas
!pip install pyvi
!pip install gensim

In [None]:
import json
import re
import string

import numpy as np
import pandas as pd

from sklearn.metrics import accuracy_score, classification_report
from underthesea import word_tokenize, text_normalize

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from torch import optim

from transformers import AutoModel, AutoTokenizer, get_scheduler
from tqdm.auto import tqdm
from functools import partial

import seaborn as sns
import matplotlib.pyplot as plt
import requests
import gc
import random

from pyvi import ViTokenizer
from gensim.utils import simple_preprocess

from torch.utils.data import TensorDataset

In [None]:
tqdm.pandas()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# from transformers import DistilBertModel, DistilBertConfig
# config = DistilBertConfig(
#     num_labels=7,
#     vocab_size=64000,
#     max_position_embeddings=258
# )
# class StudentModel(nn.Module):
#     def __init__(self, n_classes, drop_out=0.2):
#         super(StudentModel, self).__init__()
#         self.distillbert = DistilBertModel(config)
#         self.dense = nn.Linear(768, 768)
#         self.activation = nn.Tanh()
#         self.l1 = torch.nn.Linear(768, 256)
#         self.d1 = torch.nn.Dropout(drop_out)
#         self.l2 = torch.nn.Linear(256, n_classes)
#     def forward(self, input_ids=None, attention_mask=None, inputs_embeds=None, labels=None):
#         if inputs_embeds is None:
#             output = self.distillbert(input_ids=input_ids, attention_mask=attention_mask)
#         else:
#             output = self.distillbert(inputs_embeds=inputs_embeds, attention_mask=attention_mask)
#         output = output[0][:, 0, :]
#         output = self.dense(output)
#         output = self.activation(output)
#         output = self.l1(output)
#         output = self.d1(output)
#         output = self.l2(output)
#         return output
# student_model = StudentModel(n_classes=7)
# student_model.to(device)

In [None]:
class StudentModel(nn.Module):
    def __init__(self, n_classes, drop_out=0.1):
        super(StudentModel, self).__init__()
        self.bert = AutoModel.from_pretrained("vinai/phobert-base")
        self.l1 = torch.nn.Linear(768, 256)
        self.l2 = torch.nn.Linear(256, n_classes)
        self.d1 = torch.nn.Dropout(drop_out)
    def forward(self,attention_mask, input_ids = None, inputs_embeds = None, labels=None):
        if inputs_embeds is None:
            output = self.distillbert(input_ids=input_ids, attention_mask=attention_mask)
        else:
            output = self.distillbert(inputs_embeds=inputs_embeds, attention_mask=attention_mask)
        output = output[1]
        output = self.l1(output)
        output = self.d1(output)
        output = self.l2(output)
        return output

student_model = StudentModel(n_classes=7)
student_model.to(device)

In [None]:
class TeacherModel(nn.Module):
    def __init__(self, n_classes, drop_out=0.1):
        super(TeacherModel, self).__init__()
        self.bert = AutoModel.from_pretrained("vinai/phobert-base-v2")
        self.l1 = torch.nn.Linear(768, 256)
        self.l2 = torch.nn.Linear(256, n_classes)
        self.d1 = torch.nn.Dropout(drop_out)
    def forward(self, inputs_embeds, attention_mask, labels=None):
        output = self.bert(inputs_embeds=inputs_embeds, attention_mask=attention_mask)
        output = output[1]
        output = self.l1(output)
        output = self.d1(output)
        output = self.l2(output)
        return output

teacher_model = TeacherModel(n_classes=7)
teacher_model.to(device)
teacher_model.load_state_dict(torch.load('/workspace/phobert_fold10.pth'), strict = False)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import BertModel, BertTokenizer, DistilBertModel, DistilBertConfig


hidden_size = 768 
embedding_size = hidden_size
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


teacher = teacher_model
student = student_model


optimizer_student = optim.Adam(student.parameters(), lr=1e-5)

def generate_pseudo_samples(batch_size, seq_len, hidden_size, mu=0, sigma=0.35):
    return torch.normal(mu, sigma, size=(batch_size, seq_len, hidden_size), device=device)

def loss_knowledge_distillation(student_logits, teacher_logits):
    return nn.KLDivLoss()(torch.log_softmax(student_logits, dim=-1), torch.softmax(teacher_logits, dim=-1))


num_epochs = 5000
batch_size = 32
seq_len = 120  
best_loss = 5
for epoch in range(num_epochs):

    pseudo_samples = generate_pseudo_samples(batch_size, seq_len, hidden_size)
    attention_mask = torch.ones(pseudo_samples.shape[:2], dtype=torch.long).to(device)

    random_class_indices = torch.randint(0, 7, (batch_size,), device=device)
    one_hot_targets = F.one_hot(random_class_indices, num_classes=7).float().to(device)

    for i in range(25): 
        pseudo_samples.requires_grad_(True)
        teacher_output = teacher(inputs_embeds=pseudo_samples, attention_mask=attention_mask)

        loss = nn.CrossEntropyLoss()(teacher_output, one_hot_targets)
        gradient = torch.autograd.grad(loss, pseudo_samples)[0]
        if i < 10:
            pseudo_samples = pseudo_samples - 0.01 * gradient
        elif 10 < i < 20:
            pseudo_samples = pseudo_samples - 0.005 * gradient
        else:
            pseudo_samples = pseudo_samples - 0.001 * gradient

    optimizer_student.zero_grad()
    student_output_final = student(inputs_embeds=pseudo_samples,attention_mask=attention_mask)
    teacher_logits = teacher(inputs_embeds=pseudo_samples, attention_mask=attention_mask)
    loss_kd = loss_knowledge_distillation(student_output_final, teacher_logits)
    loss_kd.backward()
    optimizer_student.step()
    if loss_kd < best_loss :
        best_loss = loss_kd
        torch.save(student_model.state_dict(), 'student_model.pth')
    if epoch % 100 == 0 :
        print(f"Epoch {epoch}/{num_epochs}")
        print(f"Loss KD: {loss_kd.item()}")


In [None]:
class_names = ['Enjoyment', 'Disgust', 'Sadness', 'Anger', 'Surprise', 'Fear', 'Other']

In [None]:
model = StudentModel(n_classes=7)
model.to(device)
model.load_state_dict(torch.load('/workspace/student_model.pth'), strict = False)

In [None]:
def infer(text, tokenizer, max_len=120):
    print(f'Text: {text}')
    text = ' '.join(simple_preprocess(text))
    text = ViTokenizer.tokenize(text)

    encoded_review = tokenizer.encode_plus(
        text,
        max_length=max_len,
        truncation=True,
        add_special_tokens=True,
        padding='max_length',
        return_attention_mask=True,
        return_token_type_ids=False,
        return_tensors='pt',
    )

    input_ids = encoded_review['input_ids'].to(device)
    attention_mask = encoded_review['attention_mask'].to(device)

    output = model(input_ids, attention_mask)
    print(output.shape)
    _, y_pred = torch.max(output, dim=1)

    print(f'Sentiment: {class_names[y_pred]}')

In [None]:
tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base", use_fast=False)

In [None]:
infer('tôi mến bạn rất nhiều', tokenizer)

In [None]:
!pip install openpyxl

In [None]:
def get_data(path):
    df = pd.read_excel(path, sheet_name=None)['Sheet1']
    df.columns = ['index', 'Emotion', 'Sentence']
    # unused column
    df.drop(columns=['index'], inplace=True)
    return df
test_df = get_data('/workspace/test_nor_811.xlsx')

In [None]:
class SentimentDataset(Dataset):
    def __init__(self, df, tokenizer, max_len=120):
        self.df = df
        self.max_len = max_len
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        """
        To customize dataset, inherit from Dataset class and implement
        __len__ & __getitem__
        __getitem__ should return
            data:
                input_ids
                attention_masks
                text
                targets
        """
        row = self.df.iloc[index]
        text, label = self.get_input_data(row)

        # Encode_plus will:
        # (1) split text into token
        # (2) Add the '[CLS]' and '[SEP]' token to the start and end
        # (3) Truncate/Pad sentence to max length
        # (4) Map token to their IDS
        # (5) Create attention mask
        # (6) Return a dictionary of outputs
        encoding = self.tokenizer.encode_plus(
            text,
            truncation=True,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            return_attention_mask=True,
            return_token_type_ids=False,
            return_tensors='pt',
        )

        return {
            'text': text,
            'input_ids': encoding['input_ids'].flatten(),
            'attention_masks': encoding['attention_mask'].flatten(),
            'targets': torch.tensor(label, dtype=torch.long),
        }


    def labelencoder(self,text):
        if text=='Enjoyment':
            return 0
        elif text=='Disgust':
            return 1
        elif text=='Sadness':
            return 2
        elif text=='Anger':
            return 3
        elif text=='Surprise':
            return 4
        elif text=='Fear':
            return 5
        else:
            return 6

    def get_input_data(self, row):
        # Preprocessing: {remove icon, special character, lower}
        text = row['Sentence']
        text = ' '.join(simple_preprocess(text))
        text = ViTokenizer.tokenize(text)
        label = self.labelencoder(row['Emotion'])

        return text, label

In [None]:
test_dataset = SentimentDataset(test_df, tokenizer, max_len=50)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=True, num_workers=2)

In [None]:
model.eval()
losses = []
correct = 0
with torch.no_grad():
    data_loader = test_loader
    for data in data_loader:
        input_ids = data['input_ids'].to(device)
        attention_mask = data['attention_masks'].to(device)
        targets = data['targets'].to(device)

        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )

        _, pred = torch.max(outputs, dim=1)

        loss = criterion(outputs, targets)
        correct += torch.sum(pred == targets)
        losses.append(loss.item())
print(f'Test Accuracy: {correct.double()/len(test_loader.dataset)} Loss: {np.mean(losses)}')