In [1]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from dataclasses import dataclass
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoConfig
from transformers import TrainingArguments, Trainer
from sklearn.metrics import log_loss

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
@dataclass
class TrainCfg:
    # Use 'microsoft/deberta-v3-large' for better results, 'small' for speed/debugging
    model_name = 'microsoft/deberta-v3-small' 
    max_length = 1024
    train_batch_size = 4
    valid_batch_size = 8
    learning_rate = 2e-5
    epochs = 10
    n_folds = 5
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    seed = 42
    
    train_path = 'data/train.csv'
    test_path = 'data/test.csv'
    sub_path = 'data/ours_submission.csv'

In [None]:
# =========================================================================================
# DATA PREPROCESSING
# =========================================================================================
def process_text(text):
    """
    The dataset sometimes stores conversation history as a string representation 
    of a list (e.g., "['hello', 'hi']"). We need to join them into plain text.
    """
    try:
        # If it looks like a list, try to evaluate it
        if text.startswith("[") and text.endswith("]"):
            import ast
            text_list = ast.literal_eval(text)
            return " ".join(text_list)
        return text
    except:
        return text

def prepare_input(cfg, text, tokenizer):
    """
    Tokenizes the input text.
    """
    inputs = tokenizer(
        text,
        add_special_tokens=True,
        max_length=cfg.max_length,
        padding="max_length",
        truncation=True,
        return_tensors=None, # Return standard lists, not tensors yet
    )
    
    return {
        "input_ids": inputs["input_ids"],
        "attention_mask": inputs["attention_mask"],
    }

class LMSYSDataset(Dataset):
    def __init__(self, df, tokenizer, max_length, is_test=False):
        self.df = df
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.is_test = is_test
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        
        # 1. Extract and Clean Text
        prompt = process_text(row['prompt'])
        resp_a = process_text(row['response_a'])
        resp_b = process_text(row['response_b'])
        
        # 2. Construct Input String
        # Structure: [CLS] Prompt [SEP] Response A [SEP] Response B [SEP]
        # Note: DeBERTa v3 uses [SEP] to separate segments naturally
        text = f"{prompt} [SEP] {resp_a} [SEP] {resp_b}"
        
        # 3. Tokenize
        inputs = prepare_input(TrainCfg, text, self.tokenizer)
        
        item = {
            "input_ids": torch.tensor(inputs["input_ids"], dtype=torch.long),
            "attention_mask": torch.tensor(inputs["attention_mask"], dtype=torch.long),
        }
        
        # 4. Handle Labels (Only for Training)
        if not self.is_test:
            # Create a probability distribution target
            # [A wins, B wins, Tie]
            label_vec = np.zeros(3, dtype=np.float32)
            if row['winner_model_a'] == 1:
                label_vec[0] = 1.0
            elif row['winner_model_b'] == 1:
                label_vec[1] = 1.0
            else:
                label_vec[2] = 1.0
                
            item["labels"] = torch.tensor(label_vec, dtype=torch.float)
            
        return item

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    # Apply softmax to convert logits to probabilities
    probs = torch.nn.functional.softmax(torch.tensor(predictions), dim=-1).numpy()
    score = log_loss(labels, probs)
    return {"log_loss": score}

In [None]:

print(f"Using Device: {TrainCfg.device}")

# 1. Load Data
# In a real Kaggle run, use the paths in CFG.
# For local testing, we create a dummy dataframe.
if os.path.exists(TrainCfg.train_path):
    train_df = pd.read_csv(TrainCfg.train_path)
    test_df = pd.read_csv(TrainCfg.test_path)
else:
    raise ValueError("No data found")

print(f"Train Shape: {train_df.shape}")

# 2. Tokenizer
tokenizer = AutoTokenizer.from_pretrained(TrainCfg.model_name)

# 3. Train-Validation Split
# We use a simple split for the starter. 
# For a winning solution, loop over k-folds.
from sklearn.model_selection import train_test_split
train_split, val_split = train_test_split(train_df, test_size=0.1, random_state=TrainCfg.seed)

train_dataset = LMSYSDataset(train_split, tokenizer, TrainCfg.max_length)
valid_dataset = LMSYSDataset(val_split, tokenizer, TrainCfg.max_length)

# 4. Model Setup
model = AutoModelForSequenceClassification.from_pretrained(
    TrainCfg.model_name,
    num_labels=3,
    problem_type="multi_label_classification" 
    # We treat it as multi-label so we can use float targets (BCEWithLogitsLoss logic internal)
    # However, custom loss is often better. HF default for float labels is BCEWithLogitsLoss.
)

# 5. Training Arguments
args = TrainingArguments(
    output_dir=f"output_lmsys",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=TrainCfg.learning_rate,
    per_device_train_batch_size=TrainCfg.train_batch_size,
    per_device_eval_batch_size=TrainCfg.valid_batch_size,
    num_train_epochs=TrainCfg.epochs,
    weight_decay=0.01,
    metric_for_best_model="log_loss",
    greater_is_better=False,
    fp16=True,
    report_to="none",
    load_best_model_at_end=True,
)

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=train_dataset,
    eval_dataset=valid_dataset,
    compute_metrics=compute_metrics,
    tokenizer=tokenizer,
)

# 6. Train
print("Starting Training...")
trainer.train()

In [None]:
print("Starting Inference with TTA...")

# Strategy:
# Prediction 1: Prompt + RespA + RespB
# Prediction 2: Prompt + RespB + RespA (Swap)
# Result = (Pred1 + Swap_Probabilities(Pred2)) / 2

# Prepare Test Datasets
# 1. Normal
test_ds_normal = LMSYSDataset(test_df, tokenizer, TrainCfg.max_length, is_test=True)

# 2. Swapped (Swap columns in a copy of the dataframe)
test_df_swapped = test_df.copy()
test_df_swapped.rename(columns={'response_a': 'response_b', 'response_b': 'response_a'}, inplace=True)
test_ds_swapped = LMSYSDataset(test_df_swapped, tokenizer, TrainCfg.max_length, is_test=True)

# Predict
preds_normal = trainer.predict(test_ds_normal).predictions
preds_swapped = trainer.predict(test_ds_swapped).predictions

# Apply Softmax to get probabilities
probs_normal = torch.nn.functional.softmax(torch.tensor(preds_normal), dim=-1).numpy()
probs_swapped = torch.nn.functional.softmax(torch.tensor(preds_swapped), dim=-1).numpy()

# Swap the probabilities of A and B back for the swapped prediction
# If the model said B wins (index 1) in the swapped version, that actually means A wins (index 0)
# Indices: 0=A, 1=B, 2=Tie
probs_swapped_fixed = np.zeros_like(probs_swapped)
probs_swapped_fixed[:, 0] = probs_swapped[:, 1] # B becomes A
probs_swapped_fixed[:, 1] = probs_swapped[:, 0] # A becomes B
probs_swapped_fixed[:, 2] = probs_swapped[:, 2] # Tie stays Tie

# Ensemble
final_probs = (probs_normal + probs_swapped_fixed) / 2

submission = pd.DataFrame({
    'id': test_df['id'],
    'winner_model_a': final_probs[:, 0],
    'winner_model_b': final_probs[:, 1],
    'winner_tie': final_probs[:, 2]
})


In [None]:
submission.to_csv(TrainCfg.sub_path, index=False)
print("Submission saved successfully!")
submission.head()