In [None]:
! pip install -q torch tqdm datasets peft transformers bert-score
! pip install -U bitsandbytes


In [None]:
import os
import yaml
import json
import torch
from tqdm import tqdm
from datasets import load_dataset, load_from_disk
from peft import PeftModel
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, pipeline
from bert_score import score as bert_score


In [None]:
# =============================================================================
# Configuration
# =============================================================================
DATASETS_DIR = "./datasets"

# Fine-tuned model from HuggingFace Hub
ADAPTER_ID = "Daniel-Krasik/Qwen2.5-1.5B-QLoRA-Recipe"
BASE_MODEL = "Qwen/Qwen2.5-1.5B-Instruct"


In [None]:
# Model configuration registry
MODEL_CONFIGS = {
    "llama": {
        "path": "meta-llama/Llama-3.2-1B-Instruct",
        "supports_system": True,
        "system_message": "You will generate one cooking recipe. List all necessary ingredients and give detailed steps.",
        "user_message_template": "Include ingredients: {ner}",
        "include_title_in_user": False,
    },
    "mistral": {
        "path": "mistralai/Mistral-7B-Instruct-v0.3",
        "supports_system": False,
        "system_message": "You will generate one cooking recipe. List all necessary ingredients and give detailed steps.",
        "user_message_template": "Include ingredients: {ner}",
        "include_title_in_user": False,
    },
    "gemma": {
        "path": "ggoogle/gemma-2-2b-it",
        "supports_system": False,
        "system_message": "You will generate one cooking recipe. List all necessary ingredients and give detailed steps.",
        "user_message_template": "Include ingredients: {ner}",
        "include_title_in_user": False,
    },
    "qwen": {
        "path": "Qwen/Qwen2.5-1.5B-Instruct",
        "supports_system": True,
        "system_message": "You will generate one cooking recipe. List all necessary ingredients and give detailed steps.",
        "user_message_template": "Include ingredients: {ner}",
        "include_title_in_user": False,
    },
    "olmo": {
        "path": "allenai/OLMoE-1B-7B-0924-Instruct",
        "supports_system": False,
        "system_message": "You will generate one cooking recipe. List all necessary ingredients and give detailed steps.",
        "user_message_template": "Include ingredients: {ner}",
        "include_title_in_user": False,
    },
}

def get_model_config_from_path(model_path: str):
    """Extract model configuration from full model path."""
    model_path_lower = model_path.lower()

    if "llama" in model_path_lower:
        return MODEL_CONFIGS["llama"].copy()
    elif "mistral" in model_path_lower:
        return MODEL_CONFIGS["mistral"].copy()
    elif "gemma" in model_path_lower:
        return MODEL_CONFIGS["gemma"].copy()
    elif "qwen" in model_path_lower:
        return MODEL_CONFIGS["qwen"].copy()
    elif "olmo" in model_path_lower:
        return MODEL_CONFIGS["olmo"].copy()
    else:
        print(f"[WARNING] Unknown model path: {model_path}. Using Llama format as default.")
        return MODEL_CONFIGS["llama"].copy()


In [None]:
def select_subset(dataset, n_samples, seed=42):
    """Select a subset of the dataset."""
    if n_samples == "all" or n_samples is None:
        return dataset

    if n_samples > len(dataset):
        print(f"[WARNING] Requested {n_samples} samples but only {len(dataset)} available. Using all samples.")
        return dataset

    return dataset.shuffle(seed=seed).select(range(n_samples))


def load_and_prepare_dataset(cfg):
    """Load dataset splits according to configuration."""
    # Extract dataset configuration
    if "dataset" in cfg:
        cfg_dataset = cfg["dataset"]
        dataset_name = cfg_dataset["name"]
        splits_cfg = cfg_dataset.get("splits", {})
        n_train = splits_cfg.get("train", "all")
        n_val = splits_cfg.get("validation", "all")
        n_test = splits_cfg.get("test", "all")
        seed = cfg_dataset.get("seed", 42)
    elif "datasets" in cfg and isinstance(cfg["datasets"], list):
        cfg_dataset = cfg["datasets"][0]
        dataset_name = cfg_dataset["path"]
        n_train = cfg.get("train_samples", "all")
        n_val = cfg.get("val_samples", "all")
        n_test = cfg.get("test_samples", "all")
        seed = cfg.get("seed", 42)
    else:
        raise KeyError("Dataset configuration not found.")

    # Load or download full dataset
    os.makedirs(DATASETS_DIR, exist_ok=True)
    local_path = os.path.join(DATASETS_DIR, dataset_name.replace("/", "_"))

    if os.path.exists(local_path):
        print(f"[INFO] Loading dataset from local cache: {local_path}")
        dataset = load_from_disk(local_path)
    else:
        print(f"[INFO] Downloading dataset from Hugging Face: {dataset_name}")
        dataset = load_dataset(dataset_name)
        dataset.save_to_disk(local_path)
        print(f"[INFO] Full dataset saved locally to: {local_path}")

    # Filter invalid samples
    def is_valid(sample):
        return (
            sample.get('title') is not None and str(sample.get('title', '')).strip() and
            sample.get('ingredients') is not None and str(sample.get('ingredients', '')).strip() and
            sample.get('directions') is not None and str(sample.get('directions', '')).strip() and
            sample.get('prompt') is not None and str(sample.get('prompt', '')).strip() and
            '[INST]' in str(sample.get('prompt', '')) and '[/INST]' in str(sample.get('prompt', ''))
        )

    print("\n[INFO] Filtering Invalid Samples:")
    for split_name in dataset.keys():
        original_size = len(dataset[split_name])
        dataset[split_name] = dataset[split_name].filter(is_valid)
        new_size = len(dataset[split_name])
        removed = original_size - new_size
        print(f"  {split_name}: kept {new_size:,} / {original_size:,} (removed {removed:,})")

    # Create validation split if it doesn't exist
    if "validation" not in dataset and "val" not in dataset:
        val_size = cfg_dataset.get("val_size", 0.05)
        print(f"\n[INFO] Creating Validation Split ({val_size*100:.1f}% of train)")
        train_val_split = dataset['train'].train_test_split(test_size=val_size, seed=seed)
        dataset['train'] = train_val_split['train']
        dataset['validation'] = train_val_split['test']
        print(f"[INFO] Created validation split: {len(dataset['validation']):,} samples")

    val_key = "validation" if "validation" in dataset else "val"
    train = select_subset(dataset["train"], n_train, seed=seed)
    val = select_subset(dataset[val_key], n_val, seed=seed)
    test = select_subset(dataset["test"], n_test, seed=seed)

    print(f"\n[INFO] Loaded {len(train)} train / {len(val)} val / {len(test)} test samples.")
    return train, val, test


In [None]:
def setup_finetuned_model_and_tokenizer(base_model: str, adapter_id: str, use_4bit: bool = True):
    """
    Load base model with 4-bit quantization and attach fine-tuned LoRA adapters from HuggingFace.
    
    Args:
        base_model (str): Base model path (e.g., "Qwen/Qwen2.5-1.5B-Instruct")
        adapter_id (str): HuggingFace adapter ID (e.g., "Daniel-Krasik/Qwen2.5-1.5B-QLoRA-Recipe")
        use_4bit (bool): Whether to use 4-bit quantization (default: True)
    
    Returns:
        tuple: (model, tokenizer)
    """
    print(f"\n[INFO] Loading base model: {base_model}")

    # Tokenizer setup
    tokenizer = AutoTokenizer.from_pretrained(base_model)
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "left"

    # Quantization setup
    quant_cfg = None
    if use_4bit:
        print("[INFO] Enabling 4-bit quantization...")
        quant_cfg = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_use_double_quant=True,
            bnb_4bit_compute_dtype=torch.bfloat16,
        )
    else:
        print("[INFO] Loading model in full precision.")

    # Load base model
    model = AutoModelForCausalLM.from_pretrained(
        base_model,
        quantization_config=quant_cfg,
        device_map="auto",
        torch_dtype=torch.bfloat16 if torch.cuda.is_available() else torch.float32,
    )

    # Load LoRA adapters from HuggingFace Hub
    print(f"[INFO] Loading fine-tuned LoRA adapters from HuggingFace: {adapter_id}")
    model = PeftModel.from_pretrained(model, adapter_id)
    model.eval()

    print("[INFO] Fine-tuned model loaded successfully!")
    return model, tokenizer


In [None]:
CONFIG_FILE_PATH = "/content/master_config.yaml"

def load_config(config_path: str = CONFIG_FILE_PATH):
    """Load and parse a YAML configuration file."""
    with open(config_path, "r", encoding="utf-8") as f:
        cfg = yaml.safe_load(f)
    return cfg


In [None]:
cfg = load_config(CONFIG_FILE_PATH)
print(cfg)


In [None]:
def generate_predictions(
    model,
    tokenizer,
    dataset,
    task_instruction,
    cfg=None,
    num_samples=None,
    batch_size=8,
    max_new_tokens=1000,
):
    """Generate model predictions for a dataset (recipe directions)."""
    if num_samples is not None and num_samples < len(dataset):
        dataset = dataset.select(range(num_samples))

    # Get field names from config
    if cfg is not None:
        field_map = cfg.get("dataset", {}).get("field_map", {})
        input_field = field_map.get("input", "NER")
        base_model = cfg.get("base_model", "")
    else:
        input_field = "NER"
        base_model = BASE_MODEL

    # Get model config to determine message format
    model_config = get_model_config_from_path(base_model)

    # Prepare prompts
    prompts = []
    for sample in dataset:
        messages = []

        if model_config['supports_system']:
            system_msg = {"role": "system", "content": model_config['system_message']}
            messages.append(system_msg)
            user_content = model_config['user_message_template'].format(ner=sample.get(input_field, ''))
            user_msg = {"role": "user", "content": user_content}
            messages.append(user_msg)
        else:
            user_lines = [model_config['system_message'], ""]
            ner = sample.get(input_field, '')
            user_content = model_config['user_message_template'].format(ner=ner)
            user_lines.append(user_content)
            user_msg = {"role": "user", "content": "\n\n".join(user_lines)}
            messages.append(user_msg)

        prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        prompts.append(prompt)

    # Initialize pipeline
    pipe = pipeline(
        "text-generation",
        model=model,
        tokenizer=tokenizer,
        torch_dtype="auto",
        do_sample=False,
    )

    # Generate predictions
    preds = []
    for i in tqdm(range(0, len(prompts), batch_size), desc="Generating recipes"):
        batch = prompts[i : i + batch_size]
        outputs = pipe(batch, max_new_tokens=max_new_tokens, return_full_text=False)
        preds.extend([o[0]["generated_text"].strip() for o in outputs])

    return preds


In [None]:
def compute_bert_score(predictions, samples, cfg=None, lang="en", model_type=None, batch_size=32):
    """
    Compute BERTScore between predictions and reference full recipe format.
    
    Args:
        predictions (list[str]): Model-generated outputs.
        samples (datasets.Dataset): Dataset containing recipe fields.
        cfg (dict, optional): Configuration dictionary.
        lang (str): Language for BERTScore (default: "en").
        model_type (str, optional): Specific BERT model to use.
        batch_size (int): Batch size for BERTScore computation.
    
    Returns:
        dict: BERTScore precision, recall, and F1 scores.
    """
    # Build full recipe format for references
    references = []
    for sample in samples:
        full_recipe = (
            f"Certainly! Here's a delicious recipe for:\n"
            f"[ {sample.get('title', 'Recipe')} ]\n\n"
            f"[ INGREDIENTS ]\n{sample.get('ingredients', '')}\n\n"
            f"[ DIRECTIONS ]\n{sample.get('directions', '')}"
        )
        references.append(full_recipe)

    # Compute BERTScore
    print(f"\n[INFO] Computing BERTScore with lang='{lang}', model_type='{model_type or 'default (roberta-large)'}', batch_size={batch_size}")
    P, R, F1 = bert_score(
        cands=predictions,
        refs=references,
        lang=lang,
        model_type=model_type,
        verbose=True,
        batch_size=batch_size,
    )

    return {
        "bert_precision": P.mean().item(),
        "bert_recall": R.mean().item(),
        "bert_f1": F1.mean().item(),
        "bert_precision_per_sample": P.tolist(),
        "bert_recall_per_sample": R.tolist(),
        "bert_f1_per_sample": F1.tolist(),
    }


In [None]:
"""
evaluate_finetuned_bert.ipynb
Evaluate the fine-tuned model (from HuggingFace) on the recipe generation dataset using BERT Score.
"""

cfg = load_config()

def evaluate_finetuned_bert():
    """Run evaluation on the recipe generation dataset using the fine-tuned model and BERT Score."""

    # Load validation data
    _, val_data, _ = load_and_prepare_dataset(cfg)
    print(f"[INFO] Loaded {len(val_data)} validation samples.")

    # Load fine-tuned model directly from HuggingFace (merged model)
    model, tokenizer = setup_finetuned_model_and_tokenizer(
        model_id=FINETUNED_MODEL_ID,
        use_4bit=True,
    )

    # Generate predictions
    print("\n[INFO] Generating recipes with fine-tuned model...")
    preds = generate_predictions(
        model=model,
        tokenizer=tokenizer,
        dataset=val_data,
        task_instruction=cfg["task_instruction"],
        cfg=cfg,
        batch_size=4,
    )

    # Compute BERT scores
    print("\n[INFO] Computing BERT scores...")
    bert_scores = compute_bert_score(preds, val_data, cfg=cfg)

    # -----------------------------------------------------------------------
    # Save outputs
    # -----------------------------------------------------------------------
    results = {
        "finetuned_model_id": FINETUNED_MODEL_ID,
        "base_model": BASE_MODEL,
        "num_samples": len(val_data),
        "bert_precision": bert_scores["bert_precision"],
        "bert_recall": bert_scores["bert_recall"],
        "bert_f1": bert_scores["bert_f1"],
    }

    results_path = "eval_results_finetuned_bert.json"
    preds_path = "predictions_finetuned_bert.jsonl"

    with open(results_path, "w", encoding="utf-8") as f:
        json.dump(results, f, indent=2)

    with open(preds_path, "w", encoding="utf-8") as f:
        for i, pred in enumerate(preds):
            # Build full reference recipe format
            full_reference = (
                f"Certainly! Here's a delicious recipe for:\n"
                f"[ {val_data[i].get('title', 'Recipe')} ]\n\n"
                f"[ INGREDIENTS ]\n{val_data[i].get('ingredients', '')}\n\n"
                f"[ DIRECTIONS ]\n{val_data[i].get('directions', '')}"
            )

            json.dump(
                {
                    "title": val_data[i].get("title", ""),
                    "NER": val_data[i].get("NER", ""),
                    "ingredients": val_data[i].get("ingredients", ""),
                    "directions": val_data[i].get("directions", ""),
                    "reference_full": full_reference,
                    "prediction": pred,
                    "bert_precision": bert_scores["bert_precision_per_sample"][i],
                    "bert_recall": bert_scores["bert_recall_per_sample"][i],
                    "bert_f1": bert_scores["bert_f1_per_sample"][i],
                },
                f,
            )
            f.write("\n")

    print(f"\n[INFO] Saved results to: {results_path}")
    print(f"[INFO] Saved predictions to: {preds_path}")

    return bert_scores, preds


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
    print("="*70)
    print("FINE-TUNED MODEL BERT SCORE EVALUATION")
    print(f"   Fine-tuned Model: {FINETUNED_MODEL_ID}")
    print(f"   Base Model: {BASE_MODEL}")
    print("="*70)
    
    bert_scores, predictions = evaluate_finetuned_bert()
    print("\n[INFO] Evaluation complete.")

    print("\n" + "="*60)
    print("FINE-TUNED MODEL BERT SCORE RESULTS")
    print("="*60)
    print(f"  BERT Precision: {bert_scores['bert_precision']:.4f}")
    print(f"  BERT Recall:    {bert_scores['bert_recall']:.4f}")
    print(f"  BERT F1:        {bert_scores['bert_f1']:.4f}")
    print("="*60)

    print("\n[INFO] Example prediction:\n")
    print(predictions[0])

    print("\n[INFO] Full BERT scores dict:")
    print({k: v for k, v in bert_scores.items() if not k.endswith('_per_sample')})
