# LLM Code Style Classifier

Train a model to identify which LLM wrote a piece of code based on coding style, patterns, and conventions.

**Purpose**: Given code from different LLMs, the classifier learns to distinguish between them based on:
- Docstring style and formatting
- Type hints usage
- Naming conventions
- Error handling patterns
- Code structure and idioms

**Steps**:
1. Load all code samples from different LLMs in the training directory
2. Fine-tune a model to classify code authorship
3. Use the trained model to identify which LLM wrote specific code


In [None]:
# Install dependencies (optional; uncomment if needed)
# %pip install -q transformers accelerate datasets torch pyyaml tqdm rich

from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple

import json
import random
from collections import defaultdict

from datasets import Dataset
from transformers import AutoTokenizer, AutoModelForCausalLM, Trainer, TrainingArguments



In [None]:
# Config - edit these
INPUT_DATA_DIR = "../data/results/mbpp-sanitized/train"  # directory with model-generated code
DATASET_FILTER = "mbpp"  # filter by benchmark name, or None for all

# Model settings
BASE_MODEL = "unsloth/Llama-3.2-1B-Instruct"  # or "meta-llama/Llama-3.2-3B-Instruct", "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
OUTPUT_DIR = "../outputs/code_style_classifier"

# Training hyperparameters
EPOCHS = 3
BATCH_SIZE = 4
GRAD_ACCUM_STEPS = 4
LEARNING_RATE = 2e-5
MAX_LENGTH = 1024
WARMUP_RATIO = 0.1
WEIGHT_DECAY = 0.01

# Data split
SEED = 42
TRAIN_RATIO = 0.8
EVAL_RATIO = 0.1  # remaining will be test set


In [None]:
# Utilities

def read_jsonl(path: str | Path) -> Iterable[Dict[str, Any]]:
    """Read JSONL file and yield parsed records."""
    p = Path(path)
    if not p.exists():
        return iter(())
    with p.open("r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            yield json.loads(line)


def format_code_for_classification(task_prompt: str, code: str) -> str:
    """Format the input for the classifier: task description + code to analyze."""
    template = (
        "Analyze the coding style of the following code snippet.\n\n"
        "Task: {task_prompt}\n\n"
        "Code:\n{code}\n\n"
        "Which model wrote this code?"
    )
    return template.format(task_prompt=task_prompt.strip(), code=code.strip())


In [None]:
# Load and prepare dataset

# Discover all model files
input_dir = Path(INPUT_DATA_DIR)
jsonl_files = sorted(input_dir.glob("*.jsonl"))
print(f"📁 Discovered {len(jsonl_files)} model files in {INPUT_DATA_DIR}:")
for f in jsonl_files:
    print(f"   • {f.name}")

# Load all code samples
samples = []
norm_filter = DATASET_FILTER.strip().lower() if DATASET_FILTER else None

for jsonl_file in jsonl_files:
    for record in read_jsonl(jsonl_file):
        benchmark = str(record.get("benchmark", "")).strip()
        if norm_filter and benchmark.lower() != norm_filter:
            continue
        
        samples.append({
            "task_prompt": str(record.get("prompt", "")),
            "code": str(record.get("generated_code", "")),
            "model_name": str(record.get("model_name", "")),
            "task_id": str(record.get("task_id", "")),
            "benchmark": benchmark
        })

print(f"\n📊 Loaded {len(samples)} code samples")

# Build model vocabulary (label mapping)
unique_models = sorted(set(s["model_name"] for s in samples))
model2id = {model: idx for idx, model in enumerate(unique_models)}
id2model = {idx: model for model, idx in model2id.items()}

print(f"\n🤖 Models to classify ({len(unique_models)}):")
for model, idx in model2id.items():
    count = sum(1 for s in samples if s["model_name"] == model)
    print(f"   [{idx}] {model}: {count} samples")

# Create training examples
examples = []
for sample in samples:
    text = format_code_for_classification(sample["task_prompt"], sample["code"])
    label = model2id[sample["model_name"]]
    examples.append({
        "text": text,
        "label": label,
        "model_name": sample["model_name"]
    })

# Split into train/eval/test
random.seed(SEED)
random.shuffle(examples)

n = len(examples)
train_end = int(n * TRAIN_RATIO)
eval_end = int(n * (TRAIN_RATIO + EVAL_RATIO))

train_examples = examples[:train_end]
eval_examples = examples[train_end:eval_end]
test_examples = examples[eval_end:]

print(f"\n📈 Dataset split:")
print(f"   • Train: {len(train_examples)} samples")
print(f"   • Eval:  {len(eval_examples)} samples")
print(f"   • Test:  {len(test_examples)} samples")

# Convert to HuggingFace datasets
train_ds = Dataset.from_list(train_examples)
eval_ds = Dataset.from_list(eval_examples)
test_ds = Dataset.from_list(test_examples)

# Save model mapping for later use
import pickle
output_dir = Path(OUTPUT_DIR)
output_dir.mkdir(parents=True, exist_ok=True)
with open(output_dir / "model_mapping.pkl", "wb") as f:
    pickle.dump({"model2id": model2id, "id2model": id2model}, f)
print(f"\n💾 Saved model mapping to {output_dir / 'model_mapping.pkl'}")


In [None]:
# Prepare model and tokenizer

from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch

print("🔧 Loading tokenizer and model...")
tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL, use_fast=True)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

# For sequence classification, we need a model with a classification head
num_labels = len(model2id)
model = AutoModelForSequenceClassification.from_pretrained(
    BASE_MODEL,
    num_labels=num_labels,
    problem_type="single_label_classification"
)

print(f"✅ Loaded {BASE_MODEL}")
print(f"   • Classifier head: {num_labels} classes")
print(f"   • Max sequence length: {MAX_LENGTH}")

# Tokenization function
def tokenize_function(examples):
    return tokenizer(
        examples["text"],
        padding=False,
        truncation=True,
        max_length=MAX_LENGTH,
    )

# Tokenize datasets
print("\n🔄 Tokenizing datasets...")
train_tokenized = train_ds.map(
    tokenize_function,
    batched=True,
    remove_columns=["text", "model_name"]
)
eval_tokenized = eval_ds.map(
    tokenize_function,
    batched=True,
    remove_columns=["text", "model_name"]
)
test_tokenized = test_ds.map(
    tokenize_function,
    batched=True,
    remove_columns=["text", "model_name"]
)

print("✅ Tokenization complete")


In [None]:
# Evaluate on test set

print("\n📊 Evaluating on test set...")
test_results = trainer.predict(test_tokenized)

# Get predictions
predictions = np.argmax(test_results.predictions, axis=1)
labels = test_results.label_ids

# Overall metrics
print(f"\n✨ Test Set Results:")
print(f"   • Accuracy: {accuracy_score(labels, predictions):.4f}")

# Per-class metrics
print(f"\n📈 Per-Model Classification Report:")
print(classification_report(
    labels, 
    predictions,
    target_names=[id2model[i] for i in range(len(id2model))],
    digits=4
))

# Confusion insights
print("\n🔍 Confusion Analysis:")
for true_label in range(len(id2model)):
    true_model = id2model[true_label]
    mask = labels == true_label
    if mask.sum() == 0:
        continue
    
    preds_for_model = predictions[mask]
    correct = (preds_for_model == true_label).sum()
    total = mask.sum()
    
    print(f"   • {true_model}:")
    print(f"      Correct: {correct}/{total} ({100*correct/total:.1f}%)")
    
    # Show most common misclassifications
    if correct < total:
        wrong_preds = preds_for_model[preds_for_model != true_label]
        if len(wrong_preds) > 0:
            unique, counts = np.unique(wrong_preds, return_counts=True)
            top_confusion = sorted(zip(unique, counts), key=lambda x: x[1], reverse=True)[:2]
            print(f"      Often confused with: {', '.join([id2model[int(i)] for i, _ in top_confusion])}")


In [None]:
# Inference example: Compare two code snippets

def classify_code(task_prompt: str, code: str, model, tokenizer, model2id, id2model):
    """Classify which LLM likely wrote the given code."""
    text = format_code_for_classification(task_prompt, code)
    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=MAX_LENGTH)
    
    # Move to same device as model
    inputs = {k: v.to(model.device) for k, v in inputs.items()}
    
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
        probs = torch.softmax(logits, dim=-1)[0]
    
    # Get top predictions
    top_probs, top_indices = torch.topk(probs, k=min(3, len(model2id)))
    
    results = []
    for prob, idx in zip(top_probs.cpu().numpy(), top_indices.cpu().numpy()):
        results.append({
            "model": id2model[int(idx)],
            "probability": float(prob),
            "confidence": float(prob * 100)
        })
    
    return results


def compare_two_codes(task_prompt: str, code1: str, code2: str, model, tokenizer, model2id, id2model):
    """Compare two code snippets and identify which model likely wrote each."""
    print(f"🔍 Analyzing code snippets for task:\n   '{task_prompt[:100]}...'\n")
    
    # Classify code 1
    results1 = classify_code(task_prompt, code1, model, tokenizer, model2id, id2model)
    print(f"📝 Code 1 predictions:")
    for r in results1:
        print(f"   • {r['model']}: {r['confidence']:.1f}%")
    
    print()
    
    # Classify code 2
    results2 = classify_code(task_prompt, code2, model, tokenizer, model2id, id2model)
    print(f"📝 Code 2 predictions:")
    for r in results2:
        print(f"   • {r['model']}: {r['confidence']:.1f}%")
    
    print()
    print(f"💡 Most likely:")
    print(f"   Code 1 → {results1[0]['model']} ({results1[0]['confidence']:.1f}%)")
    print(f"   Code 2 → {results2[0]['model']} ({results2[0]['confidence']:.1f}%)")
    
    return results1, results2


# Example usage with test data
if len(test_ds) >= 2:
    print("=" * 80)
    print("🎯 EXAMPLE: Comparing two code snippets")
    print("=" * 80)
    
    # Get two different samples
    sample1 = test_ds[0]
    sample2 = test_ds[1]
    
    # Use the same task prompt for fair comparison
    task_prompt = sample1["task_prompt"]
    code1 = sample1["text"].split("Code:\n")[1].split("\n\nWhich model")[0]
    code2 = sample2["text"].split("Code:\n")[1].split("\n\nWhich model")[0]
    
    results1, results2 = compare_two_codes(
        task_prompt, code1, code2,
        model, tokenizer, model2id, id2model
    )
    
    print(f"\n✅ Actual labels:")
    print(f"   Code 1 was written by: {sample1['model_name']}")
    print(f"   Code 2 was written by: {sample2['model_name']}")


In [None]:
# Train the classifier

from transformers import Trainer, TrainingArguments, DataCollatorWithPadding
import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, classification_report

# Data collator for dynamic padding
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

# Evaluation metrics
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    
    accuracy = accuracy_score(labels, predictions)
    precision, recall, f1, _ = precision_recall_fscore_support(
        labels, predictions, average='weighted', zero_division=0
    )
    
    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1,
    }

# Training arguments
training_args = TrainingArguments(
    output_dir=OUTPUT_DIR,
    num_train_epochs=EPOCHS,
    per_device_train_batch_size=BATCH_SIZE,
    per_device_eval_batch_size=BATCH_SIZE,
    gradient_accumulation_steps=GRAD_ACCUM_STEPS,
    learning_rate=LEARNING_RATE,
    weight_decay=WEIGHT_DECAY,
    warmup_ratio=WARMUP_RATIO,
    logging_dir=f"{OUTPUT_DIR}/logs",
    logging_steps=50,
    eval_strategy="steps",
    eval_steps=200,
    save_strategy="steps",
    save_steps=200,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    save_total_limit=2,
    report_to="none",
    seed=SEED,
)

# Initialize trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_tokenized,
    eval_dataset=eval_tokenized,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

print("\n🚀 Starting training...")
print(f"   • Epochs: {EPOCHS}")
print(f"   • Batch size: {BATCH_SIZE}")
print(f"   • Learning rate: {LEARNING_RATE}")
print(f"   • Training samples: {len(train_tokenized)}")

# Train
trainer.train()

# Save the model
trainer.save_model(OUTPUT_DIR)
tokenizer.save_pretrained(OUTPUT_DIR)

print(f"\n✅ Training complete!")
print(f"💾 Model saved to: {OUTPUT_DIR}")
