
# Jetson RGB‑D Privacy Masking Tutorial (RealSense → Fast Pixelate)

This notebook is an **instructional companion** for your Jetson setup. It covers:

1. Environment checks (PyTorch, CUDA, TensorRT, ONNX Runtime)
2. Initializing `AVPrivacyMasker` and verifying providers
3. Building masks two ways: **depth‑guided** and **box‑only (elliptical)** 
4. Running the anonymization loop on a directory of images (offline)
5. (Optional) Live RealSense → **MJPEG over HTTP** streaming server
6. Computing **Dice** and **Recall** vs. SAM masks
7. Troubleshooting

> **Note:** Some cells require your Jetson + camera. They’re safe to run; if a device/library is missing, the cells will explain what to install/enable.


## 1) Environment checks

In [1]:

import sys, os, platform
print("Python:", sys.version)
print("Platform:", platform.platform())

try:
    import torch
    print("Torch:", torch.__version__)
    print("CUDA available:", torch.cuda.is_available())
    if torch.cuda.is_available():
        print("CUDA device:", torch.cuda.get_device_name(0))
        import torch.backends.cudnn as cudnn
        print("cuDNN:", cudnn.version())
except Exception as e:
    print("PyTorch not available:", e)

try:
    import onnxruntime as ort
    print("ONNX Runtime:", ort.__version__)
    print("Available providers:", ort.get_available_providers())
except Exception as e:
    print("ONNX Runtime not available:", e)

try:
    import cv2
    print("OpenCV:", cv2.__version__)
except Exception as e:
    print("OpenCV not available:", e)


Python: 3.10.12 (main, May 27 2025, 17:12:29) [GCC 11.4.0]
Platform: Linux-5.15.148-tegra-aarch64-with-glibc2.35
Torch: 2.5.0a0+872d972e41.nv24.08
CUDA available: True
CUDA device: Orin
cuDNN: 90300
ONNX Runtime: 1.23.0
Available providers: ['TensorrtExecutionProvider', 'CUDAExecutionProvider', 'CPUExecutionProvider']
OpenCV: 4.11.0


## 2) Initialize `AVPrivacyMasker`

In [4]:

# If your av_privacy_masker.py is not importable, set its folder here:
# import sys; sys.path.insert(0, '/path/to/your/module/folder')

from av_privacy_masker import AVPrivacyMasker
mp = AVPrivacyMasker(
    device="cuda",
    conf_thresh=0.5,
    anon_block=24,
    anon_noise=20,
    dilate_kernel=13,
    det_size=(640, 640),
    verbose=True,
    enable_depth_anon=False,
)

print("Masker initialized.")


Applied providers: ['CUDAExecutionProvider', 'CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}, 'CUDAExecutionProvider': {'sdpa_kernel': '0', 'use_tf32': '1', 'fuse_conv_bias': '0', 'prefer_nhwc': '0', 'tunable_op_max_tuning_duration_ms': '0', 'enable_skip_layer_norm_strict_mode': '0', 'tunable_op_tuning_enable': '0', 'tunable_op_enable': '0', 'use_ep_level_unified_stream': '0', 'device_id': '0', 'has_user_compute_stream': '0', 'gpu_external_empty_cache': '0', 'cudnn_conv_algo_search': 'EXHAUSTIVE', 'cudnn_conv1d_pad_to_nc1d': '0', 'gpu_mem_limit': '18446744073709551615', 'gpu_external_alloc': '0', 'gpu_external_free': '0', 'arena_extend_strategy': 'kNextPowerOfTwo', 'do_copy_in_default_stream': '1', 'enable_cuda_graph': '0', 'user_compute_stream': '0', 'cudnn_conv_use_max_workspace': '1'}}
model ignore: /home/jetsonuser/.insightface/models/buffalo_s/1k3d68.onnx landmark_3d_68
Applied providers: ['CUDAExecutionProvider', 'CPUExecutionProvider'], with options: {'CPUExec

## 3) Mask construction demo (box‑only vs depth‑guided)

In [5]:
import os
import numpy as np, cv2, torch, time

# Read in demo image
H, W = 480, 640
bgr = cv2.imread("/home/jetsonuser/masking/demo/data/demo_image.png")
assert bgr is not None, "Image not found"

# detect faces and yield bounding boxes
boxes = mp.detect_faces(bgr)  # e.g., [[204,127,405,428],[648,283,848,555]]
print(f"[detect_faces] boxes={boxes}")

# Box-only mask (elliptical, with padding and dilation inside the class)
mask_box = mp.build_mask_from_boxes(boxes, bgr.shape, pad_ratio=0.25, oval=True)

# Visualize: black out masked region
vis = bgr.copy()
mask_bool = mask_box.astype(bool) if mask_box.dtype != np.bool_ else mask_box
vis[mask_bool] = (0, 0, 0)

# Save preview (make sure path has a filename + extension)
out_dir = "/home/jetsonuser/masking/demo/data"
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, "mask_box_demo.png")
ok = cv2.imwrite(out_path, vis)
assert ok, f"Failed to write image to {out_path}"

print("Mask pixels (box-only):", int(mask_box.sum()))
print(f"Preview saved to {out_path}")

[detect_faces] boxes=[[405, 128, 530, 260], [129, 56, 253, 201]]
[detect_faces] boxes=[[405, 128, 530, 260], [129, 56, 253, 201]]
Mask pixels (box-only): 68714
Preview saved to /home/jetsonuser/masking/demo/data/mask_box_demo.png


## 4) Offline batch anonymization (images → masked images)

In [7]:
from pathlib import Path
import time
import cv2

in_dir = Path("/home/jetsonuser/masking/demo/data/demo_images")
out_dir = Path("/home/jetsonuser/masking/demo/data/demo_out")
out_dir.mkdir(parents=True, exist_ok=True)

def is_img(p):
    return p.suffix.lower() in {".jpg", ".jpeg", ".png", ".bmp"}

# Gather input images
paths = [p for p in in_dir.glob("*") if is_img(p)]
if not paths:
    print("Put a few images into", in_dir, "then rerun this cell.")
else:
    t0 = time.perf_counter()
    n = 0
    for p in paths:
        bgr = cv2.imread(str(p), cv2.IMREAD_COLOR)
        if bgr is None:
            continue

        # Detect faces
        boxes = mp.detect_faces(bgr)

        # Box-only mask (fast; no depth file in this demo)
        mask = mp.build_mask_from_boxes(
            boxes, bgr.shape, pad_ratio=0.25, oval=True
        )

        # Anonymize
        out = mp.fast_pixelate(
            bgr, mask, block=mp.anon_block, noise=mp.anon_noise
        )

        # Save anonymized output (same filename as input)
        cv2.imwrite(str(out_dir / p.name), out)
        n += 1

    dt = time.perf_counter() - t0
    fps = n / dt if dt > 0 else 0
    print(f"Processed {n} images in {dt:.2f}s ({fps:.1f} FPS). Output:", out_dir)

[detect_faces] boxes=[[405, 128, 530, 260], [129, 56, 253, 201]]
Processed 1 images in 0.16s (6.3 FPS). Output: /home/jetsonuser/masking/demo/data/demo_out


## 5) Dice & Recall vs SAM masks (if available)

In [None]:

import numpy as np
from pathlib import Path

def dice_and_recall(pred: np.ndarray, gt: np.ndarray):
    # pred, gt: uint8 {0,1} HxW
    tp = int(((pred==1)&(gt==1)).sum())
    fp = int(((pred==1)&(gt==0)).sum())
    fn = int(((pred==0)&(gt==1)).sum())
    dice = (2*tp) / (2*tp + fp + fn) if (2*tp + fp + fn) > 0 else 1.0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 1.0
    return dice, recall, tp, fp, fn

# Example layout:
# /path/to/sam_masks/session_key.npy   -> shape (T, H, W) uint8
# /path/to/our_masks/session_key.npy   -> shape (T, H, W) uint8
our_np_path = Path("/mnt/data/our_masks_example.npy")  # change to your file
sam_np_path = Path("/mnt/data/sam_masks_example.npy")

if our_np_path.exists() and sam_np_path.exists():
    our = np.load(our_np_path)    # (T,H,W)
    sam = np.load(sam_np_path)    # (T,H,W)
    T = min(len(our), len(sam))
    assert our.shape[1:]==sam.shape[1:], "H/W mismatch"
    dices = []; recalls = []
    for t in range(T):
        d, r, *_ = dice_and_recall(our[t].astype(np.uint8), sam[t].astype(np.uint8))
        dices.append(d); recalls.append(r)
    print(f"Macro Dice={np.mean(dices):.4f}, Macro Recall={np.mean(recalls):.4f}, Frames={T}")
else:
    print("Drop two matching .npy stacks into /mnt/data and update the paths above to compute metrics.")


## 6) (Optional) Live RealSense → MJPEG over HTTP

In [None]:

# This mirrors your streaming server in a cell for convenience.
# It requires: pyrealsense2, Flask, and a connected RealSense camera.
import os, time, threading
from collections import deque
import numpy as np
import cv2

try:
    import pyrealsense2 as rs
    from flask import Flask, Response
    have_live = True
except Exception as e:
    have_live = False
    print("Live demo dependencies missing or no camera:", e)

if have_live:
    def run_server(port=5001, jpeg_quality=80, preview_width=640, save=False):
        pipeline = rs.pipeline(); config = rs.config()
        W,H,FPS = 640,480,30
        config.enable_stream(rs.stream.color, W, H, rs.format.bgr8, FPS)
        config.enable_stream(rs.stream.depth, W, H, rs.format.z16, FPS)
        profile = pipeline.start(config)
        align = rs.align(rs.stream.color)

        # warm up model
        _ = mp.detect_faces(np.zeros((H,W,3), np.uint8))

        q = deque(maxlen=1)
        stop = False

        def cap():
            while not stop:
                f = pipeline.wait_for_frames()
                f = align.process(f)
                c = f.get_color_frame()
                d = f.get_depth_frame()
                if c and d:
                    q.append((np.asanyarray(c.get_data()), np.asanyarray(d.get_data())))
        t = threading.Thread(target=cap, daemon=True); t.start()

        app = Flask(__name__)
        latest = {"buf": None, "seq": 0}
        lock = threading.Lock()
        cond = threading.Condition(lock)

        def publisher():
            while not stop:
                if not q:
                    time.sleep(0.001); continue
                color, depth = q[-1]
                boxes = mp.detect_faces(color)
                mask = mp.build_mask_numpy(depth, boxes, mp.kernel, mp._calc_depth_profile)
                view = mp.fast_pixelate(color, mask, block=mp.anon_block, noise=mp.anon_noise)
                ok, buf = cv2.imencode(".jpg", view, [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality])
                if ok:
                    with lock:
                        latest["buf"] = buf
                        latest["seq"] += 1
                        cond.notify_all()
        threading.Thread(target=publisher, daemon=True).start()

        @app.route("/view")
        def view():
            def gen():
                boundary = b"--frame\r\n"; headers = b"Content-Type: image/jpeg\r\n\r\n"
                last = -1
                while True:
                    with lock:
                        cond.wait_for(lambda: latest["seq"] != last)
                        last = latest["seq"]; buf = latest["buf"]
                    yield boundary + headers + buf.tobytes() + b"\r\n"
            return Response(gen(), mimetype="multipart/x-mixed-replace; boundary=frame")

        print(f"Open http://localhost:{port}/view  (SSH: ssh -L {port}:localhost:{port} <user>@jetson)")
        app.run(host="0.0.0.0", port=port, threaded=True, use_reloader=False)
        stop = True; pipeline.stop()

    # To launch, uncomment:
    # run_server(port=5001, jpeg_quality=80, preview_width=640, save=False)
else:
    print("Install pyrealsense2 and Flask on the Jetson to run the live server.")



## 7) Troubleshooting

- **`cuda.is_available() == False`**: verify JetPack/CUDA install and use the correct NVIDIA PyTorch wheel for your JetPack.
- **`onnxruntime` missing CUDA/TensorRT providers**: install `onnxruntime-gpu` for Jetson (Jetson AI Lab wheel), confirm `get_available_providers()` lists `CUDAExecutionProvider` and `TensorrtExecutionProvider` (if configured).
- **RealSense errors**: check `udev` rules and that `pyrealsense2` matches your firmware; try `sudo apt install librealsense2-utils` and `realsense-viewer`.
- **OOM / crashes building torchvision**: add swap, limit builds to `-j1`, or skip torchvision and use OpenCV for transforms.
- **Flask stream not visible**: open `/view` URL; if remote, use SSH tunnel: `ssh -L 5001:localhost:5001 user@jetson`.
