# UAV AIP Detection API (Colab GPU)

在 Google Colab 上執行 UAV 物件偵測 API，使用免費 GPU 加速推論。

## 使用方式
1. 執行所有 Cell
2. 複製最後產生的 Cloudflare Tunnel URL
3. 在前端 Dashboard 貼上 URL 連線

In [None]:
#@title 1. 安裝依賴
!pip install -q fastapi uvicorn nest-asyncio \
    ultralytics rasterio laspy[lazrs] pyproj shapely \
    segmentation-models-pytorch huggingface_hub pillow

# 安裝 cloudflared
!wget -q https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64 -O /usr/local/bin/cloudflared
!chmod +x /usr/local/bin/cloudflared
print("依賴安裝完成")

In [None]:
#@title 2. API 程式碼
import threading
import time
import os
import shutil
import io
import math
from pathlib import Path
from datetime import datetime

import numpy as np
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response
from pydantic import BaseModel

# FastAPI App
app = FastAPI(title="UAV Object Detection API - Colab GPU")
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 設定
UPLOAD_DIR = Path("/content/uploads")
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
MODEL_DIR = Path("/content/models")
MODEL_DIR.mkdir(parents=True, exist_ok=True)
HF_MODEL_REPO = "chyyynh/uav-yolo-models"

def get_model_path(filename: str) -> Path:
    cached_path = MODEL_DIR / filename
    if cached_path.exists():
        return cached_path
    try:
        from huggingface_hub import hf_hub_download
        print(f"[Model] Downloading {filename}...")
        downloaded = hf_hub_download(repo_id=HF_MODEL_REPO, filename=filename)
        return Path(downloaded)
    except Exception as e:
        print(f"[Model] Failed: {e}")
        return cached_path

# YOLO 設定
MODELS_CONFIG = {
    "car": {"filename": "vehicle.pt", "conf": 0.75, "patch_size": 1024, "overlap": 850, "nms_iou": 0.1, "area": (4.0, 25.0), "ratio": (1.0, 3.0)},
    "person": {"filename": "human.pt", "conf": 0.60, "patch_size": 1024, "overlap": 850, "nms_iou": 0.1, "area": (0.2, 1.0), "ratio": (0.5, 2.0)},
    "cone": {"filename": "cone.pt", "conf": 0.60, "patch_size": 1024, "overlap": 850, "nms_iou": 0.1, "area": (0.05, 0.5), "ratio": (0.8, 1.4)},
}

HEIGHT_RANGE = {"person": (1.45, 1.90), "cone": (0.25, 0.90), "car": (1.0, 2.2), "vehicle": (1.0, 2.2)}
HEIGHT_PRIOR = {"person": {"mean": 1.70, "std": 0.08}, "cone": {"mean": 0.45, "std": 0.10}, "car": {"mean": 1.60, "std": 0.25}, "vehicle": {"mean": 1.60, "std": 0.25}}
MIN_PTS_BY_CLASS = {"person": 8, "cone": 10, "car": 30}

# UPerNet 設定
LANDCOVER_CLASSES = {0: "bare-ground", 1: "tree", 2: "road", 3: "pavement", 4: "grass", 5: "building"}
LANDCOVER_COLORS = {"bare-ground": [222, 184, 135], "tree": [34, 139, 34], "road": [128, 128, 128], "pavement": [178, 34, 34], "grass": [124, 252, 0], "building": [255, 140, 0]}
UPERNET_CONFIG = {"filename": "UPerNet_best.pth", "num_classes": 6, "tile_size": (64, 64), "overlap": 0.5, "encoder_name": "resnet50"}
IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD = [0.229, 0.224, 0.225]

# Terrain 設定
SLOPE_CATEGORIES = {"flat": (0, 5), "gentle": (5, 15), "moderate": (15, 30), "steep": (30, 90)}
ASPECT_DIRECTIONS = {"N": (337.5, 22.5), "NE": (22.5, 67.5), "E": (67.5, 112.5), "SE": (112.5, 157.5),
                     "S": (157.5, 202.5), "SW": (202.5, 247.5), "W": (247.5, 292.5), "NW": (292.5, 337.5)}

rng = np.random.default_rng(42)

# 全域狀態
uploaded_files = {"ortho": None, "laz": None, "dsm": None}
ortho_cache = {"src": None, "transform": None, "crs": None, "bounds": None, "width": 0, "height": 0, "pixel_w": 0, "pixel_h": 0}
pointcloud_cache = {"X": None, "Y": None, "Z": None, "loaded": False}
dsm_cache = {"data": None, "transform": None, "crs": None, "loaded": False, "nodata": None, "resolution": None}
models_cache = {"loaded": False, "models": {}}
upernet_cache = {"loaded": False, "model": None, "device": None}
landcover_cache = {"mask": None, "stats": None, "computed": False}
terrain_cache = {"slope": None, "aspect": None, "stats": None, "computed": False}
processing_state = {"job_id": None, "status": "idle", "progress": 0, "current_step": "", "elapsed_seconds": 0, "results": [], "start_time": None}

class ProcessingRequest(BaseModel):
    project_id: str = "current"
    detect_person: bool = True
    detect_vehicle: bool = True
    detect_cone: bool = True
    include_elevation: bool = True
    include_landcover: bool = False
    include_terrain: bool = False

def load_yolo_models():
    if models_cache["loaded"]:
        return models_cache["models"]
    from ultralytics import YOLO
    models = {}
    for cls_name, cfg in MODELS_CONFIG.items():
        model_path = get_model_path(cfg["filename"])
        if model_path.exists():
            models[cls_name] = YOLO(str(model_path))
            print(f"[YOLO] Loaded: {cls_name}")
    models_cache["models"] = models
    models_cache["loaded"] = True
    return models

# ============ UPerNet Landcover Functions ============

def load_upernet_model():
    """載入 UPerNet 模型"""
    if upernet_cache["loaded"]:
        return upernet_cache["model"]

    try:
        import torch
        import segmentation_models_pytorch as smp
    except ImportError as e:
        print(f"[UPerNet] Missing dependency: {e}")
        return None

    model_path = get_model_path(UPERNET_CONFIG["filename"])
    if not model_path.exists():
        print(f"[UPerNet] Model not found: {model_path}")
        return None

    try:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model = smp.UPerNet(
            encoder_name=UPERNET_CONFIG["encoder_name"],
            encoder_weights=None,
            in_channels=3,
            classes=UPERNET_CONFIG["num_classes"]
        ).to(device)

        state_dict = torch.load(str(model_path), map_location=device)
        model.load_state_dict(state_dict, strict=False)
        model.eval()

        upernet_cache["model"] = model
        upernet_cache["loaded"] = True
        upernet_cache["device"] = device
        print(f"[UPerNet] Loaded on {device}")
        return model
    except Exception as e:
        print(f"[UPerNet] Failed to load: {e}")
        import traceback
        traceback.print_exc()
        return None

def run_landcover_segmentation(progress_callback=None) -> dict:
    """執行土地覆蓋分割"""
    import torch
    import torchvision.transforms as T
    import cv2

    if ortho_cache["src"] is None:
        raise ValueError("No ortho image loaded")

    model = load_upernet_model()
    if model is None:
        raise ValueError("Failed to load UPerNet model")

    device = upernet_cache["device"]
    src = ortho_cache["src"]

    # Read image
    data = src.read([1, 2, 3])
    img = np.moveaxis(data, 0, -1)

    # Normalize to uint8 if needed
    if img.dtype != np.uint8:
        img = img.astype(np.float32)
        img = (img - img.min()) / max(img.max() - img.min(), 1e-6) * 255
        img = img.astype(np.uint8)

    H, W = img.shape[:2]
    th, tw = UPERNET_CONFIG["tile_size"]
    overlap = UPERNET_CONFIG["overlap"]
    num_classes = UPERNET_CONFIG["num_classes"]

    stride_h = max(1, int(th * (1 - overlap)))
    stride_w = max(1, int(tw * (1 - overlap)))

    # Padding
    pad_h = max(((H - th) // stride_h + 1) * stride_h + th - H, 0)
    pad_w = max(((W - tw) // stride_w + 1) * stride_w + tw - W, 0)
    img_pad = cv2.copyMakeBorder(img, 0, pad_h, 0, pad_w, cv2.BORDER_REFLECT_101)
    Hp, Wp = img_pad.shape[:2]

    # Accumulators
    logit_sum = np.zeros((num_classes, Hp, Wp), np.float32)
    count = np.zeros((Hp, Wp), np.float32)

    to_tensor = T.Compose([
        T.ToTensor(),
        T.Normalize(IMAGENET_MEAN, IMAGENET_STD)
    ])

    # Calculate total tiles for progress
    total_tiles = ((Hp - th) // stride_h + 1) * ((Wp - tw) // stride_w + 1)
    tile_count = 0

    # Sliding window inference
    with torch.no_grad():
        for y in range(0, Hp - th + 1, stride_h):
            for x in range(0, Wp - tw + 1, stride_w):
                tile = img_pad[y:y+th, x:x+tw]
                tin = to_tensor(tile).unsqueeze(0).to(device)
                logits = model(tin)
                logits = logits[0] if isinstance(logits, (list, tuple)) else logits
                logit_sum[:, y:y+th, x:x+tw] += logits.squeeze(0).cpu().numpy()
                count[y:y+th, x:x+tw] += 1

                tile_count += 1
                if progress_callback and tile_count % 100 == 0:
                    progress = int(tile_count / total_tiles * 100)
                    progress_callback(progress, f"Landcover segmentation ({tile_count}/{total_tiles})...")

    # Get prediction
    pred = np.argmax(logit_sum / np.maximum(count, 1e-6), axis=0).astype(np.uint8)
    pred = pred[:H, :W]

    # Handle nodata (black pixels)
    black_mask = np.all(img <= 10, axis=2)
    pred[black_mask] = 255

    # Compute statistics
    stats = {}
    total_valid = np.sum(pred < num_classes)
    for class_id, class_name in LANDCOVER_CLASSES.items():
        class_pixels = np.sum(pred == class_id)
        stats[class_name] = {
            "pixels": int(class_pixels),
            "percentage": round(class_pixels / total_valid * 100, 2) if total_valid > 0 else 0
        }

    # Cache results
    landcover_cache["mask"] = pred
    landcover_cache["stats"] = stats
    landcover_cache["computed"] = True

    print(f"[UPerNet] Segmentation complete: {H}x{W}")
    return {"stats": stats, "shape": (H, W)}

def get_landcover_colorized() -> np.ndarray:
    """取得彩色土地覆蓋圖"""
    if not landcover_cache["computed"] or landcover_cache["mask"] is None:
        return None

    mask = landcover_cache["mask"]
    H, W = mask.shape
    color_img = np.zeros((H, W, 3), dtype=np.uint8)

    for class_id, class_name in LANDCOVER_CLASSES.items():
        color = LANDCOVER_COLORS[class_name]
        color_img[mask == class_id] = color

    # Set nodata to black
    color_img[mask == 255] = [0, 0, 0]

    return color_img

# ============ Terrain Analysis Functions ============

def compute_terrain_metrics(progress_callback=None) -> dict:
    """從 DSM 計算 slope 和 aspect"""
    if not dsm_cache["loaded"]:
        raise ValueError("No DSM loaded")

    arr = dsm_cache["data"].astype(np.float32)
    nodata = dsm_cache["nodata"]
    resolution = dsm_cache["resolution"] or 1.0

    # Handle nodata
    if nodata is not None:
        arr[arr == nodata] = np.nan

    if progress_callback:
        progress_callback(10, "Computing gradients...")

    # Compute gradients
    grad_y, grad_x = np.gradient(arr, resolution, resolution)

    if progress_callback:
        progress_callback(40, "Computing slope...")

    # Slope (degrees)
    slope = np.degrees(np.arctan(np.sqrt(grad_x**2 + grad_y**2)))

    if progress_callback:
        progress_callback(60, "Computing aspect...")

    # Aspect (0-360 degrees)
    aspect = np.degrees(np.arctan2(-grad_x, grad_y))
    aspect[aspect < 0] += 360

    if progress_callback:
        progress_callback(80, "Computing statistics...")

    # Compute statistics - filter out NaN values
    valid_mask = ~np.isnan(arr) & ~np.isnan(slope) & ~np.isnan(aspect)
    valid_slope = slope[valid_mask]
    valid_aspect = aspect[valid_mask]
    valid_elev = arr[valid_mask]

    # Elevation stats (with NaN safety)
    def safe_stat(arr, func, default=0.0):
        if len(arr) == 0:
            return default
        result = func(arr)
        if np.isnan(result) or np.isinf(result):
            return default
        return float(result)

    elevation_stats = {
        "min": safe_stat(valid_elev, np.min),
        "max": safe_stat(valid_elev, np.max),
        "mean": safe_stat(valid_elev, np.mean),
        "std": safe_stat(valid_elev, np.std),
    }

    # Slope distribution by category
    slope_distribution = {}
    total_valid = len(valid_slope)
    for cat_name, (low, high) in SLOPE_CATEGORIES.items():
        count = int(np.sum((valid_slope >= low) & (valid_slope < high)))
        slope_distribution[cat_name] = {
            "count": count,
            "percentage": round(count / total_valid * 100, 2) if total_valid > 0 else 0.0
        }

    slope_stats = {
        "min": safe_stat(valid_slope, np.min),
        "max": safe_stat(valid_slope, np.max),
        "mean": safe_stat(valid_slope, np.mean),
        "distribution": slope_distribution,
    }

    # Aspect distribution by direction
    aspect_distribution = {}
    for dir_name, (low, high) in ASPECT_DIRECTIONS.items():
        if dir_name == "N":  # Special case: wraps around 360
            count = int(np.sum((valid_aspect >= low) | (valid_aspect < high)))
        else:
            count = int(np.sum((valid_aspect >= low) & (valid_aspect < high)))
        aspect_distribution[dir_name] = {
            "count": count,
            "percentage": round(count / total_valid * 100, 2) if total_valid > 0 else 0.0
        }

    aspect_stats = {
        "distribution": aspect_distribution,
    }

    stats = {
        "elevation": elevation_stats,
        "slope": slope_stats,
        "aspect": aspect_stats,
    }

    # Cache results
    terrain_cache["slope"] = slope
    terrain_cache["aspect"] = aspect
    terrain_cache["stats"] = stats
    terrain_cache["computed"] = True

    if progress_callback:
        progress_callback(100, "Terrain analysis complete")

    print(f"[Terrain] Analysis complete: {arr.shape}")
    return stats

def get_slope_colorized() -> np.ndarray:
    """取得彩色坡度圖 (terrain colormap)"""
    if not terrain_cache["computed"] or terrain_cache["slope"] is None:
        return None

    import matplotlib.pyplot as plt
    from matplotlib.colors import Normalize

    slope = terrain_cache["slope"]
    norm = Normalize(vmin=0, vmax=60)
    cmap = plt.cm.terrain

    # Apply colormap
    colored = cmap(norm(slope))[:, :, :3]  # Remove alpha channel
    colored = (colored * 255).astype(np.uint8)

    # Handle NaN as black
    nan_mask = np.isnan(slope)
    colored[nan_mask] = [0, 0, 0]

    return colored

def get_aspect_colorized() -> np.ndarray:
    """取得彩色坡向圖 (HSV colormap)"""
    if not terrain_cache["computed"] or terrain_cache["aspect"] is None:
        return None

    import matplotlib.pyplot as plt
    from matplotlib.colors import Normalize

    aspect = terrain_cache["aspect"]
    norm = Normalize(vmin=0, vmax=360)
    cmap = plt.cm.hsv

    # Apply colormap
    colored = cmap(norm(aspect))[:, :, :3]  # Remove alpha channel
    colored = (colored * 255).astype(np.uint8)

    # Handle NaN as black
    nan_mask = np.isnan(aspect)
    colored[nan_mask] = [0, 0, 0]

    return colored

# ============ YOLO Detection Functions ============

def run_yolo_detection(classes_to_detect: list[str], progress_callback=None) -> list[dict]:
    import rasterio
    from rasterio.windows import Window
    import torch
    from torchvision.ops import nms

    if ortho_cache["src"] is None:
        raise ValueError("No ortho image loaded")

    src = ortho_cache["src"]
    transform = ortho_cache["transform"]
    width, height = ortho_cache["width"], ortho_cache["height"]
    pixel_w, pixel_h = ortho_cache["pixel_w"], ortho_cache["pixel_h"]

    models = load_yolo_models()
    if not models:
        raise ValueError("No YOLO models loaded")

    raw_detections = []
    total_classes = len(classes_to_detect)

    for cls_idx, cls_name in enumerate(classes_to_detect):
        if cls_name not in models:
            continue
        model = models[cls_name]
        cfg = MODELS_CONFIG[cls_name]
        patch_size = cfg["patch_size"]
        step = patch_size - cfg["overlap"]

        rows = list(range(0, height, step))
        total_patches = len(rows) * ((width + step - 1) // step)
        patch_count = 0

        for y in rows:
            for x in range(0, width, step):
                win_w, win_h = min(patch_size, width - x), min(patch_size, height - y)
                patch = src.read(window=Window(x, y, win_w, win_h))
                patch = np.moveaxis(patch[:3], 0, -1)

                if patch.shape[0] < patch_size or patch.shape[1] < patch_size:
                    padded = np.zeros((patch_size, patch_size, 3), dtype=patch.dtype)
                    padded[:patch.shape[0], :patch.shape[1]] = patch
                    patch = padded

                results = model(patch, conf=cfg["conf"], verbose=False)
                for result in results:
                    boxes = result.boxes
                    if boxes is None:
                        continue
                    for i in range(len(boxes)):
                        bx = boxes.xyxy[i].cpu().numpy()
                        conf = float(boxes.conf[i].cpu())
                        raw_detections.append({"class": cls_name, "conf": conf, "px1": x + bx[0], "py1": y + bx[1], "px2": x + bx[2], "py2": y + bx[3]})

                patch_count += 1
                if progress_callback and patch_count % 10 == 0:
                    progress = 20 + (cls_idx / total_classes) * 50 + (patch_count / total_patches) * (50 / total_classes)
                    progress_callback(int(progress), f"Detecting {cls_name}...")

    print(f"[YOLO] Raw: {len(raw_detections)}")

    # NMS
    final_detections = []
    for cls_name in classes_to_detect:
        if cls_name not in MODELS_CONFIG:
            continue
        cfg = MODELS_CONFIG[cls_name]
        cls_raw = [r for r in raw_detections if r["class"] == cls_name]
        if not cls_raw:
            continue
        boxes = torch.tensor([[r["px1"], r["py1"], r["px2"], r["py2"]] for r in cls_raw])
        scores = torch.tensor([r["conf"] for r in cls_raw])
        keep = nms(boxes, scores, cfg["nms_iou"])
        final_detections.extend([cls_raw[int(i)] for i in keep])

    print(f"[YOLO] After NMS: {len(final_detections)}")

    # OBIA filter
    records = []
    id_counter = {k: 0 for k in MODELS_CONFIG}
    for r in final_detections:
        cls_name = r["class"]
        cfg = MODELS_CONFIG[cls_name]
        w_m = (r["px2"] - r["px1"]) * pixel_w
        h_m = (r["py2"] - r["py1"]) * pixel_h
        area = w_m * h_m
        aspect = max(w_m, h_m) / (min(w_m, h_m) + 1e-6)
        if not (cfg["area"][0] <= area <= cfg["area"][1]):
            continue
        if not (cfg["ratio"][0] <= aspect <= cfg["ratio"][1]):
            continue

        id_counter[cls_name] += 1
        cx, cy = (r["px1"] + r["px2"]) / 2, (r["py1"] + r["py2"]) / 2
        gx, gy = transform * (cx, cy)
        records.append({"id": id_counter[cls_name], "cls": "vehicle" if cls_name == "car" else cls_name, "score": round(r["conf"], 3),
                        "center_x": round(gx, 2), "center_y": round(gy, 2), "area_m2": round(area, 2), "aspect_rat": round(aspect, 2),
                        "px1": r["px1"], "py1": r["py1"], "px2": r["px2"], "py2": r["py2"], "elev_z": 0.0, "height_m": 0.0, "lat": 0.0, "lon": 0.0})

    print(f"[YOLO] After OBIA: {len(records)}")
    return records

def sample_trunc_normal(mean, std, low, high):
    for _ in range(60):
        v = rng.normal(mean, std)
        if low <= v <= high:
            return float(v)
    return float(rng.uniform(low, high))

def impute_height_by_class(cls_name, h_raw, n_pts, min_pts):
    hmin, hmax = HEIGHT_RANGE.get(cls_name, (0.0, float('inf')))
    prior = HEIGHT_PRIOR.get(cls_name, {"mean": (hmin + hmax) / 2.0, "std": 0.1})
    def draw():
        return sample_trunc_normal(prior["mean"], prior["std"], hmin, hmax)
    if n_pts == 0 or not np.isfinite(h_raw) or h_raw < hmin or h_raw > hmax:
        return draw(), "imputed"
    return float(h_raw), "ok"

def compute_height_volume(detections, progress_callback=None):
    if not pointcloud_cache["loaded"]:
        for det in detections:
            cls = det["cls"] if det["cls"] != "vehicle" else "car"
            h, _ = impute_height_by_class(cls, np.nan, 0, 100)
            det["height_m"] = round(h, 2)
        return detections

    from shapely.geometry import box
    X, Y, Z = pointcloud_cache["X"], pointcloud_cache["Y"], pointcloud_cache["Z"]
    transform = ortho_cache["transform"]

    for det in detections:
        cls = det["cls"] if det["cls"] != "vehicle" else "car"
        px1, py1 = transform * (det["px1"], det["py1"])
        px2, py2 = transform * (det["px2"], det["py2"])
        geom = box(min(px1, px2), min(py1, py2), max(px1, px2), max(py1, py2))
        minx, miny, maxx, maxy = geom.bounds
        m = (X >= minx) & (X <= maxx) & (Y >= miny) & (Y <= maxy)
        if np.any(m):
            zz = Z[m]
            z0 = float(np.percentile(zz, 5))
            ztop = float(np.percentile(zz, 95))
            h_raw = max(0.0, ztop - z0)
            h_fix, _ = impute_height_by_class(cls, h_raw, len(zz), MIN_PTS_BY_CLASS.get(cls, 30))
            det["height_m"] = round(h_fix, 2)
            det["elev_z"] = round(z0, 1)
        else:
            h, _ = impute_height_by_class(cls, np.nan, 0, 100)
            det["height_m"] = round(h, 2)
    return detections

def add_latlon_to_detections(detections):
    if ortho_cache["crs"] is None:
        return detections
    try:
        from pyproj import Transformer
        transformer = Transformer.from_crs(ortho_cache["crs"], "EPSG:4326", always_xy=True)
        for det in detections:
            lon, lat = transformer.transform(det["center_x"], det["center_y"])
            det["lat"], det["lon"] = round(lat, 6), round(lon, 6)
    except Exception as e:
        print(f"[Coord] Error: {e}")
    return detections

# ============ Data Loading Functions ============

def load_ortho_image(tiff_path):
    import rasterio
    src = rasterio.open(tiff_path)
    ortho_cache["src"] = src
    ortho_cache["transform"] = src.transform
    ortho_cache["crs"] = src.crs
    ortho_cache["width"], ortho_cache["height"] = src.width, src.height
    ortho_cache["pixel_w"], ortho_cache["pixel_h"] = src.res
    bounds = src.bounds
    try:
        from pyproj import Transformer
        transformer = Transformer.from_crs(src.crs, "EPSG:4326", always_xy=True)
        west, south = transformer.transform(bounds.left, bounds.bottom)
        east, north = transformer.transform(bounds.right, bounds.top)
        ortho_cache["bounds"] = {"north": north, "south": south, "east": east, "west": west}
    except:
        ortho_cache["bounds"] = {"north": bounds.top, "south": bounds.bottom, "east": bounds.right, "west": bounds.left}
    print(f"[Ortho] Loaded: {src.width}x{src.height}")

def load_point_cloud(laz_path):
    import laspy
    las = laspy.read(laz_path)
    pointcloud_cache["X"] = np.asarray(las.x)
    pointcloud_cache["Y"] = np.asarray(las.y)
    pointcloud_cache["Z"] = np.asarray(las.z)
    pointcloud_cache["loaded"] = True
    print(f"[PointCloud] Loaded {len(pointcloud_cache['Z'])} points")

def load_dsm(dsm_path):
    import rasterio
    src = rasterio.open(dsm_path)
    dsm_cache["data"] = src.read(1)
    dsm_cache["transform"] = src.transform
    dsm_cache["crs"] = src.crs
    dsm_cache["nodata"] = src.nodata
    dsm_cache["loaded"] = True
    dsm_cache["resolution"] = src.res[0]
    print(f"[DSM] Loaded: {src.width}x{src.height}")
    src.close()

def convert_numpy(obj):
    """Convert numpy types to Python types, handling NaN and Inf values for JSON serialization."""
    if isinstance(obj, dict):
        return {k: convert_numpy(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_numpy(v) for v in obj]
    elif isinstance(obj, np.floating):
        val = float(obj)
        if math.isnan(val) or math.isinf(val):
            return None
        return val
    elif isinstance(obj, float):
        if math.isnan(obj) or math.isinf(obj):
            return None
        return obj
    elif isinstance(obj, np.integer):
        return int(obj)
    elif isinstance(obj, np.ndarray):
        return convert_numpy(obj.tolist())
    return obj

# ============ API 端點 ============

@app.get("/")
async def root():
    return {"status": "ok", "message": "UAV Object Detection API - Colab GPU"}

@app.get("/api/projects")
async def get_projects():
    return [{"id": "current", "name": "Current Project"}]

@app.get("/api/detections/{project_id}")
async def get_detections(project_id: str):
    return convert_numpy(processing_state["results"] or [])

@app.get("/api/gpu/status")
async def get_gpu_status():
    try:
        import torch
        if torch.cuda.is_available():
            return {"name": torch.cuda.get_device_name(0), "status": "online"}
    except:
        pass
    return {"name": "CPU Mode", "status": "offline"}

@app.get("/api/ortho/bounds")
async def get_ortho_bounds():
    return ortho_cache["bounds"] or {"error": "No image loaded"}

@app.get("/api/ortho/image")
async def get_ortho_image(max_width: int = None, quality: int = 85):
    """取得正射影像 (JPEG with compression)"""
    if ortho_cache["src"] is None:
        raise HTTPException(status_code=404, detail="No image loaded")
    from PIL import Image
    src = ortho_cache["src"]
    data = src.read([1, 2, 3])
    data = np.moveaxis(data, 0, -1)
    if data.dtype != np.uint8:
        data = ((data - data.min()) / (data.max() - data.min() + 1e-6) * 255).astype(np.uint8)
    img = Image.fromarray(data)
    # Resize if max_width specified
    if max_width and img.width > max_width:
        ratio = max_width / img.width
        new_height = int(img.height * ratio)
        img = img.resize((max_width, new_height), Image.Resampling.LANCZOS)
    buffer = io.BytesIO()
    img.save(buffer, format="JPEG", quality=min(95, max(1, quality)), optimize=True)
    buffer.seek(0)
    return Response(content=buffer.getvalue(), media_type="image/jpeg",
                    headers={"Cache-Control": "public, max-age=3600"})

@app.get("/api/ortho/preview")
async def get_ortho_preview(width: int = 800, height: int = 600, quality: int = 85):
    """取得正射影像預覽 (JPEG with compression)"""
    if ortho_cache["src"] is None:
        raise HTTPException(status_code=404, detail="No image loaded")
    from PIL import Image
    src = ortho_cache["src"]
    data = src.read([1, 2, 3])
    data = np.moveaxis(data, 0, -1)
    if data.dtype != np.uint8:
        data = ((data - data.min()) / (data.max() - data.min() + 1e-6) * 255).astype(np.uint8)
    img = Image.fromarray(data)
    img.thumbnail((width, height), Image.Resampling.LANCZOS)
    buffer = io.BytesIO()
    img.save(buffer, format="JPEG", quality=min(95, max(1, quality)), optimize=True)
    buffer.seek(0)
    return Response(content=buffer.getvalue(), media_type="image/jpeg",
                    headers={"Cache-Control": "public, max-age=3600"})

@app.get("/api/ortho/metadata")
async def get_ortho_metadata():
    if ortho_cache["src"] is None:
        return {"error": "No image loaded"}
    src = ortho_cache["src"]
    return {
        "filename": Path(uploaded_files["ortho"]).name if uploaded_files["ortho"] else None,
        "datetime": datetime.now().isoformat(),
        "width": src.width,
        "height": src.height,
        "crs": str(src.crs) if src.crs else None,
        "pixel_w": ortho_cache["pixel_w"],
        "pixel_h": ortho_cache["pixel_h"],
    }

@app.post("/api/upload")
async def upload_file(file: UploadFile = File(...)):
    filename = file.filename.lower()
    file_path = UPLOAD_DIR / file.filename
    with open(file_path, "wb") as f:
        shutil.copyfileobj(file.file, f)
    if filename.endswith((".tif", ".tiff")):
        uploaded_files["ortho"] = str(file_path)
        load_ortho_image(str(file_path))
        return {"filename": file.filename, "message": "Image uploaded", "type": "ortho"}
    elif filename.endswith((".laz", ".las")):
        uploaded_files["laz"] = str(file_path)
        load_point_cloud(str(file_path))
        return {"filename": file.filename, "message": "Point cloud uploaded", "type": "laz", "points": len(pointcloud_cache["Z"])}
    return {"filename": file.filename, "message": "File uploaded", "type": "unknown"}

@app.post("/api/upload/dsm")
async def upload_dsm(file: UploadFile = File(...)):
    filename = file.filename.lower()
    file_path = UPLOAD_DIR / file.filename
    with open(file_path, "wb") as f:
        shutil.copyfileobj(file.file, f)
    if filename.endswith((".tif", ".tiff")):
        uploaded_files["dsm"] = str(file_path)
        load_dsm(str(file_path))
        return {"filename": file.filename, "message": "DSM uploaded", "type": "dsm", "resolution": dsm_cache.get("resolution")}
    raise HTTPException(status_code=400, detail="DSM must be a GeoTIFF file")

# ============ Landcover API Endpoints ============

@app.get("/api/landcover/status")
async def get_landcover_status():
    return {"computed": landcover_cache["computed"], "has_stats": landcover_cache["stats"] is not None}

@app.get("/api/landcover/stats")
async def get_landcover_stats():
    if not landcover_cache["computed"]:
        raise HTTPException(status_code=400, detail="Landcover not computed yet")
    return convert_numpy({"classes": LANDCOVER_CLASSES, "colors": LANDCOVER_COLORS, "stats": landcover_cache["stats"]})

@app.get("/api/landcover/image")
async def get_landcover_image(max_width: int = None):
    """取得土地覆蓋彩色圖 (PNG with compression)"""
    if not landcover_cache["computed"]:
        raise HTTPException(status_code=400, detail="Landcover not computed yet")
    from PIL import Image
    color_img = get_landcover_colorized()
    if color_img is None:
        raise HTTPException(status_code=500, detail="Failed to generate colorized landcover")
    img = Image.fromarray(color_img)
    if max_width and img.width > max_width:
        ratio = max_width / img.width
        new_height = int(img.height * ratio)
        img = img.resize((max_width, new_height), Image.Resampling.NEAREST)
    buffer = io.BytesIO()
    img.save(buffer, format="PNG", optimize=True)
    buffer.seek(0)
    return Response(content=buffer.getvalue(), media_type="image/png",
                    headers={"Cache-Control": "public, max-age=3600"})

@app.get("/api/landcover/overlay")
async def get_landcover_overlay(alpha: float = 0.5, max_width: int = None, quality: int = 85):
    """取得土地覆蓋疊加圖 (JPEG with compression)"""
    if not landcover_cache["computed"]:
        raise HTTPException(status_code=400, detail="Landcover not computed yet")
    if ortho_cache["src"] is None:
        raise HTTPException(status_code=400, detail="No ortho image loaded")
    from PIL import Image
    src = ortho_cache["src"]
    data = src.read([1, 2, 3])
    ortho_img = np.moveaxis(data, 0, -1)
    if ortho_img.dtype != np.uint8:
        ortho_img = ((ortho_img - ortho_img.min()) / (ortho_img.max() - ortho_img.min() + 1e-6) * 255).astype(np.uint8)
    color_img = get_landcover_colorized()
    if color_img is None:
        raise HTTPException(status_code=500, detail="Failed to generate colorized landcover")
    mask = landcover_cache["mask"]
    valid_mask = (mask < UPERNET_CONFIG["num_classes"]).astype(np.float32)
    blended = ortho_img.astype(np.float32) * (1 - alpha * valid_mask[:, :, np.newaxis]) + \
              color_img.astype(np.float32) * alpha * valid_mask[:, :, np.newaxis]
    blended = np.clip(blended, 0, 255).astype(np.uint8)
    img = Image.fromarray(blended)
    if max_width and img.width > max_width:
        ratio = max_width / img.width
        new_height = int(img.height * ratio)
        img = img.resize((max_width, new_height), Image.Resampling.LANCZOS)
    buffer = io.BytesIO()
    img.save(buffer, format="JPEG", quality=min(95, max(1, quality)), optimize=True)
    buffer.seek(0)
    return Response(content=buffer.getvalue(), media_type="image/jpeg",
                    headers={"Cache-Control": "public, max-age=3600"})

@app.post("/api/landcover/run")
async def run_landcover():
    if ortho_cache["src"] is None:
        raise HTTPException(status_code=400, detail="Please upload an image first")
    try:
        result = run_landcover_segmentation()
        return convert_numpy({"status": "done", "stats": result["stats"], "shape": result["shape"]})
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# ============ Terrain API Endpoints ============

@app.get("/api/terrain/status")
async def get_terrain_status():
    return {"computed": terrain_cache["computed"], "has_stats": terrain_cache["stats"] is not None, "dsm_loaded": dsm_cache["loaded"]}

@app.get("/api/terrain/stats")
async def get_terrain_stats():
    if not terrain_cache["computed"]:
        raise HTTPException(status_code=400, detail="Terrain not computed yet")
    return convert_numpy(terrain_cache["stats"])

@app.get("/api/terrain/slope")
async def get_terrain_slope(max_width: int = None):
    """取得坡度彩色圖 (PNG with compression)"""
    if not terrain_cache["computed"]:
        raise HTTPException(status_code=400, detail="Terrain not computed yet")
    from PIL import Image
    color_img = get_slope_colorized()
    if color_img is None:
        raise HTTPException(status_code=500, detail="Failed to generate colorized slope")
    img = Image.fromarray(color_img)
    if max_width and img.width > max_width:
        ratio = max_width / img.width
        new_height = int(img.height * ratio)
        img = img.resize((max_width, new_height), Image.Resampling.NEAREST)
    buffer = io.BytesIO()
    img.save(buffer, format="PNG", optimize=True)
    buffer.seek(0)
    return Response(content=buffer.getvalue(), media_type="image/png",
                    headers={"Cache-Control": "public, max-age=3600"})

@app.get("/api/terrain/aspect")
async def get_terrain_aspect(max_width: int = None):
    """取得坡向彩色圖 (PNG with compression)"""
    if not terrain_cache["computed"]:
        raise HTTPException(status_code=400, detail="Terrain not computed yet")
    from PIL import Image
    color_img = get_aspect_colorized()
    if color_img is None:
        raise HTTPException(status_code=500, detail="Failed to generate colorized aspect")
    img = Image.fromarray(color_img)
    if max_width and img.width > max_width:
        ratio = max_width / img.width
        new_height = int(img.height * ratio)
        img = img.resize((max_width, new_height), Image.Resampling.NEAREST)
    buffer = io.BytesIO()
    img.save(buffer, format="PNG", optimize=True)
    buffer.seek(0)
    return Response(content=buffer.getvalue(), media_type="image/png",
                    headers={"Cache-Control": "public, max-age=3600"})

@app.post("/api/terrain/run")
async def run_terrain():
    if not dsm_cache["loaded"]:
        raise HTTPException(status_code=400, detail="Please upload a DSM first")
    try:
        result = compute_terrain_metrics()
        return convert_numpy({"status": "done", "stats": result})
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# ============ Processing Endpoints ============

@app.post("/api/process")
async def start_processing(request: ProcessingRequest = None):
    if request is None:
        request = ProcessingRequest()
    if ortho_cache["src"] is None:
        raise HTTPException(status_code=400, detail="Please upload an image first")

    job_id = f"job_{int(time.time())}"
    processing_state["job_id"] = job_id
    processing_state["status"] = "pending"
    processing_state["progress"] = 0
    processing_state["start_time"] = time.time()
    processing_state["results"] = []

    def update_progress(progress, step):
        processing_state["progress"] = progress
        processing_state["current_step"] = step
        processing_state["elapsed_seconds"] = time.time() - processing_state["start_time"]

    def run():
        try:
            processing_state["status"] = "running"
            classes = []
            if request.detect_vehicle: classes.append("car")
            if request.detect_person: classes.append("person")
            if request.detect_cone: classes.append("cone")

            update_progress(10, "Loading models...")
            detections = run_yolo_detection(classes, update_progress)

            if request.include_elevation:
                update_progress(70, "Height analysis...")
                detections = compute_height_volume(detections, update_progress)

            if request.include_landcover:
                update_progress(80, "Landcover segmentation...")
                def landcover_progress(p, step):
                    update_progress(80 + int(p * 0.1), step)
                run_landcover_segmentation(landcover_progress)

            if request.include_terrain and dsm_cache["loaded"]:
                update_progress(90, "Terrain analysis...")
                def terrain_progress(p, step):
                    update_progress(90 + int(p * 0.05), step)
                compute_terrain_metrics(terrain_progress)

            update_progress(95, "Coordinate transform...")
            detections = add_latlon_to_detections(detections)

            for i, det in enumerate(detections, 1):
                det["id"] = i
                det.pop("px1", None)
                det.pop("py1", None)
                det.pop("px2", None)
                det.pop("py2", None)

            processing_state["results"] = detections
            processing_state["status"] = "done"
            update_progress(100, "Complete")
        except Exception as e:
            processing_state["status"] = "error"
            processing_state["current_step"] = str(e)
            import traceback
            traceback.print_exc()

    threading.Thread(target=run, daemon=True).start()
    return {"job_id": job_id, "status": "started", "message": "Processing started"}

@app.get("/api/process/status")
async def get_current_processing_status():
    return {"job_id": processing_state["job_id"], "status": processing_state["status"],
            "progress": processing_state["progress"], "current_step": processing_state["current_step"],
            "elapsed_seconds": time.time() - processing_state["start_time"] if processing_state["start_time"] else 0}

@app.get("/api/process/{job_id}/status")
async def get_processing_status(job_id: str):
    if processing_state["job_id"] != job_id:
        raise HTTPException(status_code=404, detail="Job not found")
    return {"job_id": job_id, "status": processing_state["status"], "progress": processing_state["progress"],
            "current_step": processing_state["current_step"],
            "elapsed_seconds": time.time() - processing_state["start_time"] if processing_state["start_time"] else 0}

@app.get("/api/export/stats")
async def export_stats():
    results = convert_numpy(processing_state["results"])
    stats = {"total": len(results), "person": len([r for r in results if r.get("cls") == "person"]),
             "vehicle": len([r for r in results if r.get("cls") == "vehicle"]),
             "cone": len([r for r in results if r.get("cls") == "cone"])}
    landcover_stats = landcover_cache["stats"] if landcover_cache["computed"] else None
    terrain_stats = terrain_cache["stats"] if terrain_cache["computed"] else None
    return convert_numpy({"generated_at": datetime.now().isoformat(), "summary": stats, "detections": results,
                          "landcover": landcover_stats, "terrain": terrain_stats})

print("API 程式碼載入完成")

In [None]:
#@title 3. 啟動 API Server (Cloudflare Tunnel)
import subprocess
import threading
import re
import time

# 啟動 cloudflared tunnel
def run_cloudflared():
    process = subprocess.Popen(
        ['/usr/local/bin/cloudflared', 'tunnel', '--url', 'http://localhost:8000'],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )
    for line in process.stderr:
        match = re.search(r'https://[a-z0-9-]+\.trycloudflare\.com', line)
        if match:
            url = match.group(0)
            print(f"\n{'='*60}")
            print(f"API URL: {url}")
            print(f"{'='*60}")
            print(f"\n請複製上方 URL 到前端 Dashboard 連線\n")

# 啟動 uvicorn
import uvicorn

def run_server():
    config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info")
    server = uvicorn.Server(config)
    server.run()

# 背景啟動
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()

tunnel_thread = threading.Thread(target=run_cloudflared, daemon=True)
tunnel_thread.start()

print("Server 啟動中，請等待 Cloudflare Tunnel URL...")

# 保持運行
while True:
    time.sleep(60)