In [None]:
import torch
import time
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
from unsloth import FastLanguageModel
from transformers import AutoModelForCausalLM, AutoTokenizer, TextStreamer
from peft import PeftModel
from datasets import load_dataset
import Levenshtein
from difflib import SequenceMatcher
MODEL_CONFIGS = {
    "Base Model": {
        "path": "Qwen/Qwen2.5-Coder-0.5B-Instruct",
        "type": "base",
        "quantized": False
    },
    "SFT Merged": {
        "path": "./final_model",
        "type": "base", 
        "quantized": False
    },
    "SFT Quantized": {
        "path": "./final_model",
        "type": "base",
        "quantized": True
    },
    "DPO Adapter": {
        "path": "Qwen/Qwen2.5-Coder-0.5B-Instruct", 
        "adapter": "./dpo_model",
        "type": "adapter",
        "quantized": False
    },
    "DPO Merged": {
        "path": "./dpo_merged_model",
        "type": "base",
        "quantized": False
    },
    "DPO Quantized": {
        "path": "./dpo_merged_model",
        "type": "base",
        "quantized": True
    }
}
TEST_DATA_PATH = "./test.jsonl"
OUTPUT_FILE = "model_comparison_results.csv"

In [None]:
def calculate_metrics(prediction, reference, start_time, end_time):
    pred = prediction.strip()
    ref = reference.strip()
    
    # TTS (Time To Start / Latency in ms)
    tts = (end_time - start_time) * 1000
    
    # EM (Exact Match)
    em = 1.0 if pred == ref else 0.0
    
    # ES (Edit Similarity)
    if not pred and not ref:
        es = 1.0
    else:
        dist = Levenshtein.distance(pred, ref)
        max_len = max(len(pred), len(ref))
        es = 1.0 - (dist / max_len) if max_len > 0 else 0.0
        
    # PL (Perfect Lines)
    pred_lines = pred.split('\n')
    ref_lines = ref.split('\n')
    common = 0
    if ref_lines:
        common = sum(1 for p, r in zip(pred_lines, ref_lines) if p.strip() == r.strip())
        pl = common / len(ref_lines)
    else:
        pl = 0.0
        
    # MR (Matched Ratio)
    mr = SequenceMatcher(None, pred, ref).ratio()
    
    # RoCC (Ratio of Correct Characters)
    correct_chars = sum(1 for p, r in zip(pred, ref) if p == r)
    rocc = correct_chars / max(len(ref), 1)
    
    return {
        "EM": em,
        "ES": es,
        "PL": pl,
        "MR": mr,
        "RoCC": rocc,
        "TTS": tts
    }

In [None]:
def load_test_data(path, num_samples=100):
    ds = load_dataset('json', data_files=path, split='train')
    if len(ds) > num_samples:
        ds = ds.select(range(num_samples))
    
    samples = []
    for item in ds:
        text = item['text']
        if '<fim_middle>' in text:
            parts = text.split('<fim_middle>')
            prompt = parts[0] + '<fim_middle>'
            reference = parts[1].split('<file_sep>')[0] if '<file_sep>' in parts[1] else parts[1]
            samples.append((prompt, reference))
    return samples
test_samples = load_test_data(TEST_DATA_PATH, num_samples=200)
print(f"Loaded {len(test_samples)} samples")

In [None]:
def evaluate_model(config_name, config):
    print(f"Loading {config_name}...")
    results = []
    model = None
    tokenizer = None
    
    try:
        if config["quantized"]:
            model, tokenizer = FastLanguageModel.from_pretrained(
                model_name=config["path"],
                max_seq_length=2048,
                dtype=None,
                load_in_4bit=True
            )
        else:
            model = AutoModelForCausalLM.from_pretrained(
                config["path"],
                torch_dtype=torch.float16,
                device_map="auto"
            )
            tokenizer = AutoTokenizer.from_pretrained(config["path"])
            
        if config["type"] == "adapter":
            model = PeftModel.from_pretrained(model, config["adapter"])
            
        if not hasattr(model, "generate"):
             FastLanguageModel.for_inference(model)
        
        tokenizer.padding_side = "left"
        if not tokenizer.pad_token:
            tokenizer.pad_token = tokenizer.eos_token
            
        print(f"Evaluating {config_name}...")
        for prompt, reference in tqdm(test_samples):
            inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
            
            torch.cuda.synchronize()
            start = time.time()
            
            with torch.no_grad():
                outputs = model.generate(
                    **inputs,
                    max_new_tokens=128,
                    pad_token_id=tokenizer.pad_token_id,
                    use_cache=True
                )
            
            torch.cuda.synchronize()
            end = time.time()
            
            gen_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
            if prompt in gen_text:
                prediction = gen_text.replace(prompt, "")
            else:
                prediction = gen_text[len(prompt):]
                
            metrics = calculate_metrics(prediction, reference, start, end)
            metrics["Model"] = config_name
            results.append(metrics)
            
        del model
        del tokenizer
        torch.cuda.empty_cache()
        return results
        
    except Exception as e:
        print(f"Error evaluating {config_name}: {e}")
        return []

In [None]:
all_results = []
for name, config in MODEL_CONFIGS.items():
    res = evaluate_model(name, config)
    all_results.extend(res)
    
df = pd.DataFrame(all_results)
df.to_csv(OUTPUT_FILE, index=False)

In [None]:
summary = df.groupby("Model").mean().reset_index()
print("\nFinal Comparison:")
print(summary.to_markdown(index=False))
best_model = summary.loc[summary['ES'].idxmax()]
print("\nBest Model by Edit Similarity:")
print(best_model)