## 1 ¬∑ Install Dependencies

In [None]:
%%capture
!pip install -q \
    ultralytics>=8.3.0 \
    transformers>=4.45.0 \
    peft>=0.13.0 \
    bitsandbytes>=0.44.0 \
    accelerate>=1.0.0 \
    qwen-vl-utils \
    rasterio \
    geopandas \
    shapely \
    albumentations>=1.4.0 \
    wandb \
    scikit-learn \
    pyyaml \
    tqdm \
    matplotlib \
    pillow \
    opencv-python-headless

print("‚úÖ All dependencies installed.")

## 2 ¬∑ Configuration (Dynamic Paths, No Hardcoded Keys)

In [None]:
import os, gc, time, json, random, shutil, logging
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Dict, Tuple, Optional, Union

import numpy as np
import torch

# ‚îÄ‚îÄ‚îÄ Environment Detection ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
IS_KAGGLE = os.path.exists("/kaggle/working")

def _resolve_root() -> Path:
    if IS_KAGGLE:
        return Path("/kaggle/working")
    return Path.cwd()

PROJECT_ROOT = _resolve_root()

def _resolve_data_root() -> Path:
    env = os.environ.get("SPACENET7_ROOT")
    if env:
        return Path(env)
    if IS_KAGGLE:
        return Path("/kaggle/input/spacenet7")
    return PROJECT_ROOT / "data"

DATA_ROOT = _resolve_data_root()

# ‚îÄ‚îÄ‚îÄ Output Directories ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
OUTPUT_DIR     = PROJECT_ROOT / "outputs"
YOLO_DATA_DIR  = OUTPUT_DIR / "yolo_dataset"
VLM_DATA_DIR   = OUTPUT_DIR / "vlm_dataset"
CHECKPOINT_DIR = OUTPUT_DIR / "checkpoints"
YOLO_CKPT_DIR  = CHECKPOINT_DIR / "yolo"
VLM_CKPT_DIR   = CHECKPOINT_DIR / "vlm"
EVAL_DIR       = OUTPUT_DIR / "evaluation"
EXPORT_DIR     = OUTPUT_DIR / "export"

for _d in [YOLO_DATA_DIR, VLM_DATA_DIR, YOLO_CKPT_DIR, VLM_CKPT_DIR, EVAL_DIR, EXPORT_DIR]:
    _d.mkdir(parents=True, exist_ok=True)

# ‚îÄ‚îÄ‚îÄ W&B Config ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
@dataclass
class WandbConfig:
    project: str = "GeoExtract-v2"
    entity: Optional[str] = None
    enabled: bool = True
    api_key: Optional[str] = field(default=None, repr=False)

    def __post_init__(self):
        self.api_key = os.environ.get("WANDB_API_KEY", self.api_key)
        self.entity = os.environ.get("WANDB_ENTITY", self.entity)
        if not self.api_key:
            print("[‚ö† wandb] WANDB_API_KEY not set ‚Äî logging disabled.")
            self.enabled = False

# ‚îÄ‚îÄ‚îÄ Dataset Config ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
@dataclass
class DataConfig:
    root: Path = DATA_ROOT
    images_subdir: str = "train"
    geojson_subdir: str = "train"
    image_size: int = 640
    val_split: float = 0.15
    seed: int = 42
    max_samples: Optional[int] = None
    augment: bool = True
    rotation_limit: int = 30
    color_jitter: float = 0.3
    flip_prob: float = 0.5

# ‚îÄ‚îÄ‚îÄ YOLO Config ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
@dataclass
class YOLOConfig:
    model_variant: str = "yolo11n.pt"
    epochs: int = 50
    batch_size: int = 16
    image_size: int = 640
    lr0: float = 1e-3
    lrf: float = 0.01
    patience: int = 10
    save_period: int = 5
    workers: int = 2
    device: str = "0"
    project: Path = YOLO_CKPT_DIR
    name: str = "building_detector"
    resume: bool = True
    checkpoint_every_n_steps: int = 500

# ‚îÄ‚îÄ‚îÄ VLM Config ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
@dataclass
class VLMConfig:
    model_id: str = "Qwen/Qwen2-VL-2B-Instruct"
    load_in_4bit: bool = True
    bnb_4bit_quant_type: str = "nf4"
    bnb_4bit_compute_dtype: str = "float16"
    lora_r: int = 16
    lora_alpha: int = 32
    lora_dropout: float = 0.05
    lora_target_modules: List[str] = field(
        default_factory=lambda: ["q_proj", "v_proj", "k_proj", "o_proj"]
    )
    epochs: int = 3
    batch_size: int = 2
    gradient_accumulation_steps: int = 8
    learning_rate: float = 2e-4
    weight_decay: float = 0.01
    warmup_ratio: float = 0.1
    lr_scheduler_type: str = "cosine"
    max_seq_length: int = 1024
    save_steps: int = 500
    logging_steps: int = 50
    eval_steps: int = 500
    output_dir: Path = VLM_CKPT_DIR
    resume_from_checkpoint: bool = True
    fp16: bool = True
    bf16: bool = False
    gradient_checkpointing: bool = True
    device: str = "cuda:0"

# ‚îÄ‚îÄ‚îÄ QA Config ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
@dataclass
class QAConfig:
    sparse_max: int = 10
    moderate_max: int = 30
    dense_max: int = 50
    min_turns: int = 2
    max_turns: int = 4
    system_prompt: str = (
        "You are GeoExtract, an expert urban planning AI that analyzes "
        "satellite imagery. You provide detailed assessments of building "
        "density, green space coverage, urban heat island risk, and "
        "construction quality based on visual and spatial data."
    )

# ‚îÄ‚îÄ‚îÄ Inference Config ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
@dataclass
class InferenceConfig:
    yolo_weights: Path = YOLO_CKPT_DIR / "building_detector" / "weights" / "best.pt"
    vlm_adapter_dir: Path = VLM_CKPT_DIR
    confidence_threshold: float = 0.25
    iou_threshold: float = 0.45
    max_new_tokens: int = 512
    device: str = "cuda:0"

# ‚îÄ‚îÄ‚îÄ Evaluation Config ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
@dataclass
class EvalConfig:
    iou_threshold: float = 0.5
    density_classes: List[str] = field(
        default_factory=lambda: ["Sparse", "Moderate", "Dense", "Urban Core"]
    )
    output_dir: Path = EVAL_DIR

# ‚îÄ‚îÄ‚îÄ Master Config ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
@dataclass
class GeoExtractConfig:
    data: DataConfig = field(default_factory=DataConfig)
    yolo: YOLOConfig = field(default_factory=YOLOConfig)
    vlm: VLMConfig = field(default_factory=VLMConfig)
    qa: QAConfig = field(default_factory=QAConfig)
    inference: InferenceConfig = field(default_factory=InferenceConfig)
    evaluation: EvalConfig = field(default_factory=EvalConfig)
    wandb: WandbConfig = field(default_factory=WandbConfig)

    def summary(self) -> str:
        lines = [
            "‚ïê" * 55,
            "  GeoExtract v2 ‚Äî Configuration Summary",
            "‚ïê" * 55,
            f"  Environment    : {'Kaggle' if IS_KAGGLE else 'Local'}",
            f"  Project Root   : {PROJECT_ROOT}",
            f"  Data Root      : {DATA_ROOT}",
            f"  Output Dir     : {OUTPUT_DIR}",
            f"  W&B Enabled    : {self.wandb.enabled}",
            "‚îÄ" * 55,
            f"  YOLO model     : {self.yolo.model_variant}",
            f"  YOLO epochs    : {self.yolo.epochs}",
            f"  YOLO batch     : {self.yolo.batch_size}",
            "‚îÄ" * 55,
            f"  VLM model      : {self.vlm.model_id}",
            f"  VLM 4-bit      : {self.vlm.load_in_4bit}",
            f"  LoRA r/alpha   : {self.vlm.lora_r}/{self.vlm.lora_alpha}",
            f"  VLM epochs     : {self.vlm.epochs}",
            f"  VLM eff. batch : {self.vlm.batch_size * self.vlm.gradient_accumulation_steps}",
            "‚ïê" * 55,
        ]
        return "\n".join(lines)

CFG = GeoExtractConfig()
print(CFG.summary())

## 3 ¬∑ Utility Helpers (VRAM, Logging, Checkpoints)

In [None]:
# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ Logging ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
def setup_logger(name: str = "geoextract", level: int = logging.INFO) -> logging.Logger:
    logger = logging.getLogger(name)
    if not logger.handlers:
        handler = logging.StreamHandler()
        fmt = logging.Formatter(
            "[%(asctime)s] %(levelname)s ‚Äî %(name)s ‚Äî %(message)s",
            datefmt="%H:%M:%S",
        )
        handler.setFormatter(fmt)
        logger.addHandler(handler)
    logger.setLevel(level)
    return logger

log = setup_logger()

# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ VRAM Monitoring ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
def get_vram_usage() -> dict:
    if not torch.cuda.is_available():
        return {"allocated_gb": 0, "reserved_gb": 0, "total_gb": 0}
    return {
        "allocated_gb": round(torch.cuda.memory_allocated() / 1e9, 2),
        "reserved_gb": round(torch.cuda.memory_reserved() / 1e9, 2),
        "total_gb": round(torch.cuda.get_device_properties(0).total_mem / 1e9, 2),
    }

def log_vram(tag: str = "") -> None:
    v = get_vram_usage()
    log.info(
        f"[VRAM {tag}] Allocated: {v['allocated_gb']} GB | "
        f"Reserved: {v['reserved_gb']} GB | Total: {v['total_gb']} GB"
    )

def free_vram() -> None:
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        torch.cuda.synchronize()
    log.info("[VRAM] Cache cleared.")
    log_vram("after cleanup")

# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ Checkpoints ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
def find_latest_checkpoint(ckpt_dir: Path, prefix: str = "checkpoint-") -> Optional[Path]:
    if not ckpt_dir.exists():
        return None
    ckpts = sorted(
        [d for d in ckpt_dir.iterdir() if d.is_dir() and d.name.startswith(prefix)],
        key=lambda p: int(p.name.split("-")[-1]) if p.name.split("-")[-1].isdigit() else 0,
    )
    if ckpts:
        log.info(f"[Checkpoint] Found {len(ckpts)} checkpoints. Latest: {ckpts[-1].name}")
        return ckpts[-1]
    return None

def count_parameters(model: torch.nn.Module) -> dict:
    total = sum(p.numel() for p in model.parameters())
    trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
    return {
        "total": total,
        "trainable": trainable,
        "trainable_pct": round(100 * trainable / total, 2) if total > 0 else 0,
    }

# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ Timer ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
class Timer:
    def __init__(self, label: str = "Block"):
        self.label = label
        self.start = 0.0
        self.elapsed = 0.0
    def __enter__(self):
        self.start = time.time()
        return self
    def __exit__(self, *args):
        self.elapsed = time.time() - self.start
        log.info(f"[Timer] {self.label} took {self.elapsed:.1f}s")

# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ W&B Helpers ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
def init_wandb(config, run_name: str, tags: Optional[list] = None):
    if not config.wandb.enabled:
        log.warning("[wandb] Disabled ‚Äî skipping init.")
        return None
    try:
        import wandb
        os.environ["WANDB_API_KEY"] = config.wandb.api_key
        run = wandb.init(
            project=config.wandb.project,
            entity=config.wandb.entity,
            name=run_name,
            tags=tags or [],
            config={
                "yolo_model": config.yolo.model_variant,
                "vlm_model": config.vlm.model_id,
                "lora_r": config.vlm.lora_r,
            },
            reinit=True,
        )
        log.info(f"[wandb] Run '{run_name}' initialized.")
        return run
    except Exception as e:
        log.error(f"[wandb] Init failed: {e}. Continuing without logging.")
        return None

def finish_wandb():
    try:
        import wandb
        if wandb.run is not None:
            wandb.finish()
    except Exception:
        pass

log_vram("initial")
print("‚úÖ Utilities ready.")

## 4 ¬∑ Data Pipeline (SpaceNet 7 ‚Üí YOLO Format)

In [None]:
import cv2
import rasterio
from rasterio.windows import Window
from shapely.geometry import shape, box
import geopandas as gpd
import albumentations as A
from tqdm import tqdm


class SpaceNet7Parser:
    def __init__(self, cfg=CFG.data):
        self.cfg = cfg
        self.root = Path(cfg.root)
        self.image_size = cfg.image_size
        self._samples: List[Dict] = []

    def discover(self) -> List[Dict]:
        train_dir = self.root / self.cfg.images_subdir
        if not train_dir.exists():
            log.info(f"[Data] Searching for alternative structures under {self.root} ...")
            train_dir = self.root
            if not any(train_dir.iterdir()):
                raise FileNotFoundError(f"No data found at {self.root}")

        samples = []
        aoi_dirs = sorted([d for d in train_dir.iterdir() if d.is_dir()])

        for aoi_dir in aoi_dirs:
            aoi_name = aoi_dir.name
            images_dir = aoi_dir / "images"
            labels_dir = aoi_dir / "labels"
            if not images_dir.exists():
                images_dir = aoi_dir / "images_masked"
            if not images_dir.exists():
                continue

            tif_files = sorted(images_dir.glob("*.tif"))
            for tif_path in tif_files:
                stem = tif_path.stem
                label_candidates = [
                    labels_dir / f"{stem}.geojson",
                    labels_dir / f"{stem}_Buildings.geojson",
                    labels_dir / f"Buildings_{stem}.geojson",
                ]
                label_path = None
                for lc in label_candidates:
                    if lc.exists():
                        label_path = lc
                        break
                if label_path is None and labels_dir.exists():
                    for gj in labels_dir.glob("*.geojson"):
                        if stem in gj.stem or gj.stem in stem:
                            label_path = gj
                            break

                samples.append({
                    "image_path": tif_path,
                    "label_path": label_path,
                    "aoi": aoi_name,
                    "timestamp": stem,
                    "has_labels": label_path is not None,
                })

        if self.cfg.max_samples and len(samples) > self.cfg.max_samples:
            random.seed(self.cfg.seed)
            samples = random.sample(samples, self.cfg.max_samples)

        self._samples = samples
        log.info(f"[Data] Discovered {len(samples)} image-label pairs across {len(aoi_dirs)} AOIs.")
        labeled = sum(1 for s in samples if s["has_labels"])
        log.info(f"[Data] {labeled}/{len(samples)} have GeoJSON labels.")
        return samples

    def read_geotiff(self, path: Path) -> np.ndarray:
        with rasterio.open(path) as src:
            img = src.read()
        img = np.transpose(img, (1, 2, 0))
        if img.shape[2] > 3:
            img = img[:, :, :3]
        if img.dtype != np.uint8:
            img = np.clip(img, 0, 255).astype(np.uint8)
        return img

    def read_geojson(self, path: Path) -> List[Dict]:
        if path is None or not path.exists():
            return []
        try:
            gdf = gpd.read_file(path)
            buildings = []
            for _, row in gdf.iterrows():
                geom = row.geometry
                if geom is not None and geom.is_valid:
                    buildings.append({
                        "geometry": geom,
                        "bounds": geom.bounds,
                        "area": geom.area,
                        "properties": {k: v for k, v in row.items() if k != "geometry"},
                    })
            return buildings
        except Exception as e:
            log.warning(f"[Data] Failed to read {path}: {e}")
            return []

    def get_image_metadata(self, path: Path) -> Dict:
        with rasterio.open(path) as src:
            return {
                "crs": str(src.crs), "transform": src.transform,
                "bounds": src.bounds, "width": src.width, "height": src.height,
            }


class YOLOFormatConverter:
    CLASS_BUILDING = 0

    def __init__(self, image_size: int = 640):
        self.image_size = image_size

    def polygon_to_yolo_bbox(self, geometry, img_width, img_height, geo_transform=None):
        minx, miny, maxx, maxy = geometry.bounds
        if geo_transform is not None:
            from rasterio.transform import rowcol
            row_min, col_min = rowcol(geo_transform, minx, maxy)
            row_max, col_max = rowcol(geo_transform, maxx, miny)
            px_xmin = max(0, min(col_min, col_max))
            px_ymin = max(0, min(row_min, row_max))
            px_xmax = min(img_width, max(col_min, col_max))
            px_ymax = min(img_height, max(row_min, row_max))
        else:
            px_xmin, px_ymin = max(0, minx), max(0, miny)
            px_xmax, px_ymax = min(img_width, maxx), min(img_height, maxy)

        bw, bh = px_xmax - px_xmin, px_ymax - px_ymin
        if bw <= 2 or bh <= 2:
            return None
        x_center = np.clip((px_xmin + bw / 2) / img_width, 0.0, 1.0)
        y_center = np.clip((px_ymin + bh / 2) / img_height, 0.0, 1.0)
        w_norm   = np.clip(bw / img_width, 0.0, 1.0)
        h_norm   = np.clip(bh / img_height, 0.0, 1.0)
        return (self.CLASS_BUILDING, x_center, y_center, w_norm, h_norm)

    def convert_sample(self, image_path, buildings, geo_transform=None):
        with rasterio.open(image_path) as src:
            img_w, img_h = src.width, src.height
            if geo_transform is None:
                geo_transform = src.transform
        bboxes = []
        for bld in buildings:
            bbox = self.polygon_to_yolo_bbox(bld["geometry"], img_w, img_h, geo_transform)
            if bbox is not None:
                bboxes.append(bbox)
        return bboxes


def build_augmentation_pipeline(cfg=CFG.data) -> A.Compose:
    transforms = []
    if cfg.augment:
        transforms.extend([
            A.HorizontalFlip(p=cfg.flip_prob),
            A.VerticalFlip(p=cfg.flip_prob * 0.5),
            A.RandomRotate90(p=0.3),
            A.Rotate(limit=cfg.rotation_limit, p=0.4, border_mode=cv2.BORDER_CONSTANT),
            A.ColorJitter(
                brightness=cfg.color_jitter, contrast=cfg.color_jitter,
                saturation=cfg.color_jitter * 0.5, hue=cfg.color_jitter * 0.2, p=0.5,
            ),
            A.RandomBrightnessContrast(p=0.3),
            A.GaussNoise(p=0.1),
        ])
    transforms.append(A.Resize(cfg.image_size, cfg.image_size))
    return A.Compose(
        transforms,
        bbox_params=A.BboxParams(format="yolo", label_fields=["class_labels"], min_visibility=0.3),
    )


class YOLODatasetBuilder:
    def __init__(self, cfg=CFG):
        self.cfg = cfg
        self.parser = SpaceNet7Parser(cfg.data)
        self.converter = YOLOFormatConverter(cfg.data.image_size)
        self.augmenter = build_augmentation_pipeline(cfg.data)
        self.output_dir = YOLO_DATA_DIR

    def build(self) -> Path:
        with Timer("YOLO Dataset Build"):
            samples = self.parser.discover()
            labeled_samples = [s for s in samples if s["has_labels"]]
            if not labeled_samples:
                raise ValueError(f"No labeled samples in {self.cfg.data.root}")

            random.seed(self.cfg.data.seed)
            random.shuffle(labeled_samples)
            split_idx = int(len(labeled_samples) * (1 - self.cfg.data.val_split))
            train_samples = labeled_samples[:split_idx]
            val_samples = labeled_samples[split_idx:]
            log.info(f"[Data] Split: {len(train_samples)} train, {len(val_samples)} val")

            for split in ["train", "val"]:
                (self.output_dir / "images" / split).mkdir(parents=True, exist_ok=True)
                (self.output_dir / "labels" / split).mkdir(parents=True, exist_ok=True)

            self._process_split(train_samples, "train", augment=True)
            self._process_split(val_samples, "val", augment=False)
            yaml_path = self._write_dataset_yaml()
            log.info(f"[Data] ‚úì YOLO dataset ready at {self.output_dir}")
            return yaml_path

    def _process_split(self, samples, split, augment):
        img_dir = self.output_dir / "images" / split
        lbl_dir = self.output_dir / "labels" / split
        for sample in tqdm(samples, desc=f"Processing {split}"):
            try:
                img = self.parser.read_geotiff(sample["image_path"])
                buildings = self.parser.read_geojson(sample["label_path"])
                meta = self.parser.get_image_metadata(sample["image_path"])
                bboxes = self.converter.convert_sample(
                    sample["image_path"], buildings, meta.get("transform")
                )
                if not bboxes:
                    continue
                yolo_bboxes = [(b[1], b[2], b[3], b[4]) for b in bboxes]
                class_labels = [b[0] for b in bboxes]

                if augment and self.cfg.data.augment:
                    try:
                        augmented = self.augmenter(
                            image=img, bboxes=yolo_bboxes, class_labels=class_labels,
                        )
                        img = augmented["image"]
                        yolo_bboxes = augmented["bboxes"]
                        class_labels = augmented["class_labels"]
                    except Exception:
                        img = cv2.resize(img, (self.cfg.data.image_size, self.cfg.data.image_size))
                else:
                    img = cv2.resize(img, (self.cfg.data.image_size, self.cfg.data.image_size))

                if not yolo_bboxes:
                    continue

                stem = f"{sample['aoi']}_{sample['timestamp']}"
                cv2.imwrite(str(img_dir / f"{stem}.png"), cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
                with open(lbl_dir / f"{stem}.txt", "w") as f:
                    for cls_id, bbox in zip(class_labels, yolo_bboxes):
                        f.write(f"{cls_id} {bbox[0]:.6f} {bbox[1]:.6f} {bbox[2]:.6f} {bbox[3]:.6f}\n")
            except Exception as e:
                log.warning(f"[Data] Failed {sample['image_path'].name}: {e}")

    def _write_dataset_yaml(self) -> Path:
        import yaml
        yaml_content = {
            "path": str(self.output_dir), "train": "images/train",
            "val": "images/val", "nc": 1, "names": ["building"],
        }
        yaml_path = self.output_dir / "dataset.yaml"
        with open(yaml_path, "w") as f:
            yaml.dump(yaml_content, f, default_flow_style=False)
        return yaml_path

    def get_stats(self) -> Dict:
        stats = {}
        for split in ["train", "val"]:
            img_dir = self.output_dir / "images" / split
            lbl_dir = self.output_dir / "labels" / split
            n_images = len(list(img_dir.glob("*.png")))
            total_boxes = 0
            for lbl_file in lbl_dir.glob("*.txt"):
                with open(lbl_file) as f:
                    total_boxes += len(f.readlines())
            stats[split] = {
                "images": n_images, "total_bboxes": total_boxes,
                "avg_bboxes_per_image": round(total_boxes / max(n_images, 1), 1),
            }
        return stats

print("‚úÖ Data pipeline ready.")

## 5 ¬∑ Synthetic QA Generator (ChatML Conversations)

In [None]:
class DensityClassifier:
    def __init__(self, cfg=CFG.qa):
        self.cfg = cfg

    def classify(self, building_count: int, image_area_m2: Optional[float] = None) -> Dict:
        if building_count <= self.cfg.sparse_max:
            density_class, density_desc = "Sparse", "Low-density suburban or rural area"
            heat_risk, green_space = "Low", "Abundant ‚Äî large open and vegetated areas visible"
            livability = "High ‚Äî spacious residential environment"
            construction_intensity = "Minimal"
        elif building_count <= self.cfg.moderate_max:
            density_class, density_desc = "Moderate", "Moderate suburban density with mixed land use"
            heat_risk, green_space = "Moderate", "Moderate ‚Äî some green patches between structures"
            livability = "Good ‚Äî balanced density with accessible open areas"
            construction_intensity = "Active ‚Äî ongoing development likely"
        elif building_count <= self.cfg.dense_max:
            density_class, density_desc = "Dense", "High-density urban area with tightly packed structures"
            heat_risk, green_space = "High", "Limited ‚Äî minimal vegetation corridors"
            livability = "Moderate ‚Äî constrained but functional residential zones"
            construction_intensity = "High ‚Äî significant built-up coverage"
        else:
            density_class, density_desc = "Urban Core", "Hyper-dense urban core with maximum building coverage"
            heat_risk, green_space = "Very High", "Severely depleted ‚Äî critical lack of vegetation"
            livability = "Low ‚Äî crowded environment with limited open space"
            construction_intensity = "Maximum ‚Äî near-complete land coverage"

        buildings_per_hectare = building_count / max(1, (image_area_m2 or 409600) / 10000)
        return {
            "building_count": building_count, "density_class": density_class,
            "density_description": density_desc, "heat_island_risk": heat_risk,
            "green_space_assessment": green_space, "livability_rating": livability,
            "construction_intensity": construction_intensity,
            "buildings_per_hectare": round(buildings_per_hectare, 1),
        }


# ‚îÄ‚îÄ‚îÄ‚îÄ QA Templates ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
def _q_density_analysis(info):
    return (
        "Analyze the building density in this satellite image. What type of urban zone does this represent?",
        f"This area shows a **{info['density_class']}** density pattern with approximately "
        f"{info['building_count']} buildings detected. {info['density_description']}. "
        f"The estimated building density is {info['buildings_per_hectare']} buildings per hectare."
    )

def _q_heat_island(info):
    return (
        "What is the urban heat island risk for this area based on the visible building coverage?",
        f"The urban heat island risk is **{info['heat_island_risk']}**. With {info['building_count']} "
        f"structures detected, the built-up area significantly "
        f"{'increases' if info['building_count'] > 30 else 'modestly affects'} surface temperature "
        f"relative to surrounding undeveloped land. Construction intensity: {info['construction_intensity']}."
    )

def _q_green_space(info):
    return (
        "Assess the green space availability and environmental health of this area.",
        f"Green space assessment: {info['green_space_assessment']}. In this "
        f"{info['density_class'].lower()}-density zone, vegetation coverage "
        f"{'provides adequate cooling and biodiversity corridors' if info['building_count'] <= 20 else 'is insufficient for effective microclimate regulation'}. "
        f"Recommendation: {'Maintain current balance' if info['building_count'] <= 20 else 'Prioritize urban greening initiatives and rooftop gardens'}."
    )

def _q_livability(info):
    return (
        "Rate the residential livability of this zone. Would you recommend it for new housing development?",
        f"Livability rating: {info['livability_rating']}. With a {info['density_class'].lower()} "
        f"building density of {info['buildings_per_hectare']} structures per hectare, "
        f"{'this area has capacity for additional development while maintaining quality of life' if info['building_count'] <= 25 else 'further development should be carefully planned to avoid overcrowding and infrastructure strain'}."
    )

def _q_construction_trend(info):
    return (
        "What can you tell about the construction activity and urban growth pattern in this area?",
        f"Construction intensity: {info['construction_intensity']}. The {info['building_count']} "
        f"detected structures suggest "
        f"{'an early-stage development area with significant growth potential' if info['building_count'] <= 15 else 'a mature built environment' if info['building_count'] > 40 else 'an actively developing zone in mid-growth phase'}. "
        f"The spatial distribution indicates "
        f"{'organic/informal growth patterns' if info['building_count'] > 45 else 'planned development with identifiable street grids'}."
    )

def _q_infrastructure(info):
    return (
        "Based on the building density and layout, what infrastructure challenges might this area face?",
        f"With {info['building_count']} buildings in this tile, key infrastructure considerations include: "
        f"{'Water and sewage ‚Äî adequate capacity likely available' if info['building_count'] <= 20 else 'Water and sewage ‚Äî systems may be at or near capacity'}. "
        f"{'Road network ‚Äî sufficient for current density' if info['building_count'] <= 30 else 'Road network ‚Äî congestion risk is elevated'}. "
        f"{'Power grid ‚Äî standard residential load' if info['building_count'] <= 25 else 'Power grid ‚Äî peak demand management needed'}. "
        f"Overall infrastructure stress: {'Low' if info['building_count'] <= 15 else 'Moderate' if info['building_count'] <= 35 else 'High' if info['building_count'] <= 50 else 'Critical'}."
    )

def _q_planning_recommendation(info):
    return (
        "If you were an urban planner, what would you recommend for this area's future development?",
        f"For this {info['density_class'].lower()}-density area ({info['building_count']} structures), "
        f"I recommend: "
        f"{'1) Controlled expansion with green buffer zones, 2) Mixed-use zoning, 3) Investment in public transit corridors' if info['building_count'] <= 25 else '1) Densification limits, 2) Mandatory green space ratios, 3) Stormwater management infrastructure upgrades' if info['building_count'] <= 45 else '1) Construction moratorium until infrastructure catches up, 2) Energy efficiency retrofitting, 3) Creating pocket parks to combat heat island effects'}."
    )

def _q_environmental_impact(info):
    return (
        "What is the environmental footprint of this built-up area? Discuss carbon implications and ecological connectivity.",
        f"Environmental analysis for {info['density_class']} zone ({info['building_count']} structures): "
        f"Carbon footprint: {'Low ‚Äî minimal impervious surface' if info['building_count'] <= 10 else 'Moderate ‚Äî significant impervious surfaces' if info['building_count'] <= 30 else 'High ‚Äî extensive land sealing'}. "
        f"Ecological connectivity: {'Intact ‚Äî wildlife corridors preserved' if info['building_count'] <= 15 else 'Fragmented ‚Äî habitat patches isolated' if info['building_count'] <= 40 else 'Severely disrupted ‚Äî near-complete habitat loss'}. "
        f"Stormwater: {'Natural infiltration adequate' if info['building_count'] <= 20 else 'Engineered drainage required to prevent flooding'}."
    )


QA_TEMPLATES = [
    _q_density_analysis, _q_heat_island, _q_green_space, _q_livability,
    _q_construction_trend, _q_infrastructure, _q_planning_recommendation,
    _q_environmental_impact,
]


class SyntheticQAGenerator:
    def __init__(self, cfg=CFG):
        self.cfg = cfg
        self.qa_cfg = cfg.qa
        self.parser = SpaceNet7Parser(cfg.data)
        self.classifier = DensityClassifier(cfg.qa)
        self.output_dir = VLM_DATA_DIR

    def generate(self) -> Path:
        with Timer("VLM QA Generation"):
            samples = self.parser.discover()
            labeled = [s for s in samples if s["has_labels"]]
            conversations = []
            for sample in tqdm(labeled, desc="Generating QA pairs"):
                try:
                    conv = self._generate_conversation(sample)
                    if conv:
                        conversations.append(conv)
                except Exception as e:
                    log.warning(f"[QA] Failed for {sample['image_path'].name}: {e}")

            random.seed(self.cfg.data.seed)
            random.shuffle(conversations)
            split_idx = int(len(conversations) * (1 - self.cfg.data.val_split))
            train_path = self._save_jsonl(conversations[:split_idx], "train.jsonl")
            self._save_jsonl(conversations[split_idx:], "val.jsonl")
            log.info(f"[QA] Generated {split_idx} train, {len(conversations) - split_idx} val conversations.")
            return train_path

    def _generate_conversation(self, sample):
        buildings = self.parser.read_geojson(sample["label_path"])
        building_count = len(buildings)
        total_area = sum(b.get("area", 0) for b in buildings) if buildings else None
        density_info = self.classifier.classify(building_count, total_area)

        n_turns = random.randint(self.qa_cfg.min_turns, self.qa_cfg.max_turns)
        selected_templates = random.sample(QA_TEMPLATES, min(n_turns, len(QA_TEMPLATES)))

        messages = [{"role": "system", "content": self.qa_cfg.system_prompt}]
        for template_fn in selected_templates:
            question, answer = template_fn(density_info)
            messages.append({"role": "user", "content": question})
            messages.append({"role": "assistant", "content": answer})

        return {
            "id": f"{sample['aoi']}_{sample['timestamp']}",
            "image": str(sample["image_path"]),
            "building_count": building_count,
            "density_class": density_info["density_class"],
            "messages": messages,
        }

    def _save_jsonl(self, conversations, filename):
        path = self.output_dir / filename
        path.parent.mkdir(parents=True, exist_ok=True)
        with open(path, "w") as f:
            for conv in conversations:
                f.write(json.dumps(conv, default=str) + "\n")
        log.info(f"[QA] Saved {len(conversations)} conversations to {path}")
        return path

    def get_stats(self) -> Dict:
        stats = {}
        for split in ["train", "val"]:
            path = self.output_dir / f"{split}.jsonl"
            if path.exists():
                with open(path) as f:
                    convs = [json.loads(l) for l in f.readlines()]
                density_dist = {}
                total_turns = 0
                for c in convs:
                    dc = c.get("density_class", "Unknown")
                    density_dist[dc] = density_dist.get(dc, 0) + 1
                    total_turns += len([m for m in c["messages"] if m["role"] == "user"])
                stats[split] = {
                    "conversations": len(convs),
                    "total_qa_turns": total_turns,
                    "density_distribution": density_dist,
                }
        return stats

print("‚úÖ QA generator ready.")

## 6 ¬∑ YOLO Building Detector Trainer

In [None]:
class YOLOTrainer:
    def __init__(self, dataset_yaml: Path, cfg=CFG):
        self.cfg = cfg
        self.yolo_cfg = cfg.yolo
        self.dataset_yaml = dataset_yaml
        self.model = None
        self._wandb_run = None

    def train(self) -> Path:
        from ultralytics import YOLO
        with Timer("YOLO Training"):
            log_vram("before YOLO load")
            resume_weights = self._find_resume_weights()
            if resume_weights and self.yolo_cfg.resume:
                log.info(f"[YOLO] Resuming from: {resume_weights}")
                self.model = YOLO(str(resume_weights))
            else:
                log.info(f"[YOLO] Starting fresh with {self.yolo_cfg.model_variant}")
                self.model = YOLO(self.yolo_cfg.model_variant)
            log_vram("after YOLO load")

            self._wandb_run = init_wandb(
                self.cfg, run_name="yolo-building-detector", tags=["yolo", "training"]
            )

            try:
                results = self.model.train(
                    data=str(self.dataset_yaml),
                    epochs=self.yolo_cfg.epochs,
                    batch=self.yolo_cfg.batch_size,
                    imgsz=self.yolo_cfg.image_size,
                    lr0=self.yolo_cfg.lr0, lrf=self.yolo_cfg.lrf,
                    patience=self.yolo_cfg.patience,
                    save_period=self.yolo_cfg.save_period,
                    workers=self.yolo_cfg.workers,
                    device=self.yolo_cfg.device,
                    project=str(self.yolo_cfg.project),
                    name=self.yolo_cfg.name, exist_ok=True,
                    pretrained=True, verbose=True,
                    hsv_h=0.015, hsv_s=0.4, hsv_v=0.3,
                    flipud=0.3, fliplr=0.5, mosaic=0.8, mixup=0.1,
                    plots=True, val=True,
                )
            except KeyboardInterrupt:
                log.warning("[YOLO] Training interrupted. Weights are saved.")
            finally:
                finish_wandb()

            best_weights = self._get_best_weights()
            log.info(f"[YOLO] ‚úì Best weights: {best_weights}")
            log_vram("after YOLO training")
            return best_weights

    def validate(self) -> Dict:
        from ultralytics import YOLO
        best = self._get_best_weights()
        if not best.exists():
            log.error("[YOLO] No trained weights found.")
            return {}
        model = YOLO(str(best))
        results = model.val(
            data=str(self.dataset_yaml), batch=self.yolo_cfg.batch_size,
            imgsz=self.yolo_cfg.image_size, device=self.yolo_cfg.device,
        )
        metrics = {
            "mAP50": results.box.map50 if hasattr(results.box, 'map50') else 0.0,
            "mAP50-95": results.box.map if hasattr(results.box, 'map') else 0.0,
            "precision": results.box.mp if hasattr(results.box, 'mp') else 0.0,
            "recall": results.box.mr if hasattr(results.box, 'mr') else 0.0,
        }
        metrics["f1"] = 2 * metrics["precision"] * metrics["recall"] / max(metrics["precision"] + metrics["recall"], 1e-6)
        log.info(f"[YOLO] Validation metrics: {metrics}")
        return metrics

    def export_for_deployment(self, format="onnx") -> Path:
        from ultralytics import YOLO
        best = self._get_best_weights()
        model = YOLO(str(best))
        return Path(model.export(format=format, imgsz=self.yolo_cfg.image_size))

    def _find_resume_weights(self):
        run_dir = self.yolo_cfg.project / self.yolo_cfg.name
        last_weights = run_dir / "weights" / "last.pt"
        if last_weights.exists():
            return last_weights
        weights_dir = run_dir / "weights"
        if weights_dir.exists():
            pts = sorted(weights_dir.glob("epoch*.pt"))
            if pts:
                return pts[-1]
        return None

    def _get_best_weights(self) -> Path:
        return self.yolo_cfg.project / self.yolo_cfg.name / "weights" / "best.pt"

    def cleanup(self):
        if self.model is not None:
            del self.model
            self.model = None
        free_vram()
        log.info("[YOLO] Model unloaded, VRAM freed.")

print("‚úÖ YOLO trainer ready.")

## 7 ¬∑ VLM Trainer (Qwen2-VL + LoRA)

In [None]:
from torch.utils.data import Dataset as TorchDataset


class GeoExtractVLMDataset(TorchDataset):
    def __init__(self, jsonl_path: Path, processor, max_length: int = 1024, include_images: bool = True):
        self.processor = processor
        self.max_length = max_length
        self.include_images = include_images
        self.conversations = []
        with open(jsonl_path, "r") as f:
            for line in f:
                line = line.strip()
                if line:
                    self.conversations.append(json.loads(line))
        log.info(f"[VLM Dataset] Loaded {len(self.conversations)} conversations from {jsonl_path.name}")

    def __len__(self):
        return len(self.conversations)

    def __getitem__(self, idx):
        conv = self.conversations[idx]
        messages = conv["messages"]
        image_path = conv.get("image")
        formatted_messages = []
        for msg in messages:
            role = msg["role"]
            content = msg["content"]
            if role == "user" and self.include_images and image_path:
                formatted_messages.append({
                    "role": role,
                    "content": [
                        {"type": "image", "image": image_path},
                        {"type": "text", "text": content},
                    ],
                })
                image_path = None
            else:
                formatted_messages.append({"role": role, "content": [{"type": "text", "text": content}]})
        return {"messages": formatted_messages, "id": conv.get("id", str(idx))}


class ChatMLCollator:
    def __init__(self, processor, max_length: int = 1024):
        self.processor = processor
        self.max_length = max_length
        self.tokenizer = processor.tokenizer if hasattr(processor, 'tokenizer') else processor

    def __call__(self, batch):
        texts, images = [], []
        for sample in batch:
            messages = sample["messages"]
            try:
                text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=False)
            except Exception:
                text = self._manual_chatml(messages)
            texts.append(text)
            for msg in messages:
                if isinstance(msg.get("content"), list):
                    for part in msg["content"]:
                        if isinstance(part, dict) and part.get("type") == "image":
                            img_path = part.get("image", "")
                            if img_path and Path(img_path).exists():
                                from PIL import Image
                                try:
                                    images.append(Image.open(img_path).convert("RGB"))
                                except Exception:
                                    pass

        encoding = self.tokenizer(
            texts, padding=True, truncation=True,
            max_length=self.max_length, return_tensors="pt",
        )
        labels = encoding["input_ids"].clone()
        labels[labels == self.tokenizer.pad_token_id] = -100
        labels = self._mask_non_assistant_tokens(texts, labels)
        encoding["labels"] = labels
        return encoding

    def _mask_non_assistant_tokens(self, texts, labels):
        for i, text in enumerate(texts):
            assistant_start_token = "<|im_start|>assistant"
            assistant_end_token = "<|im_end|>"
            char_pos = 0
            assistant_ranges = []
            while True:
                start_idx = text.find(assistant_start_token, char_pos)
                if start_idx == -1:
                    break
                content_start = text.find("\n", start_idx)
                if content_start == -1:
                    break
                content_start += 1
                end_idx = text.find(assistant_end_token, content_start)
                if end_idx == -1:
                    end_idx = len(text)
                assistant_ranges.append((content_start, end_idx))
                char_pos = end_idx + len(assistant_end_token)

            if assistant_ranges:
                mask = torch.ones_like(labels[i], dtype=torch.bool)
                for start, end in assistant_ranges:
                    prefix_tokens = self.tokenizer.encode(text[:start], add_special_tokens=False)
                    content_tokens = self.tokenizer.encode(text[start:end], add_special_tokens=False)
                    tok_start = min(len(prefix_tokens), labels.shape[1] - 1)
                    tok_end = min(len(prefix_tokens) + len(content_tokens), labels.shape[1])
                    mask[tok_start:tok_end] = False
                labels[i][mask] = -100
        return labels

    def _manual_chatml(self, messages):
        parts = []
        for msg in messages:
            role = msg["role"]
            if isinstance(msg["content"], list):
                content = " ".join(
                    p["text"] for p in msg["content"]
                    if isinstance(p, dict) and p.get("type") == "text"
                )
            else:
                content = msg["content"]
            parts.append(f"<|im_start|>{role}\n{content}<|im_end|>")
        return "\n".join(parts)


class VLMTrainer:
    def __init__(self, cfg=CFG):
        self.cfg = cfg
        self.vlm_cfg = cfg.vlm
        self.model = None
        self.processor = None
        self.trainer = None

    def train(self) -> Path:
        with Timer("VLM Training"):
            log_vram("before VLM load")
            self._load_model()
            log_vram("after VLM load")
            self._apply_lora()
            train_dataset, val_dataset = self._load_datasets()
            self._setup_trainer(train_dataset, val_dataset)
            self._run_training()
            final_path = self._save_final()
            log.info(f"[VLM] ‚úì Training complete. Adapter saved to {final_path}")
            return final_path

    def _load_model(self):
        from transformers import AutoModelForCausalLM, AutoProcessor, BitsAndBytesConfig
        bnb_config = BitsAndBytesConfig(
            load_in_4bit=self.vlm_cfg.load_in_4bit,
            bnb_4bit_quant_type=self.vlm_cfg.bnb_4bit_quant_type,
            bnb_4bit_compute_dtype=getattr(torch, self.vlm_cfg.bnb_4bit_compute_dtype),
            bnb_4bit_use_double_quant=True,
        )
        log.info(f"[VLM] Loading {self.vlm_cfg.model_id} in 4-bit NF4...")
        self.model = AutoModelForCausalLM.from_pretrained(
            self.vlm_cfg.model_id, quantization_config=bnb_config,
            device_map="auto", trust_remote_code=True,
            torch_dtype=torch.float16,
            attn_implementation="flash_attention_2"
            if torch.cuda.get_device_capability()[0] >= 8 else "eager",
        )
        self.processor = AutoProcessor.from_pretrained(self.vlm_cfg.model_id, trust_remote_code=True)
        if self.processor.tokenizer.pad_token is None:
            self.processor.tokenizer.pad_token = self.processor.tokenizer.eos_token
            self.model.config.pad_token_id = self.model.config.eos_token_id
        params = count_parameters(self.model)
        log.info(f"[VLM] Model loaded. Total: {params['total']:,}, Trainable (pre-LoRA): {params['trainable']:,}")

    def _apply_lora(self):
        from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training, TaskType
        self.model = prepare_model_for_kbit_training(
            self.model, use_gradient_checkpointing=self.vlm_cfg.gradient_checkpointing,
        )
        lora_config = LoraConfig(
            r=self.vlm_cfg.lora_r, lora_alpha=self.vlm_cfg.lora_alpha,
            lora_dropout=self.vlm_cfg.lora_dropout,
            target_modules=self.vlm_cfg.lora_target_modules,
            bias="none", task_type=TaskType.CAUSAL_LM,
        )
        self.model = get_peft_model(self.model, lora_config)
        params = count_parameters(self.model)
        log.info(f"[VLM] LoRA applied. Trainable: {params['trainable']:,} ({params['trainable_pct']}%)")
        self.model.print_trainable_parameters()

    def _load_datasets(self):
        train_path = VLM_DATA_DIR / "train.jsonl"
        val_path = VLM_DATA_DIR / "val.jsonl"
        if not train_path.exists():
            raise FileNotFoundError(f"Training data not found at {train_path}. Run QA generator first!")
        train_dataset = GeoExtractVLMDataset(train_path, self.processor, max_length=self.vlm_cfg.max_seq_length)
        val_dataset = None
        if val_path.exists():
            val_dataset = GeoExtractVLMDataset(val_path, self.processor, max_length=self.vlm_cfg.max_seq_length)
        return train_dataset, val_dataset

    def _setup_trainer(self, train_dataset, val_dataset):
        from transformers import TrainingArguments, Trainer
        resume_ckpt = None
        if self.vlm_cfg.resume_from_checkpoint:
            resume_ckpt = find_latest_checkpoint(Path(self.vlm_cfg.output_dir))
            if resume_ckpt:
                log.info(f"[VLM] Will resume from: {resume_ckpt}")

        training_args = TrainingArguments(
            output_dir=str(self.vlm_cfg.output_dir),
            num_train_epochs=self.vlm_cfg.epochs,
            per_device_train_batch_size=self.vlm_cfg.batch_size,
            per_device_eval_batch_size=self.vlm_cfg.batch_size,
            gradient_accumulation_steps=self.vlm_cfg.gradient_accumulation_steps,
            learning_rate=self.vlm_cfg.learning_rate,
            weight_decay=self.vlm_cfg.weight_decay,
            warmup_ratio=self.vlm_cfg.warmup_ratio,
            lr_scheduler_type=self.vlm_cfg.lr_scheduler_type,
            fp16=self.vlm_cfg.fp16, bf16=self.vlm_cfg.bf16,
            save_steps=self.vlm_cfg.save_steps, save_total_limit=3,
            save_strategy="steps",
            eval_strategy="steps" if val_dataset else "no",
            eval_steps=self.vlm_cfg.eval_steps if val_dataset else None,
            logging_steps=self.vlm_cfg.logging_steps, logging_first_step=True,
            report_to="wandb" if self.cfg.wandb.enabled else "none",
            run_name="vlm-geoextract",
            gradient_checkpointing=self.vlm_cfg.gradient_checkpointing,
            optim="paged_adamw_8bit", max_grad_norm=1.0,
            remove_unused_columns=False, dataloader_num_workers=2,
            seed=self.cfg.data.seed,
            load_best_model_at_end=True if val_dataset else False,
            metric_for_best_model="eval_loss" if val_dataset else None,
        )
        collator = ChatMLCollator(self.processor, max_length=self.vlm_cfg.max_seq_length)
        self.trainer = Trainer(
            model=self.model, args=training_args,
            train_dataset=train_dataset, eval_dataset=val_dataset,
            data_collator=collator,
        )
        self._resume_checkpoint = resume_ckpt

    def _run_training(self):
        wandb_run = init_wandb(self.cfg, run_name="vlm-geoextract-lora", tags=["vlm", "lora", "qwen2-vl"])
        try:
            if self._resume_checkpoint:
                log.info(f"[VLM] Resuming from {self._resume_checkpoint}")
                self.trainer.train(resume_from_checkpoint=str(self._resume_checkpoint))
            else:
                log.info("[VLM] Starting training from scratch.")
                self.trainer.train()
        except KeyboardInterrupt:
            log.warning("[VLM] Training interrupted. Saving checkpoint...")
            self.trainer.save_model(str(self.vlm_cfg.output_dir / "interrupted"))
        finally:
            finish_wandb()

    def _save_final(self) -> Path:
        final_dir = Path(self.vlm_cfg.output_dir) / "final_adapter"
        final_dir.mkdir(parents=True, exist_ok=True)
        self.model.save_pretrained(str(final_dir))
        self.processor.save_pretrained(str(final_dir))
        log.info(f"[VLM] Final adapter saved to {final_dir}")
        return final_dir

    def cleanup(self):
        if self.model is not None:
            del self.model; self.model = None
        if self.processor is not None:
            del self.processor; self.processor = None
        if self.trainer is not None:
            del self.trainer; self.trainer = None
        free_vram()
        log.info("[VLM] Model unloaded, VRAM freed.")

print("‚úÖ VLM trainer ready.")

## 8 ¬∑ Agentic Inference Pipeline

In [None]:
from PIL import Image as PILImage


class GeoExtractPipeline:
    def __init__(self, cfg: InferenceConfig = CFG.inference, full_cfg=CFG):
        self.cfg = cfg
        self.full_cfg = full_cfg
        self.yolo_model = None
        self.vlm_model = None
        self.vlm_processor = None
        self._loaded = False

    def load(self):
        log_vram("before pipeline load")
        self._load_yolo()
        self._load_vlm()
        self._loaded = True
        log_vram("after pipeline load")
        log.info("[Pipeline] ‚úì Both models loaded and ready.")

    def _load_yolo(self):
        from ultralytics import YOLO
        weights_path = self.cfg.yolo_weights
        if not weights_path.exists():
            candidates = list(self.cfg.yolo_weights.parent.parent.rglob("best.pt"))
            if candidates:
                weights_path = candidates[0]
            else:
                raise FileNotFoundError(f"YOLO weights not found at {self.cfg.yolo_weights}")
        self.yolo_model = YOLO(str(weights_path))
        log.info(f"[Pipeline] YOLO loaded from {weights_path}")

    def _load_vlm(self):
        from transformers import AutoModelForCausalLM, AutoProcessor, BitsAndBytesConfig
        from peft import PeftModel
        vlm_cfg = self.full_cfg.vlm
        bnb_config = BitsAndBytesConfig(
            load_in_4bit=True, bnb_4bit_quant_type="nf4",
            bnb_4bit_compute_dtype=torch.float16, bnb_4bit_use_double_quant=True,
        )
        base_model = AutoModelForCausalLM.from_pretrained(
            vlm_cfg.model_id, quantization_config=bnb_config,
            device_map="auto", trust_remote_code=True, torch_dtype=torch.float16,
        )
        adapter_dir = self.cfg.vlm_adapter_dir / "final_adapter"
        if not adapter_dir.exists():
            candidates = list(self.cfg.vlm_adapter_dir.rglob("adapter_config.json"))
            if candidates:
                adapter_dir = candidates[0].parent
            else:
                log.warning("[Pipeline] No LoRA adapter found ‚Äî using base model.")
                self.vlm_model = base_model
                self.vlm_processor = AutoProcessor.from_pretrained(vlm_cfg.model_id, trust_remote_code=True)
                return
        self.vlm_model = PeftModel.from_pretrained(base_model, str(adapter_dir))
        self.vlm_model.eval()
        self.vlm_processor = AutoProcessor.from_pretrained(str(adapter_dir), trust_remote_code=True)
        if self.vlm_processor.tokenizer.pad_token is None:
            self.vlm_processor.tokenizer.pad_token = self.vlm_processor.tokenizer.eos_token
        log.info(f"[Pipeline] VLM loaded with adapter from {adapter_dir}")

    def analyze(self, image, question=None):
        if not self._loaded:
            self.load()
        start_time = time.time()
        if isinstance(image, (str, Path)):
            image_path = str(image)
            pil_image = PILImage.open(image_path).convert("RGB")
        else:
            pil_image = image
            image_path = "uploaded_image"

        yolo_results = self._run_yolo(pil_image)
        context = self._build_context(yolo_results)
        if question is None:
            question = (
                "Analyze this satellite image comprehensively. Assess the building "
                "density, urban heat island risk, green space availability, "
                "infrastructure stress, and provide urban planning recommendations."
            )
        vlm_response = self._run_vlm(pil_image, question, context)
        return {
            "image": image_path,
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ"),
            "processing_time_s": round(time.time() - start_time, 2),
            "detection": yolo_results,
            "context": context,
            "analysis": {"question": question, "response": vlm_response},
            "metadata": {
                "yolo_model": str(self.cfg.yolo_weights.name),
                "vlm_model": self.full_cfg.vlm.model_id,
                "confidence_threshold": self.cfg.confidence_threshold,
            },
        }

    def batch_analyze(self, images, question=None):
        results = []
        for img in images:
            try:
                results.append(self.analyze(img, question))
            except Exception as e:
                results.append({"image": str(img), "error": str(e)})
        return results

    def _run_yolo(self, image):
        results = self.yolo_model(
            image, conf=self.cfg.confidence_threshold,
            iou=self.cfg.iou_threshold, verbose=False,
        )
        detections = []
        if results and len(results) > 0:
            boxes = results[0].boxes
            if boxes is not None:
                for i in range(len(boxes)):
                    detections.append({
                        "bbox": boxes.xyxy[i].cpu().tolist(),
                        "confidence": float(boxes.conf[i].cpu()),
                        "class": int(boxes.cls[i].cpu()),
                        "class_name": "building",
                    })
        return {
            "building_count": len(detections),
            "detections": detections,
            "avg_confidence": round(np.mean([d["confidence"] for d in detections]), 3) if detections else 0.0,
        }

    def _build_context(self, yolo_results):
        count = yolo_results["building_count"]
        qa_cfg = self.full_cfg.qa
        if count <= qa_cfg.sparse_max:
            density = "Sparse"
        elif count <= qa_cfg.moderate_max:
            density = "Moderate"
        elif count <= qa_cfg.dense_max:
            density = "Dense"
        else:
            density = "Urban Core"
        return {
            "building_count": count, "density_class": density,
            "avg_detection_confidence": yolo_results["avg_confidence"],
            "context_prompt": (
                f"The building detection model has identified {count} structures "
                f"in this image with an average confidence of "
                f"{yolo_results['avg_confidence']:.1%}. This area is classified "
                f"as '{density}' density."
            ),
        }

    def _run_vlm(self, image, question, context):
        messages = [
            {"role": "system", "content": self.full_cfg.qa.system_prompt},
            {"role": "user", "content": [
                {"type": "image", "image": image},
                {"type": "text", "text": f"Context from detection model: {context['context_prompt']}\n\nQuestion: {question}"},
            ]},
        ]
        try:
            text = self.vlm_processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        except Exception:
            text = (
                f"<|im_start|>system\n{self.full_cfg.qa.system_prompt}<|im_end|>\n"
                f"<|im_start|>user\n{context['context_prompt']}\n{question}<|im_end|>\n"
                f"<|im_start|>assistant\n"
            )
        inputs = self.vlm_processor(
            text=[text], images=[image], return_tensors="pt", padding=True,
        ).to(self.vlm_model.device)
        with torch.no_grad():
            outputs = self.vlm_model.generate(
                **inputs, max_new_tokens=self.cfg.max_new_tokens,
                do_sample=True, temperature=0.7, top_p=0.9, repetition_penalty=1.1,
            )
        input_len = inputs["input_ids"].shape[1]
        return self.vlm_processor.tokenizer.decode(outputs[0][input_len:], skip_special_tokens=True).strip()

    def chat(self, image, conversation_history, new_question):
        if not self._loaded:
            self.load()
        if isinstance(image, (str, Path)):
            pil_image = PILImage.open(str(image)).convert("RGB")
        else:
            pil_image = image
        if not conversation_history:
            yolo_results = self._run_yolo(pil_image)
            context = self._build_context(yolo_results)
        else:
            context = conversation_history[0].get("context", {})

        messages = [{"role": "system", "content": self.full_cfg.qa.system_prompt}]
        for turn in conversation_history:
            messages.append({"role": "user", "content": turn.get("question", "")})
            messages.append({"role": "assistant", "content": turn.get("response", "")})
        messages.append({"role": "user", "content": new_question})

        text = self.vlm_processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        inputs = self.vlm_processor(
            text=[text], images=[pil_image], return_tensors="pt", padding=True,
        ).to(self.vlm_model.device)
        with torch.no_grad():
            outputs = self.vlm_model.generate(
                **inputs, max_new_tokens=self.cfg.max_new_tokens,
                do_sample=True, temperature=0.7,
            )
        input_len = inputs["input_ids"].shape[1]
        response = self.vlm_processor.tokenizer.decode(outputs[0][input_len:], skip_special_tokens=True).strip()
        conversation_history.append({"question": new_question, "response": response, "context": context})
        return {"response": response, "conversation_history": conversation_history}

    def cleanup(self):
        if self.yolo_model is not None:
            del self.yolo_model; self.yolo_model = None
        if self.vlm_model is not None:
            del self.vlm_model; self.vlm_model = None
        if self.vlm_processor is not None:
            del self.vlm_processor; self.vlm_processor = None
        self._loaded = False
        free_vram()
        log.info("[Pipeline] All models unloaded.")

print("‚úÖ Inference pipeline ready.")

## 9 ¬∑ Evaluation Module (Defense Metrics)

In [None]:
from sklearn.metrics import (
    precision_score, recall_score, f1_score,
    confusion_matrix, classification_report, accuracy_score,
)
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt


class YOLOEvaluator:
    def __init__(self, iou_threshold: float = 0.5):
        self.iou_threshold = iou_threshold

    def evaluate(self, dataset_yaml: Path, weights_path: Path) -> Dict:
        from ultralytics import YOLO
        log.info(f"[Eval-YOLO] Running validation with IoU={self.iou_threshold}...")
        model = YOLO(str(weights_path))
        results = model.val(
            data=str(dataset_yaml), iou=self.iou_threshold,
            conf=0.25, verbose=True, plots=True,
            save_dir=str(EVAL_DIR / "yolo_eval"),
        )
        metrics = {
            "precision": float(results.box.mp) if hasattr(results.box, 'mp') else 0.0,
            "recall": float(results.box.mr) if hasattr(results.box, 'mr') else 0.0,
            "mAP50": float(results.box.map50) if hasattr(results.box, 'map50') else 0.0,
            "mAP50-95": float(results.box.map) if hasattr(results.box, 'map') else 0.0,
        }
        p, r = metrics["precision"], metrics["recall"]
        metrics["f1"] = 2 * p * r / max(p + r, 1e-8)
        log.info(f"[Eval-YOLO] Results: {metrics}")
        out_path = EVAL_DIR / "yolo_metrics.json"
        out_path.parent.mkdir(parents=True, exist_ok=True)
        with open(out_path, "w") as f:
            json.dump(metrics, f, indent=2)
        return metrics

    def evaluate_counting_accuracy(self, predictions, ground_truths) -> Dict:
        predictions, ground_truths = np.array(predictions), np.array(ground_truths)
        mae = float(np.mean(np.abs(predictions - ground_truths)))
        rmse = float(np.sqrt(np.mean((predictions - ground_truths) ** 2)))
        mape = float(np.mean(np.abs(predictions - ground_truths) / np.maximum(ground_truths, 1)) * 100)
        metrics = {
            "mae": mae, "rmse": rmse, "mape_pct": mape,
            "exact_match": float(np.mean(predictions == ground_truths)),
            "within_5": float(np.mean(np.abs(predictions - ground_truths) <= 5)),
            "within_10": float(np.mean(np.abs(predictions - ground_truths) <= 10)),
        }
        log.info(f"[Eval-YOLO] Counting accuracy: {metrics}")
        return metrics


class VLMEvaluator:
    DENSITY_CLASSES = ["Sparse", "Moderate", "Dense", "Urban Core"]

    def __init__(self, cfg=CFG):
        self.cfg = cfg
        self.class_to_idx = {c: i for i, c in enumerate(self.DENSITY_CLASSES)}

    def evaluate(self, gt_classes, pred_classes) -> Dict:
        log.info(f"[Eval-VLM] Evaluating {len(gt_classes)} samples...")
        gt_idx = [self.class_to_idx.get(c, -1) for c in gt_classes]
        pred_idx = [self.class_to_idx.get(c, -1) for c in pred_classes]
        valid = [(g, p) for g, p in zip(gt_idx, pred_idx) if g >= 0 and p >= 0]
        if not valid:
            log.error("[Eval-VLM] No valid predictions!")
            return {}
        gt_valid = [v[0] for v in valid]
        pred_valid = [v[1] for v in valid]

        metrics = {
            "accuracy": float(accuracy_score(gt_valid, pred_valid)),
            "f1_macro": float(f1_score(gt_valid, pred_valid, average="macro", zero_division=0)),
            "f1_weighted": float(f1_score(gt_valid, pred_valid, average="weighted", zero_division=0)),
            "precision_macro": float(precision_score(gt_valid, pred_valid, average="macro", zero_division=0)),
            "recall_macro": float(recall_score(gt_valid, pred_valid, average="macro", zero_division=0)),
        }
        report = classification_report(
            gt_valid, pred_valid, target_names=self.DENSITY_CLASSES,
            output_dict=True, zero_division=0,
        )
        metrics["per_class"] = {
            cls: {"precision": report[cls]["precision"], "recall": report[cls]["recall"],
                  "f1": report[cls]["f1-score"], "support": report[cls]["support"]}
            for cls in self.DENSITY_CLASSES if cls in report
        }
        cm = confusion_matrix(gt_valid, pred_valid, labels=list(range(len(self.DENSITY_CLASSES))))
        metrics["confusion_matrix"] = cm.tolist()

        log.info(f"[Eval-VLM] Accuracy: {metrics['accuracy']:.4f}, F1-macro: {metrics['f1_macro']:.4f}")
        print("\n" + classification_report(gt_valid, pred_valid, target_names=self.DENSITY_CLASSES, zero_division=0))

        # Save confusion matrix plot
        fig, ax = plt.subplots(figsize=(8, 6))
        im = ax.imshow(cm, interpolation="nearest", cmap=plt.cm.Blues)
        ax.figure.colorbar(im, ax=ax)
        ax.set(xticks=np.arange(cm.shape[1]), yticks=np.arange(cm.shape[0]),
               xticklabels=self.DENSITY_CLASSES, yticklabels=self.DENSITY_CLASSES,
               title="Density Classification ‚Äî Confusion Matrix",
               ylabel="Ground Truth", xlabel="Predicted")
        plt.setp(ax.get_xticklabels(), rotation=45, ha="right")
        thresh = cm.max() / 2.0
        for i in range(cm.shape[0]):
            for j in range(cm.shape[1]):
                ax.text(j, i, format(cm[i, j], "d"), ha="center", va="center",
                        color="white" if cm[i, j] > thresh else "black")
        fig.tight_layout()
        fig.savefig(str(EVAL_DIR / "vlm_confusion_matrix.png"), dpi=150)
        plt.close(fig)

        # Save metrics
        serializable = {k: v for k, v in metrics.items() if not isinstance(v, np.ndarray)}
        with open(EVAL_DIR / "vlm_metrics.json", "w") as f:
            json.dump(serializable, f, indent=2, default=str)
        return metrics

    def evaluate_from_pipeline(self, pipeline, val_jsonl=None, max_samples=100):
        if val_jsonl is None:
            val_jsonl = VLM_DATA_DIR / "val.jsonl"
        if not val_jsonl.exists():
            log.error(f"[Eval-VLM] Validation data not found: {val_jsonl}")
            return {}
        with open(val_jsonl) as f:
            val_data = [json.loads(l) for l in f.readlines()[:max_samples]]
        gt_classes, pred_classes = [], []
        for sample in tqdm(val_data, desc="VLM Evaluation"):
            gt_class = sample.get("density_class", "Unknown")
            image_path = sample.get("image", "")
            if not Path(image_path).exists():
                continue
            try:
                result = pipeline.analyze(image_path)
                pred_class = result.get("context", {}).get("density_class", "Unknown")
                gt_classes.append(gt_class)
                pred_classes.append(pred_class)
            except Exception as e:
                log.warning(f"[Eval-VLM] Failed on {image_path}: {e}")
        return self.evaluate(gt_classes, pred_classes)


class GeoExtractEvaluator:
    def __init__(self, cfg=CFG):
        self.cfg = cfg
        self.yolo_eval = YOLOEvaluator(cfg.evaluation.iou_threshold)
        self.vlm_eval = VLMEvaluator(cfg)

    def run_full_evaluation(self, dataset_yaml, yolo_weights, pipeline=None):
        wandb_run = init_wandb(self.cfg, run_name="evaluation", tags=["eval", "metrics"])
        all_metrics = {}

        with Timer("YOLO Evaluation"):
            yolo_metrics = self.yolo_eval.evaluate(dataset_yaml, yolo_weights)
            all_metrics["yolo"] = yolo_metrics
            if wandb_run:
                import wandb
                wandb.log({f"eval/yolo_{k}": v for k, v in yolo_metrics.items()})

        if pipeline is not None:
            with Timer("VLM Evaluation"):
                vlm_metrics = self.vlm_eval.evaluate_from_pipeline(pipeline)
                all_metrics["vlm"] = vlm_metrics
                if wandb_run:
                    import wandb
                    log_m = {k: v for k, v in vlm_metrics.items() if isinstance(v, (int, float))}
                    wandb.log({f"eval/vlm_{k}": v for k, v in log_m.items()})
                    cm_path = EVAL_DIR / "vlm_confusion_matrix.png"
                    if cm_path.exists():
                        wandb.log({"eval/confusion_matrix": wandb.Image(str(cm_path))})

        finish_wandb()
        report_path = EVAL_DIR / "full_evaluation_report.json"
        with open(report_path, "w") as f:
            json.dump(all_metrics, f, indent=2, default=str)
        log.info(f"[Eval] ‚úì Full report saved to {report_path}")
        self._print_summary(all_metrics)
        return all_metrics

    def _print_summary(self, metrics):
        print("\n" + "‚ïê" * 60)
        print("  GeoExtract v2 ‚Äî EVALUATION SUMMARY")
        print("‚ïê" * 60)
        if "yolo" in metrics:
            y = metrics["yolo"]
            print(f"\n  ‚îå‚îÄ‚îÄ YOLO Building Detection ‚îÄ‚îÄ‚îê")
            print(f"  ‚îÇ Precision:  {y.get('precision', 0):.4f}          ‚îÇ")
            print(f"  ‚îÇ Recall:     {y.get('recall', 0):.4f}          ‚îÇ")
            print(f"  ‚îÇ F1-Score:   {y.get('f1', 0):.4f}          ‚îÇ")
            print(f"  ‚îÇ mAP@50:     {y.get('mAP50', 0):.4f}          ‚îÇ")
            print(f"  ‚îÇ mAP@50-95:  {y.get('mAP50-95', 0):.4f}          ‚îÇ")
            print(f"  ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò")
        if "vlm" in metrics:
            v = metrics["vlm"]
            print(f"\n  ‚îå‚îÄ‚îÄ VLM Density Classification ‚îÄ‚îÄ‚îê")
            print(f"  ‚îÇ Accuracy:   {v.get('accuracy', 0):.4f}             ‚îÇ")
            print(f"  ‚îÇ F1 (macro): {v.get('f1_macro', 0):.4f}             ‚îÇ")
            print(f"  ‚îÇ F1 (wgt.):  {v.get('f1_weighted', 0):.4f}             ‚îÇ")
            print(f"  ‚îÇ Precision:  {v.get('precision_macro', 0):.4f}             ‚îÇ")
            print(f"  ‚îÇ Recall:     {v.get('recall_macro', 0):.4f}             ‚îÇ")
            print(f"  ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò")
        print("‚ïê" * 60)

print("‚úÖ Evaluation module ready.")

---

# üöÄ PIPELINE EXECUTION

All code is loaded. Now run each step sequentially.

## Step 1 ¬∑ Build YOLO Dataset from SpaceNet 7

Parses GeoTIFF images + GeoJSON labels ‚Üí YOLO bbox format with augmentations.

In [None]:
builder = YOLODatasetBuilder(CFG)
dataset_yaml = builder.build()

# Print dataset statistics
stats = builder.get_stats()
print("\nüìä Dataset Statistics:")
for split, s in stats.items():
    print(f"  {split}: {s['images']} images, {s['total_bboxes']} bboxes "
          f"(avg {s['avg_bboxes_per_image']} per image)")
print(f"\nüìÅ Dataset YAML: {dataset_yaml}")

## Step 2 ¬∑ Generate Synthetic QA Pairs for VLM Training

In [None]:
qa_gen = SyntheticQAGenerator(CFG)
qa_path = qa_gen.generate()

# Print QA statistics
qa_stats = qa_gen.get_stats()
print("\nüìä QA Statistics:")
for split, s in qa_stats.items():
    print(f"  {split}: {s['conversations']} conversations, {s['total_qa_turns']} QA turns")
    print(f"    Density distribution: {s['density_distribution']}")

## Step 3 ¬∑ Train YOLO Building Detector

**Estimated time: ~1‚Äì2 hours on T4 GPU (50 epochs)**  
Model auto-saves every 5 epochs. Safe to interrupt ‚Äî will auto-resume.

In [None]:
yolo_trainer = YOLOTrainer(dataset_yaml, CFG)
yolo_best_weights = yolo_trainer.train()
print(f"\n‚úÖ YOLO training complete. Best weights: {yolo_best_weights}")

## Step 4 ¬∑ Free YOLO VRAM ‚Üí Train VLM with LoRA

**Estimated time: ~2‚Äì4 hours on T4 GPU (3 epochs)**  
Checkpoints every 500 steps. Auto-resume supported.

In [None]:
# Free YOLO from VRAM before loading VLM
yolo_trainer.cleanup()
del yolo_trainer
free_vram()
print("üßπ YOLO unloaded. VRAM is free for VLM.")
log_vram("before VLM")

In [None]:
vlm_trainer = VLMTrainer(CFG)
vlm_adapter_path = vlm_trainer.train()
print(f"\n‚úÖ VLM training complete. Adapter: {vlm_adapter_path}")

In [None]:
# Free VLM trainer VRAM for inference
vlm_trainer.cleanup()
del vlm_trainer
free_vram()
print("üßπ VLM trainer unloaded.")
log_vram("before inference")

## Step 5 ¬∑ Run Agentic Inference Demo

YOLO detects buildings ‚Üí context injected ‚Üí VLM reasons about the scene.

In [None]:
# Load the full pipeline (both models)
pipeline = GeoExtractPipeline(CFG.inference, CFG)
pipeline.load()

# Find a sample image for demo
sample_images = list(Path(YOLO_DATA_DIR / "images" / "val").glob("*.png"))
if sample_images:
    demo_image = sample_images[0]
    print(f"\nüñºÔ∏è Analyzing: {demo_image.name}")
    result = pipeline.analyze(demo_image)

    print(f"\nüîç Detection: {result['detection']['building_count']} buildings found")
    print(f"üìä Density: {result['context']['density_class']}")
    print(f"‚è±Ô∏è Processing time: {result['processing_time_s']}s")
    print(f"\nüí¨ VLM Analysis:\n{result['analysis']['response']}")

    # Save demo result
    with open(OUTPUT_DIR / "demo_result.json", "w") as f:
        json.dump(result, f, indent=2, default=str)
    print(f"\nüìÅ Full result saved to {OUTPUT_DIR / 'demo_result.json'}")
else:
    print("‚ö†Ô∏è No validation images found for demo. Run data pipeline first.")

## Step 6 ¬∑ Full Evaluation (Defense Metrics)

In [None]:
evaluator = GeoExtractEvaluator(CFG)
yolo_weights = YOLO_CKPT_DIR / "building_detector" / "weights" / "best.pt"

all_metrics = evaluator.run_full_evaluation(
    dataset_yaml=dataset_yaml,
    yolo_weights=yolo_weights,
    pipeline=pipeline,
)

print(f"\nüìÅ Full report: {EVAL_DIR / 'full_evaluation_report.json'}")

In [None]:
# Final cleanup
pipeline.cleanup()
del pipeline
free_vram()
print("\nüéâ GeoExtract v2 pipeline complete!")
print(f"üìÇ All outputs saved to: {OUTPUT_DIR}")

---

## ‚è±Ô∏è Estimated Training Times (NVIDIA T4 16 GB)

| Step | Estimated Time | Notes |
|------|---------------|-------|
| **Data Processing** | 10‚Äì20 min | Depends on SpaceNet 7 subset size |
| **QA Generation** | 5‚Äì15 min | GeoJSON parsing + template instantiation |
| **YOLO Training** (50 epochs) | 1‚Äì2 hours | batch=16, img=640, YOLOv11-nano |
| **VLM Training** (3 epochs) | 2‚Äì4 hours | batch=2, grad_accum=8, LoRA on 4-bit |
| **Inference Demo** | 1‚Äì2 min | Single image end-to-end |
| **Evaluation** | 15‚Äì30 min | YOLO val + VLM density classification |
| **Total** | **~4‚Äì7 hours** | ‚úÖ Within Kaggle 12-hour limit |

### üí° Tips to Speed Up
- Set `CFG.data.max_samples = 500` for a faster debug run
- Reduce YOLO epochs: `CFG.yolo.epochs = 25`
- Reduce VLM epochs: `CFG.vlm.epochs = 1`
- The pipeline auto-resumes from checkpoints if Kaggle session restarts