## 0) Environment Setup (Nightly Unsloth + Qwen3-VL-2B)

**Why:** Nightly Unsloth builds ship the latest kernels, RLHF utilities, and Qwen3 fixes that we rely on for reproducible LoRA fine-tuning.

**Data posture:** All training samples are streamed from Hugging Face (`jmazz/sys-scan-linux-synthetic`) to avoid copying the 600k example corpus into VS Code.

**Safety:** This notebook keeps the workflow purely defensive—reasoning about synthetic detections and correlations while never generating exploit guidance.

In [None]:
# Run configuration and deterministic defaults
from __future__ import annotations

import os
import random
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path


@dataclass
class RunConfig:
    base_model_id: str = "Qwen/Qwen3-VL-2B-Thinking"
    quantized_repo_hint: str = "unsloth/Qwen3-VL-2B-Thinking-GGUF"
    dataset_repo_id: str = "jmazz/sys-scan-linux-synthetic"
    dataset_split: str = "train"
    max_seq_length: int = 2048
    lora_rank: int = 16
    load_in_4bit: bool = True
    seed: int = 424242
    shuffle_buffer: int = 8192
    max_train_steps: int = 1200
    per_device_batch_size: int = 1
    grad_accum_steps: int = 8
    eval_sample_count: int = 256
    max_findings_in_prompt: int = 3
    max_correlations_in_prompt: int = 2
    grpo_max_steps: int = 400
    grpo_per_device_batch_size: int = 1
    grpo_grad_accum_steps: int = 4
    grpo_num_generations: int = 4
    grpo_beta: float = 0.04
    grpo_learning_rate: float = 5e-6
    artifact_root: Path = Path("outputs/qwen3_vl_finetune")
    timestamp: str = field(default_factory=lambda: datetime.utcnow().strftime("%Y%m%d_%H%M%S"))

    @property
    def sft_dir(self) -> Path:
        return self.artifact_root / "sft"

    @property
    def grpo_dir(self) -> Path:
        return self.artifact_root / "grpo"

    @property
    def log_dir(self) -> Path:
        return self.artifact_root / "logs"

    def ensure_dirs(self) -> None:
        for path in (self.artifact_root, self.sft_dir, self.grpo_dir, self.log_dir):
            path.mkdir(parents=True, exist_ok=True)


config = RunConfig()
config.ensure_dirs()

random.seed(config.seed)
os.environ["PYTHONHASHSEED"] = str(config.seed)
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")

system_prompt = (
    "You are a senior security analytics engineer. Reason step-by-step about synthetic "
    "host and network telemetry, explain causal links, and only emit JSON that conforms "
    "to the sys-scan ground_truth schema (version ground_truth_v1). Always prefer "
    "defensive mitigations, never offensive guidance."
)

print(f"Base model: {config.base_model_id}")
print(f"Dataset: {config.dataset_repo_id}::{config.dataset_split} (streaming)")
print(f"Artifacts: {config.artifact_root.resolve()}")
print(f"Quantized deployment hint: {config.quantized_repo_hint}")

In [None]:
%%capture
!pip uninstall -y unsloth unsloth_zoo > /dev/null 2>&1 || true
!pip install --upgrade pip
!pip install --no-cache-dir git+https://github.com/unslothai/unsloth.git
!pip install --no-cache-dir git+https://github.com/unslothai/unsloth.git#subdirectory=unsloth_zoo
!pip install --upgrade datasets accelerate bitsandbytes huggingface_hub transformers trl vllm

## 1) Model and Precision Setup

**Why:** Qwen3-VL-2B-Thinking pairs well with a LoRA adapter for text-only security reasoning while keeping the door open for future multimodal telemetry.

**Precision:** Load in 4-bit (QLoRA) to stay within 16–24 GiB VRAM and compute in bf16 when available.

**Reproducibility:** Seed PyTorch/CUDA before instantiating the model so we can replay the run deterministically.

In [None]:
import itertools
import json
import textwrap
from typing import Any, Dict, Iterable, List

import torch
from datasets import Dataset, load_dataset
from unsloth import FastLanguageModel


def set_torch_seed(seed: int) -> None:
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


if not torch.cuda.is_available():
    raise EnvironmentError("CUDA device not detected — attach an NVIDIA GPU before continuing.")
device_props = torch.cuda.get_device_properties(0)
device_name = torch.cuda.get_device_name(0)
total_vram_gib = device_props.total_memory / 1024**3

if "L4" in device_name:
    gpu_memory_utilization = 0.70
elif "T4" in device_name:
    gpu_memory_utilization = 0.55
else:
    gpu_memory_utilization = 0.60

print(
    f"Detected GPU: {device_name} ({total_vram_gib:.1f} GiB) — targeting {gpu_memory_utilization*100:.0f}% VRAM",
 )

set_torch_seed(config.seed)

model, tokenizer = FastLanguageModel.from_pretrained(
    model_name=config.base_model_id,
    max_seq_length=config.max_seq_length,
    dtype=None,  # auto-select
    load_in_4bit=config.load_in_4bit,
    fast_inference=True,
    max_lora_rank=config.lora_rank,
    gpu_memory_utilization=gpu_memory_utilization,
 )

model = FastLanguageModel.get_peft_model(
    model,
    r=config.lora_rank,
    target_modules=[
        "q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj"
    ],
    lora_alpha=config.lora_rank * 2,
    lora_dropout=0.0,
    bias="none",
    use_gradient_checkpointing="unsloth",
    random_state=config.seed,
    use_rslora=False,
    loftq_config=None,
 )

if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.pad_token_id = tokenizer.eos_token_id
print("Tokenizer configured (PAD token = EOS). Chat template:", tokenizer.chat_template[:120], "...")

## 3) Dataset Preparation (Streaming)

**Why:** Stream directly from Hugging Face so we stay within memory bounds while respecting the repository’s guidance against copying the entire 600k corpus into the editor.

**Contract:** Every assistant turn must serialize back to the ground-truth schema; we normalize required keys before templating.

**Safety:** Synthetic data only—no secrets or production telemetry.

In [None]:
# Streaming dataset helpers
GROUND_TRUTH_DEFAULTS = {
    "version": "ground_truth_v1",
    "enriched_findings": [],
    "correlations": [],
    "reductions": {},
    "summaries": {},
    "actions": [],
}


def _clone_default(value):
    if isinstance(value, dict):
        return dict(value)
    if isinstance(value, list):
        return list(value)
    return value


def canonicalize_ground_truth(record: Dict[str, Any]) -> Dict[str, Any]:
    payload = record.get("ground_truth") or record.get("data") or record
    canonical = dict(payload)
    for key, default in GROUND_TRUTH_DEFAULTS.items():
        current = canonical.get(key)
        if current is None:
            canonical[key] = _clone_default(default)
        elif key == "version" and not isinstance(current, str):
            canonical[key] = str(current)
    return canonical


def summarize_findings(payload: Dict[str, Any], limit: int) -> str:
    rows = []
    for finding in (payload.get("enriched_findings") or [])[:limit]:
        title = finding.get("title", "(untitled)")
        severity = finding.get("severity", "unknown")
        risk = finding.get("risk_score", "?")
        rows.append(f"- [{severity}] {title} (risk_score={risk})")
    if not rows:
        rows.append("- No enriched findings present in this slice.")
    return "\n".join(rows)


def summarize_correlations(payload: Dict[str, Any], limit: int) -> str:
    rows = []
    for corr in (payload.get("correlations") or [])[:limit]:
        title = corr.get("title", "(untitled)")
        related = ", ".join((corr.get("related_finding_ids") or [])[:3]) or "n/a"
        rows.append(f"- {title} → related: {related}")
    if not rows:
        rows.append("- No correlations linked in this slice.")
    return "\n".join(rows)


def build_messages(record: Dict[str, Any]) -> List[Dict[str, str]]:
    payload = canonicalize_ground_truth(record)
    summaries = payload.get("summaries") or {}
    exec_summary = summaries.get("executive_summary") or "No executive summary provided."
    triage_summary = summaries.get("triage_summary") or "No triage summary provided."
    user_prompt = textwrap.dedent(
        f"""
        Review the following synthetic security telemetry and produce a final ground truth JSON.\n\n
        Top findings (capped at {config.max_findings_in_prompt}):\n
        {summarize_findings(payload, config.max_findings_in_prompt)}\n\n
        Correlations (capped at {config.max_correlations_in_prompt}):\n
        {summarize_correlations(payload, config.max_correlations_in_prompt)}\n\n
        Executive summary: {exec_summary}\n
        Triage summary: {triage_summary}\n\n
        Respond with JSON that exactly matches the sys-scan ground_truth schema.
        """
    ).strip()
    assistant_json = json.dumps(payload, ensure_ascii=False, sort_keys=True)
    return [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt},
        {"role": "assistant", "content": assistant_json},
    ]


def to_sft_format(example: Dict[str, Any]) -> Dict[str, str]:
    chat = build_messages(example)
    rendered = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=False)
    return {"text": rendered}


def create_sft_dataset(tokenizer, *, max_samples: int | None = None, shuffle: bool = True):
    stream = load_dataset(
        config.dataset_repo_id,
        split=config.dataset_split,
        streaming=True,
    )
    if shuffle:
        stream = stream.shuffle(seed=config.seed, buffer_size=config.shuffle_buffer)
    if max_samples is not None:
        stream = stream.take(max_samples)
    return stream.map(to_sft_format)


def create_eval_dataset(tokenizer, sample_count: int) -> Dataset:
    eval_stream = load_dataset(
        config.dataset_repo_id,
        split=config.dataset_split,
        streaming=True,
    )
    materialized = []
    for raw in itertools.islice(eval_stream, sample_count):
        materialized.append(to_sft_format(raw))
    return Dataset.from_list(materialized) if materialized else Dataset.from_list([])


train_dataset = create_sft_dataset(tokenizer)
eval_dataset = create_eval_dataset(tokenizer, config.eval_sample_count)

print("Train dataset: streaming iterable (length computed per epoch)")
print(f"Eval dataset materialized: {len(eval_dataset)} examples")

try:
    sample_preview = next(iter(load_dataset(config.dataset_repo_id, split=config.dataset_split, streaming=True).take(1)))
    preview_text = to_sft_format(sample_preview)["text"]
    print("Preview prompt snippet:\n", preview_text[:500], "...")
except Exception as exc:
    print("Preview unavailable (likely because of offline mode):", exc)

In [None]:
# Offline sanity check using the bundled synthetic example
example_path = (Path("../synthetic_data/synthetic_dataset_example.json")).resolve()
if example_path.exists():
    with open(example_path, "r", encoding="utf-8") as fh:
        local_blob = json.load(fh)
    local_payload = local_blob.get("data") or local_blob
    rendered = to_sft_format(local_payload)["text"]
    print("Local example rendered characters:", len(rendered))
    print("Assistant JSON keys:", list(canonicalize_ground_truth(local_payload).keys())[:10])
else:
    print("Local synthetic example not found; skipping offline sanity check.")

## 4) Supervised Fine-Tuning (SFT)

**Why:** Establish a strong JSON-grounded baseline before any preference optimization so downstream rewards stay well-behaved.

**Bounds:** We train from a streaming dataset with small batches and explicit `max_steps` to avoid exhausting VRAM.

**Telemetry:** Evaluation runs on a deterministic, materialized slice for consistent validation metrics.

In [None]:
from unsloth import is_bfloat16_supported
from trl import SFTConfig, SFTTrainer

sft_args = SFTConfig(
    output_dir=str(config.sft_dir),
    max_steps=config.max_train_steps,
    per_device_train_batch_size=config.per_device_batch_size,
    gradient_accumulation_steps=config.grad_accum_steps,
    optim="adamw_8bit",
    save_steps=200,
    logging_steps=10,
    evaluation_strategy="steps",
    eval_steps=200,
    learning_rate=2e-5,
    weight_decay=0.01,
    warmup_ratio=0.03,
    bf16=is_bfloat16_supported(),
    max_grad_norm=1.0,
    report_to="none",
    seed=config.seed,
    remove_unused_columns=False,
 )

sft_trainer = SFTTrainer(
    model=model,
    tokenizer=tokenizer,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    args=sft_args,
 )

sft_metrics = sft_trainer.train()
print("SFT training metrics:", sft_metrics)

sft_trainer.save_model(str(config.sft_dir))
tokenizer.save_pretrained(str(config.sft_dir))
print(f"SFT artifacts saved to {config.sft_dir}")

## 5) GRPO Training Configuration (Optional)

**Why:** Once the SFT adapter is stable we can encourage structured reasoning with reward shaping.

**Bounds:** Streaming prompts keep VRAM under control; feel free to skip this section if resources are tight.

**Prerequisite:** Confirm the SFT checkpoint finished successfully before starting GRPO.

In [None]:
# Reload the SFT adapter as the starting point for GRPO
grpo_model, grpo_tokenizer = FastLanguageModel.from_pretrained(
    model_name=str(config.sft_dir),
    max_seq_length=config.max_seq_length,
    dtype=None,
    load_in_4bit=config.load_in_4bit,
    fast_inference=True,
    max_lora_rank=config.lora_rank,
    gpu_memory_utilization=gpu_memory_utilization,
 )

if grpo_tokenizer.pad_token is None:
    grpo_tokenizer.pad_token = grpo_tokenizer.eos_token

grpo_train_dataset = create_sft_dataset(grpo_tokenizer)
print("GRPO dataset ready (streaming iterable)")

## 6) GRPO Training Execution

**Why:** Reinforce JSON validity and schema compliance using lightweight reward functions.

**Safety:** Rewards focus on structure only; no offensive content is generated or rewarded.

**Resource Guard:** We check VRAM reservations before starting generation-heavy steps.

In [None]:
import gc
from trl import GRPOConfig, GRPOTrainer

REQUIRED_TOP_LEVEL_KEYS = tuple(GROUND_TRUTH_DEFAULTS.keys())


def _extract_json(completion: str) -> Dict[str, Any]:
    start = completion.find("{")
    end = completion.rfind("}")
    if start == -1 or end == -1 or end <= start:
        raise ValueError("No JSON object detected")
    snippet = completion[start : end + 1]
    return json.loads(snippet)


def reward_valid_json(completion: str, **_: Any) -> float:
    try:
        _extract_json(completion)
        return 1.0
    except Exception:
        return 0.0


def reward_has_required_keys(completion: str, **_: Any) -> float:
    try:
        data = _extract_json(completion)
    except Exception:
        return 0.0
    missing = [key for key in REQUIRED_TOP_LEVEL_KEYS if key not in data]
    return 1.0 if not missing else 0.2


grpo_args = GRPOConfig(
    output_dir=str(config.grpo_dir),
    max_steps=config.grpo_max_steps,
    per_device_train_batch_size=config.grpo_per_device_batch_size,
    gradient_accumulation_steps=config.grpo_grad_accum_steps,
    optim="adamw_8bit",
    save_steps=200,
    logging_steps=10,
    learning_rate=config.grpo_learning_rate,
    weight_decay=0.01,
    max_grad_norm=1.0,
    seed=config.seed,
    report_to="none",
    max_completion_length=1024,
    num_generations=config.grpo_num_generations,
    beta=config.grpo_beta,
 )

grpo_trainer = GRPOTrainer(
    model=grpo_model,
    processing_class=grpo_tokenizer,
    reward_funcs=[reward_valid_json, reward_has_required_keys],
    args=grpo_args,
    train_dataset=grpo_train_dataset,
 )

reserved_vram = torch.cuda.memory_reserved(0) / 1024**3
if reserved_vram > total_vram_gib * 0.85:
    raise RuntimeError(
        f"VRAM too high before GRPO ({reserved_vram:.2f} GiB of {total_vram_gib:.1f} GiB). Lower batch size or num_generations."
    )

torch.cuda.empty_cache()
gc.collect()

grpo_metrics = grpo_trainer.train()
print("GRPO training metrics:", grpo_metrics)

grpo_trainer.save_model(str(config.grpo_dir))
grpo_tokenizer.save_pretrained(str(config.grpo_dir))
print(f"GRPO artifacts saved to {config.grpo_dir}")

## 7) Evaluation and Inference

**Why:** Sanity-check the latest adapter on a streamed hold-out example and confirm that the response parses as JSON.

**Auditability:** We log both the rendered completion and the JSON validation result.

**Tip:** If you skip GRPO, point `model_path` at `config.sft_dir` instead.

In [None]:
from pathlib import Path
from vllm import SamplingParams

def build_generation_messages(record: Dict[str, Any]) -> List[Dict[str, str]]:
    chat = build_messages(record)
    return chat[:-1]  # drop assistant turn so the model must regenerate it

candidate_paths = [config.grpo_dir, config.sft_dir]
model_path = next((path for path in candidate_paths if Path(path).exists() and any(Path(path).iterdir())), None)
if model_path is None:
    raise FileNotFoundError("Neither GRPO nor SFT checkpoints were found. Train before running inference.")

eval_model, eval_tokenizer = FastLanguageModel.from_pretrained(
    model_name=str(model_path),
    max_seq_length=config.max_seq_length,
    dtype=None,
    load_in_4bit=config.load_in_4bit,
    fast_inference=True,
    max_lora_rank=config.lora_rank,
    gpu_memory_utilization=gpu_memory_utilization,
 )
FastLanguageModel.for_inference(eval_model)

eval_source = load_dataset(config.dataset_repo_id, split=config.dataset_split, streaming=True)
sample = next(iter(eval_source.take(1)))
messages = build_generation_messages(sample)
prompt_text = eval_tokenizer.apply_chat_template(
    messages, tokenize=False, add_generation_prompt=True
 )

sampling_params = SamplingParams(
    temperature=0.1,
    top_p=0.9,
    top_k=40,
    max_tokens=1024,
)

outputs = eval_model.fast_generate(
    prompt_text,
    sampling_params=sampling_params,
    lora_request=None,
)

completion = outputs[0].outputs[0].text.strip()
print("Generated response:\n", completion)

try:
    parsed = _extract_json(completion)
    print("✅ Parsed JSON with keys:", list(parsed.keys())[:10])
except Exception as exc:
    print("⚠️ JSON validation failed:", exc)

## 8) Artifact Export

**Why:** Persist LoRA weights for deployment (and optional conversion to GGUF for lightweight inference).

**Verification:** Assert tensors are non-zero before pushing anywhere public.

**Next step:** Use Unsloth Dynamic to package adapters into GGUF if you plan to mirror `unsloth/Qwen3-VL-2B-Thinking-GGUF`.

In [None]:
from safetensors import safe_open

model_for_export = globals().get("eval_model") or globals().get("grpo_model") or model
export_base = Path(config.grpo_dir if Path(config.grpo_dir).exists() else config.sft_dir)
lora_dir = export_base / "lora_adapter"
lora_dir.mkdir(parents=True, exist_ok=True)

model_for_export.save_lora(str(lora_dir))

with safe_open(lora_dir / "adapter_model.safetensors", framework="pt") as handle:
    for tensor_name in handle.keys():
        tensor = handle.get_tensor(tensor_name)
        if torch.count_nonzero(tensor) == 0:
            raise ValueError(f"Tensor {tensor_name} appears to be all zeros")

print(f"GRPO/SFT LoRA adapters saved and verified in {lora_dir}")
print("Ready for optional GGUF export with Unsloth Dynamic.")

## Notes
- **Streaming first:** Training pulls samples on-demand, so we never materialize the 600k-record corpus locally.
- **Determinism:** Seeds propagate through Python, CUDA, and HF Trainer to keep runs reproducible.
- **Deployment:** For quantized inference, start from `config.quantized_repo_hint` after exporting LoRA weights.
- **Safety:** The assistant is confined to defensive reasoning with schema compliance checks baked into rewards.
- **Next:** Integrate the exported adapters into the DAG or convert to GGUF via Unsloth Dynamic tooling.