In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer, AutoModelForMaskedLM, AutoModel
from torch.optim import AdamW
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score
from sklearn.preprocessing import LabelEncoder
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json


In [None]:
cd /news

In [None]:
#Loading the dataset

In [None]:
files = {
    "Economy": "Economy_40k.csv",
    "National": "National_40k.csv",
    "Education": "Education_40k.csv",
    "ScienceTechnology": "ScienceTechnology_40k.csv",
    "Entertainment": "Entertainment_40k.csv",
    "Sports": "Sports_40k.csv",
    "International": "International_40k.csv",
    "Politics": "politics_40k.csv",
}

 
dataframes = [pd.read_csv(file).assign(label=label) for label, file in files.items()]
combined_df = pd.concat(dataframes, ignore_index=True)
 
label_encoder = LabelEncoder()
combined_df["label"] = label_encoder.fit_transform(combined_df["label"])
label_mapping = dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_)))
 

# Split dataset
train_texts, test_texts, train_labels, test_labels = train_test_split(
    combined_df["article"], combined_df["label"], test_size=0.2, random_state=42
)

# Reduce dataset size for training
train_texts, test_texts, train_labels, test_labels = (
    train_texts[:2000], test_texts[:2000], train_labels[:2000], test_labels[:2000]
)


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
#Supervised Contrastive-Masked Pretraining

In [None]:
 
class Pretraining_model(nn.Module):  
    def __init__(self,  model_name):
        super().__init__()
        self.bert = AutoModelForMaskedLM.from_pretrained(model_name) 
        self.head = nn.Linear(self.bert.config.hidden_size,self.bert.config.hidden_size )

    def forward(self, input_ids, attention_mask,labels):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask,  labels=labels,output_hidden_states=True)
        cls_output = outputs.hidden_states[-1][:, 0, :] 
        linear_transformation=self.head(cls_output)
        return linear_transformation,outputs.loss 
    
    
def torch_kron(a, b):
    a_shape = [a.size(0), a.size(1)]
    b_shape = [b.size(0), b.size(1)]
    return torch.reshape(torch.reshape(a, [a_shape[0], 1, a_shape[1], 1]) * torch.reshape(b, [1, b_shape[0], 1, b_shape[1]]), [a_shape[0] * b_shape[0], a_shape[1] * b_shape[1]])

 
model_name = "distilbert-base-multilingual-cased"
model = Pretraining_model(model_name).to(device)
tokenizer = AutoTokenizer.from_pretrained(model_name)

train_text_list = train_texts.tolist()
train_labels_array = np.array(train_labels)

loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=5e-5)

num_classes = 8
feature_dim = 768
target_labels = torch.arange(num_classes, dtype=torch.int64, device=device)
ones_column = torch.ones((num_classes, 1), dtype=torch.float32, device=device)
ones_row = torch.ones((1, num_classes), dtype=torch.float32, device=device)

identity_matrix = np.identity(num_classes)
L = np.kron(identity_matrix, np.ones((feature_dim, 1)))
kronecker_tensor = torch.tensor(L, dtype=torch.float32, device=device)

num_epochs = 2000
num_inner_iterations = 4

for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    optimizer.zero_grad()

    for _ in range(num_inner_iterations):
         
        ww = []
        wwe = []
        nnn = num_classes  
        selected_indices = np.random.choice(num_classes, nnn, replace=False)
        for ii in range(nnn):
            class_indices = torch.nonzero(torch.tensor(train_labels_array) == selected_indices[ii], as_tuple=False).squeeze(-1)
            rand_choice = np.random.choice(class_indices.size(0), 2, replace=False)
            selected_numbers = class_indices[rand_choice]
            ww.append(selected_numbers[0])
            wwe.append(selected_numbers[1])
        dd = ww + wwe
        dt = np.stack([tensor.numpy() for tensor in dd])
        dt = dt.T
        train_text_subset = [train_text_list[i] for i in dt.flatten()]

        encoding = tokenizer(
            train_text_subset,
            max_length=512,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )

        input_ids = encoding.input_ids.clone()
        labels = input_ids.clone()
        probability_matrix = torch.full(labels.shape, 0.3)
        masked_indices = torch.bernoulli(probability_matrix).bool() & (input_ids != tokenizer.pad_token_id)
        labels[~masked_indices] = -100
        input_ids[masked_indices] = tokenizer.mask_token_id
        attention_mask = encoding["attention_mask"].squeeze(0)

        model_outputs, mlm_loss = model(
            input_ids=input_ids.to(device),
            attention_mask=attention_mask.to(device),
            labels=labels.to(device)
        )

        t1 = model_outputs[0:num_classes]
        t2 = model_outputs[num_classes:2 * num_classes]

        t1_reshaped = t1.reshape(1, num_classes * feature_dim)
        kron_result = torch.kron(ones_row, t2)
        z = torch.matmul(ones_column, t1_reshaped) - kron_result
        logits = -torch.matmul(torch.square(z), kronecker_tensor)

        iteration_loss = loss_fn(logits, target_labels) + mlm_loss
        total_loss += iteration_loss

    total_loss.backward()
    optimizer.step()

    if epoch % 50 == 0:
        print(f"Epoch {epoch} - Loss: {total_loss.item()}")

#save the base model
model.bert.save_pretrained("pretrained_distilbert_scmp")


In [None]:
#fine tuning Supervised Contrastive-Masked Pretrained model with joint loss function 

In [None]:

class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=512):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)
    
 

    def __getitem__(self, idx):
        text = self.texts[idx]
        
        label = self.labels[idx]
        inputs = self.tokenizer(
            text,
            return_tensors="pt",
            max_length=self.max_length,
            truncation=True,
            padding="max_length"
        )
        return {
            "input_ids": inputs["input_ids"].squeeze(0),
            "attention_mask": inputs["attention_mask"].squeeze(0),
            "label": torch.tensor(label, dtype=torch.long)}

    
    
    
class ClassifierModel(nn.Module):  
    def __init__(self, pretrained_model_name, num_labels):
        super().__init__()
        self.bert = AutoModelForMaskedLM.from_pretrained(model_name)
        self.head = nn.Linear(self.bert.config.hidden_size,num_labels )

    def forward(self, input_ids, attention_mask, labels, input_ids_unmasked):
        masked_outputs = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels,
            output_hidden_states=True
        )

        unmasked_outputs = self.bert(
            input_ids=input_ids_unmasked,
            attention_mask=attention_mask,
            output_hidden_states=True
        )
        cls_output = unmasked_outputs.hidden_states[-1][:, 0, :]
        classification_logits = self.head(cls_output)
        
        return classification_logits, masked_outputs.loss    
    
    
    
def train_model(model, dataloader, optimizer, loss_fn, device):
    model.train()
    total_loss = 0
    predictions = []
    true_labels = []
    for train_batch in dataloader:
        optimizer.zero_grad()
        input_ids = train_batch["input_ids"]
        attention_mask = train_batch["attention_mask"]

        labels = input_ids.clone()
        probability_matrix = torch.full(labels.shape, .3)
        masked_indices = torch.bernoulli(probability_matrix).bool() & (input_ids != tokenizer.pad_token_id)
        labels[~masked_indices] = -100
        input_ids[masked_indices] = tokenizer.mask_token_id

        masked_input_ids = input_ids
        masked_labels = labels
        
        labels = train_batch["label"].to(device)
        classification_logits, mlm_loss = model(
            input_ids=masked_input_ids.to(device),
            attention_mask=attention_mask.to(device),
            labels=masked_labels.to(device),
            input_ids_unmasked=train_batch["input_ids"].to(device)
        )
        loss = loss_fn(classification_logits, labels) + mlm_loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
 
        
        batch_preds = torch.argmax(classification_logits, dim=1)
        predictions.extend(batch_preds.cpu().numpy())
        true_labels.extend(labels.cpu().numpy())
        
    return total_loss / len(dataloader), predictions, true_labels


criterion = nn.CrossEntropyLoss()
 
def evaluate_model(model, dataloader, device, return_loss):
    model.eval()
    predictions, true_labels = [], []
    loss_values = []
    with torch.no_grad():
        for eval_batch in dataloader:
            input_ids = eval_batch["input_ids"]
            attention_mask = eval_batch["attention_mask"]
            labels = eval_batch["label"].to(device)
            
            classification_logits, _ = model(
                input_ids=input_ids.to(device),
                attention_mask=attention_mask.to(device),
                labels=None,
                input_ids_unmasked=eval_batch["input_ids"].to(device)
            )
            batch_preds = torch.argmax(classification_logits, dim=1)
            batch_loss_value = criterion(classification_logits, labels).item()
            loss_values.append(batch_loss_value)

            predictions.extend(batch_preds.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())
    return np.mean(loss_values), predictions, true_labels




  

tokenizer = AutoTokenizer.from_pretrained("distilbert-base-multilingual-cased")
train_texts_, val_texts_, train_labelss, val_labels = train_texts.tolist(), test_texts.tolist(),  np.array(train_labels),np.array(test_labels)

 
train_dataset = TextDataset(train_texts_, train_labelss, tokenizer)
val_dataset = TextDataset(val_texts_, val_labels, tokenizer)
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=16)

model_path = "pretrained_distilbert_scmp"
model = ClassifierModel(model_path, num_labels=8).to(device)

optimizer = AdamW(model.parameters(),  lr=5e-5)
loss_fn = nn.CrossEntropyLoss()


epochs = 400
metrics_history = [] 


for epoch in range(epochs):
    # Train model
    train_loss, train_predictions, train_true_labels = train_model(model, train_dataloader, optimizer, loss_fn, device)

    # Compute training accuracy
    train_accuracy = accuracy_score(train_true_labels, train_predictions)
    print(f"Epoch {epoch + 1}/{epochs}, Training Loss: {train_loss:.4f}")

    if (epoch + 1) % 10 == 0 or epoch == 0:
        # Evaluate the model
        train_accuracy = accuracy_score(train_true_labels, train_predictions)
        val_loss, val_predictions, val_true_labels = evaluate_model(model, val_dataloader, device, return_loss=True)
 
        val_accuracy = accuracy_score(val_true_labels, val_predictions)
 
        class_report = classification_report(val_true_labels, val_predictions, output_dict=True)
 
        metrics_history.append({
            "epoch": epoch + 1,
            "train_loss": train_loss,
            "train_accuracy": train_accuracy,
            "val_loss": val_loss,
            "val_accuracy": val_accuracy,
            "classification_report": class_report
        })

        
        print(f"Epoch {epoch + 1}: Validation Loss = {val_loss:.4f}, Validation Accuracy = {val_accuracy:.4f}")
        print(classification_report(val_true_labels, val_predictions))

df = pd.DataFrame([{k: v for k, v in entry.items() if k != "classification_report"} for entry in metrics_history])
df.to_csv("metrics_history_joint_finetuning.csv", index=False)

 
with open("classification_reports.json", "w") as f:
    json.dump([entry["classification_report"] for entry in metrics_history], f, indent=4)

    
    
   
 
 

In [None]:
#plotting validation and training accuracy

In [None]:


# Load the metrics history CSV file
df = pd.read_csv("metrics_history_joint_finetuning.csv")
 
plt.rcParams.update({
    "font.family": "serif",
    "font.size": 10,
    "axes.labelsize": 10,
    "axes.titlesize": 11,
    "xtick.labelsize": 9,
    "ytick.labelsize": 9,
    "legend.fontsize": 9,
    "lines.linewidth": 1.5,
    "lines.markersize": 5
})

 
plt.figure(figsize=(4, 2.5))
plt.plot(df["epoch"], df["train_accuracy"], marker="o", linestyle="-", label="Train accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.title( "Train accuracy Curve")
plt.legend()
plt.grid(True, linestyle="--", linewidth=0.5)
plt.tight_layout()
plt.savefig("validation_loss_curve.pdf", bbox_inches="tight")
plt.show()

 
plt.figure(figsize=(4, 2.5))
plt.plot(df["epoch"], df["val_accuracy"], marker="s", linestyle="-", label="Validation Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.title("Validation Accuracy Curve")
plt.legend()
plt.grid(True, linestyle="--", linewidth=0.5)
plt.tight_layout()
plt.savefig("avalidation_accuracy_curve.pdf", bbox_inches="tight")
plt.show()


In [None]:
#Direct fine tuning

In [None]:

class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=512):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)
    
 

    def __getitem__(self, idx):
        text = self.texts[idx]
        
        label = self.labels[idx]
        inputs = self.tokenizer(
            text,
            return_tensors="pt",
            max_length=self.max_length,
            truncation=True,
            padding="max_length"
        )
        return {
            "input_ids": inputs["input_ids"].squeeze(0),
            "attention_mask": inputs["attention_mask"].squeeze(0),
            "label": torch.tensor(label, dtype=torch.long)}

    
    
 
    
class ClassifierModel(nn.Module):  
    def __init__(self, pretrained_model_name, num_labels):
        super().__init__()
        self.bert = AutoModelForMaskedLM.from_pretrained(model_name)
        self.head = nn.Linear(self.bert.config.hidden_size,num_labels )

    def forward(self, input_ids, attention_mask):
 

        unmasked_outputs = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask,
            output_hidden_states=True
        )
        cls_output = unmasked_outputs.hidden_states[-1][:, 0, :]
        classification_logits = self.head(cls_output)
        
        return classification_logits, _    
    
    
    
def train_model(model, dataloader, optimizer, loss_fn, device):
    model.train()
    total_loss = 0
    predictions = []
    true_labels = []
    for train_batch in dataloader:
        optimizer.zero_grad()
        input_ids = train_batch["input_ids"]
        attention_mask = train_batch["attention_mask"]
 
        
        labels = train_batch["label"].to(device)
        classification_logits, _ = model(
            input_ids=train_batch["input_ids"].to(device),
            attention_mask=attention_mask.to(device),
          
       
        )
        loss = loss_fn(classification_logits, labels)  
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
 
        
        batch_preds = torch.argmax(classification_logits, dim=1)
        predictions.extend(batch_preds.cpu().numpy())
        true_labels.extend(labels.cpu().numpy())
        
    return total_loss / len(dataloader), predictions, true_labels


criterion = nn.CrossEntropyLoss()
 
def evaluate_model(model, dataloader, device, return_loss):
    model.eval()
    predictions, true_labels = [], []
    loss_values = []
    with torch.no_grad():
        for eval_batch in dataloader:
            input_ids = eval_batch["input_ids"]
            attention_mask = eval_batch["attention_mask"]
            labels = eval_batch["label"].to(device)
            
            classification_logits, _ = model(
                input_ids=input_ids.to(device),
                attention_mask=attention_mask.to(device),
                 
            )
            batch_preds = torch.argmax(classification_logits, dim=1)
            batch_loss_value = criterion(classification_logits, labels).item()
            loss_values.append(batch_loss_value)

            predictions.extend(batch_preds.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())
    return np.mean(loss_values), predictions, true_labels




 

tokenizer = AutoTokenizer.from_pretrained("distilbert-base-multilingual-cased")
train_texts_, val_texts_, train_labelss, val_labels = train_texts.tolist(), test_texts.tolist(),  np.array(train_labels),np.array(test_labels)
train_dataset = TextDataset(train_texts_, train_labelss, tokenizer)
val_dataset = TextDataset(val_texts_, val_labels, tokenizer)
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=16)


 
model_name ="distilbert-base-multilingual-cased"
model = ClassifierModel(model_name, num_labels=8).to(device)

optimizer = AdamW(model.parameters(),  lr=5e-5)
loss_fn = nn.CrossEntropyLoss()


epochs = 400
metrics_history = [] 


for epoch in range(epochs):
   
    train_loss, train_predictions, train_true_labels = train_model(model, train_dataloader, optimizer, loss_fn, device)

 
    train_accuracy = accuracy_score(train_true_labels, train_predictions)
    print(f"Epoch {epoch + 1}/{epochs}, Training Loss: {train_loss:.4f}")

    if (epoch + 1) % 10 == 0 or epoch == 0:
       
        train_accuracy = accuracy_score(train_true_labels, train_predictions)
        val_loss, val_predictions, val_true_labels = evaluate_model(model, val_dataloader, device, return_loss=True)

      
        val_accuracy = accuracy_score(val_true_labels, val_predictions)
 
        class_report = classification_report(val_true_labels, val_predictions, output_dict=True)

     
        metrics_history.append({
            "epoch": epoch + 1,
            "train_loss": train_loss,
            "train_accuracy": train_accuracy,
            "val_loss": val_loss,
            "val_accuracy": val_accuracy,
            "classification_report": class_report
        })

      
        print(f"Epoch {epoch + 1}: Validation Loss = {val_loss:.4f}, Validation Accuracy = {val_accuracy:.4f}")
        print(classification_report(val_true_labels, val_predictions))

        
df = pd.DataFrame([{k: v for k, v in entry.items() if k != "classification_report"} for entry in metrics_history])
df.to_csv("metrics_history_direct_finetuning.csv", index=False)
 
    
with open("classification_reports.json", "w") as f:
    json.dump([entry["classification_report"] for entry in metrics_history], f, indent=4)

 
 

In [None]:
#plotting validation and training accuracy

In [None]:


# Load the metrics history CSV file
df = pd.read_csv("metrics_history_direct_finetuning.csv")
 
plt.rcParams.update({
    "font.family": "serif",
    "font.size": 10,
    "axes.labelsize": 10,
    "axes.titlesize": 11,
    "xtick.labelsize": 9,
    "ytick.labelsize": 9,
    "legend.fontsize": 9,
    "lines.linewidth": 1.5,
    "lines.markersize": 5
})

 
plt.figure(figsize=(4, 2.5))
plt.plot(df["epoch"], df["train_accuracy"], marker="o", linestyle="-", label="Train accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.title( "Train accuracy Curve")
plt.legend()
plt.grid(True, linestyle="--", linewidth=0.5)
plt.tight_layout()
plt.savefig("validation_loss_curve.pdf", bbox_inches="tight")
plt.show()

 
plt.figure(figsize=(4, 2.5))
plt.plot(df["epoch"], df["val_accuracy"], marker="s", linestyle="-", label="Validation Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.title("Validation Accuracy Curve")
plt.legend()
plt.grid(True, linestyle="--", linewidth=0.5)
plt.tight_layout()
plt.savefig("avalidation_accuracy_curve.pdf", bbox_inches="tight")
plt.show()
