# üèãÔ∏è NSSS Security Auditor - Colab Trainer (Complete Workflow)

This notebook provides a robust, automated workflow to fine-tune the Qwen2.5-Coder model for security auditing.

**Workflow Steps:**
1.  **Environment Setup:** Auto-repairing installation of Unsloth and GPU drivers.
2.  **Data Preparation:** Downloads real CVE fixes and generates training data.
3.  **Baseline Evaluation:** Assess model performance before training.
4.  **Fine-tuning:** Trains the model using QLoRA on T4 GPU.
5.  **Final Evaluation:** Verifies improvements and generates a comparison report.

**Usage:** Simply select **Runtime -> Run all**.

In [None]:
%%bash
# @title 1. Setup Repository

# Clone or update the repository
if [ ! -d "/content/app" ]; then
    echo "üì• Cloning repository..."
    git clone https://github.com/TCTri205/Neuro-Symbolic_Software_Security.git /content/app
else
    echo "üîÑ Updating repository..."
    cd /content/app && git pull origin main
fi

cd /content/app
echo "‚úÖ Repository ready at: $(pwd)"
echo "üìç Current branch: $(git branch --show-current)"
echo "üìå Latest commit: $(git log -1 --oneline)"

In [None]:
# @title 1.1 Mount Drive & Configure Smart Cache
from google.colab import drive
import os
import shutil

print("üîó Configuring Persistence...")

# 1. Mount Drive
if not os.path.exists('/content/drive'):
    drive.mount('/content/drive')

# 2. Define Paths
DRIVE_ROOT = "/content/drive/MyDrive/NSSS_Project"
APP_ROOT = "/content/app"
HF_CACHE_DRIVE = os.path.join(DRIVE_ROOT, "hf_cache")
HF_CACHE_LOCAL = "/root/.cache/huggingface"

# 3. Create Directories on Drive
os.makedirs(os.path.join(DRIVE_ROOT, "data"), exist_ok=True)
os.makedirs(os.path.join(DRIVE_ROOT, "outputs"), exist_ok=True)
os.makedirs(HF_CACHE_DRIVE, exist_ok=True)

# 4. Symlink Data & Outputs (App <-> Drive)
for folder in ["data", "outputs"]:
    local_path = os.path.join(APP_ROOT, folder)
    drive_path = os.path.join(DRIVE_ROOT, folder)
    
    # Remove local folder if it exists (empty from git clone)
    if os.path.exists(local_path) and not os.path.islink(local_path):
        shutil.rmtree(local_path)
        
    # Link to Drive
    if not os.path.exists(local_path):
        os.symlink(drive_path, local_path)
        print(f"   ‚úÖ Linked {folder}: {local_path} -> {drive_path}")

# 5. Symlink HF Cache (Avoid re-downloading Base Model)
if os.path.exists(HF_CACHE_LOCAL) and not os.path.islink(HF_CACHE_LOCAL):
    shutil.rmtree(HF_CACHE_LOCAL)

if not os.path.exists(HF_CACHE_LOCAL):
    os.makedirs(os.path.dirname(HF_CACHE_LOCAL), exist_ok=True)
    os.symlink(HF_CACHE_DRIVE, HF_CACHE_LOCAL)
    print(f"   ‚úÖ Linked HF Cache: {HF_CACHE_LOCAL} -> {HF_CACHE_DRIVE}")

print("üöÄ Persistence Configured! Data and Models will be saved to Drive.")

In [None]:
%%bash
# @title 2. Fix Unsloth Installation
echo "üîß Cleaning Unsloth stack..."

# 1. Uninstall existing packages to prevent conflicts
pip uninstall -y unsloth unsloth_zoo 2>/dev/null || true

# 2. Remove cached files that might be corrupt
rm -rf /usr/local/lib/python3.12/dist-packages/unsloth* 2>/dev/null || true
rm -rf /usr/local/lib/python3.12/dist-packages/__pycache__/unsloth* 2>/dev/null || true

# 3. Clean project cache
cd /content/app
find . -name "*.pyc" -delete 2>/dev/null || true
find . -name "__pycache__" -type d -exec rm -rf {} + 2>/dev/null || true

echo "‚úÖ Cleanup complete!"

In [None]:
%%bash
# @title 3. Install System Dependencies
cd /content/app
echo "üì¶ Installing system dependencies..."
pip install -q -r requirements.txt
echo "‚úÖ System dependencies installed!"

In [None]:
%%bash
# @title 4. Install Unsloth & GPU Optimizations

# 1. Install dependencies first to ensure stability
echo "üì¶ Installing core dependencies..."
pip install -q transformers>=4.51.3 datasets>=3.4.1 bitsandbytes>=0.45.5 peft>=0.18.0 accelerate>=0.34.1

# 2. Install TRL (specific version range)
pip install -q "trl>=0.18.2,<0.25.0"

# 3. Install Unsloth without auto-dependencies (prevents xformers build errors)
echo "üì¶ Installing Unsloth..."
pip install -q "unsloth @ git+https://github.com/unslothai/unsloth.git" --no-deps

# 4. Install unsloth_zoo explicitly
pip install -q "unsloth_zoo>=2026.1.4"

echo "üì¶ Verifying Unsloth stack..."
pip list | grep -E "(unsloth|transformers|bitsandbytes|trl|torch)" || true
echo "‚úÖ Unsloth stack installed!"

In [None]:
# @title 5. Environment Verification
import sys
import torch
import os

print("="*60)
print("üîç ENVIRONMENT VERIFICATION")
print("="*60)

# 1. GPU Check
print(f"\n1Ô∏è‚É£ GPU Status:")
if torch.cuda.is_available():
    print(f"   ‚úÖ CUDA Available: {torch.cuda.is_available()}")
    print(f"   üéÆ Device: {torch.cuda.get_device_name(0)}")
    print(f"   üíæ Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
else:
    print("   ‚ùå GPU NOT AVAILABLE!")
    print("   üëâ Go to Runtime ‚Üí Change runtime type ‚Üí Select T4 GPU")
    sys.exit(1)

# 2. Unsloth Import Check
print(f"\n2Ô∏è‚É£ Unsloth Status:")
try:
    from unsloth import FastLanguageModel
    print("   ‚úÖ Unsloth imported successfully!")
    os.environ['INFERENCE_PROVIDER'] = 'local'
    os.environ['INFERENCE_MODEL'] = 'unsloth/Qwen2.5-Coder-7B-Instruct-bnb-4bit'
    print("   üéØ Provider: LOCAL (GPU-accelerated)")
except Exception as e:
    print(f"   ‚ö†Ô∏è Unsloth import failed: {e}")
    print("   üîÑ Will use fallback provider (Gemini)")
    os.environ['INFERENCE_PROVIDER'] = 'gemini'
    os.environ['INFERENCE_MODEL'] = 'gemini-1.5-flash'

# 3. Project Path Check
print(f"\n3Ô∏è‚É£ Project Setup:")
print(f"   üìÇ Working Directory: {os.getcwd()}")
if os.path.exists('/content/app/src'):
    print("   ‚úÖ Project structure valid")
    sys.path.insert(0, '/content/app')
else:
    print("   ‚ùå Project structure invalid!")
    sys.exit(1)

print("\n" + "="*60)
print(f"üöÄ Ready to proceed with provider: {os.environ['INFERENCE_PROVIDER'].upper()}")
print("="*60 + "\n")

In [None]:
# @title 6. Configure LLM Provider (with Fallback)
import os
from google.colab import userdata

print("üîë Configuring LLM Provider...")

provider = os.environ.get('INFERENCE_PROVIDER', 'local')

if provider == 'local':
    print("‚úÖ Using LOCAL inference (GPU-accelerated)")
    print("   No API keys needed for inference.")
else:
    # Fallback to Gemini
    print(f"‚ö†Ô∏è Using FALLBACK provider: {provider.upper()}")
    try:
        gemini_key = userdata.get('GEMINI_API_KEY')
        if gemini_key:
            os.environ['GEMINI_API_KEY'] = gemini_key
            os.environ['LLM_PROVIDER'] = 'gemini'
            # Force provider to Gemini if we have a key and local failed
            os.environ['INFERENCE_PROVIDER'] = 'gemini'
            print("   ‚úÖ Gemini API key configured from Secrets")
        else:
            raise ValueError("GEMINI_API_KEY is empty")
    except Exception as e:
        print(f"   ‚ùå Failed to get GEMINI_API_KEY: {e}")
        print("   üëâ Check Colab Secrets (Key icon on left). Ensure 'GEMINI_API_KEY' exists and 'Notebook access' is ON.")
        print("   üîÑ Falling back to MOCK provider for testing")
        os.environ['INFERENCE_PROVIDER'] = 'mock'

# Optional: Configure HF_TOKEN for Gate Datasets
try:
    hf_token = userdata.get('HF_TOKEN')
    if hf_token:
        os.environ['HF_TOKEN'] = hf_token
        print("\n   ‚úÖ HF_TOKEN configured (Access to gated datasets enabled)")
except Exception:
    print("\n   ‚ÑπÔ∏è HF_TOKEN not found in Secrets (Using public datasets only)")

In [None]:
%%bash
# @title 7. Prepare Training Dataset
cd /content/app

echo "üìä Preparing training dataset..."

# Check if registry already exists with enough data
if [ -f "data/few_shot_registry.json" ]; then
    EXAMPLE_COUNT=$(python -c "import json; data=json.load(open('data/few_shot_registry.json')); print(len(data.get('examples', [])))")
    echo "   üìÇ Found existing registry on Drive with $EXAMPLE_COUNT examples"
    
    if [ "$EXAMPLE_COUNT" -ge 2000 ]; then
        echo "   ‚úÖ Dataset ready! Skipping download."
        exit 0
    else
        echo "   ‚ö†Ô∏è Insufficient data ($EXAMPLE_COUNT < 2000), regenerating..."
    fi
fi

# Generate dataset from HuggingFace
echo "   üîÑ Downloading from HuggingFace & extracting vulnerable patterns..."
python scripts/prepare_cve_data.py --limit 2000

# Verify output
if [ -f "data/few_shot_registry.json" ]; then
    FINAL_COUNT=$(python -c "import json; data=json.load(open('data/few_shot_registry.json')); print(len(data.get('examples', [])))")
    echo "   ‚úÖ Dataset prepared: $FINAL_COUNT examples"
else
    echo "   ‚ùå Dataset preparation failed!"
    exit 1
fi

In [None]:
%%bash
# @title 8. Run Baseline Evaluation
cd /content/app

echo "üìä Running Baseline Evaluation..."

# Get provider from environment
PROVIDER="${INFERENCE_PROVIDER:-local}"
MODEL="${INFERENCE_MODEL:-unsloth/Qwen2.5-Coder-7B-Instruct-bnb-4bit}"

# STRICT MODE: Stop if using Mock
if [ "$PROVIDER" == "mock" ]; then
    echo ""
    echo "=============================================================="
    echo "‚ùå CRITICAL ERROR: System is using MOCK provider."
    echo "=============================================================="
    echo "REASON: Neither Local GPU nor Gemini API Key is available."
    echo ""
    echo "üëá TROUBLESHOOTING GUIDE:"
    echo "1. GPU Check: Go to 'Runtime' -> 'Change runtime type'. Ensure 'T4 GPU' is selected."
    echo "2. Unsloth Check: Look at Cell 5 output. Did Unsloth import fail?"
    echo "3. API Key Check: Look at Cell 6 output. Did it fail to get GEMINI_API_KEY?"
    echo "   -> Click the Key icon üîë on the left sidebar."
    echo "   -> Ensure 'GEMINI_API_KEY' is listed."
    echo "   -> IMPORTANT: Toggle the 'Notebook access' switch to ON."
    echo ""
    echo "Execution stopped to prevent invalid evaluation on fake data."
    exit 1
fi

echo "   üéØ Provider: $PROVIDER"
echo "   ü§ñ Model: $MODEL"

python scripts/evaluate_model.py \
    --provider "$PROVIDER" \
    --model "$MODEL" \
    --registry data/few_shot_registry.json

# Save baseline report
if [ -f "outputs/evaluation_report.json" ]; then
    mv outputs/evaluation_report.json outputs/report_baseline.json
    echo "   ‚úÖ Baseline saved to outputs/report_baseline.json"
else
    echo "   ‚ö†Ô∏è Baseline evaluation produced no output"
fi

In [None]:
# @title 9. Fine-tune Security Model
import os

ENABLE_TRAINING = False  # @param {type:"boolean"}

if ENABLE_TRAINING:
    print("üéì Starting Fine-tuning...")
    print("   üìä Dataset: 2000 examples")
    print("   ü§ñ Base Model: Qwen/Qwen2.5-Coder-7B-Instruct")
    print("   üíæ Output: outputs/qwen-security-model")
    print("   ‚è±Ô∏è Estimated Time: 15-20 minutes\n")

    !python scripts/train_model.py \
        --registry data/few_shot_registry.json \
        --output outputs/qwen-security-model \
        --model Qwen/Qwen2.5-Coder-7B-Instruct
    
    if os.path.exists("outputs/qwen-security-model"):
        print("\n‚úÖ Fine-tuning completed successfully!")
    else:
        print("\n‚ùå Fine-tuning failed!")
else:
    print("‚è≠Ô∏è Training skipped (ENABLE_TRAINING = False).")
    print("   Enable the checkbox and rerun this cell to fine-tune.")

In [None]:
# @title 10. Run Final Evaluation
import os

# Determine if we should run evaluation
should_run = False
if 'ENABLE_TRAINING' in locals() and ENABLE_TRAINING:
    should_run = True
elif os.path.exists("outputs/qwen-security-model/adapter_config.json"):
    print("üìÇ Found existing fine-tuned model.")
    should_run = True

if should_run:
    print("üìä Running Final Evaluation...")
    # Always use LOCAL provider for post-training eval
    !python scripts/evaluate_model.py \
        --provider local \
        --model outputs/qwen-security-model \
        --registry data/few_shot_registry.json

    if os.path.exists("outputs/evaluation_report.json"):
        !mv outputs/evaluation_report.json outputs/report_final.json
        print("‚úÖ Final evaluation saved to outputs/report_final.json")
    else:
        print("‚ö†Ô∏è Final evaluation produced no output")
else:
    print("‚è≠Ô∏è Skipping Final Evaluation (No trained model found).")

In [None]:
# @title 11. Compare Results & Generate Report
import json
import os
from datetime import datetime

print("="*60)
print("üìà TRAINING RESULTS COMPARISON")
print("="*60)

# Load reports
baseline_path = '/content/app/outputs/report_baseline.json'
final_path = '/content/app/outputs/report_final.json'

if not os.path.exists(baseline_path):
    print("‚ùå Baseline report not found!")
elif not os.path.exists(final_path):
    print("‚ö†Ô∏è Final report not found (Training skipped?).")
    print("üìä Showing Baseline Results Only:\n")
    
    with open(baseline_path) as f:
        baseline = json.load(f)
        metrics = baseline['metrics']
        print(f"Accuracy:        {metrics['accuracy']:.2%}")
        print(f"JSON Validity:   {metrics['json_validity_rate']:.2%}")
        print(f"False Positive:  {metrics['fpr']:.2%}")
        print(f"False Negative:  {metrics['fnr']:.2%}")
else:
    with open(baseline_path) as f:
        baseline = json.load(f)
    with open(final_path) as f:
        final = json.load(f)
    
    b_metrics = baseline['metrics']
    f_metrics = final['metrics']
    
    # Calculate improvements
    def delta(metric_name):
        b = b_metrics.get(metric_name, 0)
        f = f_metrics.get(metric_name, 0)
        diff = f - b
        pct = (diff / b * 100) if b > 0 else 0
        return f, diff, pct
    
    print("\nüìä Metric Comparison:\n")
    print(f"{'Metric':<20} {'Baseline':<12} {'Fine-tuned':<12} {'Change':<15}")
    print("-" * 60)
    
    acc_f, acc_d, acc_p = delta('accuracy')
    print(f"{'Accuracy':<20} {b_metrics.get('accuracy',0):<12.2%} {acc_f:<12.2%} {acc_d:+.2%} ({acc_p:+.1f}%)")
    
    json_f, json_d, json_p = delta('json_validity_rate')
    print(f"{'JSON Validity':<20} {b_metrics.get('json_validity_rate',0):<12.2%} {json_f:<12.2%} {json_d:+.2%} ({json_p:+.1f}%)")
    
    fpr_f, fpr_d, fpr_p = delta('fpr')
    print(f"{'False Positive':<20} {b_metrics.get('fpr',0):<12.2%} {fpr_f:<12.2%} {fpr_d:+.2%} ({fpr_p:+.1f}%)")
    
    fnr_f, fnr_d, fnr_p = delta('fnr')
    print(f"{'False Negative':<20} {b_metrics.get('fnr',0):<12.2%} {fnr_f:<12.2%} {fnr_d:+.2%} ({fnr_p:+.1f}%)")
    
    print("\n" + "="*60)
    
    # Overall assessment
    if acc_d > 0:
        print("üéâ SUCCESS! Fine-tuning improved model accuracy!")
    elif acc_d == 0:
        print("‚ö†Ô∏è No significant change in accuracy")
    else:
        print("‚ö†Ô∏è Accuracy decreased - may need more training data or epochs")
    
    print("="*60)
    
    # Save comparison report
    comparison = {
        'timestamp': datetime.now().isoformat(),
        'baseline': baseline,
        'final': final,
        'improvements': {
            'accuracy': {'absolute': acc_d, 'relative_pct': acc_p},
            'json_validity': {'absolute': json_d, 'relative_pct': json_p},
            'fpr': {'absolute': fpr_d, 'relative_pct': fpr_p},
            'fnr': {'absolute': fnr_d, 'relative_pct': fnr_p}
        }
    }
    
    with open('/content/app/outputs/comparison_report.json', 'w') as f:
        json.dump(comparison, f, indent=2)
    
    print("\nüíæ Detailed comparison saved to: outputs/comparison_report.json")