In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install transformers

In [None]:
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from gensim.utils import simple_preprocess
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import precision_score, recall_score, f1_score, classification_report, ConfusionMatrixDisplay

import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.utils.data import Dataset, DataLoader

from transformers import AutoTokenizer, AutoModel, logging

import warnings

warnings.filterwarnings("ignore")
logging.set_verbosity_error()

## LOAD DATA

In [None]:
sub_path = '/content/drive/MyDrive/khoa_luan/data_04/'

In [None]:
train_df = pd.read_csv(sub_path + '/train.csv')
train_emb = np.load(sub_path + '/train_embedding_1024_vi.npy', allow_pickle=True)
train_df['audio_embedding'] = train_emb

train_df['author'].value_counts()

In [None]:
test_df = pd.read_csv(sub_path + '/test.csv')
test_emb = np.load(sub_path + '/test_embedding_1024_vi.npy', allow_pickle=True)
test_df['audio_embedding'] = test_emb
print()
test_df['author'].value_counts()

In [None]:
val_df = pd.read_csv(sub_path + '/val.csv')
val_emb = np.load(sub_path + '/val_embedding_1024_vi.npy', allow_pickle=True)
val_df['audio_embedding'] = val_emb

val_df['author'].value_counts()

## MODELING

In [None]:
phobert_type = 'vinai/phobert-base-v2'
audio_model_type = 'nguyenvulebinh/wav2vec2-base-vietnamese-250h'

In [None]:
tokenizer = AutoTokenizer.from_pretrained(phobert_type, use_fast=False)

### NN MODEL

In [None]:
class Multimodal_Dataset(Dataset):
    def __init__(self, df, tokenizer, max_len, label_encoder, device):
      self.df = df
      self.max_len = max_len
      self.tokenizer = tokenizer
      self.label_encoder = label_encoder
      self.device = device
    def __len__(self):
      return len(self.df)

    def __getitem__(self, index):
      row = self.df.iloc[index]
      text, audio, label = self.get_input_data(row)
      audio = torch.tensor(audio).float()
      encoding = self.tokenizer.__call__(
          text,
          truncation=True,
          add_special_tokens=True,
          max_length=self.max_len,
          padding='max_length',
          return_attention_mask=True,
          return_token_type_ids=False,
          return_tensors='pt',
      )

      return {
          'audio_input_values': audio[0],
          'audio_attention_mask': audio[1],
          'input_ids': encoding['input_ids'].flatten(),
          'attention_mask': encoding['attention_mask'].flatten(),
          'target': torch.tensor(label, dtype=torch.long),
      }

    def get_input_data(self, row):
      text = row['lyric']
      audio = row['audio_embedding']
      label = self.label_encoder[row['author']]

      return text, audio, label

In [None]:
class Multimodal_Classifier(nn.Module):
    def __init__(self, n_classes):
        super(Multimodal_Classifier, self).__init__()

        self.bert = AutoModel.from_pretrained(phobert_type)
        bert_out_size = self.bert.config.hidden_size
        self.text = nn.Sequential(
            nn.Tanh()
        )

        self.w2v2 = AutoModel.from_pretrained(audio_model_type)
        self.w2v2.config.mask_time_prob = 0
        w2v2_out_size = self.w2v2.config.output_hidden_size
        self.audio = nn.Sequential(
            nn.Tanh()
        )
        self.tanh = nn.Tanh()

        self.dense = nn.Sequential(
            # nn.Linear(1536, 768),
            # nn.Linear(768, 768),
            nn.Linear(768, 512),
            nn.Linear(512, 512),
            nn.Dropout(p=0.1)
        )
        self.last_fc = nn.Linear(512, n_classes)

    def merged_strategy(self,hidden_states,mode=None):
        if mode == 'mean':
            outputs = torch.mean(hidden_states, dim=1)
        elif mode == 'sum':
            outputs = torch.sum(hidden_states, dim=1)
        elif mode == 'max':
            outputs = torch.max(hidden_states, dim=1)[0]
        else:
            outputs = hidden_states[:,0]
        return outputs

    def forward(self, input):
        ## TEXT LAYERS
        last_hidden_state, output_bert = self.bert(
            input_ids=input['input_ids'],
            attention_mask=input['attention_mask'],
            return_dict=False
        )
        text_pooler = self.merged_strategy(last_hidden_state, mode='mean')
        x_text = self.text(text_pooler)

        ## AUDIO LAYERS
        output_w2v2 = self.w2v2(
            input_values=input['audio_input_values'],
            attention_mask=input['audio_attention_mask'],
          )

        audio_pooler = self.merged_strategy(output_w2v2.last_hidden_state, mode='mean')

        x_audio = self.audio(audio_pooler)


        ## MUL/CAT
        # x_cat = torch.cat((x_text,x_audio), 1)

        x_cat = torch.mul(x_text,x_audio)

        ## LAST LINEARS
        x = self.dense(x_cat)
        x = self.tanh(x)
        x = self.last_fc(x)
        return x

In [None]:
def train_single_epoch(model, train_loader, val_loader, loss_func, optimizer, device):
    model.train()
    train_losses = []
    val_losses = []
    train_acc = 0
    val_acc = 0

    for data in train_loader:
        input = {
            'input_ids':data['input_ids'].to(device),
            'attention_mask':data['attention_mask'].to(device),
            'audio_input_values':data['audio_input_values'].to(device),
            'audio_attention_mask':data['audio_attention_mask'].to(device)
        }
        targets = data['target'].to(device)

        # calculate loss
        outputs = model(input)
        loss = loss_func(outputs, targets)
        _, predictions = torch.max(outputs, dim=1)
        train_acc += torch.sum(predictions == targets)

        # backpropagate error and update weights
        optimizer.zero_grad()
        train_losses.append(loss.item())
        loss.backward()
        optimizer.step()

    model.eval()
    for data in val_loader:
        input = {
            'input_ids':data['input_ids'].to(device),
            'attention_mask':data['attention_mask'].to(device),
            'audio_input_values':data['audio_input_values'].to(device),
            'audio_attention_mask':data['audio_attention_mask'].to(device)
        }
        targets = data['target'].to(device)

        # calculate loss
        outputs = model(input)

        loss = loss_func(outputs, targets)
        _, predictions = torch.max(outputs, dim=1)
        val_acc += torch.sum(predictions == targets)
        val_losses.append(loss.item())

    train_acc = train_acc.double()/len(train_loader.dataset)
    val_acc = val_acc.double()/len(val_loader.dataset)
    train_loss = np.mean(train_losses)
    val_loss = np.mean(val_losses)

    print(f'Train Accuracy: {train_acc}')
    print(f'Validation Accuracy: {val_acc}')
    print(f'Train Loss: {train_loss}')
    print(f'Validate Loss: {val_loss}')
    return train_loss, val_loss


In [None]:
def train(model, train_loader, val_loader, loss_func, optimizer, epochs, device):
  train_losses = []
  val_losses = []
  for i in range(epochs):
      print(f"Epoch {i+1}")
      train_loss, val_loss = train_single_epoch(model, train_loader, val_loader, loss_func, optimizer, device)
      train_losses.append(train_loss)
      val_losses.append(val_loss)
      print("---------------------------")
  print("Finished training")
  return train_losses, val_losses

In [None]:
def prepare_loaders(train_df, val_df, test_df, max_len, batch_size, label_encoder, device):

    train_dataset = Multimodal_Dataset(
        df=train_df,
        tokenizer=tokenizer,
        max_len=max_len,
        label_encoder=label_encoder,
        device=device)
    val_dataset = Multimodal_Dataset(
        df=val_df,
        tokenizer=tokenizer,
        max_len=max_len,
        label_encoder=label_encoder,
        device=device)
    test_dataset = Multimodal_Dataset(
        df=test_df,
        tokenizer=tokenizer,
        max_len=max_len,
        label_encoder=label_encoder,
        device=device)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False, num_workers=2)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

    return train_loader, test_loader, val_loader

In [None]:
label_encoder = {
    'khắc hưng': 0,
    'châu đăng khoa': 1,
    'khắc việt': 2,
    'phúc trường': 3,
    'nguyễn đình vũ': 4,
    'mr siro': 5,
    'vương anh tú': 6,
    'trịnh công sơn': 7,
    'phan mạnh quỳnh': 8,
    'nguyên chấn phong': 9,
    'nguyễn hồng thuận':10,
    'nguyễn văn chung': 11,
    'phạm trưởng': 12,
    'khánh đơn': 13,
    'tiên cookie': 14,
}

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
MAX_LEN = 256
BATCH_SIZE = 300
n_classes = len(label_encoder.keys())

train_loader, test_loader, val_loader = prepare_loaders(train_df, val_df, test_df, MAX_LEN, BATCH_SIZE, label_encoder, device)

In [None]:
model = Multimodal_Classifier(n_classes=n_classes).to(device)

In [None]:
### freeze all layers of Pretrained PhoBert
for param in model.bert.parameters():
  param.requires_grad = False

### freeze n layers of Pretrained Wav2vec2
for param in model.w2v2.parameters():
  param.requires_grad = False

In [None]:
EPOCHS = 350
LR = 2e-3

loss_func = nn.CrossEntropyLoss()
optimizer = AdamW(model.parameters(), lr=LR)

train_loss, val_loss = train(
    model=model,
    train_loader=train_loader,
    val_loader=val_loader,
    loss_func=loss_func,
    optimizer=optimizer,
    epochs=EPOCHS,
    device=device
)

In [None]:
epochs = range(1, EPOCHS + 1)
# Plot and label the training and validation loss values
plt.plot(epochs, train_loss, label='Training Loss')
plt.plot(epochs, val_loss, label='Validation Loss')

# Add in a title and axes labels
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')

# Set the tick locations
plt.xticks(range(0, EPOCHS + 1, EPOCHS//10))

# Display the plot
plt.legend(loc='best')
plt.show()

In [None]:
checkpoint_path = sub_path + 'multimodal_model_mul_checkpoints.pth'
model_path = sub_path + 'multimodal_model_mul.pth'

checkpoint = {
    'epoch': EPOCHS + 1,
    'state_dict': model.state_dict(),
    'optimizer': optimizer.state_dict()
}

In [None]:
torch.save(checkpoint, checkpoint_path)
torch.save(model.state_dict(), model_path)

In [None]:
def load_ckp(checkpoint_fpath, model, optimizer, device):
    checkpoint = torch.load(checkpoint_fpath, map_location=torch.device(device))
    model.load_state_dict(checkpoint['state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer'])
    return model, optimizer, checkpoint['epoch']

def load_model(model_path, model):
    weights = torch.load(model_path, map_location=torch.device(device))
    model.load_state_dict(checkpoint)
    return model

In [None]:
# model, optimizer, epoch = load_ckp(checkpoint_path, model, optimizer, device)
# model = load_model(model_path, model)

In [None]:
def test(model, data_loader):
  model.eval()
  predicts = []
  real_values = []
  for data in data_loader:
      input_ids = data['input_ids'].to(device)
      attention_mask = data['attention_mask'].to(device)
      audio_input_values = data['audio_input_values'].to(device)
      audio_attention_mask = data['audio_attention_mask'].to(device)

      input = {
          'input_ids':input_ids,
          'attention_mask':attention_mask,
          'audio_input_values':audio_input_values,
          'audio_attention_mask':audio_attention_mask
      }
      targets = data['target'].to(device)

      total_outs = []
      with torch.no_grad():
        outputs = model(input)
        total_outs.append(outputs)

      total_outs = torch.stack(total_outs)
      _, pred = torch.max(total_outs.mean(0), dim=1)
      predicts.extend(pred)
      real_values.extend(targets)

  predicts = torch.stack(predicts).cpu()
  real_values = torch.stack(real_values).cpu()
  return real_values, predicts


In [None]:
real_values, pred_values = test(model, test_loader)

In [None]:
print('Precision Score: ', round(precision_score(real_values, pred_values, average='macro'),6))
print('Recall Score: ', round(recall_score(real_values, pred_values, average='macro'),6))
print('F1 Score: ', round(f1_score(real_values, pred_values, average='macro'),6))

In [None]:
target_names = label_encoder.keys()
print(classification_report(real_values, pred_values, target_names=target_names))

In [None]:
labels = label_encoder.keys()


disp = ConfusionMatrixDisplay.from_predictions(real_values, pred_values, display_labels=labels, cmap='Oranges')


plt.xticks(fontsize=8)

plt.gcf().autofmt_xdate()
plt.show()

fig_name = 'multi_mul_cm.png'
disp.figure_.savefig(sub_path + fig_name, format='png', bbox_inches='tight')