In [2]:
!pip install --no-deps unsloth bitsandbytes accelerate xformers peft trl tokenizers
!pip install --no-deps "transformers>=4.51.0"
!pip install sentencepiece protobuf huggingface_hub hf_transfer
!pip install unsloth_zoo


Collecting unsloth
  Downloading unsloth-2026.2.1-py3-none-any.whl.metadata (69 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/69.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting bitsandbytes
  Downloading bitsandbytes-0.49.2-py3-none-manylinux_2_24_x86_64.whl.metadata (10 kB)
Collecting xformers
  Downloading xformers-0.0.34-cp39-abi3-manylinux_2_28_x86_64.whl.metadata (1.2 kB)
Collecting trl
  Downloading trl-0.28.0-py3-none-any.whl.metadata (11 kB)
Downloading unsloth-2026.2.1-py3-none-any.whl (432 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m432.3/432.3 kB[0m [31m47.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading bitsandbytes-0.49.2-py3-none-manylinux_2_24_x86_64.whl (60.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.7/60.7 MB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
[?25hDown

Zero Shot Inference on base models

Environment Setup: Mounts Google Drive and defines paths for datasets and inference results.

Model Loading: load base model: Gemma, Qwen, Llama


In [None]:
import torch
import gc
import os
import json
from tqdm import tqdm
from unsloth import FastLanguageModel


#  MODEL SELECTION
# MODEL_ID = "unsloth/Llama-3.2-1B-Instruct-bnb-4bit"
# MODEL_ID = "unsloth/gemma-2-2b-it-bnb-4bit"
MODEL_ID = "Qwen/Qwen3-0.6B"


# Path
BASE_DIR = "/content/drive/MyDrive/DnD_Project_Data"
TEST_DATA_PATH = os.path.join(BASE_DIR, "processed_dataset", "dnd_test.json")
BASE_RESULTS_DIR = os.path.join(BASE_DIR, "inference_results_baseline")

safe_model_name = MODEL_ID.split("/")[-1].replace("-bnb-4bit", "")
MODEL_SPECIFIC_DIR = os.path.join(BASE_RESULTS_DIR, safe_model_name)

# Create directory
os.makedirs(MODEL_SPECIFIC_DIR, exist_ok=True)

# Prompt Template
alpaca_prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
{}"""

# MEMORY CLEANUP

print(f"Cleaning GPU memory before loading {safe_model_name}...")

# Delete previous instances if they exist
if 'model' in globals():
    del model
if 'tokenizer' in globals():
    del tokenizer

# Force garbage collection
gc.collect()
torch.cuda.empty_cache()
print("[INFO] Memory cleaned.")


# LOAD BASE MODEL

print(f"[INFO] Loading Model: {MODEL_ID}...")

try:
    model, tokenizer = FastLanguageModel.from_pretrained(
        model_name = MODEL_ID,
        max_seq_length = 2048,
        dtype = None,
        load_in_4bit = True,
    )
except Exception as e:
    print(f"Failed to load model {MODEL_ID}. Error: {e}")
    raise e

FastLanguageModel.for_inference(model)

# Tokenizer Configuration
tokenizer.padding_side = "left"
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

#Inference
output_file = os.path.join(MODEL_SPECIFIC_DIR, f"predictions_{safe_model_name}_BASELINE.json")
print(f"[INFO] Output file set to: {output_file}")

# Load Test Dataset
with open(TEST_DATA_PATH, 'r') as f:
    test_data = json.load(f)

BATCH_SIZE = 16
results = []

# Resume Capability
if os.path.exists(output_file):
    try:
        with open(output_file, 'r') as f:
            results = json.load(f)
        print(f"Resuming from example {len(results)}")
    except json.JSONDecodeError:
        print("Output file corrupted or empty. Starting from scratch.")
        results = []

start_index = len(results)
data_slice = test_data[start_index:]

# Generation Parameters
gen_kwargs = {
    "max_new_tokens": 1024,
    "use_cache": True,
    "temperature": 0.2,
    "min_p": 0.1,
    "pad_token_id": tokenizer.pad_token_id,
    "eos_token_id": tokenizer.eos_token_id
}

# Main Loop
if len(data_slice) > 0:
    for i in tqdm(range(0, len(data_slice), BATCH_SIZE), desc=f"Testing {safe_model_name}"):

        # Prepare Batch
        batch_samples = data_slice[i : i + BATCH_SIZE]
        batch_prompts = []

        for sample in batch_samples:
            raw_prompt = sample['llm_prompt']

            # Parse Instruction vs Input
            if "Input Data:" in raw_prompt:
                parts = raw_prompt.split("Input Data:")
                instr = parts[0].strip()
                inp_data = parts[1].strip()
            elif "Character Sheet:" in raw_prompt:
                parts = raw_prompt.split("Character Sheet:")
                instr = parts[0].strip()
                inp_data = parts[1].strip()
            else:
                instr = raw_prompt
                inp_data = ""

            # Format using Alpaca Template
            full_prompt = alpaca_prompt.format(instr, inp_data, "")
            batch_prompts.append(full_prompt)

        # Tokenize
        inputs = tokenizer(
            batch_prompts,
            return_tensors="pt",
            padding=True,
            truncation=True
        ).to("cuda")

        # Generate
        with torch.no_grad():
            outputs = model.generate(**inputs, **gen_kwargs)

        # Decode
        input_len = inputs.input_ids.shape[1]
        generated_tokens = outputs[:, input_len:]
        decoded_responses = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)

        # Results
        for idx, resp in enumerate(decoded_responses):
            results.append({
                "task_type": batch_samples[idx].get("task_type"),
                "input_prompt": batch_prompts[idx],
                "generated_response": resp.strip(),
                "expected_output": batch_samples[idx].get("expected_output")
            })

        # Save Checkpoint
        with open(output_file, 'w') as f:
            json.dump(results, f, indent=2)

    print(f"Baseline inference completed for {safe_model_name}.")
else:
    print(f"Inference already completed for {safe_model_name}.")

Fine-Tuning Pipeline

Environment Setup: Mounts Google Drive and defines paths for datasets, model storage, and inference results.

Model Loading: Supports loading varying base models (e.g., Gemma 2, Qwen, Llama 3) in 4-bit quantization (for GPU limits).

LoRA Configuration: Applies Low-Rank Adaptation (LoRA) to efficiently fine-tune the model parameters

Training Loop:
* Checks if a fine-tuned adapter already exists
* If not, initiates the trainer on the processed training dataset
* Uses the Alpaca prompt format (Instruction/Input/Response)

Batch Inference: Runs optimized inference on the test set


In [None]:
import torch
import gc
import os
import json
import random
import re
from tqdm import tqdm
from google.colab import drive, userdata
from unsloth import FastLanguageModel, is_bfloat16_supported
from trl import SFTTrainer
from transformers import TrainingArguments, TextStreamer
from datasets import load_dataset

# Mount Drive
drive.mount('/content/drive')

# PATH
BASE_DIR = "/content/drive/MyDrive/DnD_Project_Data"
TRAIN_DATA_PATH = os.path.join(BASE_DIR, "processed_dataset", "dnd_train.json")
TEST_DATA_PATH = os.path.join(BASE_DIR, "processed_dataset", "dnd_test.json")
OUTPUT_MODELS_DIR = os.path.join(BASE_DIR, "fine_tuned_models_unsloth")
OUTPUT_RESULTS_DIR = os.path.join(BASE_DIR, "inference_results")

# Create directories
os.makedirs(OUTPUT_MODELS_DIR, exist_ok=True)
os.makedirs(OUTPUT_RESULTS_DIR, exist_ok=True)

# MODEL SELECTION
MODELS_TO_TRAIN = [
    "Qwen/Qwen3-0.6B",
    #"unsloth/Llama-3.2-1B-Instruct-bnb-4bit",
    #"unsloth/gemma-2-2b-it-bnb-4bit"
]

# Training Configuration
MAX_SEQ_LENGTH = 2048
DTYPE = None
LOAD_IN_4BIT = True

# Prompt Template
alpaca_prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
{}"""

# UTILITY FUNCTIONS


def clean_memory():
    import gc
    import torch
    print("Cleaning memory")

    target_vars = ['model', 'tokenizer', 'trainer']

    for var_name in target_vars:
        if var_name in globals():
            del globals()[var_name]

    gc.collect()
    torch.cuda.empty_cache()
    print("Memoria pulita.\n")

def format_prompts(examples):
    """Formats the raw dataset examples into the Alpaca prompt structure."""
    EOS_TOKEN = tokenizer.eos_token
    instructions = examples["instruction"]
    inputs       = examples["input"]
    outputs      = examples["output"]
    texts = []

    for instruction, input_data, output_data in zip(instructions, inputs, outputs):
        # Convert dict/list inputs to JSON string for consistency
        input_str = json.dumps(input_data, indent=2) if isinstance(input_data, (dict, list)) else str(input_data)
        output_str = json.dumps(output_data, indent=2) if isinstance(output_data, (dict, list)) else str(output_data)

        # Format text
        text = alpaca_prompt.format(instruction, input_str, output_str) + EOS_TOKEN
        texts.append(text)

    return { "text" : texts }

def run_full_inference_benchmark(model, tokenizer, test_path, model_name):
    """
    Batch Inference with periodic saving to Drive
    """
    print(f"\nSTARTING FULL INFERENCE (BATCH MODE) FOR {model_name}...")

    # 1. SETUP
    FastLanguageModel.for_inference(model)
    tokenizer.padding_side = "left"
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    BATCH_SIZE = 16

    with open(test_path, 'r') as f:
        test_data = json.load(f)

    safe_name = model_name.split("/")[-1].replace("-bnb-4bit", "")
    output_file = os.path.join(OUTPUT_RESULTS_DIR, f"predictions_{safe_name}.json")

    # RESUME LOGIC
    results = []
    start_index = 0

    if os.path.exists(output_file):
        print(f"Found partial results file: {output_file}")
        try:
            with open(output_file, 'r') as f:
                results = json.load(f)
            start_index = len(results)
            print(f"Resuming from example {start_index} of {len(test_data)}")
        except json.JSONDecodeError:
            print("Partial file corrupted. Restarting from zero.")
            results = []

    if start_index >= len(test_data):
        print("Inference already completed for this model!")
        return

    # BATCH LOOP
    data_slice = test_data[start_index:]

    # Configure generation parameters based on model type
    gen_kwargs = {
        "max_new_tokens": 1024,
        "use_cache": True,
        "pad_token_id": tokenizer.pad_token_id,
        "eos_token_id": tokenizer.eos_token_id,
        "temperature": 0.2, # Low temperature for more deterministic results
        "min_p": 0.1
    }

    if "qwen" in model_name.lower():
        gen_kwargs["repetition_penalty"] = 1.1

    # Process data in chunks
    for i in tqdm(range(0, len(data_slice), BATCH_SIZE), desc=f"Testing {safe_name}"):

        # Prepare Batch
        batch_samples = data_slice[i : i + BATCH_SIZE]
        batch_prompts = []

        for sample in batch_samples:
            raw_prompt = sample['llm_prompt']

            # Simple parsing to separate Instruction from Input for formatting
            if "Input Data:" in raw_prompt:
                parts = raw_prompt.split("Input Data:")
                instr = parts[0].strip()
                inp_data = parts[1].strip()
            elif "Character Sheet:" in raw_prompt:
                parts = raw_prompt.split("Character Sheet:")
                instr = parts[0].strip()
                inp_data = parts[1].strip()
            else:
                instr = raw_prompt
                inp_data = ""

            full_prompt = alpaca_prompt.format(instr, inp_data, "")
            batch_prompts.append(full_prompt)

        # Tokenize Batch
        inputs = tokenizer(
            batch_prompts,
            return_tensors="pt",
            padding=True,
            truncation=True
        ).to("cuda")

        # Generate
        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                **gen_kwargs
            )

        # Decode
        input_len = inputs.input_ids.shape[1]
        generated_tokens = outputs[:, input_len:]
        decoded_responses = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)

        # Store
        for idx, resp in enumerate(decoded_responses):
            original_sample = batch_samples[idx]
            results.append({
                "task_type": original_sample.get("task_type"),
                "input_prompt": batch_prompts[idx],
                "generated_response": resp.strip(),
                "expected_output": original_sample.get("expected_output")
            })

        # SAVE CHECKPOINT
        with open(output_file, 'w') as f:
            json.dump(results, f, indent=2)
            f.flush()
            os.fsync(f.fileno())

    print(f"Results saved to: {output_file}")


# MAIN PIPELINE - TRAIN & INFERENCE


for model_name in MODELS_TO_TRAIN:
    print(f"\n{'='*60}")
    print(f"PIPELINE STARTED FOR: {model_name}")
    print(f"{'='*60}\n")

    safe_name = model_name.split("/")[-1].replace("-bnb-4bit", "")
    save_path = os.path.join(OUTPUT_MODELS_DIR, f"{safe_name}_DnD_Adapter")

    try:
        # CHECK IF MODEL EXISTS
        if os.path.exists(save_path):
            print(f"FOUND EXISTING ADAPTER AT: {save_path}")
            print("Skipping training phase. Loading adapter from Drive")

            model, tokenizer = FastLanguageModel.from_pretrained(
                model_name = save_path,
                max_seq_length = MAX_SEQ_LENGTH,
                dtype = DTYPE,
                load_in_4bit = LOAD_IN_4BIT,
            )
            should_train = False

        else:
            print(f"No existing adapter found, starting training ")

            # Load base model
            model, tokenizer = FastLanguageModel.from_pretrained(
                model_name = model_name,
                max_seq_length = MAX_SEQ_LENGTH,
                dtype = DTYPE,
                load_in_4bit = LOAD_IN_4BIT,
            )

            # Configure LoRA
            model = FastLanguageModel.get_peft_model(
                model,
                r = 16,
                target_modules = ["q_proj", "k_proj", "v_proj", "o_proj",
                                  "gate_proj", "up_proj", "down_proj"],
                lora_alpha = 16,
                lora_dropout = 0,
                bias = "none",
                use_gradient_checkpointing = "unsloth",
                random_state = 3407,
                use_rslora = False,
                loftq_config = None,
            )
            should_train = True

        # TRAINING
        if should_train:
            dataset = load_dataset("json", data_files=TRAIN_DATA_PATH, split="train")
            dataset = dataset.map(format_prompts, batched = True)

            os.environ["WANDB_DISABLED"] = "true"

            print("Starting SFT Training...")
            trainer = SFTTrainer(
                model = model,
                tokenizer = tokenizer,
                train_dataset = dataset,
                dataset_text_field = "text",
                max_seq_length = MAX_SEQ_LENGTH,
                dataset_num_proc = 2,
                packing = False,
                args = TrainingArguments(
                    per_device_train_batch_size = 2,
                    gradient_accumulation_steps = 4,
                    warmup_steps = 10,
                    num_train_epochs = 1,
                    learning_rate = 2e-4,
                    fp16 = not is_bfloat16_supported(),
                    bf16 = is_bfloat16_supported(),
                    logging_steps = 50,
                    optim = "adamw_8bit",
                    weight_decay = 0.01,
                    lr_scheduler_type = "linear",
                    seed = 3407,
                    output_dir = "outputs",
                    report_to = "none",
                ),
            )
            trainer.train()

            print(f"Saving Adapter to: {save_path}")
            model.save_pretrained(save_path)
            tokenizer.save_pretrained(save_path)

        # INFERENCE
        run_full_inference_benchmark(model, tokenizer, TEST_DATA_PATH, model_name)

        # CLEANUP
        clean_memory()

    except Exception as e:
        print(f"error with {model_name}: {e}")
        clean_memory()
        continue

ONE SHOT INFERENCE - on fine-tuned model

In [None]:
import torch
import os
import json
import gc
from unsloth import FastLanguageModel
from tqdm import tqdm

# PATH
BASE_DIR = "/content/drive/MyDrive/DnD_Project_Data"
ADAPTER_PATH = os.path.join(BASE_DIR, "fine_tuned_models_unsloth", "Llama-3.2-1B-Instruct_DnD_Adapter")
TEST_DATA_PATH = os.path.join(BASE_DIR, "processed_dataset", "dnd_test.json")
OUTPUT_FILE = os.path.join(BASE_DIR, "inference_results", "predictions_Llama_1B_OneShot_Smart.json")

# LOAD
try:
    del model
    del tokenizer
except:
    pass
gc.collect()
torch.cuda.empty_cache()

print(f"loading model: {ADAPTER_PATH}...")
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = ADAPTER_PATH,
    max_seq_length = 2048,
    dtype = None,
    load_in_4bit = True,
)
FastLanguageModel.for_inference(model)

# EXAMPLES
alpaca_template = "Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.\n\n### Instruction:\n{}\n\n### Input:\n{}\n\n### Response:\n{}"

# GENERATION
ex_gen_instr = "Generate a complete D&D 5e character sheet based on the provided attributes."
ex_gen_input = '{"race": "Hill Dwarf", "class": "Cleric", "level": 1, "background": "Acolyte", "stats": {"str": 14, "dex": 8, "con": 15, "int": 10, "wis": 16, "cha": 12}}'
ex_gen_output = """```json
{
  "race": "Hill Dwarf",
  "subrace": null,
  "class": "Cleric",
  "subclass": "Life Domain",
  "level": 1,
  "background": "Acolyte",
  "stats": {"str": 14, "dex": 8, "con": 15, "int": 10, "wis": 16, "cha": 12},
  "hp": 11,
  "ac": 16,
  "alignment": "Lawful Good",
  "skills": ["Insight", "Religion", "Medicine"],
  "weapons": ["Warhammer", "Light Crossbow"],
  "spells": ["Bless", "Cure Wounds", "Sacred Flame"],
  "feats": []
}
```"""

# COMPLETION
ex_fill_instr = "Complete this D&D 5e character sheet. Replace the NULL value(s) with correct value(s) consistent with the rules."
ex_fill_input = '{"race": "Lightfoot Halfling", "class": "Rogue", "level": 1, "subclass": null, "alignment": null, "stats": {"str": 8, "dex": 17, "con": 14, "int": 13, "wis": 12, "cha": 11}}'
ex_fill_output = """```json
{
  "race": "Lightfoot Halfling",
  "class": "Rogue",
  "level": 1,
  "subclass": null,
  "alignment": "Chaotic Neutral",
  "stats": {"str": 8, "dex": 17, "con": 14, "int": 13, "wis": 12, "cha": 11},
  "hp": 10,
  "ac": 14,
  "skills": ["Stealth", "Acrobatics", "Sleight of Hand", "Deception"],
  "weapons": ["Dagger", "Shortbow"]
}
```"""

# REFUSAL
ex_ref_instr = "Generate a complete D&D 5e character sheet based on the provided attributes."
ex_ref_input = '{"race": "Saiyan", "class": "Super Warrior", "level": 9000, "stats": {"str": 100}}'
ex_ref_output = """```json
{
  "message": "The provided character data contains invalid, homebrew, or rule-breaking content inconsistent with D&D 5e rules. Cannot generate a character sheet."
}
```"""

# map task -> example
TASK_EXAMPLES = {
    "generation": (ex_gen_instr, ex_gen_input, ex_gen_output),
    "completion": (ex_fill_instr, ex_fill_input, ex_fill_output),
    "refusal":    (ex_ref_instr, ex_ref_input, ex_ref_output)
}

# INFERENCE
print(f"\nStart inference on: {TEST_DATA_PATH}")

with open(TEST_DATA_PATH, 'r') as f:
    test_data = json.load(f)

results = []
gen_kwargs = {
    "max_new_tokens": 1024,
    "use_cache": True,
    "temperature": 0.1,
    "min_p": 0.05,
    "repetition_penalty": 1.1
}

for item in tqdm(test_data, desc="Elaborazione"):
    task_type = item.get("task_type", "generation")
    raw_prompt = item['llm_prompt']

    # Parsing Input
    if "Input Data:" in raw_prompt:
        parts = raw_prompt.split("Input Data:")
        instr, inp_data = parts[0].strip(), parts[1].strip()
    elif "Character Sheet:" in raw_prompt:
        parts = raw_prompt.split("Character Sheet:")
        instr, inp_data = parts[0].strip(), parts[1].strip()
    else:
        instr, inp_data = "Generate a D&D 5e character sheet.", raw_prompt

    example_tuple = TASK_EXAMPLES.get(task_type, TASK_EXAMPLES["generation"])
    one_shot_block = alpaca_template.format(*example_tuple)

    # final prompt
    current_query = alpaca_template.format(instr, inp_data, "")
    full_prompt = one_shot_block + "\n\n" + current_query

    # Tokenization
    inputs = tokenizer([full_prompt], return_tensors="pt").to("cuda")

    # Generation
    with torch.no_grad():
        outputs = model.generate(**inputs, **gen_kwargs)

    input_len = inputs.input_ids.shape[1]
    decoded = tokenizer.decode(outputs[0][input_len:], skip_special_tokens=True)

    results.append({
        "task_type": task_type,
        "input_prompt": raw_prompt,
        "generated_response": decoded.strip(),
        "expected_output": item.get("expected_output"),
        "mode": "one-shot-smart"
    })

    # Save during inference
    if len(results) % 20 == 0:
        with open(OUTPUT_FILE, 'w') as f:
            json.dump(results, f, indent=2)

# final save
with open(OUTPUT_FILE, 'w') as f:
    json.dump(results, f, indent=2)

print(f"\nDone, check {OUTPUT_FILE}")

One Shot Inference on base model

In [2]:
import torch
import os
import json
import gc
from unsloth import FastLanguageModel
from tqdm import tqdm
from google.colab import drive
drive.mount('/content/drive')


# PATH
BASE_DIR = "/content/drive/MyDrive/DnD_Project_Data"
MODEL_NAME = "unsloth/Llama-3.2-1B-Instruct"
TEST_DATA_PATH = os.path.join(BASE_DIR, "processed_dataset", "dnd_test.json")
OUTPUT_FILE = os.path.join(BASE_DIR, "inference_results_base_one_shot", "predictions_Llama_1B_BASE_OneShot.json")
os.makedirs(os.path.dirname(OUTPUT_FILE), exist_ok=True)
#clean memory
try:
    del model
    del tokenizer
except:
    pass
gc.collect()
torch.cuda.empty_cache()

# Load model
print(f"Loading Base Model: {MODEL_NAME}...")
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = MODEL_NAME,
    max_seq_length = 2048,
    dtype = None,
    load_in_4bit = True,
)
FastLanguageModel.for_inference(model)

alpaca_template = "Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.\n\n### Instruction:\n{}\n\n### Input:\n{}\n\n### Response:\n{}"

# Generation Example
ex_gen_instr = "Generate a complete D&D 5e character sheet based on the provided attributes."
ex_gen_input = '{"race": "Hill Dwarf", "class": "Cleric", "level": 1, "background": "Acolyte", "stats": {"str": 14, "dex": 8, "con": 15, "int": 10, "wis": 16, "cha": 12}}'
ex_gen_output = """```json
{
  "race": "Hill Dwarf",
  "subrace": null,
  "class": "Cleric",
  "subclass": "Life Domain",
  "level": 1,
  "background": "Acolyte",
  "stats": {"str": 14, "dex": 8, "con": 15, "int": 10, "wis": 16, "cha": 12},
  "hp": 11,
  "ac": 16,
  "alignment": "Lawful Good",
  "skills": ["Insight", "Religion", "Medicine"],
  "weapons": ["Warhammer", "Light Crossbow"],
  "spells": ["Bless", "Cure Wounds", "Sacred Flame"],
  "feats": []
}
```"""

# Completion Example
ex_fill_instr = "Complete this D&D 5e character sheet. Replace the NULL value(s) with correct value(s) consistent with the rules."
ex_fill_input = '{"race": "Lightfoot Halfling", "class": "Rogue", "level": 1, "subclass": null, "alignment": null, "stats": {"str": 8, "dex": 17, "con": 14, "int": 13, "wis": 12, "cha": 11}}'
ex_fill_output = """```json
{
  "race": "Lightfoot Halfling",
  "class": "Rogue",
  "level": 1,
  "subclass": null,
  "alignment": "Chaotic Neutral",
  "stats": {"str": 8, "dex": 17, "con": 14, "int": 13, "wis": 12, "cha": 11},
  "hp": 10,
  "ac": 14,
  "skills": ["Stealth", "Acrobatics", "Sleight of Hand", "Deception"],
  "weapons": ["Dagger", "Shortbow"]
}
```"""

# Refusal Example
ex_ref_instr = "Generate a complete D&D 5e character sheet based on the provided attributes."
ex_ref_input = '{"race": "Saiyan", "class": "Super Warrior", "level": 9000, "stats": {"str": 100}}'
ex_ref_output = """```json
{
  "message": "The provided character data contains invalid, homebrew, or rule-breaking content inconsistent with D&D 5e rules. Cannot generate a character sheet."
}
```"""

TASK_EXAMPLES = {
    "generation": (ex_gen_instr, ex_gen_input, ex_gen_output),
    "completion": (ex_fill_instr, ex_fill_input, ex_fill_output),
    "refusal":    (ex_ref_instr, ex_ref_input, ex_ref_output)
}

# INFERENCE
print(f"\nStart inference on: {TEST_DATA_PATH}")

with open(TEST_DATA_PATH, 'r') as f:
    test_data = json.load(f)

results = []

gen_kwargs = {
    "max_new_tokens": 1024,
    "use_cache": True,
    "temperature": 0.1,
    "min_p": 0.05,
    "repetition_penalty": 1.1,
    "eos_token_id": tokenizer.eos_token_id,
    "pad_token_id": tokenizer.pad_token_id,
}

for item in tqdm(test_data, desc="Processing Base Model"):
    task_type = item.get("task_type", "generation")
    raw_prompt = item['llm_prompt']

    # Parsing Input
    if "Input Data:" in raw_prompt:
        parts = raw_prompt.split("Input Data:")
        instr, inp_data = parts[0].strip(), parts[1].strip()
    elif "Character Sheet:" in raw_prompt:
        parts = raw_prompt.split("Character Sheet:")
        instr, inp_data = parts[0].strip(), parts[1].strip()
    else:
        instr, inp_data = "Generate a D&D 5e character sheet.", raw_prompt

    # Prompt One-Shot
    example_tuple = TASK_EXAMPLES.get(task_type, TASK_EXAMPLES["generation"])
    one_shot_block = alpaca_template.format(*example_tuple)

    current_query = alpaca_template.format(instr, inp_data, "")
    full_prompt = one_shot_block + "\n\n" + current_query

    # Tokenization
    inputs = tokenizer([full_prompt], return_tensors="pt").to("cuda")

    # Generation
    with torch.no_grad():
        outputs = model.generate(**inputs, **gen_kwargs)

    input_len = inputs.input_ids.shape[1]
    decoded = tokenizer.decode(outputs[0][input_len:], skip_special_tokens=True)

    results.append({
        "task_type": task_type,
        "input_prompt": raw_prompt,
        "generated_response": decoded.strip(),
        "expected_output": item.get("expected_output"),
        "mode": "base-model-oneshot"
    })

    # Save checkpoint
    if len(results) % 20 == 0:
        with open(OUTPUT_FILE, 'w') as f:
            json.dump(results, f, indent=2)

# Final save
with open(OUTPUT_FILE, 'w') as f:
    json.dump(results, f, indent=2)

print(f"\nDone. Results saved to {OUTPUT_FILE}")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Loading Base Model: unsloth/Llama-3.2-1B-Instruct...
==((====))==  Unsloth 2026.2.1: Fast Llama patching. Transformers: 4.57.6.
   \\   /|    Tesla T4. Num GPUs = 1. Max memory: 14.563 GB. Platform: Linux.
O^O/ \_/ \    Torch: 2.9.0+cu128. CUDA: 7.5. CUDA Toolkit: 12.8. Triton: 3.5.0
\        /    Bfloat16 = FALSE. FA [Xformers = None. FA2 = False]
 "-____-"     Free license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!

Start inference on: /content/drive/MyDrive/DnD_Project_Data/processed_dataset/dnd_test.json


Processing Base Model: 100%|██████████| 636/636 [54:34<00:00,  5.15s/it]


Done. Results saved to /content/drive/MyDrive/DnD_Project_Data/inference_results/predictions_Llama_1B_BASE_OneShot.json





In [16]:
import json
import os
import re
import collections
from glob import glob
from google.colab import drive

# PATHS
BASE_DIR = "/content/drive/MyDrive/DnD_Project_Data"
INPUT_POS_INT = os.path.join(BASE_DIR, "dataset_integrated", "dataset_integrated_positive.json")

SUBCLASS_UNLOCK_LEVELS = {
    "cleric": 1, "sorcerer": 1, "warlock": 1,
    "druid": 2, "wizard": 2,
    "default": 3
}

# KNOWLEDGE BASE
print("Building Knowledge Base...")
whitelists = {
    "races": set(), "classes": set(), "subclasses": set(),
    "backgrounds": set(), "feats": set(), "skills": set(), "spells": set()
}

def super_normalize(text):
    if not isinstance(text, str): return ""
    return re.sub(r'[^a-z0-9]', '', text.lower())

try:
    with open(INPUT_POS_INT, 'r') as f:
        clean_data = json.load(f)
    for char in clean_data:
        if char.get('race'): whitelists['races'].add(super_normalize(char['race']))
        if char.get('class'): whitelists['classes'].add(super_normalize(char['class']))
        if char.get('subclass'): whitelists['subclasses'].add(super_normalize(char['subclass']))
        if char.get('background'): whitelists['backgrounds'].add(super_normalize(char['background']))
        for f in char.get('feats', []): whitelists['feats'].add(super_normalize(f))
        for s in char.get('skills', []): whitelists['skills'].add(super_normalize(s))
        for sp in char.get('spells', []): whitelists['spells'].add(super_normalize(sp))
    print(f"Knowledge Base Ready. Loaded {len(clean_data)} reference characters.")
except FileNotFoundError:
    print("Warning: Reference dataset not found. Validation will be strictly rule-based without whitelist checks.")

# FUNCTIONS

def extract_input_from_prompt(prompt):
    try:
        match = re.search(r'(?:Input Data:|Character Sheet:|### Input:)\s*(\{.*?\})\s*(?:###|$)', prompt, re.DOTALL)
        if match: return json.loads(match.group(1))
    except: pass
    return {}

def has_meaningful_content(val):
    if val is None: return False
    if isinstance(val, str):
        norm = val.strip().lower()
        if norm == "" or norm == "null" or norm == "none" or norm == ".": return False
    if isinstance(val, (list, dict)) and len(val) == 0: return False
    if isinstance(val, (int, float)) and val <= 0: return False
    return True

# --- PARSING LOGIC ---
def detect_repetition_loop(text, threshold=10):
    if len(text) < 100: return False
    tokens = re.split(r'\s+|[,;"]', text)
    tokens = [t for t in tokens if len(t) > 2]
    if not tokens: return False
    counts = collections.Counter(tokens)
    most_common, count = counts.most_common(1)[0]
    if count > threshold:
        if count > 15 and (count / len(tokens) > 0.1): return True
    chunk_size = 30
    if len(text) > chunk_size * 5:
        sub = text[-chunk_size:]
        if text.count(sub) > 5: return True
    return False

def extract_json_from_text(text):
    try:
        code_block = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL)
        if code_block: return json.loads(code_block.group(1)), True
        match = re.search(r'\{.*\}', text, re.DOTALL)
        if match: return json.loads(match.group(0)), True
        return json.loads(text), True
    except:
        return None, False

def parse_text_content(text):
    splitters = ["### Response:", "Output:", "Response:"]
    search_text = text
    for s in splitters:
        if s in text:
            search_text = text.split(s)[-1]
            break
    extracted = {"stats": {}}
    patterns = {
        "race":       r'Race[:\*\s\-]*([^\n\*\[]+)',
        "subrace":    r'Subrace[:\*\s\-]*([^\n\*\[]+)',
        "class":      r'Class[:\*\s\-]*([^\n\*\[]+)',
        "subclass":   r'Subclass[:\*\s\-]*([^\n\*\[]+)',
        "background": r'Background[:\*\s\-]*([^\n\*\[]+)',
        "level":      r'Level[:\*\s\-]*(\d+)',
        "hp":         r'(?:HP|Hit Points)[:\*\s\-]*(\d+)',
        "ac":         r'(?:AC|Armor Class)[:\*\s\-]*(\d+)',
        "alignment":  r'Alignment[:\*\s\-]*([^\n\*\[]+)'
    }
    found_keys = 0
    for key, pat in patterns.items():
        match = re.search(pat, search_text, re.IGNORECASE)
        if match:
            val = match.group(1).strip()
            val = re.sub(r'[\*\[\]]', '', val).strip()
            if has_meaningful_content(val):
                extracted[key] = val
                found_keys += 1
            if key == "subrace" and str(val).lower() == "none":
                extracted[key] = "None"

    stat_map = {
        "str": ["strength", "str"], "dex": ["dexterity", "dex"], "con": ["constitution", "con"],
        "int": ["intelligence", "int"], "wis": ["wisdom", "wis"], "cha": ["charisma", "cha"]
    }
    found_stats = 0
    for short_k, aliases in stat_map.items():
        for alias in aliases:
            pat = r'(?:^|[\s\*])' + alias + r'[:\*\s\-\(]*(\d+)'
            match = re.search(pat, search_text, re.IGNORECASE)
            if match:
                extracted["stats"][short_k] = int(match.group(1))
                found_stats += 1
                break

    spells_pat = r'(?:^|\n)(?:\*\*|#|\s)*Spells[:\*\s\-]*([\s\S]+?)(?:\n(?:\*\*|#)|$)'
    spells_match = re.search(spells_pat, search_text, re.IGNORECASE)
    if spells_match:
        content = spells_match.group(1).strip()
        if len(content) > 5: extracted["spells"] = [content]

    if (("race" in extracted or "class" in extracted) and found_stats >= 3):
        return extracted, True
    return None, False

# VALIDATION LOGIC

def check_mutations(input_json, output_data):
    mutations = []

    for k, v in input_json.items():
        if k == 'stats': continue

        if has_meaningful_content(v):
            val_out = output_data.get(k)
            if val_out is None:
                mutations.append(f"Field '{k}' dropped (Input: {v})")
            else:
                str_v = str(v).lower().strip()
                str_out = str(val_out).lower().strip()
                if super_normalize(str_out) != super_normalize(str_v):
                    mutations.append(f"Field '{k}' mutated (In: '{v}' -> Out: '{val_out}')")

    if 'stats' in input_json and isinstance(input_json['stats'], dict):
        out_stats = output_data.get('stats', {})
        if not isinstance(out_stats, dict):
            mutations.append("Stats block corrupted")
        else:
            for sk, sv in input_json['stats'].items():
                if has_meaningful_content(sv):
                    out_sv = out_stats.get(sk)
                    if out_sv is None:
                         mutations.append(f"Stat '{sk}' dropped")
                    elif str(out_sv) != str(sv):
                        mutations.append(f"Stat '{sk}' mutated ({sv} -> {out_sv})")

    return mutations

def check_progress(input_json, output_data):
    added = []
    schema_keys = ['race', 'class', 'subclass', 'background', 'level', 'hp', 'ac', 'alignment', 'spells', 'subrace', 'skills', 'weapons', 'feats']

    for k in schema_keys:
        input_val = input_json.get(k)
        if not has_meaningful_content(input_val):
            output_val = output_data.get(k)
            if has_meaningful_content(output_val):
                if k == 'subclass' and str(output_val) == ".": continue
                added.append(k)

    input_stats_empty = False
    if 'stats' in input_json:
        inp_s = input_json.get('stats')
        if not has_meaningful_content(inp_s):
            input_stats_empty = True
            out_s = output_data.get('stats', {})
            if out_s and len(out_s) >= 3:
                added.append('stats')
    else:
        input_stats_empty = True
        if output_data.get('stats'): added.append('stats')

    return (len(added) > 0), added, input_stats_empty


def validate_game_rules(char_json, input_json, task_type, input_stats_empty):
    errors = []
    passed = []

    # COMPLETION task
    is_completion = (task_type == 'completion')

    race_norm = super_normalize(char_json.get('race'))
    class_name = char_json.get('class', 'Unknown')
    class_norm = super_normalize(class_name)
    try: lvl = int(char_json.get('level', 1))
    except: lvl = 1


    # SPECIAL CHECKS: SUBRACE & SUBCLASS

    # --- SUBRACE ---
    RACES_NO_SUBRACES = {'human', 'dragonborn', 'tiefling', 'halforc', 'halfelf', 'tabaxi', 'triton'}
    subrace_val = char_json.get('subrace')
    subrace_input = input_json.get('subrace')

    if race_norm not in RACES_NO_SUBRACES and race_norm:
        if not has_meaningful_content(subrace_val):
             if not has_meaningful_content(subrace_input):
                 errors.append(f"Missing Subrace for {char_json.get('race')}")
        else:
            if not has_meaningful_content(subrace_input):
                passed.append(f"Generated 'subrace': '{subrace_val}'")
    else:
        if has_meaningful_content(subrace_val) and str(subrace_val).lower() != "none":
            errors.append(f"Invalid Subrace '{subrace_val}' for {char_json.get('race')}")

    # --- SUBCLASS ---
    unlock_lvl = SUBCLASS_UNLOCK_LEVELS.get(class_norm, SUBCLASS_UNLOCK_LEVELS["default"])
    subclass_val = char_json.get('subclass')
    subclass_input = input_json.get('subclass')

    if lvl >= unlock_lvl:
        if not has_meaningful_content(subclass_val):
            if not has_meaningful_content(subclass_input):
                errors.append(f"Missing Subclass (Required for {class_name} at Lvl {lvl})")
        else:
            if not has_meaningful_content(subclass_input):
                passed.append(f"Generated 'subclass': '{subclass_val}'")
    else:
        if has_meaningful_content(subclass_val) and str(subclass_val).lower() != "none":
            errors.append(f"Invalid Subclass '{subclass_val}' (Unlocks at Lvl {unlock_lvl}, Char is Lvl {lvl})")


    # GENERIC CHECKS: ALIGNMENT, BG, HP, AC
    check_fields = ['hp', 'background', 'ac', 'alignment']

    for field in check_fields:
        out_val = char_json.get(field)
        in_val = input_json.get(field)

        if has_meaningful_content(out_val):
            if not has_meaningful_content(in_val):
                passed.append(f"Generated '{field}'")
        else:
            if not has_meaningful_content(in_val):
                if is_completion:
                    errors.append(f"Failed to complete field '{field}'")
                elif field in ['hp', 'background']:
                    errors.append(f"Missing {field.capitalize()}")

    # STATS CHECK
    stats = char_json.get('stats')

    if has_meaningful_content(stats):
        # Stats present in the output
        stat_valid = True
        for k, v in stats.items():
            try:
                if int(v) < 1 or int(v) > 30:
                    errors.append(f"Stat '{k}' out of range ({v})")
                    stat_valid = False
            except: pass

        # if were null in input -> model filled the field
        if stat_valid and input_stats_empty:
            passed.append("Generated 'stats' correctly")

    else:
        # missing stats in output
        if input_stats_empty:
            # empty input -> empty output
            if is_completion:
                # if completion -> must fill
                errors.append("Failed to complete field 'stats'")
            else:
                # In generation. all characters have stats in input
                errors.append("Missing Stats")
        else:
            errors.append("Stats dropped from input")

    # SPELLS CHECK
    full_casters = {'bard', 'cleric', 'druid', 'sorcerer', 'warlock', 'wizard'}
    half_casters = {'paladin', 'ranger'}
    should_have_magic = (class_norm in full_casters) or (class_norm in half_casters and lvl >= 2)
    spells_val = char_json.get('spells', [])

    if should_have_magic:
        if has_meaningful_content(spells_val):
             if not has_meaningful_content(input_json.get('spells')):
                 passed.append("Generated 'spells'")
        else:
             if not has_meaningful_content(input_json.get('spells')):
                 errors.append(f"Missing Spells (Rule: {class_name} at Lvl {lvl})")

    return errors, passed
def detect_refusal(raw_text, json_obj=None):
    clean_text = super_normalize(raw_text)
    refusal_keywords = [
        # Keywords
        "cannotgenerate", "unabletofulfill", "cannotcomplete",
        "invalidhomebrew", "rulebreakingcontent", "inconsistentwithdnd",
        "safetyguidelines", "violatespolicy", "cannotcreatecharactersheet",
        "contentpolicy", "cannotcompletethisrequest", "icannot", "sorrybut",
        "notcontainenoughinformation", "insufficientinformation",
        "pleaseprovideallrelevant", "missingrequired", "notenoughinformation",
        "invalidcharactersheet", "invalidformat",
        "providevalidattribute", "providevalidvalue",
        "missingstats", "statscannotbenull"
    ]

    if any(k in clean_text for k in refusal_keywords): return True

    if json_obj:
        keys = set(json_obj.keys())
        # Check standard message {data: null, message: ...}
        if "data" in keys and json_obj["data"] is None and "message" in keys: return True

        if "message" in keys and isinstance(json_obj["message"], str):
            clean_msg = super_normalize(json_obj["message"])
            if any(k in clean_msg for k in refusal_keywords): return True

    return False

# FILE ANALYSIS

def analyze_predictions(file_path):
    model_name = os.path.basename(file_path).replace("predictions_", "").replace(".json", "")
    print(f"Analyzing {model_name}...")

    try:
        with open(file_path, 'r') as f: predictions = json.load(f)
    except:
        return None, None, None, None, None

    stats_counter = {"GEN": collections.Counter(), "COMP": collections.Counter(), "REF": collections.Counter()}
    detail_stats = {"missed_fields": collections.Counter(), "valid_fields": collections.Counter()}
    format_stats = {"valid_json": 0, "valid_text_parsed": 0, "garbage": 0, "total": 0}

    log_lines = []

    for idx, entry in enumerate(predictions):
        task_type = entry.get('task_type', 'generation')
        prompt = entry.get('input_prompt', '')
        response = entry.get('generated_response', '')

        input_json = extract_input_from_prompt(prompt)

        json_parsed, is_json = extract_json_from_text(response)
        text_parsed, is_text = parse_text_content(response)

        is_loop = detect_repetition_loop(response)

        format_stats["total"] += 1

        if is_json:
            format_stats["valid_json"] += 1
            data_object = json_parsed
            parsing_source = "JSON"
        elif is_text:
            format_stats["valid_text_parsed"] += 1
            data_object = text_parsed
            parsing_source = "TEXT_PARSER"
        else:
            format_stats["garbage"] += 1
            data_object = None
            parsing_source = "NONE"

        status = "UNKNOWN"
        reasoning = []
        passed_checks = []

        is_refusal = detect_refusal(response, json_parsed)

        if task_type == 'refusal':
            if is_refusal:
                status = "SUCCESS"
                stats_counter["REF"]["SUCCESS"] += 1
            else:
                status = "FAILURE"
                stats_counter["REF"]["FAILURE"] += 1
                reasoning.append("Model failed to refuse.")

        else:
            cat = "GEN" if task_type == "generation" else "COMP"

            if is_refusal:
                status = "FAIL_REFUSAL"
                stats_counter[cat]["FAIL_REFUSAL"] += 1
                reasoning.append("Model refused a valid request.")
            elif data_object is None:
                if is_loop:
                    status = "FAIL_LOOP"
                    stats_counter[cat]["FAIL_LOOP"] += 1
                    reasoning.append("Model degenerated into loop (Garbage).")
                else:
                    status = "FAIL_FORMAT_EMPTY"
                    stats_counter[cat]["FAIL_FORMAT_EMPTY"] += 1
                    reasoning.append("No valid data found.")
            else:
                mutations = check_mutations(input_json, data_object)

                if mutations:
                    status = "FAIL_MUTATION"
                    stats_counter[cat]["FAIL_MUTATION"] += 1
                    reasoning.extend(mutations)
                else:
                    has_progress, added_fields, input_stats_empty = check_progress(input_json, data_object)

                    if not has_progress:
                        status = "FAIL_STAGNATION"
                        stats_counter[cat]["FAIL_STAGNATION"] += 1
                        reasoning.append("Echoed input. No new fields generated.")
                    else:
                        passed_checks.append(f"Added/Generated fields: {added_fields}")

                        errs, pass_c = validate_game_rules(data_object, input_json, task_type, input_stats_empty)
                        passed_checks.extend(pass_c)

                        if errs:
                            status = "PARTIAL_SUCCESS"
                            stats_counter[cat]["PARTIAL_SUCCESS"] += 1
                            reasoning.extend(errs)
                        else:
                            status = "SUCCESS"
                            stats_counter[cat]["SUCCESS"] += 1

                if is_loop and status in ["SUCCESS", "PARTIAL_SUCCESS"]:
                    status = "PARTIAL_SUCCESS"
                    reasoning.append("WARNING: Repetition loop detected in output tail.")
                    if status == "SUCCESS":
                         stats_counter[cat]["SUCCESS"] -= 1
                         stats_counter[cat]["PARTIAL_SUCCESS"] += 1

        def pj(d): return json.dumps(d, indent=2) if isinstance(d, dict) else str(d)
        log_entry = (
            f"{'='*80}\nENTRY #{idx+1} | TASK: {task_type.upper()} | STATUS: {status} | FORMAT: {parsing_source}\n{'-'*80}\n"
            f"INPUT:\n{pj(input_json)}\n\n"
            f"EXTRACTED DATA ({parsing_source}):\n{pj(data_object) if data_object else 'NONE'}\n\n"
            f"RAW MODEL RESPONSE:\n{response}\n\n"
            f"REASONING:\n" + "\n".join([f"  [!] {r}" for r in reasoning]) + "\n" +
            "\n".join([f"  [OK] {c}" for c in passed_checks]) + "\n\n"
        )
        log_lines.append(log_entry)

    return model_name, stats_counter, detail_stats, format_stats, log_lines

# REPORT

def generate_report(target_dir_path):
    print(f"\nSTARTING VALIDATION FOR DIRECTORY: {target_dir_path}")
    if not os.path.exists(target_dir_path): return
    output_report_file = os.path.join(target_dir_path, "VALIDATION_REPORT.txt")
    log_dir = os.path.join(target_dir_path, "validation_logs")
    os.makedirs(log_dir, exist_ok=True)
    files = glob(os.path.join(target_dir_path, "predictions_*.json"))

    report = ["VALIDATION REPORT (V15 - Full RAW LOGS Included)", f"Directory: {target_dir_path}", "="*80]

    for f in files:
        name, s, d, f_stats, logs = analyze_predictions(f)
        if not name: continue
        with open(os.path.join(log_dir, f"log_{name}.txt"), 'w') as logf: logf.write("\n".join(logs))

        report.append(f"\nMODEL: {name}\n" + "-"*40)
        report.append("[FORMAT COMPLIANCE]")
        tot = f_stats["total"]
        if tot:
            report.append(f"  Valid JSON: {f_stats['valid_json']} ({f_stats['valid_json']/tot*100:.1f}%)")

        report.append("\n[SUBSTANCE & RULES PERFORMANCE]")
        for t in ["GEN", "COMP", "REF"]:
            tot_task = sum(s[t].values())
            report.append(f"  {t} TASK (Total: {tot_task})")
            for k, v in sorted(s[t].items()):
                report.append(f"    - {k:<20}: {v} ({v/tot_task*100:.1f}%)" if tot_task else f"    - {k}: 0")

    with open(output_report_file, 'w') as f: f.write("\n".join(report))
    print(f"Report generated: {output_report_file}")

Building Knowledge Base...
Knowledge Base Ready. Loaded 2114 reference characters.


In [17]:
# VALIDATION: BASELINE MODELS
# Path
BASELINE_DIR = os.path.join(BASE_DIR, "inference_results_baseline")
generate_report(BASELINE_DIR)


STARTING VALIDATION FOR DIRECTORY: /content/drive/MyDrive/DnD_Project_Data/inference_results_baseline
Analyzing gemma-2-2b-it_BASELINE...
Analyzing Qwen3-0.6B_BASELINE...
Analyzing Llama-3.2-1B-Instruct_BASELINE...
Report generated: /content/drive/MyDrive/DnD_Project_Data/inference_results_baseline/VALIDATION_REPORT.txt


In [18]:
# VALIDATION: FINETUNED (ZERO SHOT)
# Path
FT_ZEROSHOT_DIR = os.path.join(BASE_DIR, "inference_results_finetuned&zero_shot")

generate_report(FT_ZEROSHOT_DIR)


STARTING VALIDATION FOR DIRECTORY: /content/drive/MyDrive/DnD_Project_Data/inference_results_finetuned&zero_shot
Analyzing gemma-2-2b-it...
Analyzing Llama-3.2-1B-Instruct...
Analyzing Qwen3-0.6B...
Report generated: /content/drive/MyDrive/DnD_Project_Data/inference_results_finetuned&zero_shot/VALIDATION_REPORT.txt


In [19]:
# VALIDATION: FINETUNED (ONE SHOT)
# Path

ONE_SHOT_DIR = os.path.join(BASE_DIR, "inference_results_finetuned&one_shot")

generate_report(ONE_SHOT_DIR)


STARTING VALIDATION FOR DIRECTORY: /content/drive/MyDrive/DnD_Project_Data/inference_results_finetuned&one_shot
Analyzing Llama_1B_OneShot_Smart...
Report generated: /content/drive/MyDrive/DnD_Project_Data/inference_results_finetuned&one_shot/VALIDATION_REPORT.txt


In [20]:
# VALIDATION: BASE (ONE SHOT)
# Path

BASE_ONE_SHOT_DIR = os.path.join(BASE_DIR, "inference_results_base_one_shot")

generate_report(BASE_ONE_SHOT_DIR)


STARTING VALIDATION FOR DIRECTORY: /content/drive/MyDrive/DnD_Project_Data/inference_results_base_one_shot
Analyzing Llama_1B_BASE_OneShot...
Report generated: /content/drive/MyDrive/DnD_Project_Data/inference_results_base_one_shot/VALIDATION_REPORT.txt


In [22]:
import os
import re

# PATHS
BASE_DIR = "/content/drive/MyDrive/DnD_Project_Data"

REPORT_PATHS = {
    "BASELINE": os.path.join(BASE_DIR, "inference_results_baseline", "VALIDATION_REPORT.txt"),
    "BASE_ONE_SHOT": os.path.join(BASE_DIR, "inference_results_base_one_shot", "VALIDATION_REPORT.txt"),
    "FINETUNED_ZERO_SHOT": os.path.join(BASE_DIR, "inference_results_finetuned&zero_shot", "VALIDATION_REPORT.txt"),
    "FINETUNED_ONE_SHOT": os.path.join(BASE_DIR, "inference_results_finetuned&one_shot", "VALIDATION_REPORT.txt")
}

OUTPUT_MASTER_REPORT = os.path.join(BASE_DIR, "PROJECT_FINAL_COMPARISON_REPORT.txt")

# DEFINITIONS
METRICS_DEF = """
[METRIC DEFINITIONS]
- Valid JSON: Percentage of outputs that are syntactically correct JSON.
- GEN Full (Generation Success): Percentage of generated characters that strictly follow all D&D 5e rules (Stats, Spells, Class reqs).
- GEN Part (Generation Partial): Output is valid JSON and has new content, but missed a specific rule (e.g., missing Subclass).
- COMP Full (Completion Success): Percentage of correctly filled NULL fields without dropping existing data.
- COMP Part (Completion Partial): Fields were filled, but with minor errors or inconsistencies (e.g., wrong skill for background).
- REF Full (Refusal Success): Percentage of successful refusals for unsafe/invalid prompts.
"""

ERRORS_DEF = """
[ERROR TYPE DEFINITIONS]
- FAIL_FORMAT_EMPTY: The model produced no extractable data or empty JSON.
- FAIL_LOOP: The model entered a repetition loop (repetition penalty failed).
- FAIL_MUTATION: The model hallucinated changes to immutable input data (e.g., changed 'Elf' to 'Human' in completion).
- FAIL_STAGNATION: The model echoed the input without generating new fields (Completion task).
- FAIL_REFUSAL: The model failed to refuse an invalid request.
- FAIL_COMPLETION: The model left target fields NULL in a completion task.
"""

# PARSING FUNCTION
def parse_validation_report(file_path, mode_label):
    if not os.path.exists(file_path):
        print(f"Warning: Report not found at {file_path}")
        return []

    with open(file_path, 'r') as f:
        content = f.read()

    # Split by model sections
    sections = content.split("MODEL: ")[1:]
    data = []

    for sec in sections:
        lines = sec.split('\n')
        model_name = lines[0].strip()

        # Structure for the summary table
        entry = {
            "Model": model_name,
            "Mode": mode_label,
            "Valid JSON": "0.0%",
            "GEN Full": "0.0%",  "GEN Part": "0.0%",
            "COMP Full": "0.0%", "COMP Part": "0.0%",
            "REF Full": "0.0%"
        }

        # Structure for detailed analysis
        details = {
            "Model": model_name,
            "Mode": mode_label,
            "GEN": {},
            "COMP": {},
            "REF": {}
        }

        # Extract JSON Stat
        json_match = re.search(r"Valid JSON:\s+\d+\s+\(([\d\.]+%)\)", sec)
        if json_match: entry["Valid JSON"] = json_match.group(1)

        # Helper to extract task blocks
        def get_task_block(text, task_name):
            start = text.find(f"{task_name} TASK")
            if start == -1: return ""
            return text[start:]

        gen_block = get_task_block(sec, "GEN")
        comp_block = get_task_block(sec, "COMP")
        ref_block = get_task_block(sec, "REF")

        # Cut blocks to avoid overlap
        if "COMP TASK" in gen_block: gen_block = gen_block.split("COMP TASK")[0]
        if "REF TASK" in comp_block: comp_block = comp_block.split("REF TASK")[0]

        # Extract Values
        def extract_stats(block, target_dict_key, data_entry, prefix):
            # Extract Table Percentages
            s_match = re.search(r"-\s+SUCCESS\s+:\s+\d+\s+\(([\d\.]+%)\)", block)
            p_match = re.search(r"-\s+PARTIAL_SUCCESS\s+:\s+\d+\s+\(([\d\.]+%)\)", block)

            if prefix == "REF":
                data_entry[f"{prefix} Full"] = s_match.group(1) if s_match else "0.0%"
            else:
                data_entry[f"{prefix} Full"] = s_match.group(1) if s_match else "0.0%"
                data_entry[f"{prefix} Part"] = p_match.group(1) if p_match else "0.0%"

            # Extract All Specific Counters for Detailed Report
            all_matches = re.findall(r"-\s+([A-Z_]+)\s+:\s+(\d+)\s+", block)
            for k, v in all_matches:
                details[target_dict_key][k] = int(v)

        extract_stats(gen_block, "GEN", entry, "GEN")
        extract_stats(comp_block, "COMP", entry, "COMP")
        extract_stats(ref_block, "REF", entry, "REF")

        data.append((entry, details))

    return data

# AGGREGATION
all_parsed = []

# Load all reports with correct labels
all_parsed.extend(parse_validation_report(REPORT_PATHS["BASELINE"], "Baseline (Zero-Shot)"))
all_parsed.extend(parse_validation_report(REPORT_PATHS["BASE_ONE_SHOT"], "Baseline (One-Shot)"))
all_parsed.extend(parse_validation_report(REPORT_PATHS["FINETUNED_ZERO_SHOT"], "Fine-Tuned (Zero-Shot)"))
all_parsed.extend(parse_validation_report(REPORT_PATHS["FINETUNED_ONE_SHOT"], "Fine-Tuned (One-Shot)"))

# Separate table data from details
table_data = [x[0] for x in all_parsed]
details_data = [x[1] for x in all_parsed]

# Sorting Logic
mode_order = {
    "Baseline (Zero-Shot)": 0,
    "Baseline (One-Shot)": 1,
    "Fine-Tuned (Zero-Shot)": 2,
    "Fine-Tuned (One-Shot)": 3
}

# Sort by Model Name then by Mode
table_data.sort(key=lambda x: (x['Model'].split('_')[0], mode_order.get(x['Mode'], 99)))
details_data.sort(key=lambda x: (x['Model'].split('_')[0], mode_order.get(x['Mode'], 99)))

# REPORT GENERATION
report_lines = []
report_lines.append("="*100)
report_lines.append("                                PROJECT FINAL COMPARISON REPORT")
report_lines.append("="*100)
report_lines.append("")

# DEFINITIONS
report_lines.append(METRICS_DEF.strip())
report_lines.append("")
report_lines.append(ERRORS_DEF.strip())
report_lines.append("")
report_lines.append("="*100)
report_lines.append("")

# DETAILED PERFORMANCE
report_lines.append("[DETAILED PERFORMANCE BREAKDOWN]")
report_lines.append("Specific counts of success and failure types for each model configuration.")
report_lines.append("")

for item in details_data:
    model_header = f"MODEL: {item['Model']} | MODE: {item['Mode']}"
    report_lines.append("-" * 80)
    report_lines.append(model_header)
    report_lines.append("-" * 80)

    for task in ["GEN", "COMP", "REF"]:
        stats = item[task]
        if not stats:
            report_lines.append(f"  {task}: No Data")
            continue

        report_lines.append(f"  {task} TASK:")
        # Sort by count descending
        sorted_stats = sorted(stats.items(), key=lambda x: x[1], reverse=True)
        for k, v in sorted_stats:
            report_lines.append(f"    - {k:<20}: {v}")
    report_lines.append("")

report_lines.append("="*100)
report_lines.append("")

# COMPARISON TABLE
report_lines.append("[FINAL COMPARISON TABLE]")
header_fmt = "| {:<42} | {:<8} | {:<10} | {:<10} | {:<10} | {:<10} | {:<8} |"
divider = "-" * 118

header = header_fmt.format("MODEL & MODE", "JSON %", "GEN Full", "GEN Part", "COMP Full", "COMP Part", "REF %")

report_lines.append(divider)
report_lines.append(header)
report_lines.append(divider)

current_model_base = ""

for row in table_data:
    # --- FIX: LOGICA MIGLIORATA PER IL SEPARATORE ---
    model_name = row['Model']

    # Distinguiamo esplicitamente "Llama_1B" (Custom/Legacy) da "Llama-3.2" (Official)
    # Altrimenti finiscono entrambi sotto "Llama" e non viene stampata la riga
    if "Llama_1B" in model_name:
        model_group = "Llama_1B_Custom"
    elif "Llama-3.2" in model_name:
        model_group = "Llama_3.2_Instruct"
    else:
        # Fallback per Qwen, Gemma ecc. (prende la prima parola)
        model_group = model_name.split('_')[0].split('-')[0]

    if model_group != current_model_base and current_model_base != "":
        report_lines.append(f"| {'-'*42} | {'-'*8} | {'-'*10} | {'-'*10} | {'-'*10} | {'-'*10} | {'-'*8} |")

    current_model_base = model_group
    # -----------------------------------------------

    # Shorten Mode string for table fit
    mode_short = row['Mode'].replace('Fine-Tuned', 'FT').replace('Baseline', 'Base').replace(' (Zero-Shot)', ' ZS').replace(' (One-Shot)', ' 1S')

    display_name = f"{row['Model']} [{mode_short}]"
    if len(display_name) > 42: display_name = display_name[:39] + ".."

    line = header_fmt.format(
        display_name,
        row['Valid JSON'],
        row['GEN Full'],
        row['GEN Part'],
        row['COMP Full'],
        row['COMP Part'],
        row['REF Full']
    )
    report_lines.append(line)

report_lines.append(divider)

# WRITE TO FILE
with open(OUTPUT_MASTER_REPORT, 'w') as f:
    f.write("\n".join(report_lines))

print(f"\nDone. Report saved to:\n{OUTPUT_MASTER_REPORT}")

# PRINT PREVIEW TO CONSOLE
print("\n--- FINAL TABLE PREVIEW ---\n")
print(divider)
print(header)
print(divider)

current_model_base_preview = ""
for row in table_data:
    # --- STESSA LOGICA DI SEPARAZIONE PER LA PREVIEW ---
    model_name = row['Model']
    if "Llama_1B" in model_name:
        model_group = "Llama_1B_Custom"
    elif "Llama-3.2" in model_name:
        model_group = "Llama_3.2_Instruct"
    else:
        model_group = model_name.split('_')[0].split('-')[0]

    if model_group != current_model_base_preview and current_model_base_preview != "":
        print(f"| {'-'*42} | {'-'*8} | {'-'*10} | {'-'*10} | {'-'*10} | {'-'*10} | {'-'*8} |")
    current_model_base_preview = model_group
    # ---------------------------------------------------

    mode_short = row['Mode'].replace('Fine-Tuned', 'FT').replace('Baseline', 'Base').replace(' (Zero-Shot)', ' ZS').replace(' (One-Shot)', ' 1S')
    display_name = f"{row['Model'][:15]}.. [{mode_short}]"
    print(header_fmt.format(display_name, row['Valid JSON'], row['GEN Full'], row['GEN Part'], row['COMP Full'], row['COMP Part'], row['REF Full']))
print(divider)


Done. Report saved to:
/content/drive/MyDrive/DnD_Project_Data/PROJECT_FINAL_COMPARISON_REPORT.txt

--- FINAL TABLE PREVIEW ---

----------------------------------------------------------------------------------------------------------------------
| MODEL & MODE                               | JSON %   | GEN Full   | GEN Part   | COMP Full  | COMP Part  | REF %    |
----------------------------------------------------------------------------------------------------------------------
| Llama_1B_BASE_O.. [Base 1S]                | 98.6%    | 43.4%      | 56.1%      | 2.4%       | 37.7%      | 90.6%    |
| Llama_1B_OneSho.. [FT 1S]                  | 79.7%    | 59.4%      | 40.6%      | 3.3%       | 41.0%      | 100.0%   |
| ------------------------------------------ | -------- | ---------- | ---------- | ---------- | ---------- | -------- |
| Llama-3.2-1B-In.. [Base ZS]                | 0.0%     | 0.0%       | 11.3%      | 0.0%       | 0.0%       | 0.0%     |
| Llama-3.2-1B-In.. [FT ZS]