In [None]:
# Install required packages (uncomment if needed)
# !pip install torch transformers peft bitsandbytes accelerate datasets trl matplotlib pandas scikit-learn

In [None]:
import json
import os
import warnings
from pathlib import Path

import matplotlib.pyplot as plt
import pandas as pd
import torch
from datasets import Dataset, DatasetDict
from peft import (
    LoraConfig,
    PeftModel,
    get_peft_model,
    prepare_model_for_kbit_training,
)
from sklearn.model_selection import train_test_split
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
    TrainingArguments,
)
from trl import SFTTrainer

warnings.filterwarnings("ignore")

# Check GPU availability
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

## 1. Configuration

In [None]:
# ============================================
# Configuration
# ============================================

# Model configuration
MODEL_NAME = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"  # Lightweight, fits in 12GB VRAM
# Alternative models:
# MODEL_NAME = "microsoft/phi-2"  # 2.7B params, good for code
# MODEL_NAME = "codellama/CodeLlama-7b-Instruct-hf"  # Needs more VRAM

# Data paths
DATA_PATH = "../export.json"  # Path to exported data from FixMyCodeDB
OUTPUT_DIR = "./lora_finetuned_model"

# Training configuration
TRAIN_TEST_SPLIT = 0.2  # 80% train, 20% test
MAX_SEQ_LENGTH = 2048
BATCH_SIZE = 2  # Adjust based on VRAM
GRADIENT_ACCUMULATION_STEPS = 4
NUM_EPOCHS = 3
LEARNING_RATE = 2e-4

# LoRA configuration
LORA_R = 16  # Rank of the low-rank matrices
LORA_ALPHA = 32  # Scaling factor
LORA_DROPOUT = 0.05

## 2. Load and Preprocess Data

In [None]:
def load_data(data_path: str) -> pd.DataFrame:
    """
    Load data from JSON or CSV file exported from FixMyCodeDB.

    Args:
        data_path: Path to the data file

    Returns:
        DataFrame with the loaded data
    """
    path = Path(data_path)

    if path.suffix == ".json":
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
        df = pd.DataFrame(data)
    elif path.suffix == ".csv":
        df = pd.read_csv(path)
    else:
        raise ValueError(f"Unsupported file format: {path.suffix}")

    print(f"Loaded {len(df)} entries from {data_path}")
    return df


# Try to load data, or create sample data for demonstration
try:
    df = load_data(DATA_PATH)
except FileNotFoundError:
    print(f"Data file not found at {DATA_PATH}. Creating sample data for demonstration.")
    # Sample data for demonstration
    sample_data = [
        {
            "code_original": "int* ptr = malloc(sizeof(int));\nfree(ptr);\nfree(ptr);  // Double free!",
            "code_fixed": "int* ptr = malloc(sizeof(int));\nfree(ptr);\nptr = NULL;",
            "labels": {"cppcheck": ["doubleFree"], "groups": {"memory_management": True}}
        },
        {
            "code_original": "int arr[10];\nint x = arr[15];  // Out of bounds!",
            "code_fixed": "int arr[10];\nint x = arr[5];  // Valid index",
            "labels": {"cppcheck": ["arrayIndexOutOfBounds"], "groups": {"invalid_access": True}}
        },
        {
            "code_original": "int x;\nprintf(\"%d\", x);  // Uninitialized!",
            "code_fixed": "int x = 0;\nprintf(\"%d\", x);",
            "labels": {"cppcheck": ["uninitvar"], "groups": {"uninitialized": True}}
        },
        {
            "code_original": "int* ptr = NULL;\n*ptr = 5;  // Null pointer dereference!",
            "code_fixed": "int val = 5;\nint* ptr = &val;\n*ptr = 5;",
            "labels": {"cppcheck": ["nullPointer"], "groups": {"invalid_access": True}}
        },
        {
            "code_original": "FILE* f = fopen(\"test.txt\", \"r\");\n// No fclose - resource leak!",
            "code_fixed": "FILE* f = fopen(\"test.txt\", \"r\");\nif (f) { /* use file */ fclose(f); }",
            "labels": {"cppcheck": ["resourceLeak"], "groups": {"resource_leak": True}}
        },
    ]
    df = pd.DataFrame(sample_data)

print(f"\nDataFrame shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")

In [None]:
def extract_labels(labels_data) -> str:
    """
    Extract labels from the nested labels structure.

    Args:
        labels_data: Labels dict or string

    Returns:
        Comma-separated string of labels
    """
    if isinstance(labels_data, str):
        return labels_data

    if isinstance(labels_data, dict):
        cppcheck = labels_data.get("cppcheck", [])
        if isinstance(cppcheck, list):
            return ", ".join(cppcheck)
        return str(cppcheck)

    return "unknown"


def extract_categories(labels_data) -> str:
    """
    Extract active category groups from labels.

    Args:
        labels_data: Labels dict

    Returns:
        Comma-separated string of active categories
    """
    if not isinstance(labels_data, dict):
        return "unknown"

    groups = labels_data.get("groups", {})
    if not isinstance(groups, dict):
        return "unknown"

    active = [k.replace("_", " ") for k, v in groups.items() if v]
    return ", ".join(active) if active else "none"


# Process labels
df["bug_labels"] = df["labels"].apply(extract_labels)
df["bug_categories"] = df["labels"].apply(extract_categories)

print("Sample processed data:")
print(df[["bug_labels", "bug_categories"]].head())

## 3. Create Prompt Templates

In [None]:
# Prompt template for bug detection instruction tuning
SYSTEM_PROMPT = """You are an expert C++ code analyzer. Your task is to identify bugs and issues in C++ code.
When analyzing code, identify:
1. The specific bug type (e.g., memory leak, null pointer, buffer overflow)
2. The category of the bug (e.g., memory management, invalid access, uninitialized variable)
3. A brief explanation of why this is a bug
4. A suggested fix
"""

USER_TEMPLATE = """Analyze the following C++ code for bugs:

```cpp
{code}
```

Identify any bugs present in this code."""

ASSISTANT_TEMPLATE = """## Bug Analysis

**Bug Type(s):** {bug_labels}

**Category:** {bug_categories}

**Explanation:** The code contains the following issue(s): {bug_labels}. This falls under the {bug_categories} category.

**Suggested Fix:**
```cpp
{fixed_code}
```
"""


def create_conversation(row) -> str:
    """
    Create a conversation-style prompt for fine-tuning.
    Uses TinyLlama chat format.

    Args:
        row: DataFrame row with code and labels

    Returns:
        Formatted conversation string
    """
    code = row.get("code_original", "")
    fixed = row.get("code_fixed", "")
    labels = row.get("bug_labels", "unknown")
    categories = row.get("bug_categories", "unknown")

    # TinyLlama chat format
    conversation = f"""<|system|>
{SYSTEM_PROMPT}</s>
<|user|>
{USER_TEMPLATE.format(code=code)}</s>
<|assistant|>
{ASSISTANT_TEMPLATE.format(
    bug_labels=labels,
    bug_categories=categories,
    fixed_code=fixed if fixed else 'See explanation for fix suggestions.'
)}</s>"""

    return conversation


# Create training prompts
df["text"] = df.apply(create_conversation, axis=1)

print("Sample conversation prompt:")
print("=" * 60)
print(df["text"].iloc[0])
print("=" * 60)

## 4. Train/Test Split

In [None]:
# Split data: 80% training, 20% testing
train_df, test_df = train_test_split(
    df,
    test_size=TRAIN_TEST_SPLIT,
    random_state=42
)

print(f"Training samples: {len(train_df)}")
print(f"Testing samples: {len(test_df)}")

# Convert to Hugging Face datasets
train_dataset = Dataset.from_pandas(train_df[["text"]])
test_dataset = Dataset.from_pandas(test_df[["text"]])

dataset = DatasetDict({
    "train": train_dataset,
    "test": test_dataset
})

print(f"\nDataset: {dataset}")

## 5. Load Model with 4-bit Quantization

In [None]:
# 4-bit quantization configuration for memory efficiency
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",  # Normalized Float 4
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_use_double_quant=True,  # Nested quantization
)

print(f"Loading model: {MODEL_NAME}")
print("This may take a few minutes...")

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(
    MODEL_NAME,
    trust_remote_code=True
)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right"

# Load model with quantization
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    quantization_config=bnb_config,
    device_map="auto",
    trust_remote_code=True,
)

# Prepare model for k-bit training
model = prepare_model_for_kbit_training(model)

print(f"\nModel loaded successfully!")
print(f"Model memory footprint: {model.get_memory_footprint() / 1e9:.2f} GB")

## 6. Configure LoRA

In [None]:
# LoRA configuration
lora_config = LoraConfig(
    r=LORA_R,
    lora_alpha=LORA_ALPHA,
    lora_dropout=LORA_DROPOUT,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

# Apply LoRA to model
model = get_peft_model(model, lora_config)

# Print trainable parameters
model.print_trainable_parameters()

## 7. Training Configuration

In [None]:
# Training arguments
training_args = TrainingArguments(
    output_dir=OUTPUT_DIR,
    num_train_epochs=NUM_EPOCHS,
    per_device_train_batch_size=BATCH_SIZE,
    per_device_eval_batch_size=BATCH_SIZE,
    gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS,
    learning_rate=LEARNING_RATE,
    weight_decay=0.01,
    warmup_ratio=0.1,
    lr_scheduler_type="cosine",
    logging_steps=10,
    eval_strategy="steps",
    eval_steps=50,
    save_strategy="steps",
    save_steps=100,
    save_total_limit=3,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False,
    fp16=True,  # Use mixed precision
    optim="paged_adamw_8bit",  # Memory-efficient optimizer
    report_to="none",  # Disable wandb/tensorboard
    logging_dir=f"{OUTPUT_DIR}/logs",
)

print("Training configuration:")
print(f"  - Epochs: {NUM_EPOCHS}")
print(f"  - Batch size: {BATCH_SIZE}")
print(f"  - Gradient accumulation: {GRADIENT_ACCUMULATION_STEPS}")
print(f"  - Effective batch size: {BATCH_SIZE * GRADIENT_ACCUMULATION_STEPS}")
print(f"  - Learning rate: {LEARNING_RATE}")

## 8. Train the Model

In [None]:
# Create trainer with SFTTrainer (Supervised Fine-Tuning)
trainer = SFTTrainer(
    model=model,
    args=training_args,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    tokenizer=tokenizer,
    dataset_text_field="text",
    max_seq_length=MAX_SEQ_LENGTH,
    packing=False,
)

print("Starting training...")
print("=" * 60)

In [None]:
# Train the model
train_result = trainer.train()

print("\nTraining completed!")
print(f"Training time: {train_result.metrics['train_runtime']:.2f} seconds")
print(f"Final training loss: {train_result.metrics['train_loss']:.4f}")

## 9. Visualize Training Progress

In [None]:
# Extract training history
history = trainer.state.log_history

# Separate training and eval metrics
train_losses = [(h["step"], h["loss"]) for h in history if "loss" in h and "eval_loss" not in h]
eval_losses = [(h["step"], h["eval_loss"]) for h in history if "eval_loss" in h]

# Create visualization
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Plot 1: Training Loss
if train_losses:
    steps, losses = zip(*train_losses)
    axes[0].plot(steps, losses, "b-", label="Training Loss", linewidth=2)
    axes[0].set_xlabel("Training Steps")
    axes[0].set_ylabel("Loss")
    axes[0].set_title("Training Loss Over Time")
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)

# Plot 2: Evaluation Loss (proxy for accuracy improvement)
if eval_losses:
    steps, losses = zip(*eval_losses)
    axes[1].plot(steps, losses, "r-", label="Evaluation Loss", linewidth=2, marker="o")
    axes[1].set_xlabel("Training Steps")
    axes[1].set_ylabel("Loss")
    axes[1].set_title("Evaluation Loss Over Time\n(Lower = Better Performance)")
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig(f"{OUTPUT_DIR}/training_progress.png", dpi=150)
plt.show()

print(f"\nTraining visualization saved to {OUTPUT_DIR}/training_progress.png")

In [None]:
# Additional visualization: Loss reduction percentage
if train_losses and len(train_losses) > 1:
    initial_loss = train_losses[0][1]
    final_loss = train_losses[-1][1]
    reduction = ((initial_loss - final_loss) / initial_loss) * 100

    print(f"\n{'=' * 50}")
    print("Training Summary")
    print(f"{'=' * 50}")
    print(f"Initial training loss: {initial_loss:.4f}")
    print(f"Final training loss:   {final_loss:.4f}")
    print(f"Loss reduction:        {reduction:.1f}%")

    if eval_losses:
        print(f"\nBest evaluation loss:  {min(l for _, l in eval_losses):.4f}")

## 10. Save the Fine-tuned Model

In [None]:
# Save the LoRA adapter
adapter_path = f"{OUTPUT_DIR}/lora_adapter"
model.save_pretrained(adapter_path)
tokenizer.save_pretrained(adapter_path)

print(f"LoRA adapter saved to: {adapter_path}")
print(f"\nFiles saved:")
for f in os.listdir(adapter_path):
    size = os.path.getsize(os.path.join(adapter_path, f)) / 1e6
    print(f"  - {f}: {size:.2f} MB")

## 11. Inference Example

In [None]:
def analyze_code(code: str, model, tokenizer) -> str:
    """
    Analyze C++ code for bugs using the fine-tuned model.

    Args:
        code: C++ code to analyze
        model: Fine-tuned model
        tokenizer: Model tokenizer

    Returns:
        Model's analysis of the code
    """
    prompt = f"""<|system|>
{SYSTEM_PROMPT}</s>
<|user|>
{USER_TEMPLATE.format(code=code)}</s>
<|assistant|>
"""

    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=512,
            temperature=0.7,
            top_p=0.9,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id,
        )

    response = tokenizer.decode(outputs[0], skip_special_tokens=False)

    # Extract assistant response
    if "<|assistant|>" in response:
        response = response.split("<|assistant|>")[-1]
    if "</s>" in response:
        response = response.split("</s>")[0]

    return response.strip()

In [None]:
# Test inference with sample code
test_code = """
void processData() {
    int* data = new int[100];
    // Process data...
    if (error_condition) {
        return;  // Memory leak - data not deleted!
    }
    delete[] data;
}
"""

print("Test Code:")
print("=" * 60)
print(test_code)
print("=" * 60)
print("\nModel Analysis:")
print("-" * 60)

analysis = analyze_code(test_code, model, tokenizer)
print(analysis)

In [None]:
# Test with another example
test_code_2 = """
char* getName() {
    char name[50];
    strcpy(name, "John Doe");
    return name;  // Returning address of local variable!
}
"""

print("Test Code 2:")
print("=" * 60)
print(test_code_2)
print("=" * 60)
print("\nModel Analysis:")
print("-" * 60)

analysis_2 = analyze_code(test_code_2, model, tokenizer)
print(analysis_2)

## 12. Load Saved Model (for later use)

In [None]:
def load_finetuned_model(adapter_path: str, base_model_name: str = MODEL_NAME):
    """
    Load the fine-tuned model from saved adapter.

    Args:
        adapter_path: Path to saved LoRA adapter
        base_model_name: Name of the base model

    Returns:
        Tuple of (model, tokenizer)
    """
    # Load tokenizer
    tokenizer = AutoTokenizer.from_pretrained(adapter_path)

    # Quantization config
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.float16,
        bnb_4bit_use_double_quant=True,
    )

    # Load base model
    base_model = AutoModelForCausalLM.from_pretrained(
        base_model_name,
        quantization_config=bnb_config,
        device_map="auto",
        trust_remote_code=True,
    )

    # Load LoRA adapter
    model = PeftModel.from_pretrained(base_model, adapter_path)

    return model, tokenizer


# Example: Uncomment to load saved model
# loaded_model, loaded_tokenizer = load_finetuned_model(adapter_path)
# print("Model loaded successfully!")

## Summary

This notebook demonstrated:

1. **Data Loading**: Loading exported data from FixMyCodeDB (JSON/CSV)
2. **Data Preprocessing**: Extracting labels and creating conversation-style prompts
3. **Train/Test Split**: 80/20 split for training and evaluation
4. **Model Loading**: Using 4-bit quantization to fit in 12GB VRAM
5. **LoRA Configuration**: Low-rank adaptation for efficient fine-tuning
6. **Training**: Using SFTTrainer with memory-efficient settings
7. **Visualization**: Plotting training and evaluation loss curves
8. **Inference**: Testing the fine-tuned model on new code samples

### Next Steps

- Collect more training data using the FixMyCodeDB scraper
- Experiment with different base models (Phi-2, CodeLlama)
- Adjust LoRA hyperparameters (r, alpha) for better performance
- Implement evaluation metrics (precision, recall, F1) for bug detection
- Deploy the model for production use