In [None]:
# Import Libraries
import pandas as pd
import numpy as np
import os
import gc
from ast import literal_eval
import warnings
warnings.filterwarnings('ignore')

import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import log_loss, accuracy_score
from transformers import (
    AutoTokenizer, 
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    DataCollatorWithPadding
)
from datasets import Dataset as HFDataset
from tqdm.auto import tqdm

# Check if running on Kaggle
IS_KAGGLE = os.path.exists('/kaggle/input')
print(f"Running on Kaggle: {IS_KAGGLE}")

# Check device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

In [None]:
# Configuration
class CFG:
    # Data paths
    data_path = "/kaggle/input/llm-classification-finetuning/" if IS_KAGGLE else "llm-classification-finetuning/"
    
    # Training config
    max_length = 512
    batch_size = 8
    epochs = 3
    learning_rate = 2e-5
    weight_decay = 0.01
    seed = 42
    num_labels = 3
    
    # Model paths - will be auto-detected on Kaggle
    model_path = "microsoft/deberta-v3-small"
    tokenizer_path = "microsoft/deberta-v3-small"

# Auto-detect model path on Kaggle
def find_model_paths():
    """Search for DeBERTa model and tokenizer in Kaggle locations."""
    if not IS_KAGGLE:
        return CFG.model_path, CFG.tokenizer_path
    
    model_path = None
    tokenizer_path = None
    
    # Check /kaggle/working paths first (custom saved models)
    working_model = "/kaggle/working/model"
    working_tokenizer = "/kaggle/working/tokenizer"
    
    if os.path.exists(working_model) and os.path.exists(os.path.join(working_model, 'config.json')):
        print(f"✓ Found model at: {working_model}")
        model_path = working_model
    
    if os.path.exists(working_tokenizer) and os.path.exists(os.path.join(working_tokenizer, 'tokenizer_config.json')):
        print(f"✓ Found tokenizer at: {working_tokenizer}")
        tokenizer_path = working_tokenizer
    
    # If found in working directory, return
    if model_path and tokenizer_path:
        return model_path, tokenizer_path
    
    # Common paths where DeBERTa model might be located in /kaggle/input
    possible_paths = [
        "/kaggle/input/deberta-v3-small",
        "/kaggle/input/deberta-v3-small/deberta-v3-small",
        "/kaggle/input/huggingface-deberta-v3-variants/deberta-v3-small",
        "/kaggle/input/deberta-v-3-small/deberta-v3-small",
        "/kaggle/input/deberta-v3-small/pytorch/small/1",
        "/kaggle/input/deberta-v3-small/transformers/small/1",
    ]
    
    # Also search dynamically in /kaggle/input
    if os.path.exists('/kaggle/input'):
        for dataset in os.listdir('/kaggle/input'):
            if 'deberta' in dataset.lower():
                base_path = f'/kaggle/input/{dataset}'
                possible_paths.insert(0, base_path)
                if os.path.isdir(base_path):
                    for subdir in os.listdir(base_path):
                        possible_paths.insert(0, f'{base_path}/{subdir}')
    
    # Find path with config.json (indicates valid model directory)
    for path in possible_paths:
        if os.path.isdir(path) and os.path.exists(os.path.join(path, 'config.json')):
            print(f"✓ Found model at: {path}")
            return path, path  # Same path for both model and tokenizer
    
    # If still not found, show what's available
    print("Searching for model files...")
    print("Available in /kaggle/input:")
    if os.path.exists('/kaggle/input'):
        for item in os.listdir('/kaggle/input'):
            print(f"  - {item}")
    print("\nAvailable in /kaggle/working:")
    if os.path.exists('/kaggle/working'):
        for item in os.listdir('/kaggle/working'):
            print(f"  - {item}")
    
    raise FileNotFoundError(
        "DeBERTa model not found! Please add a DeBERTa model dataset:\n"
        "1. Click '+ Add Data' in your Kaggle notebook\n"
        "2. Search for 'deberta-v3-small'\n"
        "3. Add one of the model datasets"
    )

# Update model paths
CFG.model_path, CFG.tokenizer_path = find_model_paths()

# Set seed
def set_seed(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(CFG.seed)

print(f"Model path: {CFG.model_path}")
print(f"Tokenizer path: {CFG.tokenizer_path}")
print(f"Data path: {CFG.data_path}")

In [None]:
# Load data
train_df = pd.read_csv(f"{CFG.data_path}train.csv")
test_df = pd.read_csv(f"{CFG.data_path}test.csv")

print(f"Training data: {train_df.shape}")
print(f"Test data: {test_df.shape}")

In [None]:
# Helper functions
def safe_literal_eval(x):
    try:
        return literal_eval(x)
    except:
        return x

def clean_text(text):
    """Remove invalid Unicode characters"""
    if isinstance(text, str):
        return text.encode('utf-8', errors='surrogatepass').decode('utf-8', errors='replace')
    return text

def create_input_text(row):
    """Create formatted input text from prompt and responses."""
    if isinstance(row['prompt_parsed'], list):
        prompt_text = " [TURN] ".join(str(p) for p in row['prompt_parsed'])
    else:
        prompt_text = str(row['prompt_parsed'])
    
    if isinstance(row['response_a_parsed'], list):
        response_a_text = " [TURN] ".join(str(r) for r in row['response_a_parsed'])
    else:
        response_a_text = str(row['response_a_parsed'])
        
    if isinstance(row['response_b_parsed'], list):
        response_b_text = " [TURN] ".join(str(r) for r in row['response_b_parsed'])
    else:
        response_b_text = str(row['response_b_parsed'])
    
    return f"[PROMPT] {prompt_text} [RESPONSE_A] {response_a_text} [RESPONSE_B] {response_b_text}"

print("Helper functions defined.")

In [None]:
# Preprocess training data
train_df['prompt_parsed'] = train_df['prompt'].apply(safe_literal_eval)
train_df['response_a_parsed'] = train_df['response_a'].apply(safe_literal_eval)
train_df['response_b_parsed'] = train_df['response_b'].apply(safe_literal_eval)
train_df['input_text'] = train_df.apply(create_input_text, axis=1)
train_df['input_text'] = train_df['input_text'].apply(clean_text)

# Create labels
train_df['label'] = train_df.apply(
    lambda x: 0 if x['winner_model_a'] == 1 else (1 if x['winner_model_b'] == 1 else 2),
    axis=1
)

print(f"Label distribution:")
print(train_df['label'].value_counts().sort_index())
print("\n0=Model A wins, 1=Model B wins, 2=Tie")

In [None]:
# Load tokenizer (using separate tokenizer path)
print(f"Loading tokenizer from: {CFG.tokenizer_path}")
tokenizer = AutoTokenizer.from_pretrained(CFG.tokenizer_path)

def tokenize_function(examples):
    return tokenizer(
        examples['input_text'],
        truncation=True,
        max_length=CFG.max_length,
        padding=False
    )

print("Tokenizer loaded successfully!")

In [None]:
# Train/Val split
train_data, val_data = train_test_split(
    train_df[['input_text', 'label']], 
    test_size=0.15, 
    stratify=train_df['label'],
    random_state=CFG.seed
)

# Create HuggingFace datasets
train_dataset = HFDataset.from_pandas(train_data.reset_index(drop=True))
val_dataset = HFDataset.from_pandas(val_data.reset_index(drop=True))

# Tokenize
train_dataset = train_dataset.map(tokenize_function, batched=True, remove_columns=['input_text'])
val_dataset = val_dataset.map(tokenize_function, batched=True, remove_columns=['input_text'])

# Rename label column
train_dataset = train_dataset.rename_column('label', 'labels')
val_dataset = val_dataset.rename_column('label', 'labels')

print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")

In [None]:
# Load model
print(f"Loading model from: {CFG.model_path}")
model = AutoModelForSequenceClassification.from_pretrained(
    CFG.model_path,
    num_labels=CFG.num_labels,
    problem_type="single_label_classification"
)
model = model.to(device)

print(f"Model loaded! Device: {next(model.parameters()).device}")
print(f"Parameters: {model.num_parameters():,}")

In [None]:
# Data collator and metrics
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    probs = torch.softmax(torch.tensor(predictions), dim=-1).numpy()
    preds = np.argmax(predictions, axis=-1)
    acc = accuracy_score(labels, preds)
    logloss = log_loss(labels, probs, labels=[0, 1, 2])
    return {"accuracy": acc, "log_loss": logloss}

In [None]:
# Training arguments optimized for Kaggle GPU
training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=CFG.learning_rate,
    per_device_train_batch_size=CFG.batch_size,
    per_device_eval_batch_size=CFG.batch_size * 2,
    num_train_epochs=CFG.epochs,
    weight_decay=CFG.weight_decay,
    warmup_ratio=0.1,
    load_best_model_at_end=True,
    metric_for_best_model="log_loss",
    greater_is_better=False,
    logging_steps=100,
    fp16=torch.cuda.is_available(),
    dataloader_num_workers=2,
    dataloader_pin_memory=True,
    gradient_accumulation_steps=2,
    report_to="none",
    seed=CFG.seed,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

print(f"Training on: {training_args.device}")
print(f"FP16: {training_args.fp16}")

In [None]:
# Train the model
gc.collect()
torch.cuda.empty_cache()

print("Starting training...")
trainer.train()
print("Training complete!")

In [None]:
# Prepare test data
test_df['prompt_parsed'] = test_df['prompt'].apply(safe_literal_eval)
test_df['response_a_parsed'] = test_df['response_a'].apply(safe_literal_eval)
test_df['response_b_parsed'] = test_df['response_b'].apply(safe_literal_eval)
test_df['input_text'] = test_df.apply(create_input_text, axis=1)
test_df['input_text'] = test_df['input_text'].apply(clean_text)

print(f"Test samples: {len(test_df)}")

In [None]:
# Generate predictions
def generate_predictions(model, tokenizer, texts, batch_size=16):
    model.eval()
    model.to(device)
    
    all_probs = []
    
    for i in tqdm(range(0, len(texts), batch_size), desc="Predicting"):
        batch_texts = texts[i:i+batch_size]
        
        inputs = tokenizer(
            batch_texts,
            truncation=True,
            max_length=CFG.max_length,
            padding=True,
            return_tensors="pt"
        ).to(device)
        
        with torch.no_grad():
            outputs = model(**inputs)
            probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()
            all_probs.append(probs)
    
    return np.vstack(all_probs)

# Generate test predictions
test_predictions = generate_predictions(model, tokenizer, test_df['input_text'].tolist())
print(f"Predictions shape: {test_predictions.shape}")

In [None]:
# Create submission
submission = pd.DataFrame({
    'id': test_df['id'],
    'winner_model_a': test_predictions[:, 0],
    'winner_model_b': test_predictions[:, 1],
    'winner_tie': test_predictions[:, 2]
})

submission.to_csv('submission.csv', index=False)
print("Submission saved!")
print(submission.head())

In [None]:
# Verify submission
print("\nSubmission Statistics:")
print(f"Shape: {submission.shape}")
print(f"\nProbability ranges:")
print(f"  winner_model_a: [{submission['winner_model_a'].min():.4f}, {submission['winner_model_a'].max():.4f}]")
print(f"  winner_model_b: [{submission['winner_model_b'].min():.4f}, {submission['winner_model_b'].max():.4f}]")
print(f"  winner_tie:     [{submission['winner_tie'].min():.4f}, {submission['winner_tie'].max():.4f}]")
print(f"\nRow sums (should be ~1.0): {submission[['winner_model_a', 'winner_model_b', 'winner_tie']].sum(axis=1).mean():.4f}")