#Banglabert with attention




In [None]:
import time
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score

!pip install transformers datasets

from transformers import (
    AutoTokenizer, AutoModel, Trainer, TrainingArguments,
    DataCollatorWithPadding
)
from datasets import Dataset, DatasetDict

# Set seed for reproducibility
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

# Load tokenizer and model name
model_name = "csebuetnlp/banglabert"
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Load dataset
file_path = "/content/BOLT.xlsx"
df = pd.read_excel(file_path)
print("Unique labels:", df['Final Annotation'].unique())

# Convert labels to numeric format
label_mapping = {"no aggression": 3, "atrocity": 2, "vandalism": 1, "hate": 0}
df['label'] = df['Final Annotation'].map(label_mapping)

# Split dataset
train_df, temp_df = train_test_split(df, test_size=0.2, stratify=df['label'], random_state=seed)
validation_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['label'], random_state=seed)

# Convert DataFrame to Hugging Face Datasets
hf_train = Dataset.from_pandas(train_df)
hf_valid = Dataset.from_pandas(validation_df)
hf_test = Dataset.from_pandas(test_df)

data = DatasetDict({"train": hf_train, "validation": hf_valid})

# Tokenization function
def tokenize(batch):
    return tokenizer(batch['Text'], padding="max_length", truncation=True, max_length=512)

data_encoded = data.map(tokenize, batched=True, batch_size=16)
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

# Define the model with attention
class BanglaBERTWithAttention(nn.Module):
    def __init__(self, model_name, num_labels, dropout_rate=0.2):
        super().__init__()
        self.bert = AutoModel.from_pretrained(model_name)
        self.hidden_size = self.bert.config.hidden_size
        self.attention = nn.Linear(self.hidden_size, 1)
        self.dropout = nn.Dropout(dropout_rate)
        self.classifier = nn.Linear(self.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask=None, labels=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        last_hidden_state = outputs.last_hidden_state
        weights = torch.softmax(torch.tanh(self.attention(last_hidden_state)), dim=1)
        context = torch.sum(weights * last_hidden_state, dim=1)
        logits = self.classifier(self.dropout(context))
        loss = F.cross_entropy(logits, labels) if labels is not None else None
        return {"loss": loss, "logits": logits} if loss is not None else {"logits": logits}

# Initialize model
model = BanglaBERTWithAttention(model_name=model_name, num_labels=4).to("cuda")

# Metric computation
def compute_metrics(pred):
    labels = pred.label_ids
    preds = np.argmax(pred.predictions, axis=-1)
    return {
        "Accuracy": accuracy_score(labels, preds),
        "F1 Score": f1_score(labels, preds, average="weighted")
    }


import transformers
!pip install --upgrade transformers
from transformers import TrainingArguments
print(transformers.__version__)

training_args = TrainingArguments(
    output_dir=f"{model_name}-classifier",         # Output directory
    overwrite_output_dir=True,                     # Overwrite if exists
    #evaluation_strategy="epoch",   # ✅ REQUIRED to get validation logs
    #logging_strategy="epoch",      # ✅ Optional: match logging with eval
    do_train=True,                                 # Enable training
    do_eval=True,                                  # Enable evaluation
    per_device_train_batch_size=8,                 # Batch size for training
    per_device_eval_batch_size=8,                  # Batch size for evaluation
    num_train_epochs=10,                           # Total number of training epochs
    learning_rate=5e-6,                            # Learning rate
    weight_decay=0.01,                             # Weight decay
    logging_dir=f"{model_name}-logs",              # Directory for logs
    logging_steps=len(data_encoded['train']) // 8, # Log every N steps
    save_steps=len(data_encoded['train']) // 8,    # Save every N steps
    save_total_limit=1,                            # Keep only last checkpoint
    seed=42,                                        # Reproducibility
    report_to="none"
)


# Trainer setup
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=data_encoded["train"],
    eval_dataset=data_encoded["validation"],
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics
)

# Train model
start_time = time.time()
trainer.train()
print(f"Training time: {time.time() - start_time:.2f} seconds")

# Extract training history
log_history = trainer.state.log_history

# Extract metrics
epochs, train_loss, eval_loss, train_acc, eval_acc = [], [], [], [], []
for entry in log_history:
    if "epoch" in entry:
        epochs.append(entry["epoch"])
        train_loss.append(entry.get("loss"))
        eval_loss.append(entry.get("eval_loss"))
        train_acc.append(entry.get("accuracy"))
        eval_acc.append(entry.get("eval_Accuracy"))

# Plot loss curve
plt.figure(figsize=(8, 5))
plt.plot(epochs[:len(train_loss)], train_loss, label="Training Loss", marker="o")
plt.plot(epochs[:len(eval_loss)], eval_loss, label="Validation Loss", marker="s")
plt.title("Loss vs Epochs")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.grid(True)
plt.show()

# Save model
torch.save(model.state_dict(), "banglabert_trained_model.pth")
tokenizer.save_pretrained("banglabert_trained_tokenizer")

# Evaluation
validation_metrics = trainer.evaluate()
print("Validation Results:", validation_metrics)

# Encode and evaluate on test set
hf_test_encoded = hf_test.map(tokenize, batched=True)
test_results = trainer.predict(hf_test_encoded)
test_metrics = compute_metrics(test_results)
print("Test Results:", test_metrics)

# Save predictions
y_true = test_results.label_ids
y_logits = test_results.predictions
y_prob = F.softmax(torch.tensor(y_logits), dim=1).numpy()

preds_df = pd.DataFrame({
    "y_true": y_true,
    **{f"model_1_prob_class_{i}": y_prob[:, i] for i in range(y_prob.shape[1])}
})

preds_df.to_csv("banglabert_attention_predictions.csv", index=False)
print("Predictions saved!")

#XLM roberta with attention


In [None]:
import time
import torch
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer, Trainer, TrainingArguments
import pandas as pd
import numpy as np
import random
!pip install datasets
!pip install transformers
from datasets import Dataset, DatasetDict
from transformers import DataCollatorWithPadding
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
import torch.nn.functional as F
import os


# ✅ Set Seed for Reproducibility
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
torch.cuda.manual_seed_all(seed)

# ✅ Define model and tokenizer
model_name = "xlm-roberta-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)

# ✅ Load dataset from single Excel file
file_path = "/content/BOLT.xlsx"
df = pd.read_excel(file_path)

# ✅ Convert labels to numerical values
label_mapping = {"no aggression": 3, "atrocity": 2, "vandalism": 1, "hate": 0}
df['label'] = df['Final Annotation'].map(label_mapping)

# ✅ Split dataset into Train (80%), Validation (10%), and Test (10%)
train_df, temp_df = train_test_split(df, test_size=0.2, random_state=seed, stratify=df['label'])
validation_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=seed, stratify=temp_df['label'])

# ✅ Convert Pandas DataFrame to Hugging Face Dataset format
hf_train_dataset = Dataset.from_pandas(train_df)
hf_validation_dataset = Dataset.from_pandas(validation_df)
hf_test_dataset = Dataset.from_pandas(test_df)

# ✅ Tokenization function
def tokenize(batch):
    return tokenizer(batch['Text'], padding="max_length", max_length=512, truncation=True)

# ✅ Create DatasetDict for train & validation
data = DatasetDict({'train': hf_train_dataset, 'validation': hf_validation_dataset})
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
data_encoded = data.map(tokenize, batched=True, batch_size=16)

# ✅ Define XLM-RoBERTa model with attention and dropout
class XLMRobertaWithAttention(nn.Module):
    def __init__(self, model_name: str, num_labels: int, dropout_rate = 0.2):
        super(XLMRobertaWithAttention, self).__init__()
        self.base_model = AutoModel.from_pretrained(model_name)
        for param in self.base_model.parameters():
            param.data = param.data.contiguous()
        self.base_model.config.hidden_dropout_prob = dropout_rate
        self.base_model.config.attention_probs_dropout_prob = dropout_rate
        self.hidden_size = self.base_model.config.hidden_size
        self.attention = nn.Linear(self.hidden_size, 1).to(torch.float32)
        self.dropout = nn.Dropout(dropout_rate)
        self.classifier = nn.Linear(self.hidden_size, num_labels)
        self.loss_fn = nn.CrossEntropyLoss()

    def forward(self, input_ids, attention_mask=None, labels=None):
        outputs = self.base_model(input_ids=input_ids, attention_mask=attention_mask)
        hidden_states = outputs.last_hidden_state.contiguous()
        attn_weights = torch.tanh(self.attention(hidden_states))
        attn_weights = torch.softmax(attn_weights, dim=1)
        weighted_output = torch.sum(attn_weights * hidden_states, dim=1)
        weighted_output = self.dropout(weighted_output.contiguous())
        logits = self.classifier(weighted_output)
        loss = self.loss_fn(logits, labels) if labels is not None else None
        return {"loss": loss, "logits": logits} if loss is not None else {"logits": logits}

# ✅ Define model
num_labels = 4
dropout_rate = 0.2
model = XLMRobertaWithAttention(model_name, num_labels, dropout_rate).to("cuda")

# ✅ Define function to compute accuracy and F1-score
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    return {"Accuracy": accuracy_score(labels, preds), "F1 Score": f1_score(labels, preds, average="weighted")}

# ✅ Define training arguments
batch_size = 8
logging_steps = len(data_encoded["train"]) // batch_size

training_args = TrainingArguments(
    output_dir=f"{model_name}-classifier",         # Output directory
    overwrite_output_dir=True,                     # Overwrite if exists
    #evaluation_strategy="epoch",   # ✅ REQUIRED to get validation logs
    #logging_strategy="epoch",      # ✅ Optional: match logging with eval
    do_train=True,                                 # Enable training
    do_eval=True,                                  # Enable evaluation
    per_device_train_batch_size=8,                 # Batch size for training
    per_device_eval_batch_size=8,                  # Batch size for evaluation
    num_train_epochs=10,                           # Total number of training epochs
    learning_rate=5e-6,                            # Learning rate
    weight_decay=0.01,                             # Weight decay
    logging_dir=f"{model_name}-logs",              # Directory for logs
    logging_steps=len(data_encoded['train']) // 8, # Log every N steps
    save_steps=len(data_encoded['train']) // 8,    # Save every N steps
    save_total_limit=1,                            # Keep only last checkpoint
    seed=42,                                        # Reproducibility
    report_to="none"
)

# ✅ Initialize Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=data_encoded['train'],
    eval_dataset=data_encoded['validation'],
    tokenizer=tokenizer,
    data_collator=data_collator
)

# ✅ Train the model with tracking time
start_time = time.time()
trainer.train()
end_time = time.time()

# ✅ Print overall training time
total_time = end_time - start_time
print(f"✅ Total training time: {total_time:.2f} seconds")

# ✅ Save the trained model
model_save_path = "xlm_roberta_trained_model.pth"
torch.save(model.state_dict(), model_save_path)
tokenizer.save_pretrained("xlm_roberta_trained_tokenizer")
print("✅ Model saved successfully!")

# ✅ Evaluate on validation set
validation_results = trainer.evaluate()
print("Validation Set Results:", validation_results)

# ✅ Encode the test set before evaluation
hf_test_dataset_encoded = hf_test_dataset.map(tokenize, batched=True, batch_size=16)

# ✅ Evaluate on test set
test_results = trainer.predict(hf_test_dataset_encoded)
test_metrics = compute_metrics(test_results)
print("Test Set Results:", test_metrics)

# ✅ Save True Labels and Predicted Probabilities for Ensemble
y_true = test_results.label_ids
y_logits = test_results.predictions
y_prob = F.softmax(torch.tensor(y_logits), dim=1).numpy()

results_df = pd.DataFrame({
    "y_true": y_true,
    "model_1_prob_class_0": y_prob[:, 0],
    "model_1_prob_class_1": y_prob[:, 1],
    "model_1_prob_class_2": y_prob[:, 2],
    "model_1_prob_class_3": y_prob[:, 3]
})

results_df.to_csv("xlm_roberta_attention_predictions.csv", index=False)
print("✅ Predictions saved successfully!")


#Weighted Ensemble

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import (
    accuracy_score, f1_score, precision_score, recall_score, confusion_matrix,
    ConfusionMatrixDisplay, classification_report, roc_curve, auc
)
from sklearn.model_selection import train_test_split

# ✅ Load Predictions from Both Models
  banglabert_df = pd.read_csv("/content/banglabert_attention_predictions.csv")
  xlm_roberta_df = pd.read_csv("/content/xlm_roberta_attention_predictions.csv")

# ✅ Ensure both have the same true labels
assert np.array_equal(banglabert_df["y_true"], xlm_roberta_df["y_true"]), "Mismatch in true labels!"

# ✅ Extract True Labels and Probabilities
y_true = banglabert_df["y_true"]

# Extract probabilities for all classes from both models
banglabert_probs = banglabert_df.iloc[:, 1:].values  # Model 1 probabilities
xlm_roberta_probs = xlm_roberta_df.iloc[:, 1:].values  # Model 2 probabilities

#input test accuracies of the models

w_1 = 86.7
w_2 = 85.3

# ✅ Compute Weighted Ensemble Probabilities
ensemble_probs = ((banglabert_probs * w_1) + (xlm_roberta_probs * w_2)) / (w_1 + w_2)

# ✅ Compute Final Predictions
ensemble_preds = np.argmax(ensemble_probs, axis=1)

# ✅ Evaluate Performance
weighted_ensemble_acc = accuracy_score(y_true, ensemble_preds)
weighted_ensemble_f1 = f1_score(y_true, ensemble_preds, average="weighted")
print(f"✅ Weighted Ensemble - Accuracy: {weighted_ensemble_acc:.4f}, F1 Score: {weighted_ensemble_f1:.4f}")

# ✅ Define class labels
class_labels = ["Hate", "Vandalism", "Atrocity", "No Aggression"]

# ✅ Compute Confusion Matrix
cm = confusion_matrix(y_true, ensemble_preds)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_labels)

# ✅ Plot Confusion Matrix
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap="Greens", xticklabels=class_labels, yticklabels=class_labels)
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")
plt.title("Confusion Matrix of Weighted Ensemble")
plt.show()

# ✅ Generate Classification Report
report = classification_report(y_true, ensemble_preds, target_names=class_labels, digits=4)
print("\nClassification Report:\n", report)

# ✅ Compute Accuracy Per Class
accuracy_per_class = []
for i in range(len(class_labels)):
    class_mask = (y_true == i)
    class_accuracy = accuracy_score(y_true[class_mask], ensemble_preds[class_mask])
    accuracy_per_class.append(class_accuracy)

# ✅ Print Accuracy per Class
print("\nAccuracy Per Class:")
for label, acc in zip(class_labels, accuracy_per_class):
    print(f"{label}: {acc:.4f}")

# ✅ Compute Overall Performance Metrics
precision = precision_score(y_true, ensemble_preds, average='macro')
recall = recall_score(y_true, ensemble_preds, average='macro')
f1 = f1_score(y_true, ensemble_preds, average='macro')
weighted_f1 = f1_score(y_true, ensemble_preds, average='weighted')
error_rate = 1 - weighted_ensemble_acc

print(f"\n✅ Overall Test Accuracy: {weighted_ensemble_acc:.4f}")
print(f"✅ Overall Test Precision: {precision:.4f}")
print(f"✅ Overall Test Recall: {recall:.4f}")
print(f"✅ Overall Test F1 Score: {f1:.4f}")
print(f"✅ Weighted F1 Score: {weighted_f1:.4f}")
print(f"✅ Error Rate: {error_rate:.4f}")

# ✅ Plot ROC Curve
plt.figure(figsize=(7, 5))
colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728']
for i in range(len(class_labels)):
    fpr, tpr, _ = roc_curve(y_true == i, ensemble_probs[:, i])
    roc_auc = auc(fpr, tpr)
    plt.plot(fpr, tpr, label=f'Class {class_labels[i]} (AUC = {roc_auc:.2f})', linestyle='-', linewidth=2, color=colors[i])

plt.plot([0, 1], [0, 1], 'k--', linewidth=1.5)
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve for Weighted Ensemble")
plt.legend()
plt.grid(True, linestyle='--', alpha=0.6)
plt.show()

# ✅ Compute Metrics Per Class
precision_per_class = precision_score(y_true, ensemble_preds, average=None)
recall_per_class = recall_score(y_true, ensemble_preds, average=None)
f1_per_class = f1_score(y_true, ensemble_preds, average=None)

# ✅ Compute Macro and Weighted Averages
macro_avg = [weighted_ensemble_acc, precision, recall, f1]
weighted_avg = [weighted_ensemble_acc, precision_score(y_true, ensemble_preds, average='weighted'), recall_score(y_true, ensemble_preds, average='weighted'), weighted_f1]

# ✅ Create and Display 2D Metrics Matrix
metrics_df = pd.DataFrame(
    data=np.vstack([accuracy_per_class, precision_per_class, recall_per_class, f1_per_class]).T,
    index=class_labels,
    columns=["Accuracy", "Precision", "Recall", "F1 Score"]
)
metrics_df.loc["M. avg"] = macro_avg
metrics_df.loc["W. avg"] = weighted_avg

plt.figure(figsize=(6, 4))
sns.heatmap(metrics_df, annot=True, fmt=".4f", cmap="Greens", linewidths=0.5)
plt.title("Performance Metrics per Class for Weighted Ensemble")
plt.show()