# Bangla Sentiment Analysis using Bangla-BERT-base

This notebook implements the best performing model from the paper "Bangla Sentiment Analysis On Highly Imbalanced Data Using Hybrid CNN-LSTM & Bangla BERT"

The Bangla-BERT-base model achieved 96% accuracy with 10-fold cross-validation.

## 1. Install Required Libraries

In [None]:
!pip install transformers torch scikit-learn pandas numpy tqdm matplotlib seaborn imbalanced-learn

## 2. Import Libraries

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import (
    AutoTokenizer, 
    AutoModelForSequenceClassification,
    AdamW,
    get_linear_schedule_with_warmup
)
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.metrics import (
    accuracy_score, 
    f1_score, 
    precision_score, 
    recall_score,
    confusion_matrix,
    roc_auc_score
)
from imblearn.over_sampling import SMOTE
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import re
import warnings
warnings.filterwarnings('ignore')

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

## 3. Data Preprocessing Functions

In [None]:
def preprocess_for_bert(text):
    """
    Minimal preprocessing for BERT model
    According to the paper, for BERT they only removed URLs
    """
    # Remove URLs
    text = re.sub(r'http\S+|www.\S+', '', text)
    
    # Remove extra whitespaces
    text = ' '.join(text.split())
    
    return text.strip()

def load_and_preprocess_data(file_path):
    """
    Load and preprocess the dataset
    Expected format: CSV with 'Text' and 'Polarity' columns
    Polarity values: 'positive', 'negative', 'neutral'
    Neutral samples will be dropped
    """
    # Load data
    df = pd.read_csv(file_path)
    
    # Ensure we have the required columns
    if 'Text' not in df.columns or 'Polarity' not in df.columns:
        raise ValueError("Dataset must have 'Text' and 'Polarity' columns")
    
    # Rename columns to match expected format
    df = df.rename(columns={'Text': 'text', 'Polarity': 'label'})
    
    # Print original distribution
    print("Original dataset distribution:")
    print(df['label'].value_counts())
    print()
    
    # Drop neutral samples
    df = df[df['label'] != 'neutral']
    print(f"After dropping neutral samples: {len(df)} samples remaining")
    
    # Convert polarity to binary labels
    # positive -> 1, negative -> 0
    df['label'] = df['label'].map({'positive': 1, 'negative': 0})
    
    # Drop any rows with NaN labels (in case of unexpected values)
    df = df.dropna(subset=['label'])
    
    # Convert label to int
    df['label'] = df['label'].astype(int)
    
    # Apply preprocessing
    df['text'] = df['text'].apply(preprocess_for_bert)
    
    # Remove any empty texts
    df = df[df['text'].str.len() > 0]
    
    # Print dataset statistics
    print(f"\nFinal dataset statistics:")
    print(f"Total samples: {len(df)}")
    print(f"Positive samples: {len(df[df['label'] == 1])} ({len(df[df['label'] == 1])/len(df)*100:.1f}%)")
    print(f"Negative samples: {len(df[df['label'] == 0])} ({len(df[df['label'] == 0])/len(df)*100:.1f}%)")
    
    return df

## 4. Dataset Class for BERT

In [None]:
class BanglaSentimentDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]
        
        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )
        
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

## 5. Model Training and Evaluation Functions

In [None]:
def train_epoch(model, dataloader, optimizer, scheduler, device):
    model.train()
    total_loss = 0
    predictions = []
    true_labels = []
    
    for batch in tqdm(dataloader, desc='Training'):
        optimizer.zero_grad()
        
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels
        )
        
        loss = outputs.loss
        logits = outputs.logits
        
        total_loss += loss.item()
        
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()
        
        preds = torch.argmax(logits, dim=-1)
        predictions.extend(preds.cpu().numpy())
        true_labels.extend(labels.cpu().numpy())
    
    avg_loss = total_loss / len(dataloader)
    accuracy = accuracy_score(true_labels, predictions)
    
    return avg_loss, accuracy

def evaluate(model, dataloader, device):
    model.eval()
    total_loss = 0
    predictions = []
    true_labels = []
    probabilities = []
    
    with torch.no_grad():
        for batch in tqdm(dataloader, desc='Evaluating'):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )
            
            loss = outputs.loss
            logits = outputs.logits
            
            total_loss += loss.item()
            
            probs = torch.softmax(logits, dim=-1)
            preds = torch.argmax(logits, dim=-1)
            
            predictions.extend(preds.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())
            probabilities.extend(probs.cpu().numpy())
    
    avg_loss = total_loss / len(dataloader)
    accuracy = accuracy_score(true_labels, predictions)
    f1 = f1_score(true_labels, predictions, average='weighted')
    f1_neg = f1_score(true_labels, predictions, pos_label=0, average='binary')
    f1_pos = f1_score(true_labels, predictions, pos_label=1, average='binary')
    precision = precision_score(true_labels, predictions, average='weighted')
    recall = recall_score(true_labels, predictions, average='weighted')
    
    # Calculate ROC AUC for binary classification
    probabilities = np.array(probabilities)
    roc_auc = roc_auc_score(true_labels, probabilities[:, 1])
    
    metrics = {
        'loss': avg_loss,
        'accuracy': accuracy,
        'f1': f1,
        'f1_negative': f1_neg,
        'f1_positive': f1_pos,
        'precision': precision,
        'recall': recall,
        'roc_auc': roc_auc
    }
    
    return metrics, predictions, true_labels

## 6. K-Fold Cross Validation Training

In [None]:
def train_with_kfold(df, n_splits=10, batch_size=16, num_epochs=10, use_smote=True):
    """
    Train Bangla-BERT-base using K-fold cross validation
    """
    # Initialize tokenizer and load model name
    model_name = 'sagorsarker/bangla-bert-base'
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    # Prepare data
    texts = df['text'].values
    labels = df['label'].values
    
    # K-fold cross validation
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    
    fold_metrics = []
    all_confusion_matrices = []
    
    for fold, (train_idx, val_idx) in enumerate(skf.split(texts, labels)):
        print(f"\n{'='*50}")
        print(f"Fold {fold + 1}/{n_splits}")
        print(f"{'='*50}")
        
        # Split data
        X_train, X_val = texts[train_idx], texts[val_idx]
        y_train, y_val = labels[train_idx], labels[val_idx]
        
        # Apply SMOTE if specified
        if use_smote:
            print("Applying SMOTE oversampling...")
            # Create a temporary dataframe for SMOTE
            train_df = pd.DataFrame({'text': X_train, 'label': y_train})
            
            # For SMOTE, we need numeric features, so we'll use indices
            indices = np.arange(len(X_train)).reshape(-1, 1)
            smote = SMOTE(random_state=42)
            indices_resampled, y_train_resampled = smote.fit_resample(indices, y_train)
            
            # Get the resampled texts
            X_train_resampled = []
            for idx in indices_resampled.flatten():
                X_train_resampled.append(X_train[idx])
            
            X_train = np.array(X_train_resampled)
            y_train = y_train_resampled
            
            print(f"After SMOTE - Train samples: {len(X_train)}")
        
        # Create datasets
        train_dataset = BanglaSentimentDataset(X_train, y_train, tokenizer)
        val_dataset = BanglaSentimentDataset(X_val, y_val, tokenizer)
        
        # Create dataloaders
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size*2, shuffle=False)
        
        # Initialize model
        model = AutoModelForSequenceClassification.from_pretrained(
            model_name,
            num_labels=2
        ).to(device)
        
        # Setup optimizer and scheduler
        optimizer = AdamW(model.parameters(), lr=1e-5, eps=1e-8)
        
        num_training_steps = len(train_loader) * num_epochs
        num_warmup_steps = int(0.1 * num_training_steps)  # 10% warmup
        
        scheduler = get_linear_schedule_with_warmup(
            optimizer,
            num_warmup_steps=num_warmup_steps,
            num_training_steps=num_training_steps
        )
        
        # Training loop
        best_val_f1 = 0
        best_model_state = None
        
        for epoch in range(num_epochs):
            print(f"\nEpoch {epoch + 1}/{num_epochs}")
            
            # Train
            train_loss, train_acc = train_epoch(model, train_loader, optimizer, scheduler, device)
            
            # Evaluate
            val_metrics, val_preds, val_labels = evaluate(model, val_loader, device)
            
            print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
            print(f"Val Loss: {val_metrics['loss']:.4f}, Val Acc: {val_metrics['accuracy']:.4f}")
            print(f"Val F1: {val_metrics['f1']:.4f} (Neg: {val_metrics['f1_negative']:.4f}, Pos: {val_metrics['f1_positive']:.4f})")
            
            # Save best model
            if val_metrics['f1'] > best_val_f1:
                best_val_f1 = val_metrics['f1']
                best_model_state = model.state_dict()
                print(f"✓ New best model! F1: {best_val_f1:.4f}")
        
        # Load best model and get final metrics
        model.load_state_dict(best_model_state)
        final_metrics, final_preds, final_labels = evaluate(model, val_loader, device)
        
        # Store metrics
        fold_metrics.append(final_metrics)
        
        # Calculate confusion matrix
        cm = confusion_matrix(final_labels, final_preds)
        all_confusion_matrices.append(cm)
        
        print(f"\nFold {fold + 1} Final Metrics:")
        for key, value in final_metrics.items():
            print(f"{key}: {value:.4f}")
    
    return fold_metrics, all_confusion_matrices

## 7. Results Visualization Functions

In [None]:
def plot_average_confusion_matrix(confusion_matrices, save_path=None):
    """
    Plot average confusion matrix from k-fold cross validation
    """
    # Calculate average confusion matrix
    avg_cm = np.mean(confusion_matrices, axis=0)
    
    plt.figure(figsize=(8, 6))
    sns.heatmap(avg_cm, annot=True, fmt='.2f', cmap='Blues', 
                xticklabels=['Negative', 'Positive'],
                yticklabels=['Negative', 'Positive'])
    plt.title('Average Confusion Matrix', fontsize=16)
    plt.ylabel('True Labels', fontsize=12)
    plt.xlabel('Predicted Labels', fontsize=12)
    
    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
    plt.show()

def print_final_results(fold_metrics):
    """
    Print final averaged results with standard deviation
    """
    # Convert to numpy array for easier calculation
    metrics_array = {key: [] for key in fold_metrics[0].keys()}
    
    for fold_metric in fold_metrics:
        for key, value in fold_metric.items():
            metrics_array[key].append(value)
    
    print("\n" + "="*60)
    print("FINAL RESULTS - Mean ± Standard Deviation")
    print("="*60)
    
    for key, values in metrics_array.items():
        mean_val = np.mean(values)
        std_val = np.std(values)
        print(f"{key.capitalize():15s}: {mean_val:.4f} ± {std_val:.4f}")
    
    # Create results dataframe
    results_df = pd.DataFrame(metrics_array)
    results_df['fold'] = range(1, len(fold_metrics) + 1)
    
    return results_df

## 8. Main Training Pipeline

In [None]:
# Load your dataset
# Replace 'your_dataset.csv' with your actual file path
# The CSV should have 'text' and 'label' columns
df = load_and_preprocess_data('/kaggle/input/final-dataset/final-dataset.csv')

# Train with 10-fold cross validation (as per the paper)
fold_metrics, confusion_matrices = train_with_kfold(
    df, 
    n_splits=10,
    batch_size=16,
    num_epochs=10,
    use_smote=True  # Paper mentions using SMOTE
)

# Print final results
results_df = print_final_results(fold_metrics)

# Plot average confusion matrix
plot_average_confusion_matrix(confusion_matrices, save_path='average_confusion_matrix.png')

# Save results to CSV
results_df.to_csv('bangla_bert_results.csv', index=False)
print("\nResults saved to 'bangla_bert_results.csv'")

## 9. Single Model Training (Hold-out Method)

In [None]:
def train_single_model(df, test_size=0.2, batch_size=16, num_epochs=10, use_smote=True):
    """
    Train a single model with train-test split (hold-out method)
    """
    # Split data
    X = df['text'].values
    y = df['label'].values
    
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, stratify=y, random_state=42
    )
    
    print(f"Train samples: {len(X_train)}")
    print(f"Test samples: {len(X_test)}")
    
    # Apply SMOTE if specified
    if use_smote:
        print("\nApplying SMOTE oversampling...")
        indices = np.arange(len(X_train)).reshape(-1, 1)
        smote = SMOTE(random_state=42)
        indices_resampled, y_train_resampled = smote.fit_resample(indices, y_train)
        
        X_train_resampled = []
        for idx in indices_resampled.flatten():
            X_train_resampled.append(X_train[idx])
        
        X_train = np.array(X_train_resampled)
        y_train = y_train_resampled
        print(f"After SMOTE - Train samples: {len(X_train)}")
    
    # Initialize tokenizer and model
    model_name = 'sagorsarker/bangla-bert-base'
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(
        model_name,
        num_labels=2
    ).to(device)
    
    # Create datasets and dataloaders
    train_dataset = BanglaSentimentDataset(X_train, y_train, tokenizer)
    test_dataset = BanglaSentimentDataset(X_test, y_test, tokenizer)
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size*2, shuffle=False)
    
    # Setup optimizer and scheduler
    optimizer = AdamW(model.parameters(), lr=1e-5, eps=1e-8)
    
    num_training_steps = len(train_loader) * num_epochs
    num_warmup_steps = int(0.1 * num_training_steps)
    
    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=num_warmup_steps,
        num_training_steps=num_training_steps
    )
    
    # Training loop
    best_val_f1 = 0
    best_model_state = None
    train_losses = []
    val_losses = []
    
    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch + 1}/{num_epochs}")
        
        # Train
        train_loss, train_acc = train_epoch(model, train_loader, optimizer, scheduler, device)
        train_losses.append(train_loss)
        
        # Evaluate
        val_metrics, _, _ = evaluate(model, test_loader, device)
        val_losses.append(val_metrics['loss'])
        
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
        print(f"Test Loss: {val_metrics['loss']:.4f}, Test Acc: {val_metrics['accuracy']:.4f}")
        print(f"Test F1: {val_metrics['f1']:.4f} (Neg: {val_metrics['f1_negative']:.4f}, Pos: {val_metrics['f1_positive']:.4f})")
        
        # Save best model
        if val_metrics['f1'] > best_val_f1:
            best_val_f1 = val_metrics['f1']
            best_model_state = model.state_dict()
            print(f"✓ New best model! F1: {best_val_f1:.4f}")
    
    # Load best model
    model.load_state_dict(best_model_state)
    
    # Final evaluation
    final_metrics, predictions, true_labels = evaluate(model, test_loader, device)
    
    print("\n" + "="*50)
    print("FINAL TEST RESULTS")
    print("="*50)
    for key, value in final_metrics.items():
        print(f"{key.capitalize():15s}: {value:.4f}")
    
    # Plot confusion matrix
    cm = confusion_matrix(true_labels, predictions)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=['Negative', 'Positive'],
                yticklabels=['Negative', 'Positive'])
    plt.title('Confusion Matrix', fontsize=16)
    plt.ylabel('True Labels', fontsize=12)
    plt.xlabel('Predicted Labels', fontsize=12)
    plt.savefig('confusion_matrix_single_model.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    # Plot training history
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.plot(range(1, num_epochs + 1), train_losses, label='Train Loss')
    plt.plot(range(1, num_epochs + 1), val_losses, label='Test Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training History')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig('training_history.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    return model, tokenizer, final_metrics

# Uncomment to train a single model
# model, tokenizer, metrics = train_single_model(df, test_size=0.2, use_smote=True)

## 10. Save and Load Trained Model

In [None]:
def save_model(model, tokenizer, save_path='bangla_bert_sentiment_model'):
    """
    Save the trained model and tokenizer
    """
    model.save_pretrained(save_path)
    tokenizer.save_pretrained(save_path)
    print(f"Model saved to {save_path}")

def load_model(model_path='bangla_bert_sentiment_model'):
    """
    Load a saved model and tokenizer
    """
    tokenizer = AutoTokenizer.from_pretrained(model_path)
    model = AutoModelForSequenceClassification.from_pretrained(model_path).to(device)
    print(f"Model loaded from {model_path}")
    return model, tokenizer

# Example usage:
# save_model(model, tokenizer)
# loaded_model, loaded_tokenizer = load_model()

## 11. Inference Function for New Texts

In [None]:
def predict_sentiment(text, model, tokenizer):
    """
    Predict sentiment for a single text
    """
    # Preprocess text
    text = preprocess_for_bert(text)
    
    # Tokenize
    encoding = tokenizer(
        text,
        truncation=True,
        padding='max_length',
        max_length=128,
        return_tensors='pt'
    )
    
    # Move to device
    input_ids = encoding['input_ids'].to(device)
    attention_mask = encoding['attention_mask'].to(device)
    
    # Get prediction
    model.eval()
    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        probs = torch.softmax(logits, dim=-1)
        prediction = torch.argmax(logits, dim=-1)
    
    # Get sentiment and confidence
    sentiment = 'Positive' if prediction.item() == 1 else 'Negative'
    confidence = probs[0][prediction.item()].item()
    
    return {
        'sentiment': sentiment,
        'confidence': confidence,
        'probabilities': {
            'negative': probs[0][0].item(),
            'positive': probs[0][1].item()
        }
    }

# Example usage:
# result = predict_sentiment("আমি খুব খুশি", model, tokenizer)
# print(result)

## 12. Batch Prediction Function

In [None]:
def predict_batch(texts, model, tokenizer, batch_size=32):
    """
    Predict sentiments for multiple texts
    """
    # Preprocess texts
    processed_texts = [preprocess_for_bert(text) for text in texts]
    
    # Create dummy labels (required for dataset)
    dummy_labels = np.zeros(len(texts))
    
    # Create dataset and dataloader
    dataset = BanglaSentimentDataset(processed_texts, dummy_labels, tokenizer)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
    
    # Predictions
    all_predictions = []
    all_probabilities = []
    
    model.eval()
    with torch.no_grad():
        for batch in tqdm(dataloader, desc='Predicting'):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            probs = torch.softmax(logits, dim=-1)
            preds = torch.argmax(logits, dim=-1)
            
            all_predictions.extend(preds.cpu().numpy())
            all_probabilities.extend(probs.cpu().numpy())
    
    # Format results
    results = []
    for i, (text, pred, prob) in enumerate(zip(texts, all_predictions, all_probabilities)):
        results.append({
            'text': text,
            'sentiment': 'Positive' if pred == 1 else 'Negative',
            'confidence': prob[pred],
            'probabilities': {
                'negative': prob[0],
                'positive': prob[1]
            }
        })
    
    return results

# Example usage:
# texts = ["আমি খুব খুশি", "এটা খুব খারাপ", "মোটামুটি ভালো"]
# results = predict_batch(texts, model, tokenizer)
# for result in results:
#     print(f"Text: {result['text']}")
#     print(f"Sentiment: {result['sentiment']} (Confidence: {result['confidence']:.4f})")
#     print()