<a href="https://colab.research.google.com/github/amathkur/Ardupilot_Multiagent_Simulation/blob/main/Copy_of_Untitled2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ============================================
# SAM2 (CPU-safe) + Single-Object Tracking + MiDaS Point Clouds
# ============================================
# Env: CPU SAFE
import os
os.environ["CUDA_VISIBLE_DEVICES"] = ""   # hard-disable CUDA to avoid torch cuda init on CPU runtimes

# ---------------- Install deps ----------------
!pip install -q "torch>=2.5.1" "torchvision>=0.20.1" --index-url https://download.pytorch.org/whl/cpu
!pip install -q git+https://github.com/facebookresearch/sam2.git
!pip install -q opencv-python-headless==4.10.0.84 numpy tqdm timm gradio

import cv2, numpy as np, torch, gradio as gr, tempfile, json
from tqdm import tqdm
from sam2.sam2_video_predictor import SAM2VideoPredictor

device = "cpu"  # enforced
print("Device:", device)

# ---------- Globals shared across steps ----------
SESSION = {
    "video_path": None,
    "first_frame_path": None,
    "bbox_xywh": None,
    "out_dir": None,
    "masked_video_path": None,
    "mask_dump_dir": None,
    "W": None, "H": None, "N": None, "fps": 30.0,
}

# ---------- Helper: write ASCII PLY ----------
def write_ply_ascii_xyzrgb(path, xyz, rgb):
    n = xyz.shape[0]
    with open(path, "w") as f:
        f.write("ply\nformat ascii 1.0\n")
        f.write(f"element vertex {n}\n")
        f.write("property float x\nproperty float y\nproperty float z\n")
        f.write("property uchar red\nproperty uchar green\nproperty uchar blue\n")
        f.write("end_header\n")
        for (x,y,z),(r,g,b) in zip(xyz, rgb):
            f.write(f"{x:.6f} {y:.6f} {z:.6f} {int(r)} {int(g)} {int(b)}\n")

# ---------- UI Step 1: Load video & show first frame ----------
def load_video(file):
    if file is None:
        return gr.update(value=None), "No video uploaded.", None
    vid_path = file.name
    cap = cv2.VideoCapture(vid_path)
    ok, frame = cap.read()
    if not ok:
        cap.release()
        return gr.update(value=None), "Cannot read first frame.", None
    H, W = frame.shape[:2]
    N = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    cap.release()

    first_frame_path = os.path.join(tempfile.mkdtemp(), "first_frame.jpg")
    cv2.imwrite(first_frame_path, frame[:, :, ::-1][:, :, ::-1])  # ensure BGR write

    base = os.path.splitext(os.path.basename(vid_path))[0]
    out_dir = os.path.join("/content", base + "_sam2_single_obj")
    os.makedirs(out_dir, exist_ok=True)
    mask_dump_dir = os.path.join(out_dir, "masks_png")
    os.makedirs(mask_dump_dir, exist_ok=True)
    masked_video_path = os.path.join(out_dir, "masked_object_only.mp4")

    SESSION.update({
        "video_path": vid_path,
        "first_frame_path": first_frame_path,
        "W": W, "H": H, "N": N, "fps": fps,
        "out_dir": out_dir,
        "mask_dump_dir": mask_dump_dir,
        "masked_video_path": masked_video_path
    })
    msg = f"Loaded video: {vid_path}\nResolution: {W}x{H}, Frames: {N}, FPS: {fps:.2f}\nDraw a box, then click 'Run Part 1'."
    return first_frame_path, msg, json.dumps(SESSION, indent=2)

# ---------- UI Step 2: Receive bbox from Gradio (select tool returns (x,y,w,h)) ----------
def set_bbox(bbox_json):
    # bbox_json format from gradio Image(tool="select"): {'x':..., 'y':..., 'width':..., 'height':...}
    if bbox_json is None:
        return "No box selected."
    x = int(bbox_json.get("x", 0))
    y = int(bbox_json.get("y", 0))
    w = int(bbox_json.get("width", 0))
    h = int(bbox_json.get("height", 0))
    if w <= 0 or h <= 0:
        return "Invalid box."
    SESSION["bbox_xywh"] = (x, y, w, h)
    return f"BBox set to (x={x}, y={y}, w={w}, h={h})."

# ---------- Part 1: SAM2 tracking on CPU ----------
def run_part1():
    if SESSION["video_path"] is None:
        return "Upload a video first."
    if SESSION["bbox_xywh"] is None:
        return "Select a bounding box first."

    video_path = SESSION["video_path"]
    out_dir = SESSION["out_dir"]
    mask_dump_dir = SESSION["mask_dump_dir"]
    masked_video_path = SESSION["masked_video_path"]
    W, H, N, fps = SESSION["W"], SESSION["H"], SESSION["N"], SESSION["fps"]
    x, y, w, h = SESSION["bbox_xywh"]

    # Load smaller SAM2 checkpoint to be CPU-friendlier
    predictor = SAM2VideoPredictor.from_pretrained("facebook/sam2-hiera-small")
    predictor = predictor.to(device)

    # Init predictor state
    state = predictor.init_state(video_path)

    # Add the bbox as prompt
    box_prompt = np.array([[x, y, x+w, y+h]], dtype=np.float32)
    predictor.add_new_points_or_box(state, {"box": box_prompt})

    # Prepare writer
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    vw = cv2.VideoWriter(masked_video_path, fourcc, fps, (W, H))
    cap = cv2.VideoCapture(video_path)

    pbar = tqdm(total=N, desc="SAM2 Tracking (CPU)")
    for f_idx, obj_ids, mask_list in predictor.propagate_in_video(state):
        cap.set(cv2.CAP_PROP_POS_FRAMES, f_idx)
        ok, frame_bgr = cap.read()
        if not ok or not mask_list:
            pbar.update(1); continue

        mask = np.asarray(mask_list[0], dtype=bool)
        masked = np.zeros_like(frame_bgr)
        masked[mask] = frame_bgr[mask]
        vw.write(masked)

        mask_png = (mask.astype(np.uint8) * 255)
        cv2.imwrite(os.path.join(mask_dump_dir, f"mask_{f_idx:05d}.png"), mask_png)
        pbar.update(1)
    pbar.close()
    cap.release()
    vw.release()

    return f"✅ Part 1 done.\nMasked video: {masked_video_path}\nMasks dir: {mask_dump_dir}"

# ---------- Part 2: MiDaS depth + masked point clouds ----------
def run_part2(hfov_deg=60.0, min_points=50, depth_scale=1.0):
    video_path = SESSION["video_path"]
    mask_dump_dir = SESSION["mask_dump_dir"]
    out_dir = SESSION["out_dir"]
    if not video_path or not mask_dump_dir:
        return "Run Part 1 first."

    # Load MiDaS
    midas = torch.hub.load("intel-isl/MiDaS", "DPT_Small").to(device).eval()
    midas_transforms = torch.hub.load("intel-isl/MiDaS", "transforms")
    transform = midas_transforms.small_transform

    cap = cv2.VideoCapture(video_path)
    W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    H = int(cap.get(cv2.CAP_PROP_HEIGHT))
    N = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)

    fx = (W/2) / np.tan(np.radians(hfov_deg/2))
    fy = fx
    cx, cy = W/2, H/2

    pc_out_dir = os.path.join(out_dir, "pointclouds")
    os.makedirs(pc_out_dir, exist_ok=True)

    pbar = tqdm(total=N, desc="Depth + PLY (CPU)")
    frame_idx = 0
    saved = 0
    while True:
        ok, frame_bgr = cap.read()
        if not ok:
            break

        mask_path = os.path.join(mask_dump_dir, f"mask_{frame_idx:05d}.png")
        if not os.path.isfile(mask_path):
            frame_idx += 1; pbar.update(1); continue
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        if mask is None:
            frame_idx += 1; pbar.update(1); continue
        mask_bool = mask > 127

        rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
        with torch.no_grad():
            inp = transform(rgb).to(device)
            pred = midas(inp)
            pred = torch.nn.functional.interpolate(
                pred.unsqueeze(1),
                size=(H, W),
                mode="bicubic",
                align_corners=False
            ).squeeze().cpu().numpy().astype(np.float32)

        # Normalize to [0,1], avoid zeros, apply optional scale
        depth = pred
        depth -= depth.min()
        denom = depth.max() + 1e-8
        depth = (depth / denom + 1e-6) * float(depth_scale)

        ys, xs = np.indices((H, W)).astype(np.float32)
        X = (xs - cx) / fx * depth
        Y = (ys - cy) / fy * depth
        Z = depth
        XYZ = np.stack([X, Y, Z], axis=-1)

        XYZ_obj = XYZ[mask_bool]
        RGB_obj = frame_bgr[mask_bool][:, ::-1]  # BGR->RGB

        if XYZ_obj.shape[0] >= int(min_points):
            ply_path = os.path.join(pc_out_dir, f"frame_{frame_idx:05d}.ply")
            write_ply_ascii_xyzrgb(ply_path, XYZ_obj.astype(np.float32), RGB_obj.astype(np.uint8))
            saved += 1

        frame_idx += 1
        pbar.update(1)
    pbar.close()
    cap.release()
    return f"✅ Part 2 done. Saved {saved} PLYs in: {pc_out_dir}"

# ---------------- Build Gradio UI ----------------
with gr.Blocks(title="SAM2 Single-Object Mask + Point Cloud (CPU)") as demo:
    gr.Markdown("## SAM2 (CPU) → Single Object Masked Video → MiDaS Point Clouds")
    gr.Markdown("**Step 1. Upload video**")

    with gr.Row():
        in_file = gr.File(file_types=["video"], label="Video")
        first_frame = gr.Image(label="First frame (draw box)")
    msg = gr.Textbox(label="Status", interactive=False)
    sess_dump = gr.Code(language="json", label="Session (debug)", interactive=False)

    load_btn = gr.Button("Load video & preview first frame")
    load_btn.click(fn=load_video, inputs=[in_file], outputs=[first_frame, msg, sess_dump])

    gr.Markdown("**Step 2. Draw a box around your object, then confirm**")
    bbox_state = gr.State()
    set_bbox_btn = gr.Button("Use selected box")
    set_bbox_btn.click(fn=set_bbox, inputs=[first_frame], outputs=[msg])

    gr.Markdown("**Step 3. Run Part 1 (SAM2 tracking)**")
    part1_btn = gr.Button("Run Part 1")
    part1_btn.click(fn=run_part1, outputs=[msg])

    gr.Markdown("**Step 4. Run Part 2 (Depth + Point Clouds)**")
    hfov = gr.Slider(40, 100, value=60, step=1, label="Approx HFOV (deg)")
    min_pts = gr.Slider(10, 2000, value=50, step=10, label="Min points per PLY")
    dscale = gr.Slider(0.1, 10.0, value=1.0, step=0.1, label="Depth scale (MiDaS is relative)")
    part2_btn = gr.Button("Run Part 2")
    part2_btn.click(fn=run_part2, inputs=[hfov, min_pts, dscale], outputs=[msg])

demo.launch(debug=False, share=True)

  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.2/42.2 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.5/154.5 kB[0m [31m13.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for SAM-2 (pyproject.toml) ... [?25l[?25hdone
  Building wheel for iopath (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.9/49.9 MB[0m [31m50.8 MB/s[0m eta [36m0:00:00[0m
[?25hDevice: cpu
Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://6becbdee213adc7667.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the t



In [None]:
# Replace these placeholder values with the actual x, y, width, and height of your bounding box
bbox_x = 0  # Example: x-coordinate of the top-left corner
bbox_y = 0  # Example: y-coordinate of the top-left corner
bbox_w = 100 # Example: width of the bounding box
bbox_h = 100 # Example: height of the bounding box

SESSION["bbox_xywh"] = (bbox_x, bbox_y, bbox_w, bbox_h)

print(f"Bounding box manually set to: {SESSION['bbox_xywh']}")

Bounding box manually set to: (0, 0, 100, 100)


In [None]:
import json
print("Current SESSION state:")
print(json.dumps(SESSION, indent=2))

Current SESSION state:
{
  "video_path": "/tmp/gradio/27b07c9cdc9ed78a84002bff7dab34ab039a5ff11e725f20c0c9744d64a9b299/ScreenRecording_09-16-2025 06-33-33_1.mov",
  "first_frame_path": "/tmp/tmpryig3pu9/first_frame.jpg",
  "bbox_xywh": null,
  "out_dir": "/content/ScreenRecording_09-16-2025 06-33-33_1_sam2_single_obj",
  "masked_video_path": "/content/ScreenRecording_09-16-2025 06-33-33_1_sam2_single_obj/masked_object_only.mp4",
  "mask_dump_dir": "/content/ScreenRecording_09-16-2025 06-33-33_1_sam2_single_obj/masks_png",
  "W": 526,
  "H": 512,
  "N": 429,
  "fps": 42.17597902670818
}
