In [None]:
"""
Declaration
"""
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset, TensorDataset
from torch.optim import AdamW 
from transformers import AutoTokenizer, AutoModel, AutoModelForSequenceClassification, get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score, f1_score, classification_report, confusion_matrix
import os
from tqdm import tqdm
from datetime import datetime
import random
import json
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

In [None]:
"""
system settings
"""
def set_overall_seed(seed=16):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_overall_seed(16)

In [None]:
"""
Define a class for importing Dataset
"""
class ProcessedIMDbDataset(Dataset):
    """loaded processed dataset"""
    
    def __init__(self, sequences, attention_masks, labels, lengths=None):
        self.sequences = sequences
        self.attention_masks = attention_masks
        self.labels = labels
        self.lengths = lengths if lengths is not None else torch.ones_like(labels)
    
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        return {
            'input_ids': self.sequences[idx],
            'attention_mask': self.attention_masks[idx],
            'labels': self.labels[idx],
            'lengths': self.lengths[idx]
        }



In [None]:
"""
Model Trainer based on Bert Finetuning
"""
class BertSentimentTrainer:
    """ SentimentAnalyzer Based on Bert """
    def __init__(self, model_name='bert-base-uncased', num_labels=2, max_length=512):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        print(f'using device {self.device}')
        
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(
            model_name, num_labels=num_labels)

        self.max_length = max_length
        self.train_loader = None
        self.val_loader = None
        self.test_loader = None


    def load_data(self, file_path='./all_data.pt', batch_size=20):
        print(f"loading dataset")

        data = torch.load(file_path, weights_only=True)

        train_dataset = ProcessedIMDbDataset(
            data['train_sequences'],
            data['train_masks'], 
            data['train_labels'],
            data['train_lengths']
        )
        
        val_dataset = ProcessedIMDbDataset(
            data['val_sequences'],
            data['val_masks'],
            data['val_labels'], 
            data['val_lengths']
        )
        
        test_dataset = ProcessedIMDbDataset(
            data['test_sequences'],
            data['test_masks'],
            data['test_labels'],
            data['test_lengths']
        )

        self.train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        self.val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
        self.test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

        print(f"loaded done, train dataset:{len(train_dataset)}, val_dataset:{len(val_dataset)}, test_dataset:{len(test_dataset)}")
        return train_dataset, val_dataset, test_dataset

    def train(self, epochs=4, learning_rate=2e-5, warmup_steps=0, logging_steps=50):
        if self.train_loader is None or self.val_loader is None:
            raise ValueError("datasets not ready!")

        # use AdamW
        optimizer = AdamW(self.model.parameters(), lr=learning_rate, weight_decay=0.01)
        total_steps = len(self.train_loader) * epochs

        # scheduler for learning rates, with warmup
        scheduler = get_linear_schedule_with_warmup(
            optimizer,
            num_warmup_steps=warmup_steps,
            num_training_steps=total_steps
        )

        training_stats = []
        best_val_accuracy = 0
        patience = 2
        patience_counter = 0

        for epoch in range(epochs):
            print(f"Epoch {epoch+1}/epochs")
            print("=" * 80)

            self.model.train()
            total_train_loss = 0
            train_correct = 0
            train_total = 0

            batch_progress = tqdm(self.train_loader, desc='training')
            for step, batch in enumerate(batch_progress):
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                labels = batch['labels'].to(self.device)

                outputs = self.model(
                    input_ids=input_ids,
                    attention_mask=attention_mask,
                    labels=labels
                )
                loss = outputs.loss
                logits = outputs.logits

                optimizer.zero_grad()
                loss.backward()

                torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
                optimizer.step()
                scheduler.step()

                total_train_loss += loss.item()
                predictions = torch.argmax(logits, dim=1)
                train_correct += (predictions == labels).sum().item()
                train_total += labels.size(0)

                if step % logging_steps == 0:
                    batch_progress.set_postfix(
                        {
                        'loss': f"{loss.item():.4f}",
                        'acc': f"{train_correct/train_total:.4f}"
                        }
                    )

                avg_train_loss = total_train_loss / len(self.train_loader)
                train_accuracy = train_correct / train_total
                val_accuracy, val_loss = self.evaluate(self.val_loader)
                print(f"training loss: {avg_train_loss:.4f}, training accuracy:{train_accuracy:.4f}")

                if val_accuracy > best_val_accuracy:
                    best_val_accuracy = val_accuracy
                    self.save_model('best_bert_model')
                    patience_count = 0
                else:
                    patience_counter += 1

                if patience_counter >= patience:
                    print(f"early stop")
                    break

                training_stats.append(
                    {
                        'epoch': epoch + 1,
                        'train_loss': avg_train_loss,
                        'train_accuracy': train_accuracy,
                        'val_loss': val_loss,
                        'val_accuracy': val_accuracy
                    }
                )

        # at last, we load the best performance model, for further process
        self.load_model('best_bert_model')
        return training_stats

    def evaluate(self, dataloader):
        self.model.eval()
        total_loss = 0
        correct = 0
        total = 0

        with torch.no_grad():
            for batch in tqdm(dataloader, desc="evaluating"):
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                labels = batch['labels'].to(self.device)
                
                outputs = self.model(
                    input_ids=input_ids,
                    attention_mask=attention_mask,
                    labels=labels
                )
                
                loss = outputs.loss
                logits = outputs.logits
                
                total_loss += loss.item()
                predictions = torch.argmax(logits, dim=1)
                correct += (predictions == labels).sum().item()
                total += labels.size(0)

        accuracy = correct / total
        avg_loss = total_loss / len(dataloader)
        return accuracy, avg_loss

    def predict(self, texts):
        self.model.eval()
        predictions = []
        probabilities = []

        with torch.no_grad():
            for text in texts:
                encoding = self.tokenizer(
                    text,
                    truncation=True,
                    padding='max_length',
                    max_length=self.max_length,
                    return_tensors='pt'
                )
                
                input_ids = encoding['input_ids'].to(self.device)
                attention_mask = encoding['attention_mask'].to(self.device)
                
                outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
                logits = outputs.logits
                probs = torch.softmax(logits, dim=1)
                pred = torch.argmax(logits, dim=1)
                
                predictions.append(pred.cpu().item())
                probabilities.append(probs.cpu().numpy())

        return predictions, probabilities

    def overall_evaluation(self):
        if self.test_loader is None:
            raise ValueError("test datasets not ready")
    
        self.model.eval()
        all_predictions = []
        all_labels = []
        all_probabilities = []
        
        with torch.no_grad():
            for batch in tqdm(self.test_loader, desc="training"):
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                labels = batch['labels'].to(self.device)
                
                outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
                logits = outputs.logits
                probabilities = torch.softmax(logits, dim=1)
                predictions = torch.argmax(logits, dim=1)
                
                all_predictions.extend(predictions.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())
                all_probabilities.extend(probabilities.cpu().numpy())
        
        accuracy = accuracy_score(all_labels, all_predictions)
        f1 = f1_score(all_labels, all_predictions, average='weighted')
        class_report = classification_report(all_labels, all_predictions, target_names=['negtive', 'positive'])
        conf_matrix = confusion_matrix(all_labels, all_predictions)
        
        results = {
            'accuracy': accuracy,
            'f1_score': f1,
            'classification_report': class_report,
            'confusion_matrix': conf_matrix,
            'predictions': all_predictions,
            'probabilities': all_probabilities
        }
        
        print(f"test accuracy{accuracy:.4f}")
        print(f"F1 score: {f1:.4f}")
        print("\n class report:")
        print(class_report)
        print("\n confusion matrix")
        print(conf_matrix)
        
        return results

    def save_model(self, path):
        if not os.path.exists(path):
            os.makedirs(path)
    
        self.model.save_pretrained(path)
        self.tokenizer.save_pretrained(path)
        print(f"save model to: {path}")
    
    def load_model(self, path):
        if not os.path.exists(path):
            print(f"model file not exist!")
            return
        
        self.model = AutoModelForSequenceClassification.from_pretrained(path)
        self.tokenizer = AutoTokenizer.from_pretrained(path)
        self.model.to(self.device)
        print(f"loaded model from {path}")

In [None]:
"""
Visualization Part
"""

def plt_training_history_data(training_stats):
    """
    use matplotlib to plot training history
    """

    df_stats = pd.DataFrame(training_stats)

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

    # loss
    ax1.plot(df_stats['epoch'], df_stats['train_loss'], 'b-', label='training loss')
    ax1.plot(df_stats['epoch'], df_stats['val_loss'], 'r-', label='validation loss')
    ax1.set_title('training and validation loss')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.legend()
    ax1.grid(True)

    # accuracy
    ax2.plot(df_stats['epoch'], df_stats['train_accuracy'], 'b-', label='training accuracy')
    ax2.plot(df_stats['epoch'], df_stats['val_accuracy'], 'r-', label='val accuracy')
    ax2.set_title('taining and validation accuracy')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Accuracy')
    ax2.legend()
    ax2.grid(True)
    
    plt.tight_layout()
    plt.savefig('bert_training_history.png', dpi=300, bbox_inches='tight')
    plt.show()

def plot_confusion_matrix(conf_matrix, class_names=['negtive', 'positive']):
    """plot confusion matrix"""
    plt.figure(figsize=(6, 5))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names, yticklabels=class_names)
    plt.title('BERT Finetuned Confusion')
    plt.ylabel('Ground Truth')
    plt.xlabel('Prediction')
    plt.savefig('bert_confusion_matrix.png', dpi=300, bbox_inches='tight')
    plt.show()


In [None]:
"""
Compare Performance with previous LSTM Model
"""
def compare_with_lstm(bert_results, lstm_results):
    bert_accuracy = bert_results['accuracy']
    bert_f1 = bert_results['f1_score']
    print(f"BERT Model:")
    print(f"  Accuracy: {bert_accuracy:.4f}")
    print(f"  F1 Score: {bert_f1:.4f}")

    lstm_accuracy = lstm_results.get('accuracy', 0)
    lstm_f1 = lstm_results.get('f1_score', 0)
    print(f"LSTM Model:")
    print(f"  Accuracy: {lstm_accuracy:.4f}")
    print(f"  F1 Score: {lstm_f1:.4f}")

    accuracy_improve = (bert_accuracy - lstm_accuracy) / lstm_accuracy * 100
    f1_improve = (bert_f1 - lstm_f1) / lstm_f1 * 100
    print(f"\nPerformance Improve:")
    print(f"  accuracy_improve: {accuracy_improve:.4f}%")
    print(f"  f1_improve: {f1_improve:.4f}%")

def load_lstm_results(path):
    lstm_results = None
    
    try:
        with open(path, 'r') as f:
            lstm_results = json.load(f)
    except e:
        print(f"load lstm results failed: {str(e)}")

    return lstm_results


In [None]:
"""
Program Main Process
"""
def perform_training():
    work_path = os.getcwd()
    data_file = 'all_data.pt'
    data_file_path = work_path + "\\processed_data\\" + data_file

    trainer = BertSentimentTrainer(
        model_name = 'bert-base-uncased',
        num_labels=2,
        max_length=512
    )

    trainer.load_data(data_file_path, batch_size=8)

    training_stats = trainer.train(
        epochs=4,
        learning_rate=2e-5,
        warmup_steps=100,
        logging_steps=10
    )

    results = trainer.overall_evaluation()

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    model_save_path = f"sentiment_bert_finetune_model_{timestamp}"
    trainer.save_model(model_save_path)

    return trainer, results, training_stats


def test_texts(trainer):
    test_samples = [
        "quite a good movie, but the ending could be better!",
        "it's a fantastic film I've ever enjoyed!"
    ]

    predictions, probabilities = trainer.predict(test_samples)

    sentiment_mapping = {0 : 'negtive', 1 : 'positive'}

    for text, pred, prob in zip(test_samples, predictions, probabilities):
        print(f"testing text: {text}")
        print(f"prediction: {sentiment_mapping[pred]}, {max(prob):.4f}")

def main():
    train = True
    
    if train:
        trainer, bert_results, training_stats = perform_training()
        plt_training_history_data(training_stats)
        plot_confusion_matrix(bert_results['confusion_matrix'])

        lstm_results = load_lstm_results("./lstm_results.json")
        compare_with_lstm(bert_results, lstm_results)
    
    test_texts(trainer)

if __name__ == "__main__":
     main()