# COLMAP Pose Estimation for Stack Sessions

Process raw capture sessions (ultrawide camera) through COLMAP SfM to get 6DoF poses.

**Requirements:** Google Colab (GPU optional but speeds up feature extraction)

**Pipeline:**
1. Mount Google Drive, unzip sessions
2. Subsample frames (60fps → 10fps for better feature matching)
3. Run COLMAP: feature extraction → sequential matching → SfM
4. Interpolate poses back to 60fps
5. Write poses.json back to session

## 1. Setup

In [None]:
import os
os.environ['QT_QPA_PLATFORM'] = 'offscreen'

!pip install pycolmap -q
!pip install opencv-python-headless scipy matplotlib -q

import json
import shutil
import subprocess
from pathlib import Path

import cv2
import numpy as np
import matplotlib.pyplot as plt
from scipy.spatial.transform import Rotation, Slerp
from scipy.interpolate import interp1d

try:
    import pycolmap
    print(f"pycolmap {pycolmap.__version__} installed")
except ImportError:
    print("ERROR: pycolmap not installed")

print("Setup complete")

## 2. Mount Google Drive & Select Session

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Set the path to your session directory on Google Drive
DRIVE_SESSIONS_DIR = Path('/content/drive/MyDrive/stack_sessions')

In [None]:
# Unzip uploaded session archives (skip already-extracted ones)
import subprocess
if DRIVE_SESSIONS_DIR.exists():
    zips = sorted(DRIVE_SESSIONS_DIR.glob('*.zip'))
    print(f"Found {len(zips)} zip files")
    for z in zips:
        session_dir = DRIVE_SESSIONS_DIR / z.stem
        if session_dir.exists():
            print(f"  {z.name}: already extracted, skipping")
        else:
            print(f"  {z.name}: extracting...")
            subprocess.run(['unzip', '-q', '-o', str(z), '-d', str(DRIVE_SESSIONS_DIR)], check=True)
    print("Done!")
else:
    print(f"Upload zipped sessions to Google Drive: My Drive/stack_sessions/")

In [None]:
# List available sessions
if DRIVE_SESSIONS_DIR.exists():
    sessions = sorted([d for d in DRIVE_SESSIONS_DIR.iterdir() if d.is_dir() and d.name.startswith('session_')])
    print(f"Found {len(sessions)} sessions:")
    for s in sessions:
        meta_file = s / 'metadata.json'
        if meta_file.exists():
            with open(meta_file) as f:
                meta = json.load(f)
            source = meta.get('captureSource', 'iphone_arkit')
            processed = meta.get('slamProcessed', False)
            n_frames = meta.get('rgbFrameCount', '?')
            status = 'done' if processed else ('arkit' if source == 'iphone_arkit' else 'needs SLAM')
            print(f"  {s.name}: {n_frames} frames, source={source}, status={status}")
        else:
            print(f"  {s.name}: no metadata")
else:
    print(f"No sessions found at {DRIVE_SESSIONS_DIR}")

In [None]:
# Select session to process
SESSION_NAME = 'session_2026-02-20_143000'  # <-- Change this
SESSION_DIR = DRIVE_SESSIONS_DIR / SESSION_NAME

assert SESSION_DIR.exists(), f"Session not found: {SESSION_DIR}"

# Load metadata
with open(SESSION_DIR / 'metadata.json') as f:
    metadata = json.load(f)
print(json.dumps(metadata, indent=2))

## 3. Subsample Frames & Prepare COLMAP

60fps is too dense for feature matching — subsample to ~10fps, run COLMAP, then interpolate back.

In [None]:
# Load all frame paths
rgb_dir = SESSION_DIR / 'rgb'
all_frame_paths = sorted(rgb_dir.glob('*.jpg'))
N_total = len(all_frame_paths)
print(f"Total frames: {N_total}")

# Preview first frame
first_frame = cv2.imread(str(all_frame_paths[0]))
first_frame = cv2.cvtColor(first_frame, cv2.COLOR_BGR2RGB)
H, W = first_frame.shape[:2]
plt.figure(figsize=(8, 6))
plt.imshow(first_frame)
plt.title(f"Frame 0: {W}x{H}")
plt.axis('off')
plt.show()

# Subsample: every 6th frame (60fps → 10fps)
SUBSAMPLE = 6
sub_indices = list(range(0, N_total, SUBSAMPLE))
# Always include last frame for full coverage
if sub_indices[-1] != N_total - 1:
    sub_indices.append(N_total - 1)
print(f"Subsampled: {len(sub_indices)} frames (every {SUBSAMPLE}th)")

# Copy subsampled frames to local working directory (faster than Drive)
WORK_DIR = Path('/content/colmap_work')
WORK_DIR.mkdir(exist_ok=True)
(WORK_DIR / 'images').mkdir(exist_ok=True)
(WORK_DIR / 'sparse').mkdir(exist_ok=True)

# Use sequential naming so COLMAP processes them in order
for new_idx, orig_idx in enumerate(sub_indices):
    src = all_frame_paths[orig_idx]
    dst = WORK_DIR / 'images' / f'{new_idx:06d}.jpg'
    if not dst.exists():
        shutil.copy2(src, dst)

print(f"Copied {len(sub_indices)} frames to {WORK_DIR / 'images'}")

## 4. Run COLMAP

Install COLMAP CLI and run: feature extraction → sequential matching → sparse reconstruction.

In [None]:
# Install COLMAP CLI
!apt install -y colmap 2>&1 | tail -3
!colmap -h 2>&1 | head -2

In [None]:
db_path = WORK_DIR / 'database.db'
image_path = WORK_DIR / 'images'
sparse_path = WORK_DIR / 'sparse'

# Remove old database if re-running
if db_path.exists():
    db_path.unlink()

# 1. Feature extraction (CPU SIFT) — single camera, SIMPLE_RADIAL model
!colmap feature_extractor \
    --database_path {db_path} \
    --image_path {image_path} \
    --ImageReader.camera_model SIMPLE_RADIAL \
    --ImageReader.single_camera 1 \
    --SiftExtraction.use_gpu 0 \
    --SiftExtraction.max_image_size 480 \
    --SiftExtraction.max_num_features 4096

print("\nFeature extraction done")

In [None]:
# 2. Sequential matching (CPU, exploits video frame ordering)
!colmap sequential_matcher \
    --database_path {db_path} \
    --SiftMatching.use_gpu 0 \
    --SequentialMatching.overlap 10 \
    --SequentialMatching.loop_detection 0

print("\nSequential matching done")

# 3. Incremental SfM (mapper) — no GPU needed
if sparse_path.exists():
    shutil.rmtree(sparse_path)
sparse_path.mkdir()

!colmap mapper \
    --database_path {db_path} \
    --image_path {image_path} \
    --output_path {sparse_path} \
    --Mapper.init_min_num_inliers 50 \
    --Mapper.ba_refine_focal_length 1 \
    --Mapper.ba_refine_extra_params 1

# Check reconstruction
recon_dirs = sorted(sparse_path.iterdir())
print(f"\nReconstructions: {len(recon_dirs)}")
for d in recon_dirs:
    images_file = d / 'images.bin'
    if images_file.exists():
        r = pycolmap.Reconstruction(str(d))
        print(f"  {d.name}: {r.num_images()} images registered / {len(sub_indices)} total")

## 5. Extract Poses & Interpolate to 60fps

COLMAP gives poses for subsampled frames. Interpolate (Slerp for rotations, linear for translation) to recover poses for all original frames.

In [None]:
# Load best reconstruction (most registered images)
best_recon_dir = max(recon_dirs, key=lambda d: pycolmap.Reconstruction(str(d)).num_images() if (d / 'images.bin').exists() else 0)
recon = pycolmap.Reconstruction(str(best_recon_dir))
print(f"Using reconstruction: {best_recon_dir.name} ({recon.num_images()} images)")

# Extract poses for subsampled frames
sub_poses = {}  # orig_frame_idx → 4x4 pose matrix
for image_id, image in recon.images.items():
    fname = image.name  # e.g. "000042.jpg"
    sub_idx = int(Path(fname).stem)
    orig_idx = sub_indices[sub_idx]

    # COLMAP gives cam_from_world (world→camera transform)
    # We want world_from_cam (camera→world, i.e., camera pose in world frame)
    cfw = image.cam_from_world()  # Rigid3d
    R_cw = np.array(cfw.rotation.matrix())
    t_cw = np.array(cfw.translation)

    # Invert: world_from_cam
    R_wc = R_cw.T
    t_wc = -R_wc @ t_cw

    pose = np.eye(4)
    pose[:3, :3] = R_wc
    pose[:3, 3] = t_wc
    sub_poses[orig_idx] = pose

print(f"Got poses for {len(sub_poses)} / {len(sub_indices)} subsampled frames")

# Check coverage
registered_indices = sorted(sub_poses.keys())
print(f"Frame range: {registered_indices[0]} to {registered_indices[-1]}")

# Visualize subsampled trajectory
positions = np.array([sub_poses[i][:3, 3] for i in registered_indices])
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')
ax.plot(positions[:, 0], positions[:, 1], positions[:, 2], 'b.-', linewidth=0.5, markersize=2)
ax.scatter(positions[0, 0], positions[0, 1], positions[0, 2], c='g', s=100, label='Start')
ax.scatter(positions[-1, 0], positions[-1, 1], positions[-1, 2], c='r', s=100, label='End')
ax.set_xlabel('X'); ax.set_ylabel('Y'); ax.set_zlabel('Z')
ax.legend()
ax.set_title(f'COLMAP Trajectory ({len(registered_indices)} keyframes)')
plt.show()

In [ ]:
# Interpolate poses to all original frames (60fps)
key_indices = np.array(registered_indices, dtype=float)
key_translations = np.array([sub_poses[i][:3, 3] for i in registered_indices])
key_rotations = Rotation.from_matrix([sub_poses[i][:3, :3] for i in registered_indices])

# Target: all frame indices between first and last registered
first_reg, last_reg = registered_indices[0], registered_indices[-1]
all_indices = np.arange(first_reg, last_reg + 1, dtype=float)

# Interpolate translations (linear)
interp_tx = interp1d(key_indices, key_translations[:, 0], kind='linear')
interp_ty = interp1d(key_indices, key_translations[:, 1], kind='linear')
interp_tz = interp1d(key_indices, key_translations[:, 2], kind='linear')

# Interpolate rotations (Slerp)
slerp = Slerp(key_indices, key_rotations)

# Build full pose array
poses_4x4 = np.zeros((N_total, 4, 4), dtype=np.float64)
poses_valid = np.zeros(N_total, dtype=bool)

for idx in all_indices:
    i = int(idx)
    t = np.array([interp_tx(idx), interp_ty(idx), interp_tz(idx)])
    R = slerp(idx).as_matrix()
    poses_4x4[i, :3, :3] = R
    poses_4x4[i, :3, 3] = t
    poses_4x4[i, 3, 3] = 1.0
    poses_valid[i] = True

n_interpolated = poses_valid.sum()
print(f"Interpolated {n_interpolated} poses (frames {first_reg}–{last_reg} of {N_total})")
if first_reg > 0 or last_reg < N_total - 1:
    print(f"  Note: frames 0–{first_reg-1} and {last_reg+1}–{N_total-1} have no pose (outside COLMAP range)")

## 6. Write Poses to Session

In [ ]:
# Build poses.json in StackCapture format
poses_list = []
for i in range(N_total):
    if poses_valid[i]:
        poses_list.append({
            'timestamp': float(i) / 60.0,
            'rgbIndex': i,
            'depth': None,
            'transform': poses_4x4[i].tolist(),
        })

# Write poses.json
poses_file = SESSION_DIR / 'poses.json'
with open(poses_file, 'w') as f:
    json.dump(poses_list, f, indent=2)
print(f"Wrote {len(poses_list)} poses to {poses_file}")

# Also save COLMAP intrinsics to calib.txt
for cam_id, cam in recon.cameras.items():
    params = cam.params  # SIMPLE_RADIAL: [f, cx, cy, k]
    calib_str = f"{params[0]:.4f} {params[0]:.4f} {params[1]:.4f} {params[2]:.4f}"
    calib_file = SESSION_DIR / 'calib.txt'
    calib_file.write_text(calib_str)
    print(f"Wrote intrinsics: {calib_str} (k={params[3]:.4f})")
    break

# Update metadata
metadata['slamProcessed'] = True
metadata['poseCount'] = len(poses_list)
with open(SESSION_DIR / 'metadata.json', 'w') as f:
    json.dump(metadata, f, indent=2)
print(f"Updated metadata: slamProcessed=true")

print(f"\nSession {SESSION_NAME} ready for training!")

## 7. Batch Process All Sessions

In [None]:
# Process all unprocessed sessions
def process_session_colmap(session_dir, subsample=6):
    """Run COLMAP SfM on a single session, write poses.json."""
    env = {**os.environ, 'QT_QPA_PLATFORM': 'offscreen'}

    rgb_dir = session_dir / 'rgb'
    frame_paths = sorted(rgb_dir.glob('*.jpg'))
    n_total = len(frame_paths)

    # Subsample
    sub_indices = list(range(0, n_total, subsample))
    if sub_indices[-1] != n_total - 1:
        sub_indices.append(n_total - 1)

    # Working directory
    work = Path('/content/colmap_work')
    if work.exists():
        shutil.rmtree(work)
    work.mkdir()
    (work / 'images').mkdir()
    (work / 'sparse').mkdir()

    for new_idx, orig_idx in enumerate(sub_indices):
        shutil.copy2(frame_paths[orig_idx], work / 'images' / f'{new_idx:06d}.jpg')

    db = work / 'database.db'

    # Feature extraction (CPU)
    result = subprocess.run([
        'colmap', 'feature_extractor',
        '--database_path', str(db),
        '--image_path', str(work / 'images'),
        '--ImageReader.camera_model', 'SIMPLE_RADIAL',
        '--ImageReader.single_camera', '1',
        '--SiftExtraction.use_gpu', '0',
        '--SiftExtraction.max_image_size', '480',
        '--SiftExtraction.max_num_features', '4096',
    ], capture_output=True, text=True, env=env)
    if result.returncode != 0:
        return False, f"Feature extraction failed: {result.stderr[-200:]}"

    # Sequential matching (CPU)
    result = subprocess.run([
        'colmap', 'sequential_matcher',
        '--database_path', str(db),
        '--SiftMatching.use_gpu', '0',
        '--SequentialMatching.overlap', '10',
        '--SequentialMatching.loop_detection', '0',
    ], capture_output=True, text=True, env=env)
    if result.returncode != 0:
        return False, f"Matching failed: {result.stderr[-200:]}"

    # Mapper
    result = subprocess.run([
        'colmap', 'mapper',
        '--database_path', str(db),
        '--image_path', str(work / 'images'),
        '--output_path', str(work / 'sparse'),
        '--Mapper.init_min_num_inliers', '50',
        '--Mapper.ba_refine_focal_length', '1',
        '--Mapper.ba_refine_extra_params', '1',
    ], capture_output=True, text=True, env=env)
    if result.returncode != 0:
        return False, f"Mapper failed: {result.stderr[-200:]}"

    # Find best reconstruction
    recon_dirs = sorted((work / 'sparse').iterdir())
    if not recon_dirs:
        return False, "No reconstruction produced"

    best_dir = max(recon_dirs, key=lambda d: pycolmap.Reconstruction(str(d)).num_images() if (d / 'images.bin').exists() else 0)
    recon = pycolmap.Reconstruction(str(best_dir))

    # Extract subsampled poses
    sub_poses = {}
    for img_id, img in recon.images.items():
        sub_idx = int(Path(img.name).stem)
        orig_idx = sub_indices[sub_idx]
        cfw = img.cam_from_world()  # Rigid3d
        R_cw = np.array(cfw.rotation.matrix())
        t_cw = np.array(cfw.translation)
        R_wc = R_cw.T
        t_wc = -R_wc @ t_cw
        pose = np.eye(4)
        pose[:3, :3] = R_wc
        pose[:3, 3] = t_wc
        sub_poses[orig_idx] = pose

    if len(sub_poses) < 3:
        return False, f"Only {len(sub_poses)} images registered"

    # Interpolate to 60fps
    reg_indices = sorted(sub_poses.keys())
    key_idx = np.array(reg_indices, dtype=float)
    key_t = np.array([sub_poses[i][:3, 3] for i in reg_indices])
    key_R = Rotation.from_matrix([sub_poses[i][:3, :3] for i in reg_indices])

    first, last = reg_indices[0], reg_indices[-1]
    all_idx = np.arange(first, last + 1, dtype=float)

    interp_x = interp1d(key_idx, key_t[:, 0])
    interp_y = interp1d(key_idx, key_t[:, 1])
    interp_z = interp1d(key_idx, key_t[:, 2])
    slerp = Slerp(key_idx, key_R)

    poses_list = []
    for idx in all_idx:
        i = int(idx)
        t = np.array([interp_x(idx), interp_y(idx), interp_z(idx)])
        R = slerp(idx).as_matrix()
        pose = np.eye(4)
        pose[:3, :3] = R
        pose[:3, 3] = t
        poses_list.append({
            'timestamp': float(i) / 60.0,
            'rgbIndex': i,
            'depth': None,
            'transform': pose.tolist(),
        })

    # Write poses.json
    with open(session_dir / 'poses.json', 'w') as f:
        json.dump(poses_list, f, indent=2)

    # Write calib.txt
    for cam_id, cam in recon.cameras.items():
        p = cam.params
        (session_dir / 'calib.txt').write_text(f"{p[0]:.4f} {p[0]:.4f} {p[1]:.4f} {p[2]:.4f}")
        break

    # Update metadata
    with open(session_dir / 'metadata.json') as f:
        meta = json.load(f)
    meta['slamProcessed'] = True
    meta['poseCount'] = len(poses_list)
    with open(session_dir / 'metadata.json', 'w') as f:
        json.dump(meta, f, indent=2)

    return True, f"{recon.num_images()}/{len(sub_indices)} keyframes → {len(poses_list)} poses"


# Process all sessions
sessions = sorted([d for d in DRIVE_SESSIONS_DIR.iterdir() if d.is_dir() and d.name.startswith('session_')])
for s in sessions:
    meta_file = s / 'metadata.json'
    if not meta_file.exists():
        continue
    with open(meta_file) as f:
        meta = json.load(f)
    if meta.get('slamProcessed', False):
        print(f"  {s.name}: already processed, skipping")
        continue
    if meta.get('captureSource') == 'iphone_arkit':
        print(f"  {s.name}: ARKit session, skipping")
        continue

    print(f"  {s.name}: processing...", end=' ')
    ok, msg = process_session_colmap(s)
    status = 'OK' if ok else 'FAILED'
    print(f"{status} — {msg}")