Implementation of the self-calibration pipeline on an ETH3D SLAM dataset.  
Given only a sequence of RGB images from a single moving camera, it:

1. Downloads and loads a monocular ETH3D sequence.
2. Detects and matches local features between frames.
3. Selects geometrically consistent frame pairs and estimates their Fundamental matrices (F).
4. Recovers an initial camera intrinsic matrix (K) (focal length and principal point) from epipolar geometry.
5. Initializes camera poses and triangulates an initial set of 3D points.
6. Incrementally extends the reconstruction across many frames using PnP + triangulation.
7. Performs global reprojection-based outlier filtering.
8. Refines focal length and principal point with a small-scale bundle-adjustment-like (BA) optimization.
9. Saves the estimated intrinsics, camera poses, and 3D points, and optionally compares them against ground truth.

In [13]:
!pip -q install opencv-python-headless==4.10.0.84 numpy scipy

import os, sys, glob, json, shutil, zipfile, pathlib
import numpy as np
import cv2
from matplotlib import pyplot as plt
from dataclasses import dataclass
from typing import List, Tuple, Dict
import re

In [2]:
# Download a monocular sequence from the ETH3D SLAM dataset
import urllib.request, os, zipfile

root = "/content/eth3d"
os.makedirs(root, exist_ok=True)
name = "einstein_1"   # dataset's name
mono_url = f"https://www.eth3d.net/data/slam/datasets/{name}_mono.zip"
dst_zip  = f"{root}/{name}_mono.zip"
dst_dir  = f"{root}/training"

os.makedirs(dst_dir, exist_ok=True)
print("Downloading:", mono_url)
urllib.request.urlretrieve(mono_url, dst_zip)

with zipfile.ZipFile(dst_zip, 'r') as z:
    z.extractall(dst_dir)
os.remove(dst_zip)

# Locate folder with RGB images
rgb_dir = glob.glob(f"{dst_dir}/{name}/**/rgb", recursive=True)[0]
print("Images dir:", rgb_dir, "num imgs:", len(glob.glob(rgb_dir+'/*.png')))


Downloading: https://www.eth3d.net/data/slam/datasets/einstein_1_mono.zip
Images dir: /content/eth3d/training/einstein_1/rgb num imgs: 487


In [3]:
# Load all PNG images from the RGB directory of ETH3D format
img_paths = sorted(glob.glob(os.path.join(rgb_dir, "*.png")))
assert len(img_paths) > 0, f"No PNGs in {rgb_dir}"
images = [cv2.imread(p, cv2.IMREAD_COLOR) for p in img_paths]
assert all(im is not None for im in images), "Some images failed to load"

# Extract image dimensions
H, W = images[0].shape[:2]
print(f"Loaded {len(images)} frames, HxW={H}x{W}")


Loaded 487 frames, HxW=458x739


In [4]:
# Define control parameters for speed/robustness
STRIDE     = 2      # Downsample frame sequence by this stride
MAX_FRAMES = 150    # Limit total number of frames
TARGET_W   = 1280   # Rescale width (used for feature extraction)
FEATURE    = "sift" # Feature type: "sift" | "orb"

# Apply frame stride and limit number of frames
images = images[::STRIDE][:MAX_FRAMES]
H, W = images[0].shape[:2]
cx, cy = W * 0.5, H * 0.5 # Principal point is assumed to be the image center
print(f"Using {len(images)} frames after stride, HxW={H}x{W}")

# Initialize intrinsic matrix K
f0 = 1.2 * max(H, W)
fx = float(f0)
fy = float(f0)
K = np.array([[fx, 0.0, cx],
              [0.0, fy,  cy],
              [0.0, 0.0, 1.0]], dtype=np.float64)

# Store intrinsic priors (used later for regularization in Bundle Adjustment)
K_prior = (fx, fy, cx, cy)


# Data container for storing keypoints and descriptors per frame
@dataclass
class FrameData:
    kps: List[cv2.KeyPoint] # List of keypoints detected in the image
    desc: np.ndarray        # Corresponding descriptor matrix
    pts_px: np.ndarray      # Pixel coordinates (N x 2) at original resolution

# Create a feature extractor
def create_feature(name="sift"):
    name = (name or "sift").lower()
    if name == "sift":
        try:
            return cv2.SIFT_create(nfeatures=3000)
        except Exception:
            pass
    return cv2.ORB_create(nfeatures=3000, scaleFactor=1.2, nlevels=10, edgeThreshold=15, fastThreshold=7)

# Apply CLAHE (Contrast Limited Adaptive Histogram Equalization) to enhance image contrast
def preprocess_gray(g):
    clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
    return clahe.apply(g)

# Convert an image to grayscale
def to_gray(img):
    return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) if img.ndim==3 else img

def match_descriptors(d1, d2, ratio=0.80, min_matches=50):
    """
    Perform two-way descriptor matching:
    - Forward: KNN + Lowe's ratio test.
    - Reverse: 1-NN verification to ensure mutual correspondence.
    Returns a list of matches that satisfy both filters.
    """
    # Sanity checks to ensure both descriptor sets are non-empty
    if d1 is None or d2 is None:
        return []
    if len(d1) == 0 or len(d2) == 0:
        return []

    # Choose matcher type based on descriptor data type
    is_bin = (d1.dtype == np.uint8 and d2.dtype == np.uint8)
    if is_bin:
        matcher_fwd = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
        matcher_rev = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
        d1_use, d2_use = d1, d2
        d2_use_rev, d1_use_rev = d2, d1
    else:
        d1_use = np.asarray(d1, dtype=np.float32)
        d2_use = np.asarray(d2, dtype=np.float32)
        d2_use_rev = d2_use
        d1_use_rev = d1_use
        index_params = dict(algorithm=1, trees=5)
        search_params = dict(checks=64)
        matcher_fwd = cv2.FlannBasedMatcher(index_params, search_params)
        matcher_rev = cv2.FlannBasedMatcher(index_params, search_params)

    # Perform KNN matching (k=2) for Lowe's ratio filtering
    knn = matcher_fwd.knnMatch(d1_use, d2_use, k=2)
    ratio_pass = []
    for pair in knn:
        if len(pair) < 2:
            continue
        m, n = pair
        if m.distance < ratio * n.distance:
            ratio_pass.append(m)

    if not ratio_pass:
        return []

    # Perform reverse 1-NN matching to check mutual correspondence
    knn_rev = matcher_rev.knnMatch(d2_use_rev, d1_use_rev, k=1)
    rev_map = {}
    for pair in knn_rev:
        if not pair:
            continue
        r = pair[0]
        rev_map[r.queryIdx] = r.trainIdx

    # Mutual match check: only retain matches where forward and reverse agree
    mutual = []
    seen = set()  # avoid duplicates
    for m in ratio_pass:
        if rev_map.get(m.trainIdx, -1) == m.queryIdx:
            key = (m.queryIdx, m.trainIdx)
            if key not in seen:
                seen.add(key)
                mutual.append(m)

    # If mutual matches are too few, skip further processing
    if len(mutual) < min_matches:
        return []

    return mutual

# Robust estimation of the Fundamental matrix
def ransac_F(pts1, pts2):
    F, mask = cv2.findFundamentalMat(pts1, pts2, cv2.FM_RANSAC,
                                     ransacReprojThreshold=3.0, confidence=0.995, maxIters=4000)
    # Retry with more iterations if initial estimation fails
    if F is None or F.shape!=(3,3):
        F, mask = cv2.findFundamentalMat(pts1, pts2, cv2.FM_RANSAC,
                                         ransacReprojThreshold=3.0, confidence=0.995, maxIters=8000)
    mask = (mask.ravel().astype(bool) if mask is not None else np.zeros(len(pts1), bool))
    return F, mask


Using 150 frames after stride, HxW=458x739


In [5]:
feat = create_feature(FEATURE)

# Initialize the list to store extracted feature information for each frame
frames: List[FrameData] = []

for img in images:
    # Compute the scaling factor to resize the image to the target working width
    scale = min(1.0, TARGET_W / float(img.shape[1]))
    img_small = cv2.resize(img, (int(img.shape[1]*scale), int(img.shape[0]*scale)),
                           interpolation=cv2.INTER_AREA) if scale<1.0 else img
    gray = preprocess_gray(to_gray(img_small))

    # Detect keypoints and compute descriptors using the selected feature detector
    kps_small, desc = feat.detectAndCompute(gray, None)

    # Map keypoints back to original scale so the rest of geometry uses full-res pixels
    kps, pts_px = [], []
    inv = 1.0 / scale

    if scale == 1.0:
        # No scaling was applied
        kps = kps_small
        pts_px = [kp.pt for kp in kps_small]
    else:
        for kp in kps_small:
            x = kp.pt[0] * inv
            y = kp.pt[1] * inv

            # Scale the keypoint size appropriately and preserve its original attributes
            size   = max(1.0, kp.size * inv)
            angle  = float(kp.angle) if kp.angle is not None else -1.0
            resp   = float(kp.response)
            octave = int(kp.octave)
            clsid  = int(kp.class_id)

            new_kp = cv2.KeyPoint(float(x), float(y), float(size), float(angle), resp, octave, clsid)

            # Append the new keypoint and its position to the lists
            kps.append(new_kp)
            pts_px.append([x, y])

    pts_px = np.array(pts_px, dtype=np.float32)
    frames.append(FrameData(kps=kps, desc=desc, pts_px=pts_px))


print("Features extracted for", len(frames), "frames")


Features extracted for 150 frames


In [6]:
# Normalize pixel coordinates using camera intrinsics
def normalize_points(pts_px, K):
    fx, fy = K[0,0], K[1,1]; cx, cy = K[0,2], K[1,2]
    x = (pts_px[:,0]-cx)/fx; y = (pts_px[:,1]-cy)/fy
    return np.stack([x,y], axis=1)

In [7]:
# List to store high-quality image pairs with sufficient parallax and valid epipolar geometry
good_pairs = []

# Helper function to skip visually similar consecutive frames
def frames_too_similar(i, j, thr=1.5):
    g1 = to_gray(images[i]).astype(np.float32)
    g2 = to_gray(images[j]).astype(np.float32)
    return float(np.mean(np.abs(g1 - g2))) < thr

# Main loop to iterate over the frame sequence and identify geometrically valid pairs
i = 0
while i < len(frames)-1:
    tried = False

    # Try pairing current frame with the next or the one after
    for j in (i+1, i+2 if i+2 < len(frames) else i+1):
        if j >= len(frames) or frames_too_similar(i,j):
            continue
        f1, f2 = frames[i], frames[j]

        # Match descriptors with Lowe's ratio test + mutual check
        matches = match_descriptors(f1.desc, f2.desc, ratio=0.85)
        if len(matches) < 60:
            continue

        # Extract matched keypoint coordinates
        pts1 = np.array([f1.kps[m.queryIdx].pt for m in matches], np.float32)
        pts2 = np.array([f2.kps[m.trainIdx].pt for m in matches], np.float32)
        F, mask = ransac_F(pts1, pts2)
        inl = int(mask.sum())

        print(f"[debug] {i}-{j} matches = {len(matches)}")
        if F is not None:
            print(f"[debug]     inliers = {inl}, cond(F) = {np.linalg.cond(F):.1e}")

        # Check quality of fundamental matrix and number of inliers
        if F is not None and inl >= 30 and np.linalg.cond(F) < 1e20:
            good = [matches[k] for k in range(len(matches)) if mask[k]]
            x1n = normalize_points(pts1[mask], K)
            x2n = normalize_points(pts2[mask], K)

            # Estimate relative pose using the essential matrix derived from F
            E_tmp = K.T @ F @ K
            _, Rtmp, ttmp, _ = cv2.recoverPose(
                E_tmp,
                cv2.undistortPoints(pts1[mask].reshape(-1,1,2), K, None),
                cv2.undistortPoints(pts2[mask].reshape(-1,1,2), K, None)
            )

            # Create normalized ray directions for both views
            b1 = np.column_stack([x1n, np.ones(len(x1n))])
            b2 = (Rtmp @ np.column_stack([x2n, np.ones(len(x2n))]).T).T
            b1 /= np.linalg.norm(b1, axis=1, keepdims=True)
            b2 /= np.linalg.norm(b2, axis=1, keepdims=True)

            # Compute parallax angles (in degrees) between rays
            ang = np.degrees(np.arccos(np.clip(np.sum(b1*b2, axis=1), -1, 1)))

            if np.median(ang) < 1.5:
                continue

            good_pairs.append((i, j, good, F))
            i = j
            tried = True
            break

    if not tried:
        print(f"[warn] Skipping weak pair at {i}-{i+1}")
        i += 1
    #if len(good_pairs) >= 60:
    #    print(f"[info] Enough good pairs: {len(good_pairs)}. Stop.")
    #    break

assert len(good_pairs) > 0, "No good pairs found. Try larger STRIDE or different sequence."
print("Good pairs:", len(good_pairs))


[debug] 0-1 matches = 118
[debug]     inliers = 108, cond(F) = 8.7e+16
[debug] 1-2 matches = 116
[debug]     inliers = 107, cond(F) = 7.7e+16
[debug] 2-3 matches = 128
[debug]     inliers = 115, cond(F) = 2.5e+16
[debug] 3-4 matches = 162
[debug]     inliers = 153, cond(F) = 7.9e+17
[debug] 4-5 matches = 154
[debug]     inliers = 144, cond(F) = inf
[debug] 4-6 matches = 143
[debug]     inliers = 130, cond(F) = 1.4e+17
[debug] 6-7 matches = 185
[debug]     inliers = 176, cond(F) = 5.8e+16
[debug] 6-8 matches = 169
[debug]     inliers = 158, cond(F) = 3.7e+17
[debug] 8-9 matches = 191
[debug]     inliers = 176, cond(F) = 5.5e+16
[debug] 8-10 matches = 184
[debug]     inliers = 167, cond(F) = 2.1e+17
[debug] 10-11 matches = 199
[debug]     inliers = 193, cond(F) = 1.0e+17
[debug] 11-12 matches = 202
[debug]     inliers = 190, cond(F) = 2.6e+17
[debug] 11-13 matches = 185
[debug]     inliers = 175, cond(F) = 1.8e+17
[debug] 13-14 matches = 229
[debug]     inliers = 223, cond(F) = 7.5e+17
[

In [8]:
# Estimate initial focal length from Fundamental matrix via grid search and SVD analysis
def estimate_focal_from_F(F, H, W, cx, cy):
    # Construct intrinsic matrix with given focal length and fixed principal point
    def score(f):
        K = np.array([[f,0,cx],[0,f,cy],[0,0,1.0]], float)
        E = K.T @ F @ K
        _, s, _ = np.linalg.svd(E)
        return (s[0]-s[1])**2 + (s[2])**2

    # Initial grid search range: 60% to 250% of max image dimension
    maxWH = float(max(W,H))
    lo, hi = 0.6*maxWH, 2.5*maxWH
    grid = np.linspace(lo, hi, 20)
    vals = [score(f) for f in grid]

    # Select best candidate based on minimum score
    f = grid[int(np.argmin(vals))]

    # Local refinement with decreasing step size
    for step in [0.25, 0.1, 0.05, 0.02]:
        span = step*maxWH
        loc = np.linspace(max(lo, f-span), min(hi, f+span), 10)
        f = loc[int(np.argmin([score(ff) for ff in loc]))]
    return float(f)

f_cands = []

# For each good fundamental matrix, attempt to estimate focal length
for _,_,_,F in good_pairs:
    try:
        f_cands.append(estimate_focal_from_F(F, H, W, cx, cy))
    except:
        pass

# Aggregate and validate focal length estimates
min_f = 0.6*max(W,H)
if len(f_cands)==0:
    f_init = 1.2*max(W,H)
else:
    f_init = float(np.median(f_cands))
    if not np.isfinite(f_init) or f_init < min_f:
        f_init = 1.0*max(W,H)

# Initialize intrinsics matrix K with estimated focal length
fx = float(f_init)
fy = float(f_init)
K  = np.array([[fx, 0, cx],
               [0,  fy, cy],
               [0,   0,  1.0]], float)
K_prior = (fx, fy, cx, cy)
print(f"[Init intrinsics] f≈{fx:.1f} px, cx={cx:.1f}, cy={cy:.1f}")



[Init intrinsics] f≈443.4 px, cx=369.5, cy=229.0


In [9]:
# Triangulates 3D points from corresponding normalized image coordinates
def triangulate_points(P0, P1, x0, x1):
    x0_h = np.vstack([x0.T, np.ones((1,x0.shape[0]))])
    x1_h = np.vstack([x1.T, np.ones((1,x1.shape[0]))])
    X_h = cv2.triangulatePoints(P0, P1, x0_h[:2,:], x1_h[:2,:])
    X = (X_h[:3,:]/X_h[3,:]).T.copy()
    return X

# Computes the median parallax angle between rays originating from two camera positions to each triangulated 3D point
def parallax_angle(R, t, pts3d):
    """
    Estimates median parallax angle between rays from two cameras to 3D points.
    R, t: rotation and translation from camera 1 to 2
    pts3d: Nx3 triangulated 3D points (in world frame of camera 1)
    Returns angle in degrees.
    """
    rays1 = pts3d / np.linalg.norm(pts3d, axis=1, keepdims=True)
    rays2 = ((R @ pts3d.T) + t.reshape(3, 1)).T
    rays2 = rays2 / np.linalg.norm(rays2, axis=1, keepdims=True)

    cos_angles = np.clip(np.sum(rays1 * rays2, axis=1), -1.0, 1.0)
    angles_rad = np.arccos(cos_angles)
    return np.median(angles_rad) * 180.0 / np.pi  # degrees

# Projects 3D points X into 2D image coordinates using camera intrinsics K and extrinsic parameters given by rvec and tvec
def project_points(X, rvec, tvec, K):
    img_pts, _ = cv2.projectPoints(X.astype(np.float32),
                                   rvec.reshape(3,1).astype(np.float32),
                                   tvec.reshape(3,1).astype(np.float32),
                                   K.astype(np.float32), None)
    return img_pts.reshape(-1,2)

# Camera poses, 3D points and observations initialization
poses: Dict[int, Tuple[np.ndarray,np.ndarray]] = {}
points3d: Dict[int, np.ndarray] = {}
observations: Dict[int, List[Tuple[int, np.ndarray]]] = {}

# Select first seed pair (i0, j0) and estimate relative pose
i0, j0, matches0, F01 = good_pairs[0]
pts1 = np.array([frames[i0].kps[m.queryIdx].pt for m in matches0], np.float32)
pts2 = np.array([frames[j0].kps[m.trainIdx].pt for m in matches0], np.float32)
E = K.T @ F01 @ K
_, R01, t01, inl01 = cv2.recoverPose(
    E,
    cv2.undistortPoints(pts1.reshape(-1,1,2), K, None),
    cv2.undistortPoints(pts2.reshape(-1,1,2), K, None)
)
inl01 = inl01.ravel().astype(bool)

# Check if there is enough parallax between rays in the first pair
x1n_seed = normalize_points(pts1[inl01], K)
x2n_seed = normalize_points(pts2[inl01], K)
b1 = np.column_stack([x1n_seed, np.ones(len(x1n_seed))])
b2 = (R01 @ np.column_stack([x2n_seed, np.ones(len(x2n_seed))]).T).T
b1 /= np.linalg.norm(b1, axis=1, keepdims=True)
b2 /= np.linalg.norm(b2, axis=1, keepdims=True)
ang = np.degrees(np.arccos(np.clip(np.sum(b1*b2, axis=1), -1, 1)))
if np.median(ang) < 3.0:
    raise RuntimeError("Seed pair has too little parallax; increase STRIDE or choose another start.")

# Initialize first two camera poses
poses[i0] = (np.zeros(3), np.zeros(3))
rvec1, _ = cv2.Rodrigues(R01)
poses[j0] = (rvec1.ravel(), t01.ravel())

# Triangulate initial 3D points from first good pair
x1n = normalize_points(pts1[inl01], K)
x2n = normalize_points(pts2[inl01], K)
P0 = np.hstack([np.eye(3), np.zeros((3,1))])
P1 = np.hstack([R01, t01])
X01 = triangulate_points(P0, P1, x1n, x2n)
angle = parallax_angle(R01, t01, X01)

# Filter out 3D points based on cheirality and reprojection error
def _repr_err(R, t, X, m):
    pr = project_points(X[None,:], cv2.Rodrigues(R)[0].ravel()*0 + 0, t, K)[0] if X.ndim==1 else project_points(X, np.zeros(3), t, K)
    return np.linalg.norm(pr - m, axis=-1)

# Filter and validate initial triangulated 3D points between first two frames
kept = 0
for j, X in enumerate(X01):
    z0 = (np.eye(3) @ X + np.zeros(3))[2]
    z1 = (R01 @ X + t01.ravel())[2]
    if z0 <= 0 or z1 <= 0:
        continue

    # Reprojection error sanity check for both views
    pr0 = project_points(X[None,:], np.zeros(3), np.zeros(3), K)[0]
    pr1 = project_points(X[None,:], cv2.Rodrigues(R01)[0].ravel(), t01.ravel(), K)[0]
    if np.linalg.norm(pr0 - pts1[inl01][j]) < 2.5 and np.linalg.norm(pr1 - pts2[inl01][j]) < 2.5:
        pid = len(points3d)
        points3d[pid] = X
        observations[pid] = [(i0, pts1[inl01][j]), (j0, pts2[inl01][j])]
        kept += 1
if kept < 50:
    print(f"[warn] Few seed 3D points kept: {kept}")

# Begin incremental reconstruction using remaining good image pairs
for (ia, ib, matches, F_ab) in good_pairs[1:]:
    f_cur, f_nxt = frames[ia], frames[ib]
    pts2d, pts3d_list = [], []
    for m in matches:
        pt_cur = np.array(f_cur.kps[m.queryIdx].pt)

        # Brute-force matching to associate 2D-3D correspondences
        found = False
        for pid, obs in observations.items():
            if any((fi==ia and np.linalg.norm(meas-pt_cur) < 0.5) for (fi,meas) in obs):
                pts3d_list.append(points3d[pid])
                pts2d.append(f_nxt.kps[m.trainIdx].pt)
                found = True
                break
    if len(pts3d_list) >= 6:
        success, rvec, tvec, inl = cv2.solvePnPRansac(
          np.array(pts3d_list, np.float32), np.array(pts2d, np.float32),
          K, None, iterationsCount=2000, reprojectionError=2.5, confidence=0.999
        )

        if not success:
            # Recover pose directly from F matrix
            p_a = np.array([f_cur.kps[m.queryIdx].pt for m in matches], np.float32)
            p_b = np.array([f_nxt.kps[m.trainIdx].pt for m in matches], np.float32)
            Eab = K.T @ F_ab @ K
            _, R, t, _ = cv2.recoverPose(
                Eab,
                cv2.undistortPoints(p_a.reshape(-1,1,2), K, None),
                cv2.undistortPoints(p_b.reshape(-1,1,2), K, None)
            )
            rvec, _ = cv2.Rodrigues(R); tvec = t
    else:
        # Estimate pose via essential matrix if too few correspondences
        p_a = np.array([f_cur.kps[m.queryIdx].pt for m in matches], np.float32)
        p_b = np.array([f_nxt.kps[m.trainIdx].pt for m in matches], np.float32)
        Eab = K.T @ F_ab @ K
        _, R, t, _ = cv2.recoverPose(
            Eab,
            cv2.undistortPoints(p_a.reshape(-1,1,2), K, None),
            cv2.undistortPoints(p_b.reshape(-1,1,2), K, None)
        )
        rvec, _ = cv2.Rodrigues(R); tvec = t

    # Reject views with insufficient parallax for reliable triangulation
    pts_i = np.array([f_cur.kps[m.queryIdx].pt for m in matches], np.float32)
    pts_j = np.array([f_nxt.kps[m.trainIdx].pt for m in matches], np.float32)
    x_i_n = normalize_points(pts_i, K)
    x_j_n = normalize_points(pts_j, K)

    R_est, _ = cv2.Rodrigues(rvec)
    b1 = np.column_stack([x_i_n, np.ones(len(x_i_n))])
    b2 = (R_est @ np.column_stack([x_j_n, np.ones(len(x_j_n))]).T).T
    b1 = b1 / np.linalg.norm(b1, axis=1, keepdims=True)
    b2 = b2 / np.linalg.norm(b2, axis=1, keepdims=True)
    ang = np.degrees(np.arccos(np.clip(np.sum(b1 * b2, axis=1), -1, 1)))
    if np.median(ang) < 3.0:
        continue

    # Pose assignment
    poses.setdefault(ia, (np.zeros(3), np.zeros(3)))
    poses[ib] = (rvec.ravel(), tvec.ravel())

    # Triangulate new points
    R_i, _ = cv2.Rodrigues(poses[ia][0]); t_i = poses[ia][1].reshape(3,1)
    R_j, _ = cv2.Rodrigues(poses[ib][0]); t_j = poses[ib][1].reshape(3,1)
    P_i = np.hstack([R_i, t_i]); P_j = np.hstack([R_j, t_j])

    pts_i = np.array([f_cur.kps[m.queryIdx].pt for m in matches], np.float32)
    pts_j = np.array([f_nxt.kps[m.trainIdx].pt for m in matches], np.float32)
    x_i = normalize_points(pts_i, K); x_j = normalize_points(pts_j, K)
    X_ij = triangulate_points(P_i, P_j, x_i, x_j)

    # Validate and store newly triangulated points
    for k, Xk in enumerate(X_ij):
        z_i = (R_i @ Xk + t_i.ravel())[2]
        z_j = (R_j @ Xk + t_j.ravel())[2]
        if z_i <= 0 or z_j <= 0:
            continue

        pr_i = project_points(Xk[None, :], poses[ia][0], poses[ia][1], K)[0]
        pr_j = project_points(Xk[None, :], poses[ib][0], poses[ib][1], K)[0]

        if (np.linalg.norm(pr_i - pts_i[k]) < 2.5) and (np.linalg.norm(pr_j - pts_j[k]) < 2.5):
            matched_pid = None
            pt_i = pts_i[k]

            for pid, obs in observations.items():
                for cam_id, pt in obs:
                    if cam_id == ia and np.linalg.norm(pt - pt_i) < 0.8:
                        matched_pid = pid
                        break
                if matched_pid is not None:
                    break

            if matched_pid is not None:
                # Associate this observation to an existing 3D point
                observations[matched_pid].append((ib, pts_j[k]))
            else:
                # Register a new 3D point
                pid = len(points3d)
                points3d[pid] = Xk
                observations.setdefault(pid, []).append((ia, pts_i[k]))
                observations[pid].append((ib, pts_j[k]))



[warn] Few seed 3D points kept: 42


In [10]:
# Accumulate all reprojection errors for each 3D point across all its observations
all_errs = []
track_err = {}

# Iterate over all 3D points and their associated 2D observations
for pid, obs in observations.items():
    errs = []
    for (fi, m_px) in obs:
        if fi not in poses:
            continue
        rvec, tvec = poses[fi]

        # Project the 3D point into the image using the current estimated camera pose
        pr = project_points(points3d[pid][None,:], rvec, tvec, K)[0]
        errs.append(float(np.linalg.norm(pr - m_px)))
    if errs:
        track_err[pid] = errs
        all_errs.extend(errs)

# Detect outliers using robust statistics
if all_errs:
    # Compute first and third quartile of reprojection errors
    q1, q3 = np.percentile(all_errs, [25, 75])
    iqr = max(q3 - q1, 1e-6)
    thr = q3 + 2.0*iqr

    # Remove outlier observations for each 3D point
    for pid, errs in list(track_err.items()):
        keep_obs = []
        for (fi, m_px), e in zip(observations[pid], errs):
            if e < thr:
                keep_obs.append((fi, m_px))
        if len(keep_obs) >= 2:
            observations[pid] = keep_obs
        else:
            observations.pop(pid, None)
            points3d.pop(pid, None)


In [11]:
try:
    from scipy.optimize import least_squares
    SCIPY_OK = True
except Exception:
    SCIPY_OK = False

# Refine focal length and principal point
def ba_refine_f_cx_cy(observations, poses, points3d, f0, cx0, cy0, W, H):
    if not SCIPY_OK:
        return f0, cx0, cy0

    # Collect all valid 3D-2D correspondences
    obs_list = []
    for pid, obs in observations.items():
        X = points3d[pid]
        for (fi, m_px) in obs:
            if fi in poses:
                rvec, tvec = poses[fi]
                obs_list.append((X, rvec, tvec, m_px))
    if len(obs_list) < 40:
        return f0, cx0, cy0
    print(f"[BA] observations used: {len(obs_list)}")

    # Define residuals for least-squares optimization
    def residuals(theta):
        f, cx_fit, cy_fit = theta
        K_loc = np.array([[f,0,cx_fit],[0,f,cy_fit],[0,0,1.0]], float)
        res = []

        # Reprojection errors per observation
        for X, rvec, tvec, m_px in obs_list:
            pr, _ = cv2.projectPoints(np.asarray(X).astype(np.float32),
                                      rvec.reshape(3,1).astype(np.float32),
                                      tvec.reshape(3,1).astype(np.float32),
                                      K_loc.astype(np.float32), None)
            pr = pr.reshape(-1,2)[0]
            res.extend((pr - m_px).tolist())

        # Soft regularization: encourage cx,cy to be near image center
        lam_c = 1e-1
        res.append(lam_c * ((cx_fit - (W*0.5)) / (0.5*W)))
        res.append(lam_c * ((cy_fit - (H*0.5)) / (0.5*H)))

        # Soft prior on f to stay near initial estimate
        lam_f = 1e-4
        res.append(lam_f * ((f - f0) / max(W, H)))

        return np.array(res, float)

    # Define search bounds and initial guess
    f_lo = 0.6*max(W,H); f_hi = 3.0*max(W,H)
    cx_lo, cx_hi = 0.3*W, 0.7*W
    cy_lo, cy_hi = 0.3*H, 0.7*H
    x0 = np.array([f0, np.clip(cx0, cx_lo, cx_hi), np.clip(cy0, cy_lo, cy_hi)], float)

    # Run least-squares optimizer
    try:
        res = least_squares(residuals, x0=x0,
                            bounds=([f_lo, cx_lo, cy_lo], [f_hi, cx_hi, cy_hi]),
                            loss="soft_l1", f_scale=2.0, max_nfev=200, verbose=0)
        f, cx, cy = map(float, res.x)
    except Exception:
        f, cx, cy = f0, cx0, cy0
    return f, cx, cy

# Apply BA-based refinement to intrinsics
f_refined, cx_ref, cy_ref = ba_refine_f_cx_cy(
    observations, poses, points3d, f_init, cx, cy, W, H
)

# Final clipping and safety checks for intrinsics
f_floor = 0.6*max(W,H)
if not np.isfinite(f_refined) or f_refined < f_floor:
    f_refined = max(f_init, 0.8*max(W,H))

cx_ref = float(np.clip(cx_ref, 0.3*W, 0.7*W))
cy_ref = float(np.clip(cy_ref, 0.3*H, 0.7*H))

# Final K matrix construction
K_ref = np.array([[f_refined, 0.0, cx_ref],
                  [0.0,      f_refined, cy_ref],
                  [0.0,      0.0,       1.0]], dtype=np.float64)

K = K_ref.copy()
fx, fy, cx, cy = float(K[0,0]), float(K[1,1]), float(K[0,2]), float(K[1,2])

assert abs(fx - f_refined) < 1e-6 and abs(fy - f_refined) < 1e-6, "fx/fy != f_refined"
assert abs(cx - cx_ref)    < 1e-6 and abs(cy - cy_ref)    < 1e-6, "cx/cy != cx_ref/cy_ref"

print(f"[Refined intrinsics] f≈{fx:.1f} px, cx={cx:.1f}, cy={cy:.1f}")

out_dir = "/content/out_sfm"
os.makedirs(out_dir, exist_ok=True)

np.save(os.path.join(out_dir, "K.npy"), K)


[BA] observations used: 32630
[Refined intrinsics] f≈443.4 px, cx=369.5, cy=229.0


In [14]:
K_est = np.load("/content/out_sfm/K.npy")
fx_e, fy_e, cx_e, cy_e = K_est[0,0], K_est[1,1], K_est[0,2], K_est[1,2]

seq_dir = str(pathlib.Path(rgb_dir).parent)
calib_path = os.path.join(seq_dir, "calibration.txt")
print("calibration.txt:", calib_path, os.path.exists(calib_path))

fx_gt=fy_gt=cx_gt=cy_gt=None
if os.path.exists(calib_path):
    with open(calib_path, "r") as f:
        txt = f.read()
    nums = re.findall(r"[-+]?\d*\.\d+|\d+", txt)
    vals = list(map(float, nums))
    for i in range(len(vals)-3):
        fx_gt, fy_gt, cx_gt, cy_gt = vals[i:i+4]
        if 50 < fx_gt < 20000 and 50 < fy_gt < 20000:
            break

    if fx_gt is not None:
        rel_fx = abs(fx_e - fx_gt)/fx_gt
        rel_fy = abs(fy_e - fy_gt)/fy_gt
        dx = abs(cx_e - cx_gt); dy = abs(cy_e - cy_gt)

        print(f"[GT compare] fx: est={fx_e:.1f} gt={fx_gt:.1f}  rel.err={100*rel_fx:.2f}%")
        print(f"[GT compare] fy: est={fy_e:.1f} gt={fy_gt:.1f}  rel.err={100*rel_fy:.2f}%")
        print(f"[GT compare] cx: est={cx_e:.1f} gt={cx_gt:.1f}  |Δ|={dx:.2f}px")
        print(f"[GT compare] cy: est={cy_e:.1f} gt={cy_gt:.1f}  |Δ|={dy:.2f}px")
    else:
        print("Ground-truth intrinsics not parsed; check file format.")
else:
    print("No ground truth file found for this sequence (test set or different layout).")


calibration.txt: /content/eth3d/training/einstein_1/calibration.txt True
[GT compare] fx: est=443.4 gt=726.3  rel.err=38.95%
[GT compare] fy: est=443.4 gt=726.3  rel.err=38.95%
[GT compare] cx: est=369.5 gt=354.6  |Δ|=14.85px
[GT compare] cy: est=229.0 gt=186.5  |Δ|=42.53px
