In [None]:
# Install necessary libraries
!pip install torch -U
!pip install plotly
!pip install livelossplot
!pip install imageio
!pip install sacrebleu
!pip install seaborn
!pip install datasets
!pip install accelerate -U
!pip install transformers[torch]
!pip install fasttext
!pip install huggingface_hub

PhoBERT

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import torch
import pandas as pd
from torch.utils.data import DataLoader, Dataset
from transformers import AutoModelForSequenceClassification, AutoTokenizer, get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt
from torch.optim import AdamW
from tqdm.notebook import tqdm
import seaborn as sns
from IPython.display import Markdown

# Function to calculate class weights
def calculate_class_weights(class_counts):
    total = sum(class_counts)
    weights = [total / class_count for class_count in class_counts]
    return torch.tensor(weights, dtype=torch.float32)

# Load the tokenizer and model
tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base-v2")
model = AutoModelForSequenceClassification.from_pretrained("vinai/phobert-base-v2", num_labels=2)

# Load a small subset of training and testing data (10 rows each)
train_df = pd.read_csv('/content/drive/MyDrive/NLP/NTC_SV_train_1.csv')
test_df = pd.read_csv('/content/drive/MyDrive/NLP/NTC_SV_test_1.csv')

# Tokenize the data with padding and create attention masks
train_encodings = tokenizer(train_df['review'].tolist(), truncation=True, padding=True, max_length=256, return_tensors="pt")
test_encodings = tokenizer(test_df['review'].tolist(), truncation=True, padding=True, max_length=256, return_tensors="pt")

# Define a custom dataset
class SentimentDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: val[idx].clone().detach() for key, val in self.encodings.items()}
        # Pass the labels directly if they are already tensors
        item['labels'] = self.labels[idx]
        return item

    def __len__(self):
        return len(self.labels)

# Convert labels to a tensor
train_labels = torch.tensor(train_df['label'].tolist())
test_labels = torch.tensor(test_df['label'].tolist())

# Calculate class weights
class_weights = calculate_class_weights([sum(train_labels == 0), sum(train_labels == 1)])

train_dataset = SentimentDataset(train_encodings, train_labels)
test_dataset = SentimentDataset(test_encodings, test_labels)

# DataLoader
def collate_fn(batch):
    return {key: torch.stack([item[key] for item in batch]) for key in batch[0]}

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)

# Loss function with class weights
loss_function = torch.nn.CrossEntropyLoss(weight=class_weights)

# Initialize the optimizer
optimizer = AdamW(model.parameters(), lr=2e-5)
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=len(train_loader)*3)

# Metrics tracking
epoch_metrics = {
    'loss': [],
    'accuracy': [],
    'precision': [],
    'recall': [],
    'f1': []
}

# Train and evaluate the model
model.train()
for epoch in range(3):  # More epochs for better training
    running_loss = 0.0
    running_corrects = 0
    total_examples = 0
    loop = tqdm(train_loader, leave=True)

    for batch in loop:
        optimizer.zero_grad()

        # Include attention mask in the model's forward pass
        input_ids = batch['input_ids'].to(torch.long)
        attention_mask = batch['attention_mask'].to(torch.long)
        labels = batch['labels'].to(torch.long)

        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = loss_function(outputs.logits, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)  # Gradient clipping
        optimizer.step()
        scheduler.step()  # Update the learning rate

        # Calculate running loss and accuracy
        running_loss += loss.item() * input_ids.size(0)
        running_corrects += torch.sum(torch.argmax(outputs.logits, axis=1) == labels)
        total_examples += input_ids.size(0)

        loop.set_description(f'Epoch {epoch}')
        loop.set_postfix(loss=loss.item())

    epoch_loss = running_loss / total_examples
    epoch_acc = running_corrects.double() / total_examples

    epoch_metrics['loss'].append(epoch_loss)
    epoch_metrics['accuracy'].append(epoch_acc.item())

    # Evaluate model performance after each epoch
    model.eval()
    predictions, true_labels = [], []
    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch['input_ids'].to(torch.long)
            attention_mask = batch['attention_mask'].to(torch.long)
            labels = batch['labels'].to(torch.long)

            outputs = model(input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            predictions.extend(torch.argmax(logits, axis=1).tolist())
            true_labels.extend(labels.tolist())

    # Calculate performance metrics
    accuracy = accuracy_score(true_labels, predictions)
    precision = precision_score(true_labels, predictions, zero_division=0)
    recall = recall_score(true_labels, predictions, zero_division=0)
    f1 = f1_score(true_labels, predictions, zero_division=0)

    epoch_metrics['precision'].append(precision)
    epoch_metrics['recall'].append(recall)
    epoch_metrics['f1'].append(f1)

    print(f'Epoch {epoch} - Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}')

# Plot confusion matrix
cm = confusion_matrix(true_labels, predictions)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()

# Line chart for each metric over epochs
plt.figure(figsize=(10, 5))
for metric in ['accuracy', 'precision', 'recall', 'f1']:
    plt.plot(range(1, 4), epoch_metrics[metric], label=metric.capitalize())
plt.xlabel('Epoch')
plt.ylabel('Score')
plt.title('Metric Scores Over Epochs')
plt.legend()
plt.show()

# Bar chart summarizing overall performance
overall_metrics = {metric: sum(epoch_metrics[metric]) / len(epoch_metrics[metric]) for metric in epoch_metrics}
plt.bar(overall_metrics.keys(), overall_metrics.values(), color=['blue', 'green', 'red', 'purple', 'orange'])
plt.xlabel('Metrics')
plt.ylabel('Values')
plt.title('Overall Model Performance')
plt.ylim([0, 1])
plt.show()

# Markdown table for performance metrics per epoch
markdown_table = "| Epoch | Loss | Accuracy | Precision | Recall | F1 |\n"
markdown_table += "|-------|------|----------|-----------|--------|----|\n"
for i in range(3):
    markdown_table += f"| {i + 1} "
    markdown_table += "|".join(f"{epoch_metrics[metric][i]:.4f}" for metric in ['loss', 'accuracy', 'precision', 'recall', 'f1'])
    markdown_table += "|\n"

# Last row for overall metrics
markdown_table += "| Overall "
markdown_table += "|".join(f"{overall:.4f}" for overall in overall_metrics.values())
markdown_table += "|\n"

display(Markdown(markdown_table))


## PhoBERT + FastText

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import AutoModelForSequenceClassification, AutoTokenizer, get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt
from torch.optim import AdamW
from tqdm.notebook import tqdm
import seaborn as sns
from IPython.display import Markdown
import fasttext
from huggingface_hub import hf_hub_download
from sklearn.utils.class_weight import compute_class_weight

# Load a small subset of training and testing data (10 rows each)
train_df = pd.read_csv('/content/drive/MyDrive/NLP/NTC_SV_train_1.csv')
test_df = pd.read_csv('/content/drive/MyDrive/NLP/NTC_SV_test_1.csv')

# Load the tokenizer and model
tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base-v2")
model = AutoModelForSequenceClassification.from_pretrained("vinai/phobert-base-v2", num_labels=2)

# Load the fastText Vietnamese model
model_path = hf_hub_download(repo_id="facebook/fasttext-vi-vectors", filename="model.bin")
ft_model = fasttext.load_model(model_path)

# Function to calculate class weights
def calculate_class_weights(class_counts):
    y_train = train_df['label'].tolist()
    class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
    class_weights = torch.tensor(class_weights, dtype=torch.float32)
    return class_weights

# Tokenize the data with padding and create attention masks
train_encodings = tokenizer(train_df['review'].tolist(), truncation=True, padding=True, max_length=256, return_tensors="pt")
test_encodings = tokenizer(test_df['review'].tolist(), truncation=True, padding=True, max_length=256, return_tensors="pt")

# Define a custom dataset
class SentimentDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: val[idx].clone().detach() for key, val in self.encodings.items()}
        item['labels'] = self.labels[idx]
        return item

    def __len__(self):
        return len(self.labels)

train_labels = torch.tensor(train_df['label'].tolist())
test_labels = torch.tensor(test_df['label'].tolist())

# Calculate class weights before they are used
class_weights = calculate_class_weights([len(train_df[train_df['label'] == 0]), len(train_df[train_df['label'] == 1])])

train_dataset = SentimentDataset(train_encodings, train_labels)
test_dataset = SentimentDataset(test_encodings, test_labels)

# DataLoader
def collate_fn(batch):
    return {key: torch.stack([item[key] for item in batch]) for key in batch[0]}

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)

# Loss function with class weights
loss_function = torch.nn.CrossEntropyLoss(weight=class_weights)

# Initialize the optimizer with a more conservative learning rate
optimizer = AdamW(model.parameters(), lr=1e-5)
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=len(train_loader) * 3)

# Metrics tracking
epoch_metrics = {
    'loss': [],
    'accuracy': [],
    'precision': [],
    'recall': [],
    'f1': []
}

# Training and evaluation loop
best_loss = float('inf')
best_model_state = None

model.train()
for epoch in range(3):  # More epochs for better training
    running_loss = 0.0
    running_corrects = 0
    total_examples = 0
    predictions, true_labels = [], []  # Initialize the lists here
    loop = tqdm(train_loader, leave=True)

    for batch in loop:
        optimizer.zero_grad()

        input_ids = batch['input_ids'].to(torch.long)
        attention_mask = batch['attention_mask'].to(torch.long)
        labels = batch['labels'].to(torch.long)

        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = loss_function(outputs.logits, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()

        running_loss += loss.item() * input_ids.size(0)
        running_corrects += torch.sum(torch.argmax(outputs.logits, axis=1) == labels)
        total_examples += input_ids.size(0)

        loop.set_description(f'Epoch {epoch}')
        loop.set_postfix(loss=loss.item())

    epoch_loss = running_loss / total_examples
    epoch_acc = running_corrects.double() / total_examples

    epoch_metrics['loss'].append(epoch_loss)
    epoch_metrics['accuracy'].append(epoch_acc.item())

    # Evaluate model performance after each epoch
    model.eval()
    eval_loss = 0.0
    eval_steps = 0
    for batch in test_loader:
        with torch.no_grad():
            input_ids = batch['input_ids'].to(torch.long)
            attention_mask = batch['attention_mask'].to(torch.long)
            labels = batch['labels'].to(torch.long)

            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            logits = outputs.logits
            tmp_eval_loss = loss_function(logits, labels)
            eval_loss += tmp_eval_loss.mean().item()
            eval_steps += 1
            predictions.extend(torch.argmax(logits, axis=1).tolist())
            true_labels.extend(labels.tolist())

    # Calculate and print the average evaluation loss
    avg_eval_loss = eval_loss / eval_steps
    print(f"Validation loss: {avg_eval_loss}")

    # Save the best model state based on validation loss
    if avg_eval_loss < best_loss:
        best_loss = avg_eval_loss
        best_model_state = model.state_dict()

    # Calculate the evaluation metrics using the validation predictions and true labels
    val_accuracy = accuracy_score(true_labels, predictions)
    val_precision = precision_score(true_labels, predictions, zero_division=0)
    val_recall = recall_score(true_labels, predictions, zero_division=0)
    val_f1 = f1_score(true_labels, predictions, zero_division=0)

    epoch_metrics['precision'].append(val_precision)
    epoch_metrics['recall'].append(val_recall)
    epoch_metrics['f1'].append(val_f1)

    print(f'Epoch {epoch} - Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, F1: {val_f1:.4f}')

# Load the best model state
model.load_state_dict(best_model_state)

# Plot confusion matrix
cm = confusion_matrix(true_labels, predictions)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()

# Line chart for each metric over epochs
plt.figure(figsize=(10, 5))
for metric in ['accuracy', 'precision', 'recall', 'f1']:
    plt.plot(range(1, 4), epoch_metrics[metric], label=metric.capitalize())
plt.xlabel('Epoch')
plt.ylabel('Score')
plt.title('Metric Scores Over Epochs')
plt.legend()
plt.show()

# Bar chart summarizing overall performance
overall_metrics = {metric: sum(epoch_metrics[metric]) / len(epoch_metrics[metric]) for metric in epoch_metrics}
plt.bar(overall_metrics.keys(), overall_metrics.values(), color=['blue', 'green', 'red', 'purple', 'orange'])
plt.xlabel('Metrics')
plt.ylabel('Values')
plt.title('Overall Model Performance')
plt.ylim([0, 1])
plt.show()

# Markdown table for performance metrics per epoch
markdown_table = "| Epoch | Loss | Accuracy | Precision | Recall | F1 |\n"
markdown_table += "|-------|------|----------|-----------|--------|----|\n"
for i in range(3):
    markdown_table += f"| {i + 1} "
    markdown_table += "|".join(f"{epoch_metrics[metric][i]:.4f}" for metric in ['loss', 'accuracy', 'precision', 'recall', 'f1'])
    markdown_table += "|\n"

# Last row for overall metrics
markdown_table += "| Overall "
markdown_table += "|".join(f"{overall:.4f}" for overall in overall_metrics.values())
markdown_table += "|\n"

display(Markdown(markdown_table))


PhoBERT + LSTM

In [None]:
import torch
import pandas as pd
from torch.utils.data import DataLoader, Dataset
from transformers import AutoModel, AutoTokenizer, get_linear_schedule_with_warmup, AutoModelForSequenceClassification  # Notice the change here to AutoModel
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt
from torch.optim import AdamW
from tqdm.notebook import tqdm
import seaborn as sns
from IPython.display import Markdown

# Function to calculate class weights
def calculate_class_weights(class_counts):
    total = sum(class_counts)
    weights = [total / class_count for class_count in class_counts]
    return torch.tensor(weights, dtype=torch.float32)

# Load the tokenizer and base model without the classification head
tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base-v2")
#base_model = AutoModelForSequenceClassification.from_pretrained("vinai/phobert-base-v2", num_labels=2)
base_model = AutoModel.from_pretrained("vinai/phobert-base-v2")  # Changed from AutoModelForSequenceClassification to AutoModel


# Load a small subset of training and testing data (10 rows each)
train_df = pd.read_csv('/content/drive/MyDrive/NLP/NTC_SV_train_1.csv')
test_df = pd.read_csv('/content/drive/MyDrive/NLP/NTC_SV_test_1.csv')

# Tokenize the data with padding and create attention masks
train_encodings = tokenizer(train_df['review'].tolist(), truncation=True, padding=True, max_length=256, return_tensors="pt")
test_encodings = tokenizer(test_df['review'].tolist(), truncation=True, padding=True, max_length=256, return_tensors="pt")

# Define a custom dataset
class SentimentDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: val[idx].clone().detach() for key, val in self.encodings.items()}
        # Pass the labels directly if they are already tensors
        item['labels'] = self.labels[idx]
        return item

    def __len__(self):
        return len(self.labels)

# Convert labels to a tensor
train_labels = torch.tensor(train_df['label'].tolist())
test_labels = torch.tensor(test_df['label'].tolist())

# Convert labels to tensors
train_dataset = SentimentDataset(train_encodings, train_labels)
test_dataset = SentimentDataset(test_encodings, test_labels)

# DataLoader
def collate_fn(batch):
    return {key: torch.stack([item[key] for item in batch]) for key in batch[0]}

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)

# Add LSTM layer
class LSTMClassifier(torch.nn.Module):
    def __init__(self, base_model, hidden_dim, num_labels, bidirectional=True, dropout=0.1):
        super(LSTMClassifier, self).__init__()
        self.base_model = base_model
        self.lstm = torch.nn.LSTM(
            input_size=self.base_model.config.hidden_size,
            hidden_size=hidden_dim,
            batch_first=True,
            bidirectional=bidirectional
        )
        lstm_output_dim = hidden_dim * 2 if bidirectional else hidden_dim
        self.dropout = torch.nn.Dropout(dropout)
        self.fc = torch.nn.Linear(lstm_output_dim, num_labels)

    def forward(self, input_ids, attention_mask):
        outputs = self.base_model(input_ids=input_ids, attention_mask=attention_mask)
        sequence_output = outputs.last_hidden_state
        sequence_output = self.dropout(sequence_output)
        lstm_out, _ = self.lstm(sequence_output)
        logits = self.fc(lstm_out[:, -1, :])  # Using the output of the last timestep
        return logits

# Instantiate the model with LSTM
lstm_model = LSTMClassifier(base_model, hidden_dim=128, num_labels=2)

# Loss function with class weights
class_weights = calculate_class_weights([sum(train_labels == 0), sum(train_labels == 1)])
loss_function = torch.nn.CrossEntropyLoss(weight=class_weights)

# Initialize the optimizer
optimizer = AdamW(lstm_model.parameters(), lr=2e-5)
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=len(train_loader)*3)

# Metrics tracking
epoch_metrics = {
    'loss': [],
    'accuracy': [],
    'precision': [],
    'recall': [],
    'f1': []
}

# Train and evaluate the model
lstm_model.train()
for epoch in range(3):  # More epochs for better training
    running_loss = 0.0
    running_corrects = 0
    total_examples = 0
    loop = tqdm(train_loader, leave=True)

    for batch in loop:
        optimizer.zero_grad()

        # Include attention mask in the model's forward pass
        input_ids = batch['input_ids'].to(torch.long)
        attention_mask = batch['attention_mask'].to(torch.long)
        labels = batch['labels'].to(torch.long)

        logits = lstm_model(input_ids, attention_mask)
        loss = loss_function(logits, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(lstm_model.parameters(), max_norm=1.0)  # Gradient clipping
        optimizer.step()
        scheduler.step()  # Update the learning rate

        # Calculate running loss and accuracy
        running_loss += loss.item() * input_ids.size(0)
        running_corrects += torch.sum(torch.argmax(logits, axis=1) == labels)
        total_examples += input_ids.size(0)

        loop.set_description(f'Epoch {epoch}')
        loop.set_postfix(loss=loss.item())

    epoch_loss = running_loss / total_examples
    epoch_acc = running_corrects.double() / total_examples

    epoch_metrics['loss'].append(epoch_loss)
    epoch_metrics['accuracy'].append(epoch_acc.item())

    # Evaluate model performance after each epoch
    lstm_model.eval()
    predictions, true_labels = [], []
    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch['input_ids'].to(torch.long)
            attention_mask = batch['attention_mask'].to(torch.long)
            labels = batch['labels'].to(torch.long)

            logits = lstm_model(input_ids, attention_mask)
            predictions.extend(torch.argmax(logits, axis=1).tolist())
            true_labels.extend(labels.tolist())

    # Calculate performance metrics
    accuracy = accuracy_score(true_labels, predictions)
    precision = precision_score(true_labels, predictions, zero_division=0)
    recall = recall_score(true_labels, predictions, zero_division=0)
    f1 = f1_score(true_labels, predictions, zero_division=0)

    epoch_metrics['precision'].append(precision)
    epoch_metrics['recall'].append(recall)
    epoch_metrics['f1'].append(f1)

    print(f'Epoch {epoch} - Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}')

# Plot confusion matrix
cm = confusion_matrix(true_labels, predictions)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()

# Line chart for each metric over epochs
plt.figure(figsize=(10, 5))
for metric in ['accuracy', 'precision', 'recall', 'f1']:
    plt.plot(range(1, 4), epoch_metrics[metric], label=metric.capitalize())
plt.xlabel('Epoch')
plt.ylabel('Score')
plt.title('Metric Scores Over Epochs')
plt.legend()
plt.show()

# Bar chart summarizing overall performance
overall_metrics = {metric: sum(epoch_metrics[metric]) / len(epoch_metrics[metric]) for metric in epoch_metrics}
plt.bar(overall_metrics.keys(), overall_metrics.values(), color=['blue', 'green', 'red', 'purple', 'orange'])
plt.xlabel('Metrics')
plt.ylabel('Values')
plt.title('Overall Model Performance')
plt.ylim([0, 1])
plt.show()

# Markdown table for performance metrics per epoch
markdown_table = "| Epoch | Loss | Accuracy | Precision | Recall | F1 |\n"
markdown_table += "|-------|------|----------|-----------|--------|----|\n"
for i in range(3):
    markdown_table += f"| {i + 1} "
    markdown_table += "|".join(f"{epoch_metrics[metric][i]:.4f}" for metric in ['loss', 'accuracy', 'precision', 'recall', 'f1'])
    markdown_table += "|\n"

# Last row for overall metrics
markdown_table += "| Overall "
markdown_table += "|".join(f"{overall:.4f}" for overall in overall_metrics.values())
markdown_table += "|\n"

display(Markdown(markdown_table))

## PhoBERT + CNN (Convolutional Neural Network)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Import necessary libraries and modules
import torch
import pandas as pd
from torch.utils.data import DataLoader, Dataset
from transformers import AutoModel, AutoTokenizer, AdamW
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import Markdown
import torch.nn.functional as F

# Load the tokenizer
tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base-v2")

# Custom CNN model that sits on top of PhoBERT
class PhoBertCNN(nn.Module):
    def __init__(self, phobert_model, num_filters, filter_sizes, num_classes):
        super().__init__()
        self.phobert = phobert_model
        hidden_size = phobert_model.config.hidden_size
        self.convs = nn.ModuleList(
            [nn.Conv1d(hidden_size, num_filters, fs) for fs in filter_sizes]
        )
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(num_filters * len(filter_sizes), num_classes)

    def forward(self, input_ids, attention_mask):
        # Don't compute gradient for the backbone PhoBERT model
        with torch.no_grad():
            outputs = self.phobert(input_ids=input_ids, attention_mask=attention_mask)

        # We take only the hidden states
        x = outputs[0]  # (B, L, D) - Batch, Length, Hidden Size
        x = x.permute(0, 2, 1)  # Switch to (B, D, L) for Conv1D
        x = [F.relu(conv(x)) for conv in self.convs]  # Apply each convolution layer
        x = [F.max_pool1d(c, c.shape[2]).squeeze(2) for c in x]  # Max pooling over time
        x = torch.cat(x, 1)  # Concatenate the feature maps
        x = self.dropout(x)  # Apply dropout
        x = self.fc(x)  # Final fully connected layer
        return x

# Load the PhoBERT model
phobert = AutoModel.from_pretrained("vinai/phobert-base-v2")

# Instantiate the custom classifier
num_classes = 2  # The number of output labels
num_filters = 100  # The number of convolutional filters
filter_sizes = [2, 3, 4]  # The size of the convolutional kernels

# Create the PhoBertCNN model
model = PhoBertCNN(
    phobert_model=phobert,
    num_filters=num_filters,
    filter_sizes=filter_sizes,
    num_classes=num_classes
)

# Prepare the dataset and dataloader
class SentimentDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

# Load training and testing data
train_df = pd.read_csv('/content/drive/MyDrive/NLP/NTC_SV_train_1.csv')
test_df = pd.read_csv('/content/drive/MyDrive/NLP/NTC_SV_test_1.csv')

# Prepare the datasets
train_encodings = tokenizer(train_df['review'].tolist(), truncation=True, padding=True, max_length=256)
test_encodings = tokenizer(test_df['review'].tolist(), truncation=True, padding=True, max_length=256)

train_dataset = SentimentDataset(train_encodings, train_df['label'].tolist())
test_dataset = SentimentDataset(test_encodings, test_df['label'].tolist())

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# Prepare for training
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-6)

# Training and evaluation loop
epoch_metrics = {
    'loss': [],
    'accuracy': [],
    'precision': [],
    'recall': [],
    'f1': []
}

for epoch in range(3):
    model.train()
    total_loss = 0
    for batch in tqdm(train_loader):
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(torch.long)
        attention_mask = batch['attention_mask'].to(torch.long)
        labels = batch['labels'].to(torch.long)
        outputs = model(input_ids, attention_mask)
        loss = F.cross_entropy(outputs, labels)
        total_loss += loss.item()
        loss.backward()
        optimizer.step()

    avg_loss = total_loss / len(train_loader)
    epoch_metrics['loss'].append(avg_loss)

    model.eval()
    predictions, true_labels = [], []
    with torch.no_grad():
        for batch in tqdm(test_loader):
            input_ids = batch['input_ids'].to(torch.long)
            attention_mask = batch['attention_mask'].to(torch.long)
            labels = batch['labels'].to(torch.long)
            outputs = model(input_ids, attention_mask)
            preds = torch.argmax(outputs, dim=1)
            predictions.extend(preds.tolist())
            true_labels.extend(labels.tolist())

    # Calculate and store the metrics after each epoch
    acc = accuracy_score(true_labels, predictions)
    prec = precision_score(true_labels, predictions, zero_division=0)
    rec = recall_score(true_labels, predictions, zero_division=0)
    f1 = f1_score(true_labels, predictions, zero_division=0)
    # Record the metrics for this epoch
    epoch_metrics['accuracy'].append(acc)
    epoch_metrics['precision'].append(prec)
    epoch_metrics['recall'].append(rec)
    epoch_metrics['f1'].append(f1)

    print(f"Epoch {epoch} - Loss: {avg_loss:.4f}, Acc: {acc:.4f}, Prec: {prec:.4f}, Rec: {rec:.4f}, F1: {f1:.4f}")

# Plot metrics over epochs
plt.figure(figsize=(10, 5))
epochs = range(1, 4)  # Assuming you have 3 epochs

for metric in ['accuracy', 'precision', 'recall', 'f1']:
    if epoch_metrics[metric]:  # Only plot if there are values in the list
        plt.plot(epochs, epoch_metrics[metric], label=metric.capitalize())

plt.xlabel('Epoch')
plt.ylabel('Score')
plt.title('Metric Scores Over Epochs')
plt.legend()
plt.show()


# Bar chart summarizing overall performance
overall_metrics = {metric: sum(epoch_metrics[metric]) / len(epoch_metrics[metric]) for metric in epoch_metrics}
plt.bar(overall_metrics.keys(), overall_metrics.values(), color=['blue', 'green', 'red', 'purple', 'orange'])
plt.xlabel('Metrics')
plt.ylabel('Values')
plt.title('Overall Model Performance')
plt.ylim([0, 1])
plt.show()

# Markdown table for performance metrics per epoch
markdown_table = "| Epoch | Loss | Accuracy | Precision | Recall | F1 |\n"
markdown_table += "|-------|------|----------|-----------|--------|----|\n"
for i in range(3):
    markdown_table += f"| {i + 1} "
    markdown_table += "|".join(f"{epoch_metrics[metric][i]:.4f}" for metric in ['loss', 'accuracy', 'precision', 'recall', 'f1'])
    markdown_table += "|\n"

# Last row for overall metrics
markdown_table += "| Overall "
markdown_table += "|".join(f"{overall:.4f}" for overall in overall_metrics.values())
markdown_table += "|\n"

display(Markdown(markdown_table))

# Generate confusion matrix
cm = confusion_matrix(true_labels, predictions)
TP = cm[1, 1]
FP = cm[0, 1]
TN = cm[0, 0]
FN = cm[1, 0]

print(f"True Positives (TP): {TP}")
print(f"False Positives (FP): {FP}")
print(f"True Negatives (TN): {TN}")
print(f"False Negatives (FN): {FN}")

# Plot the confusion matrix
fig, ax = plt.subplots()
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax)

# Labels, title and ticks
label_font = {'size':'16'}
ax.set_xlabel('Predicted labels', fontdict=label_font)
ax.set_ylabel('True labels', fontdict=label_font)
ax.set_title('Data Preview', fontdict=label_font)
ax.tick_params(axis='both', which='major', labelsize=14)

plt.show()
