In [1]:
pip install bitsandbytes

In [2]:
import os
import json
import torch
from datasets import Dataset
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    TrainingArguments,
    Trainer,
    DataCollatorForLanguageModeling
)
from peft import LoraConfig, get_peft_model, TaskType
import pandas as pd
from typing import Dict, List
import numpy as np
from huggingface_hub import login

In [3]:
# Authenticate with Hugging Face
try:
    login(token=os.getenv("HF_TOKEN"))
    print("Successfully authenticated with Hugging Face")
except Exception as e:
    print(f"Authentication failed: {e}")
    print("Please run: huggingface-cli login")

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [5]:
MODEL_NAME = "mistralai/Mistral-7B-Instruct-v0.2"
OUTPUT_DIR = "./results"

os.makedirs(OUTPUT_DIR, exist_ok=True)

In [6]:
def load_insurance_faq_dataset(file_path="data/faq_sample.json"):
    """Load the insurance FAQ dataset from external JSON file."""

    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            faq_data = json.load(f)
        print(f"Successfully loaded {len(faq_data)} FAQ entries from {file_path}")
        return faq_data

    except FileNotFoundError:
        print(f"Error: Dataset file '{file_path}' not found.")
        print("Please make sure you have created the 'data/faq_sample.json' file with the FAQ dataset.")
        return []

    except json.JSONDecodeError as e:
        print(f"Error decoding JSON file: {e}")
        return []

In [7]:
def create_training_dataset_from_faq(faq_data):
    """Convert FAQ data to supervised fine-tuning format."""

    if not faq_data:
        print("No FAQ data provided")
        return []

    training_data = []

    # Create instruction-following format for each Q&A pair
    for item in faq_data:
        # Format: System prompt + User question + Assistant answer
        formatted_text = f"""<s>[INST] You are a helpful insurance expert assistant. Answer the following question about insurance clearly and accurately. {item['question']} [/INST] {item['answer']}</s>"""

        training_data.append({
            "text": formatted_text,
            "question": item["question"],
            "answer": item["answer"]
        })

    print(f"Created {len(training_data)} training examples")

    return training_data

In [8]:
def setup_model_and_tokenizer():
    """Initialize the base model and tokenizer."""

    print("Loading tokenizer and model...")

    # Load tokenizer
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

    # Add padding token if it doesn't exist
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    # Load model
    model = AutoModelForCausalLM.from_pretrained(
        MODEL_NAME,
        torch_dtype=torch.float16,
        device_map="auto",
        trust_remote_code=True
    )

    print(f"Model loaded successfully")
    print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")

    return model, tokenizer

In [9]:
def setup_lora_config():
    """Configure LoRA (Low-Rank Adaptation) for efficient fine-tuning."""

    lora_config = LoraConfig(
        task_type=TaskType.CAUSAL_LM,
        inference_mode=False,
        r=16,
        lora_alpha=32,
        lora_dropout=0.1,
        target_modules=[
            "q_proj", "k_proj", "v_proj", "o_proj",
            "gate_proj", "up_proj", "down_proj"
        ],
        bias="none",
    )

    print("LoRA configuration created")
    return lora_config

In [10]:
def apply_lora_to_model(model, lora_config):
    """Apply LoRA configuration to the model."""

    model = get_peft_model(model, lora_config)

    # Count trainable parameters
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    total_params = sum(p.numel() for p in model.parameters())

    print(f"LoRA applied successfully")
    print(f"Trainable parameters: {trainable_params:,} ({100 * trainable_params / total_params:.2f}%)")

    return model

In [11]:
def tokenize_function(examples, tokenizer, max_length=512):
    """Tokenize the training examples."""

    # Tokenize the text
    tokenized = tokenizer(
        examples["text"],
        truncation=True,
        padding=False,
        max_length=max_length,
        return_tensors=None,
    )

    # For causal language modeling, labels are the same as input_ids
    tokenized["labels"] = tokenized["input_ids"].copy()

    return tokenized

In [12]:
def prepare_dataset(training_data, tokenizer):
    """Prepare the dataset for training."""

    # Convert to Hugging Face Dataset
    dataset = Dataset.from_list(training_data)

    # Tokenize the dataset
    tokenized_dataset = dataset.map(
        lambda examples: tokenize_function(examples, tokenizer),
        batched=True,
        remove_columns=dataset.column_names,
        desc="Tokenizing dataset"
    )

    print(f"Dataset prepared: {len(tokenized_dataset)} examples")
    return tokenized_dataset

In [13]:
def setup_training_arguments():
    """Configure training arguments."""

    training_args = TrainingArguments(
        output_dir=OUTPUT_DIR,
        num_train_epochs=5,
        per_device_train_batch_size=1,
        gradient_accumulation_steps=4,
        warmup_steps=10,
        logging_steps=5,
        save_steps=50,
        save_strategy="epoch",
        load_best_model_at_end=False,
        push_to_hub=False,
        report_to=None,  # Disable wandb/tensorboard
        learning_rate=2e-4,
        fp16=True,  # Use mixed precision
        optim="adamw_torch",
        lr_scheduler_type="cosine",
        remove_unused_columns=False,
        dataloader_pin_memory=False,
    )

    print("Training arguments configured")
    return training_args

def setup_trainer(model, tokenizer, tokenized_dataset, training_args):
    """Set up the Hugging Face Trainer."""

    # Data collator for language modeling
    data_collator = DataCollatorForLanguageModeling(
        tokenizer=tokenizer,
        mlm=False,  # We're doing causal LM, not masked LM
        pad_to_multiple_of=8,
    )

    # Initialize trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_dataset,
        data_collator=data_collator,
        tokenizer=tokenizer,
    )

    print("Trainer initialized")
    return trainer

In [14]:
def generate_response(model, tokenizer, question, max_length=200):
    """Generate response for a given question."""

    # Format the input
    input_text = f"<s>[INST] You are a helpful insurance expert assistant. Answer the following question about insurance clearly and accurately.\n\n{question} [/INST]"

    # Tokenize input
    inputs = tokenizer(input_text, return_tensors="pt").to(model.device)

    # Generate response
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=max_length,
            do_sample=True,
            temperature=0.7,
            top_p=0.9,
            pad_token_id=tokenizer.eos_token_id,
        )

    # Decode and clean response
    full_response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    response = full_response.replace(input_text, "").strip()

    return response

def evaluate_model(model, tokenizer, test_questions):
    """Evaluate model with sample questions."""

    print("\n" + "="*80)
    print("MODEL EVALUATION")
    print("="*80)

    for i, question in enumerate(test_questions, 1):
        print(f"\nQuestion {i}: {question}")
        print("-" * 60)

        response = generate_response(model, tokenizer, question)
        print(f"Response: {response}")

        if i < len(test_questions):
            print()

def compare_before_after(original_model, finetuned_model, tokenizer, test_questions):
    """Compare responses before and after fine-tuning."""

    print("\n" + "="*80)
    print("BEFORE vs AFTER COMPARISON")
    print("="*80)

    for i, question in enumerate(test_questions, 1):
        print(f"\nQuestion {i}: {question}")
        print("-" * 60)

        # Generate with original model
        original_response = generate_response(original_model, tokenizer, question)
        print(f"Before Fine-tuning: {original_response}")

        # Generate with fine-tuned model
        finetuned_response = generate_response(finetuned_model, tokenizer, question)
        print(f"After Fine-tuning: {finetuned_response}")

        if i < len(test_questions):
            print()

In [15]:
def main_training_pipeline():
    """Execute the complete fine-tuning pipeline with before/after comparison logging."""

    print("Starting LLM Fine-tuning Pipeline")
    print("=" * 80)

    # Step 1: Load dataset
    print("\nStep 1: Loading Dataset")
    faq_data = load_insurance_faq_dataset("data/faq_sample.json")

    if not faq_data:
        print("Cannot proceed without dataset. Please create the data/faq_sample.json file.")
        return None, None

    training_data = create_training_dataset_from_faq(faq_data)

    # Step 2: Load model and tokenizer
    print("\nStep 2: Loading Model and Tokenizer")
    model, tokenizer = setup_model_and_tokenizer()

    if model is None:
        print("Model failed to load.")
        return None, None

    # Step 3: Save responses before fine-tuning
    print("\nStep 3: Generating Responses Before Fine-tuning")
    test_questions = [
        "What does comprehensive coverage include?",
        "How do I start a property damage claim?",
        "What affects life insurance costs?",
    ]

    before_responses = []
    for question in test_questions:
        response = generate_response(model, tokenizer, question)
        before_responses.append(response)

    # Step 4: Apply LoRA
    print("\nStep 4: Applying LoRA Configuration")
    lora_config = setup_lora_config()
    model = apply_lora_to_model(model, lora_config)

    # Step 5: Prepare dataset
    print("\nStep 5: Preparing Dataset")
    tokenized_dataset = prepare_dataset(training_data, tokenizer)

    # Step 6: Setup training
    print("\nStep 6: Setting up Training")
    training_args = setup_training_arguments()
    trainer = setup_trainer(model, tokenizer, tokenized_dataset, training_args)

    # Step 7: Train the model
    print("\nStep 7: Starting Training")
    trainer.train()
    print("✅ Training completed!")

    # Step 8: Save model
    print("\nStep 8: Saving Fine-tuned Model")
    trainer.save_model()
    tokenizer.save_pretrained(OUTPUT_DIR)
    print(f"✅ Model saved to {OUTPUT_DIR}")

    # Step 9: Evaluate after fine-tuning and write to file
    print("\nStep 9: Comparing Responses Before and After Fine-tuning")

    comparison_path = os.path.join(OUTPUT_DIR, "comparison_results.txt")
    with open(comparison_path, "w", encoding="utf-8") as f:
        f.write("LLM Fine-tuning Evaluation Report\n")
        f.write("=" * 80 + "\n\n")

        for i, question in enumerate(test_questions):
            after_response = generate_response(model, tokenizer, question)

            f.write(f"Question {i + 1}:\n")
            f.write(f"{question}\n")
            f.write("-" * 80 + "\n")
            f.write(f"Before Fine-tuning:\n{before_responses[i]}\n\n")
            f.write(f"After Fine-tuning:\n{after_response}\n")
            f.write("=" * 80 + "\n\n")

            # Also print to console
            print(f"\nQuestion {i + 1}: {question}")
            print(f"Before Fine-tuning: {before_responses[i]}")
            print(f"After Fine-tuning: {after_response}")

    print(f"\n✅ Comparison results saved to: {comparison_path}")

    return model, tokenizer

In [18]:
def run_evaluation():
    """Run model training and compare before/after."""
    model, tokenizer = main_training_pipeline()
    if model is None:
        print("❌ Pipeline failed.")

In [19]:
run_evaluation()