# Experiment 2: Fine-Tuning with QLoRA

**Objective:** Fine-tune LLaMA-3.1-8B-Instruct on Solana smart contract vulnerability detection.

**Method:** QLoRA (Quantized Low-Rank Adaptation) with SFTTrainer for efficient fine-tuning.

**Key Innovation:** Using DataCollatorForCompletionOnlyLM to train only on classification output.

**References:**
- Dettmers et al. (2023). QLoRA: Efficient Finetuning of Quantized LLMs. arXiv:2305.14314
- Boi & Esposito (2025). Prompt Engineering vs. Fine-Tuning for LLM-Based Vulnerability Detection.

---

## 1. Environment Setup

In [None]:
import torch
import os
import warnings

# Suppress all warnings for clean output
warnings.filterwarnings('ignore')
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['TRANSFORMERS_VERBOSITY'] = 'error'

print("=" * 50)
print("ENVIRONMENT CHECK")
print("=" * 50)

if torch.cuda.is_available():
    GPU_NAME = torch.cuda.get_device_name(0)
    GPU_MEMORY = torch.cuda.get_device_properties(0).total_memory / 1e9
    print(f"GPU: {GPU_NAME}")
    print(f"Memory: {GPU_MEMORY:.1f} GB")
    print("Status: Ready")
else:
    print("ERROR: GPU not detected!")
    print("Go to: Settings -> Accelerator -> GPU T4 x2")

In [None]:
%%capture
# Install packages silently
!pip install -q bitsandbytes accelerate
!pip install -q peft==0.9.0
!pip install -q trl==0.12.0

In [None]:
print("Packages installed successfully.")

## 2. Imports & Authentication

In [None]:
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict, Counter
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from tqdm import tqdm
from datasets import Dataset
from huggingface_hub import login

# Suppress all logging
logging.getLogger("transformers").setLevel(logging.ERROR)
logging.getLogger("accelerate").setLevel(logging.ERROR)
logging.getLogger("peft").setLevel(logging.ERROR)
logging.getLogger("trl").setLevel(logging.ERROR)

# Authentication
from kaggle_secrets import UserSecretsClient
secrets = UserSecretsClient()
HF_TOKEN = secrets.get_secret("HF_TOKEN")
login(token=HF_TOKEN, add_to_git_credential=False)

print("Authentication successful.")

## 3. Load Dataset

In [None]:
# Find dataset path
POSSIBLE_PATHS = [
    "/kaggle/input/solana-dataset/solana_140s_final.json",
    "/kaggle/input/solana_140s_final.json",
    "/kaggle/working/solana_140s_final.json"
]

DATASET_PATH = None
for path in POSSIBLE_PATHS:
    if os.path.exists(path):
        DATASET_PATH = path
        break

if DATASET_PATH is None:
    raise FileNotFoundError("Dataset not found. Please upload solana_140s_final.json")

with open(DATASET_PATH, 'r') as f:
    dataset = json.load(f)

print(f"Dataset: {len(dataset)} samples")
print(f"Path: {DATASET_PATH}")
print("\nVulnerability Types:")
for vtype, count in sorted(Counter(s['vulnerability_type'] for s in dataset).items()):
    print(f"  {vtype}: {count}")
print(f"\nLabels: VULNERABLE={sum(1 for s in dataset if s['label']=='VULNERABLE')}, SAFE={sum(1 for s in dataset if s['label']=='SAFE')}")

## 4. Data Split

In [None]:
# Stratified split by vulnerability type (80% train, 10% val, 10% test)
by_vuln_type = defaultdict(list)
for sample in dataset:
    by_vuln_type[sample['vulnerability_type']].append(sample)

train_data, val_data, test_data = [], [], []

for vtype, samples in by_vuln_type.items():
    labels = [s['label'] for s in samples]
    train_samples, temp_samples = train_test_split(samples, test_size=0.2, stratify=labels, random_state=42)
    temp_labels = [s['label'] for s in temp_samples]
    val_samples, test_samples = train_test_split(temp_samples, test_size=0.5, stratify=temp_labels, random_state=42)
    train_data.extend(train_samples)
    val_data.extend(val_samples)
    test_data.extend(test_samples)

print("Data Split:")
print(f"  Train: {len(train_data)} (80%)")
print(f"  Val:   {len(val_data)} (10%)")
print(f"  Test:  {len(test_data)} (10%)")

In [None]:
# Create HuggingFace datasets
train_dataset = Dataset.from_list([{"text": s["text"]} for s in train_data])
val_dataset = Dataset.from_list([{"text": s["text"]} for s in val_data])

print(f"Train dataset: {len(train_dataset)} samples")
print(f"Val dataset: {len(val_dataset)} samples")

## 5. Load Model

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig

MODEL_ID = "meta-llama/Llama-3.1-8B-Instruct"

# 4-bit quantization (QLoRA)
quant_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_use_double_quant=True
)

print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, token=HF_TOKEN)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right"

print("Loading model...")
model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    quantization_config=quant_config,
    device_map="auto",
    token=HF_TOKEN,
    low_cpu_mem_usage=True,
    use_cache=False  # Disable cache for training
)

print(f"\nModel: {MODEL_ID}")
print(f"Parameters: {model.num_parameters():,}")
print(f"Quantization: 4-bit NF4")

## 6. LoRA Configuration

LoRA (Low-Rank Adaptation) adds small trainable matrices to the model while keeping the original weights frozen.

In [None]:
from peft import LoraConfig, prepare_model_for_kbit_training

# Prepare model for training
model = prepare_model_for_kbit_training(model)

# LoRA configuration
LORA_CONFIG = LoraConfig(
    r=64,                    # Rank
    lora_alpha=16,           # Scaling factor  
    lora_dropout=0.1,        # Dropout
    target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"],
    bias="none",
    task_type="CAUSAL_LM"
)

print("=" * 50)
print("LoRA Configuration")
print("=" * 50)
print(f"  Rank (r): {LORA_CONFIG.r}")
print(f"  Alpha: {LORA_CONFIG.lora_alpha}")
print(f"  Dropout: {LORA_CONFIG.lora_dropout}")
print(f"  Target: Attention + MLP layers")

## 7. Training Configuration

### Why SFTTrainer with DataCollatorForCompletionOnlyLM?

Standard training computes loss on the entire text (prompt + response), which can cause:
- Pattern memorization instead of learning
- Mode collapse (always predicting one class)

With `DataCollatorForCompletionOnlyLM`, the loss is computed **only on the response** (VULNERABLE/SAFE), making the model learn to classify rather than memorize.

In [None]:
from trl import SFTTrainer, SFTConfig, DataCollatorForCompletionOnlyLM

# Response template - loss computed only after this marker
RESPONSE_TEMPLATE = "<|start_header_id|>assistant<|end_header_id|>"

# Data collator for completion-only training
data_collator = DataCollatorForCompletionOnlyLM(
    response_template=RESPONSE_TEMPLATE,
    tokenizer=tokenizer
)

print("Data Collator: DataCollatorForCompletionOnlyLM")
print(f"Response Template: {RESPONSE_TEMPLATE}")
print("\nTraining will focus ONLY on classification output.")

In [None]:
# Training arguments
TRAINING_CONFIG = SFTConfig(
    output_dir="/kaggle/working/checkpoints",
    
    # Training parameters
    num_train_epochs=3,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    gradient_accumulation_steps=4,
    
    # Optimizer
    optim="paged_adamw_32bit",
    learning_rate=2e-4,
    weight_decay=0.01,
    warmup_ratio=0.03,
    lr_scheduler_type="cosine",
    
    # Precision
    fp16=True,
    
    # Logging
    logging_steps=10,
    eval_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=2,
    load_best_model_at_end=True,
    
    # SFT specific
    max_seq_length=1024,
    packing=False,
    
    # Other
    report_to="none",
    gradient_checkpointing=True,
    gradient_checkpointing_kwargs={"use_reentrant": False}
)

print("=" * 50)
print("Training Configuration")
print("=" * 50)
print(f"  Epochs: {TRAINING_CONFIG.num_train_epochs}")
print(f"  Batch size: {TRAINING_CONFIG.per_device_train_batch_size}")
print(f"  Gradient accumulation: {TRAINING_CONFIG.gradient_accumulation_steps}")
print(f"  Effective batch size: {TRAINING_CONFIG.per_device_train_batch_size * TRAINING_CONFIG.gradient_accumulation_steps}")
print(f"  Learning rate: {TRAINING_CONFIG.learning_rate}")
print(f"  Max sequence length: {TRAINING_CONFIG.max_seq_length}")

In [None]:
# Initialize trainer
trainer = SFTTrainer(
    model=model,
    args=TRAINING_CONFIG,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    peft_config=LORA_CONFIG,
    data_collator=data_collator,
    tokenizer=tokenizer,
)

print("SFTTrainer initialized.")

## 8. Training

In [None]:
print("=" * 50)
print("STARTING TRAINING")
print("Estimated time: 20-30 minutes")
print("=" * 50)

trainer.train()

print("\n" + "=" * 50)
print("TRAINING COMPLETE")
print("=" * 50)

In [None]:
# Save model
MODEL_OUTPUT = "/kaggle/working/solana-vuln-model"
trainer.save_model(MODEL_OUTPUT)
tokenizer.save_pretrained(MODEL_OUTPUT)
print(f"Model saved to: {MODEL_OUTPUT}")

## 9. Evaluation

In [None]:
def extract_code(sample):
    """Extract code content from formatted sample."""
    text = sample['text']
    start_marker = '<|start_header_id|>user<|end_header_id|>'
    end_marker = '<|eot_id|><|start_header_id|>assistant'
    
    start_idx = text.find(start_marker)
    end_idx = text.find(end_marker)
    
    if start_idx != -1 and end_idx != -1:
        return text[start_idx + len(start_marker):end_idx].strip()
    return text[:1000]

def predict_fine_tuned(sample):
    """Prediction using fine-tuned model."""
    code = extract_code(sample)
    
    prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|>

You are a smart contract security analyzer.
You analyze Solana smart contracts written in Rust and identify vulnerabilities.
Classify the code as either VULNERABLE or SAFE.
Respond with only one word: VULNERABLE or SAFE.<|eot_id|><|start_header_id|>user<|end_header_id|>

{code}<|eot_id|><|start_header_id|>assistant<|end_header_id|>

"""
    
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024).to(model.device)
    
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=10,
            do_sample=False,
            pad_token_id=tokenizer.eos_token_id
        )
    
    response = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True).strip().upper()
    first_word = response.split()[0] if response.split() else ""
    
    return 'VULNERABLE' if 'VULN' in first_word else 'SAFE'

print("Prediction function ready.")

In [None]:
print("=" * 50)
print("RUNNING EVALUATION")
print("=" * 50)

results = []
for sample in tqdm(test_data, desc="Evaluating"):
    pred = predict_fine_tuned(sample)
    results.append({
        'vulnerability_type': sample['vulnerability_type'],
        'ground_truth': sample['label'],
        'prediction': pred,
        'correct': sample['label'] == pred
    })

print("\nEvaluation complete.")

## 10. Results

In [None]:
# Calculate metrics per vulnerability type
metrics_by_type = {}
vuln_types = sorted(set(r['vulnerability_type'] for r in results))

for vtype in vuln_types:
    type_results = [r for r in results if r['vulnerability_type'] == vtype]
    gt = [r['ground_truth'] for r in type_results]
    pred = [r['prediction'] for r in type_results]
    
    metrics_by_type[vtype] = {
        'Accuracy': round(accuracy_score(gt, pred), 2),
        'Precision': round(precision_score(gt, pred, pos_label='VULNERABLE', zero_division=0), 2),
        'Recall': round(recall_score(gt, pred, pos_label='VULNERABLE', zero_division=0), 2),
        'F1-score': round(f1_score(gt, pred, pos_label='VULNERABLE', zero_division=0), 2)
    }

# Calculate averages
avg_metrics = {
    'Accuracy': round(sum(m['Accuracy'] for m in metrics_by_type.values()) / len(metrics_by_type), 2),
    'Precision': round(sum(m['Precision'] for m in metrics_by_type.values()) / len(metrics_by_type), 2),
    'Recall': round(sum(m['Recall'] for m in metrics_by_type.values()) / len(metrics_by_type), 2),
    'F1-score': round(sum(m['F1-score'] for m in metrics_by_type.values()) / len(metrics_by_type), 2)
}

# Display
print("=" * 70)
print("RESULTS: Experiment 2 - Fine-Tuning with QLoRA")
print("=" * 70)
print(f"{'Vulnerability':<20} {'Accuracy':<12} {'Precision':<12} {'Recall':<12} {'F1-score':<12}")
print("-" * 70)
for vtype in vuln_types:
    m = metrics_by_type[vtype]
    print(f"{vtype:<20} {m['Accuracy']:<12} {m['Precision']:<12} {m['Recall']:<12} {m['F1-score']:<12}")
print("-" * 70)
print(f"{'Average':<20} {avg_metrics['Accuracy']:<12} {avg_metrics['Precision']:<12} {avg_metrics['Recall']:<12} {avg_metrics['F1-score']:<12}")
print("=" * 70)

## 11. Confusion Matrix

In [None]:
all_gt = [r['ground_truth'] for r in results]
all_pred = [r['prediction'] for r in results]
cm = confusion_matrix(all_gt, all_pred, labels=['VULNERABLE', 'SAFE'])

plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Greens',
            xticklabels=['VULNERABLE', 'SAFE'],
            yticklabels=['VULNERABLE', 'SAFE'])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Experiment 2: Fine-Tuning (QLoRA) - Confusion Matrix')
plt.tight_layout()
plt.savefig('/kaggle/working/cm_fine_tuning.png', dpi=150)
plt.show()

print(f"\nConfusion Matrix:")
print(f"  TP (detected vulnerabilities): {cm[0,0]}")
print(f"  FN (missed vulnerabilities):   {cm[0,1]}")
print(f"  FP (false alarms):             {cm[1,0]}")
print(f"  TN (correct safe):             {cm[1,1]}")

## 12. Save Results

In [None]:
# Save CSV
results_df = pd.DataFrame(results)
results_df.to_csv('/kaggle/working/results_fine_tuning.csv', index=False)

# Save summary JSON
summary = {
    'experiment': 'Fine-Tuning with QLoRA',
    'experiment_id': 2,
    'model': {
        'name': 'Llama-3.1-8B-Instruct',
        'quantization': '4-bit NF4',
        'parameters': '8B'
    },
    'method': {
        'type': 'Fine-Tuning',
        'technique': 'QLoRA + SFTTrainer',
        'description': 'Low-Rank Adaptation with completion-only training',
        'training_required': True
    },
    'lora_config': {
        'r': LORA_CONFIG.r,
        'alpha': LORA_CONFIG.lora_alpha,
        'dropout': LORA_CONFIG.lora_dropout,
        'target_modules': list(LORA_CONFIG.target_modules)
    },
    'training_config': {
        'epochs': TRAINING_CONFIG.num_train_epochs,
        'learning_rate': TRAINING_CONFIG.learning_rate,
        'batch_size': TRAINING_CONFIG.per_device_train_batch_size,
        'gradient_accumulation': TRAINING_CONFIG.gradient_accumulation_steps,
        'effective_batch_size': TRAINING_CONFIG.per_device_train_batch_size * TRAINING_CONFIG.gradient_accumulation_steps,
        'max_seq_length': TRAINING_CONFIG.max_seq_length,
        'warmup_ratio': TRAINING_CONFIG.warmup_ratio
    },
    'dataset': {
        'total': len(dataset),
        'train': len(train_data),
        'val': len(val_data),
        'test': len(test_data),
        'vulnerability_types': 7
    },
    'results': {
        'overall_accuracy': round(accuracy_score(all_gt, all_pred), 4),
        'per_vulnerability': metrics_by_type,
        'average': avg_metrics
    },
    'confusion_matrix': {
        'TP': int(cm[0,0]),
        'FN': int(cm[0,1]),
        'FP': int(cm[1,0]),
        'TN': int(cm[1,1])
    },
    'references': [
        'Dettmers et al. (2023). QLoRA: Efficient Finetuning of Quantized LLMs. arXiv:2305.14314'
    ]
}

with open('/kaggle/working/summary_fine_tuning.json', 'w') as f:
    json.dump(summary, f, indent=2)

print("Files saved:")
print("  - results_fine_tuning.csv")
print("  - summary_fine_tuning.json")
print("  - cm_fine_tuning.png")
print("  - solana-vuln-model/ (adapter files)")

In [None]:
print("\n" + "=" * 50)
print("EXPERIMENT 2 COMPLETE")
print("=" * 50)
print(f"\nOverall Accuracy: {summary['results']['overall_accuracy']:.2%}")
print(f"Average F1-Score: {avg_metrics['F1-score']}")
print(f"\nMethod: QLoRA + SFTTrainer")
print(f"Model: LLaMA-3.1-8B-Instruct (fine-tuned)")
print(f"\nIMPORTANT: Download solana-vuln-model/ for Experiment 3")