# DeBERTa-v3 

### Import necessary packages

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
import mlflow
from mlflow.transformers import log_model
import logging 
from mlflow.sklearn import save_model

from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.preprocessing import LabelEncoder
from mlflow.models.signature import infer_signature
from sklearn.utils.class_weight import compute_class_weight
from sklearn.preprocessing import LabelBinarizer
from scipy.special import softmax
import numpy as np

from torch import nn
import mlflow.pytorch

import sentencepiece
import os

# os.environ["TOKENIZERS_PARALLELISM"] = "false"  # This tells Hugging Face: “Don’t use parallel tokenization — avoid possible deadlocks.”

from torch.utils.data import Dataset, DataLoader
import torch

from transformers import TrainingArguments, Trainer, AutoModelForSequenceClassification, AutoModel, AutoTokenizer, AutoConfig

import config 

  from .autonotebook import tqdm as notebook_tqdm


### MLFlow setup


In [2]:
MODEL_NAME = "deberta_v3" 
TRACKING_URI = open("../.mlflow_uri").read().strip()
EXPERIMENT_NAME = config.EXPERIMENT_NAME

logging.basicConfig(format="%(asctime)s: %(message)s") # Configure logging format to show timestamp before every message

logger = logging.getLogger()
logger.setLevel(logging.INFO) # Only show logs that are INFO or more important (e.g., WARNING, ERROR) — but ignore DEBUG.

### Get data

In [3]:
DATA_PATH = "../data/data_small.csv"
MODEL_PATH = "microsoft/deberta-v3-base"
MODEL_TRAINING_PATH ="microsoft/deberta-v3-small"

In [None]:
logger.info('create tokenizer & load model')
# tokenization after train test split to prevent data leakage
#added use_fast=False to prevent tokenization error (might happen when using fast tokenization)
tokenizer = AutoTokenizer.from_pretrained(model_path, use_fast=False)

In [None]:
def tokenize(texts):
    return tokenizer(
        texts,
        padding="max_length", #ensures that all tokenized sequences are padded to the same length, padding adds special tokens to shorter sequeces so they match the maximum length
        truncation=True, #if sequence exceeds max, it will be trucated
        max_length=512, #for most transformer models, 512 is a common limit for maximum length
        return_tensors="pt" #converts the output to pytorch tensors
    )

#object oriented programming (class is the object), with class you can do different things, such as calling functions
class TextDataset(Dataset):  # Inherits from PyTorch's Dataset class
    def __init__(self, encodings, labels):
        self.input_ids = encodings['input_ids']       # Token IDs from tokenizer
        self.attention_mask = encodings['attention_mask']  # Mask for padding
        self.labels = torch.tensor(labels)  # Convert labels to tensors
    def __getitem__(self, idx):
        return {
            'input_ids': self.input_ids[idx],       # Token IDs for one sample
            'attention_mask': self.attention_mask[idx],  # Mask for one sample
            'labels': self.labels[idx]              # Label for one sample
        }
    def __len__(self):
        return len(self.labels)  # Total number of samples

def get_encode_tokenize_data(path, model_path):
    logger.info("Loading data...")
    df = pd.read_csv(path)
    y = df["logical_fallacies"]
    X = df["text"]
    logger.info("Train test split, test-size 0.3")
    X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.30, random_state=42)

    logging.info('encode the label column')
    le = LabelEncoder()
    y_train = le.fit_transform(y_train) 
    y_test = le.transform(y_test)

    logging.info('tokenize')
    train_encodings = tokenize(X_train.to_list())
    test_encodings = tokenize(X_test.to_list())

    logging.info('create TextDatasets (train & test)')
    train_dataset = TextDataset(train_encodings, y_train)
    test_dataset = TextDataset(test_encodings, y_test)

    return train_dataset, test_dataset, y_train, le


In [None]:
train_dataset, test_dataset, y_train, le = get_encode_tokenize_data(DATA_PATH, MODEL_PATH)

In [None]:
y_train

### done


In [None]:
# df = pd.read_csv('../data/data_small.csv')

# Y = df["logical_fallacies"]
# X = df["text"]

In [None]:
# X_train, X_test, y_train, y_test = train_test_split(
#     X, Y, test_size=0.30, random_state=42)

https://stackoverflow.com/questions/76868251/how-to-load-deberta-v3-properly

In [None]:
# tokenization after train test split to prevent data leakage

#added use_fast=False to prevent tokenization error (might happen when using fast tokenization)
# tokenizer = AutoTokenizer.from_pretrained('microsoft/deberta-v3-base', use_fast=False)

In [None]:
# def tokenize(texts):
#     return tokenizer(
#         texts,
#         padding="max_length", #ensures that all tokenized sequences are padded to the same length, padding adds special tokens to shorter sequeces so they match the maximum length
#         truncation=True, #if sequence exceeds max, it will be trucated
#         max_length=512, #for most transformer models, 512 is a common limit for maximum length
#         return_tensors="pt" #converts the output to pytorch tensors
#     )

In [None]:
# train_encodings = tokenize(X_train.to_list())
# test_encodings = tokenize(X_test.to_list())

### Convert string labels to integers

In [None]:
# le = LabelEncoder()
# y_train = le.fit_transform(y_train) 
# y_test = le.transform(y_test)

### Dataset Preparation for usage in model training

needed to create a PyTorch Dataset object that:
- organizes tokenized text
- pairs them with corresponding labels
- structures everything for batch processing during training

In [None]:
# #object oriented programming (class is the object), with class you can do different things, such as calling functions

# class TextDataset(Dataset):  # Inherits from PyTorch's Dataset class
#     def __init__(self, encodings, labels):
#         self.input_ids = encodings['input_ids']       # Token IDs from tokenizer
#         self.attention_mask = encodings['attention_mask']  # Mask for padding
#         self.labels = torch.tensor(labels)  # Convert labels to tensors
#     def __getitem__(self, idx):
#         return {
#             'input_ids': self.input_ids[idx],       # Token IDs for one sample
#             'attention_mask': self.attention_mask[idx],  # Mask for one sample
#             'labels': self.labels[idx]              # Label for one sample
#         }
#     def __len__(self):
#         return len(self.labels)  # Total number of samples

In [None]:
# train_dataset = TextDataset(train_encodings, y_train)
# test_dataset = TextDataset(test_encodings, y_test)

In [None]:
train_dataset[0]

### Zero Shot Inference 

In [None]:
# # disable upper limit for memory
# os.environ["PYTORCH_MPS_HIGH_WATERMARK_RATIO"] = "0.0"

# # Allows up to 100% of available memory
# torch.mps.set_per_process_memory_fraction(1.0)  

# torch.mps.empty_cache()  # Clears unused GPU memory

In [None]:
# # Load fresh copy of base model (not train on our data)
# num_classes = len(df["logical_fallacies"].unique())
# base_model = AutoModelForSequenceClassification.from_pretrained(
#     "microsoft/deberta-v3-small",
#     num_labels=num_classes,
#     problem_type="single_label_classification"
# )

In [None]:
# def predict(model, encodings, batch_size=8):
#     # Set the model to evaluation mode
#     model.eval()
    
#     # Use GPU
#     device = torch.device("mps")
#     model.to(device)
    
#     # Perform inference
#     probabilities = []
#     for i in range(0, len(encodings["input_ids"]), batch_size):
#         with torch.no_grad():
#             batch = {
#                 "input_ids": encodings["input_ids"][i:i+batch_size].to(device),
#                 "attention_mask": encodings["attention_mask"][i:i+batch_size].to(device)
#             }
#             outputs = model(**batch)
#             probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()
#             probabilities.extend(probs)
            
#         # Clear GPU memory after each batch
#         torch.mps.empty_cache()
    
#     return np.array(probabilities)

In [None]:
# # Get predictions for test data
# base_probs = predict(base_model, test_encodings, batch_size=8)

In [None]:
# # Get highest probability indices
# predicted_indices = np.argmax(base_probs, axis=1)  

In [None]:
# from sklearn.metrics import classification_report

# # Generate classification report
# report = classification_report(y_test, predicted_indices, target_names=le.classes_)
# print(report)

Note: This deberta model is actually not designed for zero shot, there is one by MoritzLauer which can be used without requiring training on data. So training on data is actually necessary! The DeBERTa used here is meant for supervised learning. 
Another option is to use BART, facebook/bart-large-mnli model.

**Zero-Shot Learning** </span> is a concept, that a model when trained on enough unlabeled data (unsupervised learning) is able to generalize/ recognize at inference time even though the model was not trained on the inference data. This can be used in NLP, Images etc.

### Model Initialization

I had to change configuration of accelerate, as it might still be configured to fp16 (mixed precision)(doesn't work on Apple M1 Pro):
- type in bash accelerate confic
- this machine
- no distributed training
- do you want to run your training on CPU only, say No, as MAC Apple M1 Pro has GPU
- do you wish to optimize script with torch dynamo: say "No" if using an Apple M1 Pro with MPS backend
- do you want to use mixed prexision: NO

In [None]:
num_classes = len(np.unique(y_train))
num_classes

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(
    MODEL_TRAINING_PATH,
    num_labels=num_classes,
    problem_type="single_label_classification"
)

model.gradient_checkpointing_enable()  # force model to use gradient checkpointing to save memory

### Class imbalance

In [None]:
OUTPUT_DIR = "../models/LLM_deberta_v3_small_class_imbalance/trainer_output"
SAVE_PATH = "../models/LLM_deberta_v3_small_class_imbalance/pytorch_model"

In [None]:
class WeightedLossTrainer(Trainer):
    def __init__(self, class_weights=None, **kwargs):
        super().__init__(**kwargs)
        self.class_weights = class_weights
        if self.class_weights is not None:
            # Move weights to device after model initialization
            self._move_weights_to_device()
    
    def _move_weights_to_device(self):
        self.class_weights = self.class_weights.to(self.model.device)

    def compute_loss(
        self, 
        model, 
        inputs, 
        return_outputs=False, 
        num_items_in_batch=None  # Add this parameter
    ):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits
        loss_fct = nn.CrossEntropyLoss(weight=self.class_weights)
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), 
                       labels.view(-1))
        return (loss, outputs) if return_outputs else loss
        

def createTrainer(
    output_dir,
    y_train, 
    class_weight=False, 
    epochs=3, 
    learning_rate=5e-5, 
    weight_decay = 0, 
    train_batch_size = 4, 
    eval_batch_size=8
    ):
    logging.info('defining training arguments')
    training_args = TrainingArguments(
            output_dir=output_dir, # to sve results
            num_train_epochs=epochs,
            per_device_train_batch_size=train_batch_size, #small to save memory
            per_device_eval_batch_size=eval_batch_size, #small to save memory
            learning_rate=learning_rate, #standard for deberta; maybe try 6e-6
            weight_decay=weight_decay,
            eval_strategy="epoch",
            logging_steps=50,
            save_strategy="epoch",
            load_best_model_at_end=True
        )

    computed_class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(y_train), y=y_train)
    # Convert class weights to tensor
    computed_class_weights_tensor = torch.tensor(computed_class_weights, dtype=torch.float32)

    def compute_metrics(p):
        preds = p.predictions.argmax(-1)
        return {'accuracy': accuracy_score(p.label_ids, preds)}

    if class_weight==False:
        logging.info('get normal trainer')
        return Trainer(
                model=model,
                args=training_args,
                train_dataset=train_dataset,
                eval_dataset=test_dataset,
                compute_metrics=compute_metrics
            )
    else:
        logging.info('get weighted loss trainer')
        return WeightedLossTrainer(
                model=model,
                args=training_args,
                train_dataset=train_dataset,
                eval_dataset=test_dataset,
                compute_metrics=compute_metrics,
                class_weights=computed_class_weights_tensor
            )



In [None]:
# epoch = 3
# learning_rate=2e-5 #standard for deberta; maybe try 6e-6
# weight_decay=0.01
# per_device_train_batch_size=4 #small to save memory
# per_device_eval_batch_size=8 

In [None]:
trainer = createTrainer(class_weight=True, output_dir= OUTPUT_DIR, y_train=y_train, epochs=1, learning_rate=2e-5, weight_decay = 0.01, train_batch_size=4, eval_batch_size=8 )


#### done

In [None]:
# Class-balanced trainer
# class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(y_train), y=y_train)

In [None]:
# # Convert class weights to tensor
# class_weights = torch.tensor(class_weights, dtype=torch.float32)

In [None]:

# class WeightedLossTrainer(Trainer):
#     def __init__(self, class_weights=None, **kwargs):
#         super().__init__(**kwargs)
#         self.class_weights = class_weights
#         if self.class_weights is not None:
#             # Move weights to device after model initialization
#             self._move_weights_to_device()
    
#     def _move_weights_to_device(self):
#         self.class_weights = self.class_weights.to(self.model.device)

#     def compute_loss(
#         self, 
#         model, 
#         inputs, 
#         return_outputs=False, 
#         num_items_in_batch=None  # Add this parameter
#     ):
#         labels = inputs.pop("labels")
#         outputs = model(**inputs)
#         logits = outputs.logits
#         loss_fct = nn.CrossEntropyLoss(weight=self.class_weights)
#         loss = loss_fct(logits.view(-1, self.model.config.num_labels), 
#                        labels.view(-1))
#         return (loss, outputs) if return_outputs else loss

### Training Confguration 

In [None]:
# Trainer with class weights

# training_args = TrainingArguments(
#     output_dir='../models/LLM_deberta_v3_small_class_imbalance/trainer_output', # to sve results
#     num_train_epochs=3,
#     per_device_train_batch_size=4, #small to save memory
#     per_device_eval_batch_size=8, #small to save memory
#     learning_rate=2e-5, #standard for deberta; maybe try 6e-6
#     weight_decay=0.01,
#     evaluation_strategy="epoch",
#     logging_steps=50,
#     save_strategy="epoch",
#     load_best_model_at_end=True
# )

# def compute_metrics(p):
#     preds = p.predictions.argmax(-1)
#     return {'accuracy': accuracy_score(p.label_ids, preds)}

# trainer = WeightedLossTrainer(
#     model=model,
#     args=training_args,
#     train_dataset=train_dataset,
#     eval_dataset=test_dataset,
#     compute_metrics=compute_metrics,
#     class_weights=class_weights
# )

In [None]:
# # trainer without class weights

# training_args = TrainingArguments(
#     output_dir='../models/LLM_deberta_v3_small_class_imbalance/trainer_output', # to sve results
#     num_train_epochs=3,
#     per_device_train_batch_size=4, #small to save memory
#     per_device_eval_batch_size=8, #small to save memory
#     learning_rate=2e-5, #standard for deberta; maybe try 6e-6
#     weight_decay=0.01,
#     evaluation_strategy="epoch",
#     logging_steps=50,
#     save_strategy="epoch",
#     load_best_model_at_end=True
# )

# def compute_metrics(p):
#     preds = p.predictions.argmax(-1)
#     return {'accuracy': accuracy_score(p.label_ids, preds)}

# trainer = Trainer(
#     model=model,
#     args=training_args,
#     train_dataset=train_dataset,
#     eval_dataset=test_dataset,
#     compute_metrics=compute_metrics
# )

### Execute Training

In [None]:
torch.mps.empty_cache()  # Clears unused GPU memory

In [None]:
# disable upper limit for memory
os.environ["PYTORCH_MPS_HIGH_WATERMARK_RATIO"] = "0.0"

# Allows up to 100% of available memory
torch.mps.set_per_process_memory_fraction(1.0)  

In [None]:
logger.info('training is running')
trainer.train()

### Evaluation

In [None]:
# results = trainer.evaluate()
# print(f"Test Accuracy: {results['eval_accuracy']:.4f}")

In [None]:
logger.info('predict on test_dataset')
output = trainer.predict(test_dataset)


In [None]:
def show_eval_metrics(output, le):
    logger.info('get evaluation metrics')

    y_pred = np.argmax(output.predictions, axis=1)
    y_true = output.label_ids
    logits = output.predictions
    proba = softmax(logits, axis=1)

    logger.info('classification_report')
    print(classification_report(y_true, y_pred, target_names=le.classes_))

    logger.info('confusion_matrix')
    print(confusion_matrix(y_true, y_pred))

    logger.info('brier score')
    # 1. One-hot encode the true labels (y_test)
    lb = LabelBinarizer()
    y_true_onehot = lb.fit_transform(y_true)  # Shape: (n_samples, n_classes)

    # 2. Compute Brier score for multiclass
    brier_score = np.mean(np.sum((proba - y_true_onehot) ** 2, axis=1))
    print("Multiclass Brier score:", brier_score)
  

In [None]:
show_eval_metrics(output, le)

In [None]:
# Generate classification report
# print(classification_report(y_true, predictions, target_names=le.classes_))

# # Generate confusion matrix
# cm = confusion_matrix(y_true, predictions)
# print("Confusion Matrix:")
# print(cm)

### Save model

I saved the model here as a SK model, although it is a Pytorch model. Keep that in mind!

Save model:
import mlflow.pytorch
mlflow.pytorch.save_model(model, "deberta_model")

Load model (correct way):
model = mlflow.pytorch.load_model("deberta_model")

In [None]:
# #save with sklearn
# path_sk = "../models/LLM_deberta_v3_small_class_imbalance/sk_learn_model"
# save_model(sk_model=model, path=path_sk)

In [None]:
#save with pytorch

mlflow.pytorch.save_model(model, path=SAVE_PATH)

### Load model

In [None]:
import mlflow.sklearn
path_sk = "../models/LLM_deberta_v3_small_class_imbalance/sk_learn_model"
model = mlflow.sklearn.load_model(path_sk)

In [None]:
import mlflow.pytorch
path_pt = "../models/LLM_deberta_v3_small_class_imbalance/pytorch_model"
model = mlflow.pytorch.load_model(path_pt)

### Make predictions based on reloaded model

Here the code works, although I loaded it as a Sklearn model, because I manually converted the logits to probabilites with torch.softmax. 

mlflow.sklearn.load_model() accidentally worked because MLflow can sometimes load PyTorch models as generic Python objects, but this isn't reliable

In [None]:
# # batch can be changed both codes, now used the upper one
# # upper one makes it more generalized for dynamic inputs, as the lower one only handles input_ids and attention_mask
# batch = {
#     key: val[i:i+batch_size].to(device) 
#     for key, val in encodings.items()
# }

# batch = {
#                 "input_ids": encodings["input_ids"][i:i+batch_size].to(device),
#                 "attention_mask": encodings["attention_mask"][i:i+batch_size].to(device)
#             }

In [None]:
## Function for prediction

def predict(model, encodings, batch_size=8):
    # Set the model to evaluation mode
    model.eval()
    
    # Use GPU
    device = torch.device("mps")
    model.to(device)
    
    # Perform inference
    probabilities = []
    for i in range(0, len(encodings["input_ids"]), batch_size):
        with torch.no_grad():
            batch = {
                key: val[i:i+batch_size].to(device) 
                for key, val in encodings.items()
            }
            outputs = model(**batch)
            probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()
            probabilities.extend(probs)
            
        # Clear GPU memory after each batch
        torch.mps.empty_cache()
    
    return np.array(probabilities)

In [None]:
#needed to reduce the batch size, otherwise I had an error
# Get predictions for test data
base_probs = predict(model, test_encodings, batch_size=2)

In [None]:
# Get highest probability indices
predicted_labels = np.argmax(base_probs, axis=1)

In [None]:
# Get second highest probability indices
second_predicted_labels = np.argsort(base_probs, axis=1)[:, -2]  

In [None]:
#probabilites of first predicted
predicted_label_probs = base_probs[np.arange(len(predicted_labels)), predicted_labels]

In [None]:
#probabilites of second predicted
second_predicted_label_probs = np.sort(base_probs, axis=1)[:, -2]  

In [None]:
#for backend 
result = {
    "predicted_labels": predicted_labels,
    "predicted_label_probs": predicted_label_probs,
    "second_predicted_labels": second_predicted_labels,
    "second_predicted_label_probs": second_predicted_label_probs
}

In [None]:
from sklearn.metrics import classification_report

# Generate classification report
report = classification_report(y_test, predicted_labels, target_names=le.classes_)
print(report)

# Generate confusion matrix
cm = confusion_matrix(y_test, predicted_labels)
print("Confusion Matrix:")
print(cm)

In [None]:
# from sklearn.preprocessing import LabelBinarizer

# # 1. One-hot encode the true labels (y_test)
# lb = LabelBinarizer()
# y_true_onehot = lb.fit_transform(y_test)  # Shape: (n_samples, n_classes)

# # 2. Compute Brier score for multiclass
# brier_score = np.mean(np.sum((base_probs - y_true_onehot) ** 2, axis=1))
# print("Multiclass Brier score:", brier_score)