In [161]:
import numpy as np
import random

def select_keyframes(image_list, num_frames=10, seed=42):
    return [image_list[2]] if image_list else []


In [162]:
import pycolmap
print(pycolmap.__version__)


3.10.0


In [163]:
def write_tracking_pairs(keyframes, non_keyframes, output_path="outputs/pairs_tracking.txt"):
    with open(output_path, "w") as f:
        for kf in keyframes:
            for img in non_keyframes:
                f.write(f"{kf} {img}\n")
    print(f"Saved tracking pairs to {output_path}")

In [164]:
from read_bin import parse_colmap

pairs, image_list, point_list, extrinsic, intrinsics = parse_colmap("output/sparse/0")

Camera: [798.58179908 360.         480.        ]
Camera: [768.5379993 360.        480.       ]
Camera: [779.58372605 360.         480.        ]
Camera: [786.13156788 360.         480.        ]
Camera: [775.67766255 360.         480.        ]
Camera: [763.8473276 360.        480.       ]
Camera: [756.80149363 360.         480.        ]
Camera: [783.13015337 360.         480.        ]
Camera: [759.49818041 360.         480.        ]
Camera: [780.99718635 360.         480.        ]
Camera: [763.28404248 360.         480.        ]
Camera: [781.16713128 360.         480.        ]
Camera: [751.47161384 360.         480.        ]
Camera: [751.3932387 360.        480.       ]
Camera: [756.30666635 360.         480.        ]
Camera: [749.40304398 360.         480.        ]
Camera: [749.51754237 360.         480.        ]
Camera: [747.97435828 360.         480.        ]
Camera: [759.07320553 360.         480.        ]
Camera: [743.56767632 360.         480.        ]
Camera: [760.32649862 360.   

In [165]:
# print("point_list:", point_list)
# print(point_list[6568])

In [166]:
# print("intrinsics:", intrinsics)

In [167]:
keyframe_names = select_keyframes(image_list, num_frames=1, seed=42)
non_keyframes = [img for img in image_list if img not in keyframe_names]
write_tracking_pairs(keyframe_names, non_keyframes, output_path="outputs/pairs_tracking.txt")

try:
    key_frame_idx = image_list.index(keyframe_names[0])
except ValueError:
    print(f"{keyframe_names[0]} not found in image_list!")
print(f"Keyframe: {keyframe_names[0]}, ID: {key_frame_idx}")

Saved tracking pairs to outputs/pairs_tracking.txt
Keyframe: IMG_0217.jpg, ID: 2


In [168]:
features = pairs[key_frame_idx]
prev_pts = np.array([f[0] for f in features], dtype=np.float32).reshape(-1, 1, 2)
# features

In [169]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import random
key_frame_name = image_list[key_frame_idx]
keyframe_path = f"output/images/{key_frame_name}"
print("Keyframe path:", keyframe_path)
prev_img = cv2.imread(keyframe_path, cv2.IMREAD_GRAYSCALE)
h, w = prev_img.shape
R1, t1 = extrinsic[key_frame_idx]
print("R :",R1)
print("T :",t1)
pair = pairs[key_frame_idx]
# print("pair :", pair)
point_ids = [pid for (_, pid) in pair]
# print("points :", point_ids)
pts3d = [point_list[pid] for pid in point_ids if pid in point_list]
# print("Matched 3D points:", pts3d)


Keyframe path: output/images/IMG_0217.jpg
R : [[ 0.95911098  0.12124239 -0.25574678]
 [-0.12571105  0.99206625 -0.00113537]
 [ 0.25358009  0.03323914  0.96674314]]
T : [ 0.08263159  0.00228529 -0.01172038]


In [170]:
# 全部點都拿進來
sampled_point_ids = point_ids[:]   # 等同於 copy 一份完整的 id list
sampled_3d_points = [point_list[pid] for pid in sampled_point_ids]

# 如果 pairs 是一個 list of (2D, 3D) 對應關係
# 且 2D 在 pair[i][0]，那就直接取所有
sampled_2d_points = [pair[0][0] for pair in pairs]

# 轉 numpy array
pts3d = np.array(sampled_3d_points, dtype=np.float32)
xys   = np.array(sampled_2d_points, dtype=np.float32)


In [171]:
print(image_list)

['IMG_0226.jpg', 'IMG_0222.jpg', 'IMG_0217.jpg', 'IMG_0218.jpg', 'IMG_0223.jpg', 'IMG_0220.jpg', 'IMG_0221.jpg', 'IMG_0219.jpg', 'IMG_0225.jpg', 'IMG_0224.jpg', 'IMG_0215.jpg', 'IMG_0216.jpg', 'IMG_0211.jpg', 'IMG_0202.jpg', 'IMG_0212.jpg', 'IMG_0205.jpg', 'IMG_0201.jpg', 'IMG_0199.jpg', 'IMG_0208.jpg', 'IMG_0198.jpg', 'IMG_0209.jpg', 'IMG_0206.jpg', 'IMG_0207.jpg', 'IMG_0200.jpg', 'IMG_0210.jpg', 'IMG_0204.jpg', 'IMG_0203.jpg', 'IMG_0197.jpg']


In [172]:
import numpy as np
import cv2
import matplotlib.pyplot as plt

def project_and_visualize_points(
    name,                # 圖片名稱（不含路徑）
    target_frame_idx,       # 關鍵幀 index，用來取 intrinsics
    extrinsic,        # 所有相機姿態 dict：idx → (R, t)
    intrinsics,          # 相機內參 dict：idx → K
    pts3d,               # 3D 點 list 或 np.array
    image_shape,         # 圖像尺寸 (h, w)，用於邊界檢查
    img_root="output/images",  # 圖像資料夾路徑
    color_image=None,
    visualize=True       # 是否顯示圖像
):
    h, w = image_shape
    img_path = f"{img_root}/{name}"
    
    if color_image is not None:
        curr_img = color_image
    else:
        curr_img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        if curr_img is None:
            raise FileNotFoundError(f"Image not found: {img_path}")

    R, t = extrinsic[target_frame_idx]
    K = intrinsics[target_frame_idx]

    pts3d_np = np.asarray(pts3d)
    pts_cam = (R @ pts3d_np.T + t.reshape(3, 1)).T  # shape (N, 3)

    pts_proj = (K @ pts_cam.T).T  # shape (N, 3)
    pts_proj /= pts_proj[:, 2:3]  # normalize by z

    point_colors = []
    if color_image is not None:
        vis_img = curr_img.copy()
        for pt2d in pts_proj[:, :2]:
            x, y = int(round(pt2d[0])), int(round(pt2d[1]))
            if 0 <= x < w and 0 <= y < h:
                color = curr_img[y, x]  # shape: (3,) in BGR
                point_colors.append(color[::-1])  # Convert BGR to RGB
                if visualize:
                    cv2.circle(vis_img, (x, y), 2, (0, 255, 0), -1)
            else:
                point_colors.append(np.array([0, 0, 0]))  # fallback color if out of bounds

    if visualize:
        vis_img = cv2.cvtColor(curr_img, cv2.COLOR_GRAY2BGR)
        for x, y in pts_proj[:, :2].astype(int):
            if 0 <= x < w and 0 <= y < h:
                cv2.circle(vis_img, (x, y), 2, (0, 255, 0), -1)

        plt.figure(figsize=(10, 6))
        plt.imshow(vis_img[..., ::-1])  # BGR to RGB
        plt.title(f"Projection onto {name}")
        plt.axis(False)
        plt.show()

    return pts_proj, np.array(point_colors)  # 可選回傳


In [173]:
print(f"Total images: {len(image_list)}")

Total images: 28


In [174]:
def projectpoints_cal(image_list, pts3d, extrinsic, intrinsics, visualize=False):
    """
    將 3D 點投影到所有影像，輸出 dict
    projected_pts[image_name] = (P, 2) numpy array
    """
    projected_pts = {}

    for idx, name in enumerate(image_list):
        img_path = f"output/images/{name}"
        curr_img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        if curr_img is None:
            raise FileNotFoundError(f"找不到圖片 {img_path}")

        pts_proj, valid_mask = project_and_visualize_points(
            name=name,
            target_frame_idx=idx,
            extrinsic=extrinsic,
            intrinsics=intrinsics,
            pts3d=pts3d,
            image_shape=curr_img.shape,
            visualize=visualize
        )

        projected_pts[name] = pts_proj
    return projected_pts


In [175]:
projected_pts = projectpoints_cal(image_list, pts3d, extrinsic, intrinsics, visualize=False)

In [176]:
name = 'IMG_0226.jpg'
# print("pair :", pair)
origin_features = np.array([np.array(f, dtype=float) for (f, idx) in pair], dtype=float)
projected_features = projected_pts[name][:, :2]
# print(origin_features)
# print(projected_features)

In [177]:
import cv2
import numpy as np
import matplotlib.pyplot as plt

def draw_matches_side_by_side(img1, img2, pts1, pts2, max_draw=200):
    """
    在兩張圖之間畫出匹配:
    img1, img2: 灰階或彩色圖片
    pts1, pts2: Nx2 numpy array, 分別是 img1 / img2 上的點
    """
    # 確保彩色
    if len(img1.shape) == 2:
        img1 = cv2.cvtColor(img1, cv2.COLOR_GRAY2BGR)
    if len(img2.shape) == 2:
        img2 = cv2.cvtColor(img2, cv2.COLOR_GRAY2BGR)

    # 拼接圖片 (左右)
    h1, w1 = img1.shape[:2]
    h2, w2 = img2.shape[:2]
    H = max(h1, h2)
    canvas = np.zeros((H, w1 + w2, 3), dtype=np.uint8)
    canvas[:h1, :w1] = img1
    canvas[:h2, w1:w1 + w2] = img2

    # 畫點和線
    for (x1, y1), (x2, y2) in zip(pts1[:max_draw], pts2[:max_draw]):
        color = tuple(np.random.randint(0, 255, 3).tolist())
        cv2.circle(canvas, (int(x1), int(y1)), 3, color, -1)
        cv2.circle(canvas, (int(x2) + w1, int(y2)), 3, color, -1)
        cv2.line(canvas, (int(x1), int(y1)), (int(x2) + w1, int(y2)), color, 1)

    plt.figure(figsize=(16, 8))
    plt.imshow(cv2.cvtColor(canvas, cv2.COLOR_BGR2RGB))
    plt.axis("off")
    plt.title(f"Matches shown: {min(len(pts1), max_draw)}")
    plt.show()

    return canvas


In [178]:
def refine_and_visualize(origin_features, projected_features,
                         kp1, des1, kp2, des2, indices,
                         img2,
                         search_radius=20, topN=2, ratio=0.8,
                         visualize=True, max_draw=1000):
    """
    origin_features: (N,2) keyframe 的 2D 點
    projected_features: (N,2) 投影到 img2 的 2D 點
    kp1, des1: SIFT from img1
    kp2, des2: SIFT from img2
    """
    refined_matches = []    # P*2 (img2 上的座標, 可能是 NaN)
    matched_indices = []
    kp2_pts = np.array([kp.pt for kp in kp2])

    for (orig_pt, proj_pt, indice) in zip(origin_features, projected_features, indices):
        # 找 origin 最近的 kp1 → 取 descriptor1
        dists1 = np.linalg.norm(np.array([kp.pt for kp in kp1]) - orig_pt, axis=1)
        idx1 = np.argmin(dists1)
        desc1 = des1[idx1]

        # 在 img2 投影點附近找候選 kp2
        dists2 = np.linalg.norm(kp2_pts - proj_pt, axis=1)
        candidate_idx = np.where(dists2 < search_radius)[0]

        if len(candidate_idx) == 0:
            # ❌ 沒找到 → 填 NaN
            refined_matches.append([np.nan, np.nan])
            matched_indices.append(indice)
            continue

        cand_desc = des2[candidate_idx]
        desc_dists = np.linalg.norm(cand_desc - desc1, axis=1)

        # 取距離最小的 topN
        best_rel_idx = np.argsort(desc_dists)[:topN]

        if len(best_rel_idx) == 1 or desc_dists[best_rel_idx[0]] < ratio * desc_dists[best_rel_idx[1]]:
            # ✅ 通過 ratio test
            best_j = candidate_idx[best_rel_idx[0]]
            refined_matches.append(kp2[best_j].pt)
            matched_indices.append(indice)
        else:
            # ❌ ratio test 失敗 → 填 NaN
            refined_matches.append([np.nan, np.nan])
            matched_indices.append(indice)

    refined_matches = np.array(refined_matches, dtype=np.float32).reshape(-1, 2)

    # ---------------- 視覺化 ----------------
    if visualize and len(refined_matches) > 0:
        img_vis = cv2.cvtColor(img2, cv2.COLOR_GRAY2BGR)
        for proj_pt, match_pt in zip(projected_features, refined_matches):
            u, v = map(int, proj_pt)
            if not np.isnan(match_pt).any():  # 只畫有匹配的
                mu, mv = map(int, match_pt)
                cv2.circle(img_vis, (u, v), 2, (0, 255, 0), -1)
                cv2.circle(img_vis, (mu, mv), 2, (0, 0, 255), -1)
                cv2.line(img_vis, (u, v), (mu, mv), (255, 0, 0), 1)
            else:
                cv2.circle(img_vis, (u, v), 2, (0, 255, 255), -1)  # 黃色表示 NaN 沒匹配到
        plt.figure(figsize=(12, 8))
        plt.imshow(cv2.cvtColor(img_vis, cv2.COLOR_BGR2RGB))
        plt.axis("off")
        plt.title(f"Refined matches (valid: {np.isfinite(refined_matches).all(axis=1).sum()})")
        plt.show()

    return refined_matches, matched_indices


In [179]:
def filter_origin_by_sift(origin_features, kp1, max_dist=5):
    """
    origin_features: (N,2) 投影點
    kp1: SIFT keypoints (list of cv2.KeyPoint)
    max_dist: 最大距離閾值 (像素)，太遠就不算匹配

    回傳:
      filtered_origins: 有 SIFT 支撐的投影點 (array)
      pairs: [(origin_pt, sift_pt, pixel_error, origin_idx), ...]
      indices: 被保留的 origin 在 origin_features 中的 index
    """
    origin_features = np.array(origin_features)
    filtered_origins = []
    pairs = []
    indices = []

    for kp in kp1:
        sift_pt = np.array(kp.pt)
        # 找最近的 origin
        dists = np.linalg.norm(origin_features - sift_pt, axis=1)
        idx = np.argmin(dists)
        nearest_origin = origin_features[idx]
        nearest_dist = dists[idx]

        if nearest_dist < max_dist:
            filtered_origins.append(nearest_origin)
            indices.append(idx)
            pairs.append((nearest_origin, tuple(sift_pt), nearest_dist, idx))

    return np.array(filtered_origins), pairs, np.array(indices)


In [180]:
def drawkeypoints(img ,kp):
    print(f"偵測到 {len(kp)} 個 keypoints")

    # 畫在灰階圖上
    img_vis = cv2.drawKeypoints(
        img, kp, None,
        flags=cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS,  # 會顯示大小和方向
        color=(0, 255, 0)
    )

    # 顯示
    plt.figure(figsize=(12, 8))
    plt.imshow(cv2.cvtColor(img_vis, cv2.COLOR_BGR2RGB))
    plt.axis("off")
    plt.title("SIFT Keypoints on img")
    plt.show()

In [None]:
# origin_features = projected_pts[key_frame_name][:, :2]
# keyframe_path = f"output/images/{key_frame_name}"
# print("Keyframe path:", keyframe_path)
# img_path = f"output/images/{name}"
# mask_path = f"output/images_mask/{name}"
# mask1 = cv2.imread(f"output/images_mask/{key_frame_name}", cv2.IMREAD_GRAYSCALE)
# mask2 = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
# img1 = cv2.imread(keyframe_path, cv2.IMREAD_GRAYSCALE)
# img2 = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
sift = cv2.SIFT_create(nOctaveLayers=5, contrastThreshold=0.0005, sigma=1.0)
# kp1, des1 = sift.detectAndCompute(img1, mask1)
# drawkeypoints(img1, kp1)
# filtered_origins, pairs, indices = filter_origin_by_sift(origin_features, kp1, max_dist=5)

# # print("保留下來的 origin 數:", len(filtered_origins))
# # print("這些 origin 的 index:", indices[:10])
# # print("第一個 pair:", pairs[0])  # (origin_pt, sift_pt, pixel_error, origin_idx)
# filtered_projected = projected_features[indices]

# kp2, des2 = sift.detectAndCompute(img2, mask2)

# print("num of des:", len(des1), len(des2))

# refined_points, indices = refine_and_visualize(filtered_origins, filtered_projected,
#     kp1, des1, kp2, des2, indices, img2,
#     search_radius=15, topN=1
# )

# # draw_matches_side_by_side(img1, img2, filtered_origins, refined_points, max_draw=1000)

In [182]:
# # 使用 Fundamental matrix RANSAC 過濾
# F, mask = cv2.findFundamentalMat(
#     filtered_origins, refined_points,
#     cv2.FM_RANSAC,
#     ransacReprojThreshold=5.0,  # 容忍投影誤差 (pixel)
#     confidence=0.999
# )

# # 取 inliers
# inlier_proj = filtered_origins[mask.ravel() == 1]
# inlier_match = refined_points[mask.ravel() == 1]

# print(f"總匹配數: {len(filtered_projected)}, Inliers: {len(inlier_proj)}")


In [183]:
import matplotlib.pyplot as plt

def visualize_track_matrix(tracks, image_list):
    """
    tracks: (N, P, 2)
    image_list: list of image names
    """
    # 建立一個有效性矩陣 (True = 有值, False = NaN)
    valid_mask = ~np.isnan(tracks[:, :, 0])
    coverage = np.sum(valid_mask) / valid_mask.size * 100

    plt.figure(figsize=(14, 6))
    plt.imshow(valid_mask, cmap='Greens', interpolation='nearest', aspect='auto')
    plt.title(f"Track Filling Progress — {coverage:.2f}% filled")
    plt.xlabel("3D Point ID")
    plt.ylabel("Image Index")
    plt.yticks(np.arange(len(image_list)), image_list, fontsize=8)
    plt.colorbar(label="Has Match (1=True, 0=False)")
    plt.tight_layout()
    plt.show()

In [None]:
import numpy as np
import cv2

P = len(point_list)
N = len(image_list)
print(f"Total 3D points: {P}, Total images: {N}")
# 初始化 tracks [N張圖片, P個3D點, (x, y)]
tracks = np.full((N, P, 2), np.nan, dtype=np.float32)
colors = np.full((P, 3), np.nan, dtype=np.float32)  # 每個 3D 點的顏色 (R,G,B)

# === 主迴圈：遍歷每個 keyframe ===
for key_frame_idx, key_frame_name in enumerate(image_list):

    # 取得該 keyframe 的對應關係 (2D-3D pair)
    pair = pairs[key_frame_idx]
    point_ids = [pid for (_, pid) in pair]
    # print(max(point_ids))
    # 取出對應的 3D 座標
    pts3d = np.array([point_list[pid] for pid in point_ids if pid in point_list], dtype=np.float32)

    # 投影到所有影像
    projected_pts = projectpoints_cal(image_list, pts3d, extrinsic, intrinsics, visualize=False)

    # keyframe 自己的特徵
    img1_path = f"output/images/{key_frame_name}"
    mask1_path = f"output/images_mask/{key_frame_name}"
    img1 = cv2.imread(img1_path, cv2.IMREAD_GRAYSCALE)
    mask1 = cv2.imread(mask1_path, cv2.IMREAD_GRAYSCALE)
    kp1, des1 = sift.detectAndCompute(img1, mask1)

    origin_features = projected_pts[key_frame_name][:, :2]
    indices = point_ids  # 與 pts3d 對應的全域點 ID

    # === 跟其他影像比對 ===
    for i, image_name in enumerate(image_list):
        img2_path = f"output/images/{image_name}"
        mask2_path = f"output/images_mask/{image_name}"
        img2 = cv2.imread(img2_path, cv2.IMREAD_GRAYSCALE)
        mask2 = cv2.imread(mask2_path, cv2.IMREAD_GRAYSCALE)
        kp2, des2 = sift.detectAndCompute(img2, mask2)

        projected_features = projected_pts[image_name][:, :2]

        refined_points, matched_indices = refine_and_visualize(
            origin_features, projected_features,
            kp1, des1, kp2, des2, indices,
            img2, search_radius=15, topN=1, visualize=False
        )

        # print(f"{image_name}: {len(matched_indices)} matches")

        # 濾除 NaN (無效點)
        valid_mask = ~np.isnan(refined_points[:, 0]) & ~np.isnan(refined_points[:, 1])
        if np.sum(valid_mask) < 8:
            print(f"⚠️ 有效對應點不足 ({np.sum(valid_mask)} < 8)，跳過 F 計算")
            continue

        # === RANSAC 求 Fundamental matrix ===
        F, mask = cv2.findFundamentalMat(
            origin_features[valid_mask],
            refined_points[valid_mask],
            cv2.FM_RANSAC,
            ransacReprojThreshold=0.5,
            confidence=0.99
        )

        if F is None or mask is None:
            print(f"⚠️ {image_name}: F estimation failed")
            continue

        # 取 inliers
        inlier_mask = mask.ravel() == 1
        inlier_proj = origin_features[valid_mask][inlier_mask]
        inlier_match = refined_points[valid_mask][inlier_mask]
        inlier_track = np.array(matched_indices)[valid_mask][inlier_mask]

        # print(f"總匹配數: {len(valid_mask)}, Inliers: {len(inlier_track)}")

        # === 更新 tracks ===
        new_count = 0
        for j, pid in enumerate(inlier_track):
            if np.isnan(tracks[i, pid-2]).all():
                tracks[i, pid-2] = inlier_match[j]
                new_count += 1
                x, y = map(int, inlier_match[j])
                if 0 <= x < img2.shape[1] and 0 <= y < img2.shape[0]:
                    bgr = img2[y, x] if len(img2.shape) == 3 else [img2[y, x]] * 3
                    colors[pid-2] = np.array(bgr[::-1], dtype=np.float32)
        # visualize_track_matrix(tracks, image_list)


Total 3D points: 34116, Total images: 28
[[[np.float64(205.30935287475586), np.float64(714.631462097168)], np.int64(1)], [[np.float64(229.76417541503906), np.float64(437.5828456878662)], np.int64(2)], [[np.float64(314.692440032959), np.float64(497.1657943725586)], np.int64(4)], [[np.float64(218.3228874206543), np.float64(724.9718856811523)], np.int64(5)], [[np.float64(310.8298587799072), np.float64(488.3269500732422)], np.int64(6)], [[np.float64(243.5698413848877), np.float64(684.1219139099121)], np.int64(8)], [[np.float64(222.76674270629883), np.float64(726.8928909301758)], np.int64(9)], [[np.float64(276.7690086364746), np.float64(719.846076965332)], np.int64(11)], [[np.float64(255.3596305847168), np.float64(871.4274787902832)], np.int64(14)], [[np.float64(255.82529067993164), np.float64(714.8785972595215)], np.int64(17)], [[np.float64(246.32683753967285), np.float64(673.8870048522949)], np.int64(19)], [[np.float64(312.9979419708252), np.float64(423.0770015716553)], np.int64(20)], [[n

KeyboardInterrupt: 

In [None]:
def apply_distortion(extra_params, u, v):
    """
    Applies radial or OpenCV distortion to the given 2D points.

    Args:
        extra_params (torch.Tensor or numpy.ndarray): Distortion parameters of shape BxN, where N can be 1, 2, or 4.
        u (torch.Tensor or numpy.ndarray): Normalized x coordinates of shape Bxnum_tracks.
        v (torch.Tensor or numpy.ndarray): Normalized y coordinates of shape Bxnum_tracks.

    Returns:
        points2D (torch.Tensor): Distorted 2D points of shape BxNx2.
    """
    extra_params = _ensure_torch(extra_params)
    u = _ensure_torch(u)
    v = _ensure_torch(v)

    num_params = extra_params.shape[1]

    if num_params == 1:
        # Simple radial distortion
        k = extra_params[:, 0]
        u2 = u * u
        v2 = v * v
        r2 = u2 + v2
        radial = k[:, None] * r2
        du = u * radial
        dv = v * radial

    elif num_params == 2:
        # RadialCameraModel distortion
        k1, k2 = extra_params[:, 0], extra_params[:, 1]
        u2 = u * u
        v2 = v * v
        r2 = u2 + v2
        radial = k1[:, None] * r2 + k2[:, None] * r2 * r2
        du = u * radial
        dv = v * radial

    elif num_params == 4:
        # OpenCVCameraModel distortion
        k1, k2, p1, p2 = (extra_params[:, 0], extra_params[:, 1], extra_params[:, 2], extra_params[:, 3])
        u2 = u * u
        v2 = v * v
        uv = u * v
        r2 = u2 + v2
        radial = k1[:, None] * r2 + k2[:, None] * r2 * r2
        du = u * radial + 2 * p1[:, None] * uv + p2[:, None] * (r2 + 2 * u2)
        dv = v * radial + 2 * p2[:, None] * uv + p1[:, None] * (r2 + 2 * v2)
    else:
        raise ValueError("Unsupported number of distortion parameters")

    u = u.clone() + du
    v = v.clone() + dv

    return u, v

def img_from_cam_np(
    intrinsics: np.ndarray, points_cam: np.ndarray, extra_params: np.ndarray | None = None, default: float = 0.0
) -> np.ndarray:
    """
    Apply intrinsics (and optional radial distortion) to camera-space points.

    Args
    ----
    intrinsics  : (B,3,3) camera matrix K.
    points_cam  : (B,3,N) homogeneous camera coords  (x, y, z)ᵀ.
    extra_params: (B, N) or (B, k) distortion params (k = 1,2,4) or None.
    default     : value used for np.nan replacement.

    Returns
    -------
    points2D : (B,N,2) pixel coordinates.
    """
    # 1. perspective divide  ───────────────────────────────────────
    z = points_cam[:, 2:3, :]  # (B,1,N)
    points_cam_norm = points_cam / z  # (B,3,N)
    uv = points_cam_norm[:, :2, :]  # (B,2,N)

    # 2. optional distortion ──────────────────────────────────────
    if extra_params is not None:
        uu, vv = apply_distortion(extra_params, uv[:, 0], uv[:, 1])
        uv = np.stack([uu, vv], axis=1)  # (B,2,N)

    # 3. homogeneous coords then K multiplication ─────────────────
    ones = np.ones_like(uv[:, :1, :])  # (B,1,N)
    points_cam_h = np.concatenate([uv, ones], axis=1)  # (B,3,N)

    # batched mat-mul: K · [u v 1]ᵀ
    points2D_h = np.einsum("bij,bjk->bik", intrinsics, points_cam_h)  # (B,3,N)
    points2D = np.nan_to_num(points2D_h[:, :2, :], nan=default)  # (B,2,N)

    return points2D.transpose(0, 2, 1)  # (B,N,2)

In [None]:
def project_3D_points_np(
    points3D: np.ndarray,
    extrinsics: np.ndarray,
    intrinsics: np.ndarray | None = None,
    extra_params: np.ndarray | None = None,
    *,
    default: float = 0.0,
    only_points_cam: bool = False,
):
    """
    NumPy clone of ``project_3D_points``.

    Parameters
    ----------
    points3D          : (N,3) world-space points.
    extrinsics        : (B,3,4)  [R|t] matrix for each of B cameras.
    intrinsics        : (B,3,3)  K matrix (optional if you only need cam-space).
    extra_params      : (B,k) or (B,N) distortion parameters (k ∈ {1,2,4}) or None.
    default           : value used to replace NaNs.
    only_points_cam   : if True, skip the projection and return points_cam with points2D as None.

    Returns
    -------
    (points2D, points_cam) : A tuple where points2D is (B,N,2) pixel coords or None if only_points_cam=True,
                           and points_cam is (B,3,N) camera-space coordinates.
    """
    # ----- 0. prep sizes -----------------------------------------------------
    N = points3D.shape[0]  # #points
    B = extrinsics.shape[0]  # #cameras

    # ----- 1. world → homogeneous -------------------------------------------
    w_h = np.ones((N, 1), dtype=points3D.dtype)
    points3D_h = np.concatenate([points3D, w_h], axis=1)  # (N,4)

    # broadcast to every camera (no actual copying with np.broadcast_to) ------
    points3D_h_B = np.broadcast_to(points3D_h, (B, N, 4))  # (B,N,4)

    # ----- 2. apply extrinsics  (camera frame) ------------------------------
    # X_cam = E · X_hom
    # einsum:  E_(b i j)  ·  X_(b n j)  →  (b n i)
    points_cam = np.einsum("bij,bnj->bni", extrinsics, points3D_h_B)  # (B,N,3)
    points_cam = points_cam.transpose(0, 2, 1)  # (B,3,N)

    if only_points_cam:
        return None, points_cam

    # ----- 3. intrinsics + distortion ---------------------------------------
    if intrinsics is None:
        raise ValueError("`intrinsics` must be provided unless only_points_cam=True")

    points2D = img_from_cam_np(intrinsics, points_cam, extra_params=extra_params, default=default)

    return points2D, points_cam


def _build_pycolmap_intri(fidx, intrinsics, camera_type, extra_params=None):
    """
    Helper function to get camera parameters based on camera type.

    Args:
        fidx: Frame index
        intrinsics: Camera intrinsic parameters
        camera_type: Type of camera model
        extra_params: Additional parameters for certain camera types

    Returns:
        pycolmap_intri: NumPy array of camera parameters
    """
    if camera_type == "PINHOLE":
        pycolmap_intri = np.array(
            [intrinsics[fidx][0, 0], intrinsics[fidx][1, 1], intrinsics[fidx][0, 2], intrinsics[fidx][1, 2]]
        )
    elif camera_type == "SIMPLE_PINHOLE":
        focal = (intrinsics[fidx][0, 0] + intrinsics[fidx][1, 1]) / 2
        pycolmap_intri = np.array([focal, intrinsics[fidx][0, 2], intrinsics[fidx][1, 2]])
    elif camera_type == "SIMPLE_RADIAL":
        raise NotImplementedError("SIMPLE_RADIAL is not supported yet")
        focal = (intrinsics[fidx][0, 0] + intrinsics[fidx][1, 1]) / 2
        pycolmap_intri = np.array([focal, intrinsics[fidx][0, 2], intrinsics[fidx][1, 2], extra_params[fidx][0]])
    else:
        raise ValueError(f"Camera type {camera_type} is not supported yet")

    return pycolmap_intri


In [None]:
def batch_np_matrix_to_pycolmap(
    points3d,
    extrinsics,
    intrinsics,
    tracks,
    image_size,
    masks=None,
    max_reproj_error=None,
    max_points3D_val=3000,
    shared_camera=False,
    camera_type="SIMPLE_PINHOLE",
    extra_params=None,
    min_inlier_per_frame=30,
    image_list=None,
    points_rgb=None,
):
    """
    Convert Batched NumPy Arrays to PyCOLMAP

    Check https://github.com/colmap/pycolmap for more details about its format

    NOTE that colmap expects images/cameras/points3D to be 1-indexed
    so there is a +1 offset between colmap index and batch index


    NOTE: different from VGGSfM, this function:
    1. Use np instead of torch
    2. Frame index and camera id starts from 1 rather than 0 (to fit the format of PyCOLMAP)
    """
    # points3d: Px3
    # extrinsics: Nx3x4
    # intrinsics: Nx3x3
    # tracks: NxPx2
    # masks: NxP
    # image_size: 2, assume all the frames have been padded to the same size
    # where N is the number of frames and P is the number of tracks

    N, P, _ = tracks.shape
    assert len(extrinsics) == N
    assert len(intrinsics) == N
    assert len(points3d) == P
    assert image_size.shape[0] == 2

    reproj_mask = None

    if max_reproj_error is not None:
        projected_points_2d, projected_points_cam = project_3D_points_np(points3d, extrinsics, intrinsics)
        projected_diff = np.linalg.norm(projected_points_2d - tracks, axis=-1)
        projected_points_2d[projected_points_cam[:, -1] <= 0] = 1e6
        reproj_mask = projected_diff < max_reproj_error

    if masks is not None and reproj_mask is not None:
        masks = np.logical_and(masks, reproj_mask)
    elif masks is not None:
        masks = masks
    else:
        masks = reproj_mask

    assert masks is not None

    inliers_per_frame = masks.sum(1)
    print(f"[DEBUG] Final inliers per frame: {inliers_per_frame}")
    print(f"[DEBUG] Min inliers per frame: {inliers_per_frame.min()}, Max: {inliers_per_frame.max()}")

    if inliers_per_frame.min() < min_inlier_per_frame:
        print(f"[WARNING] Some frames have < {min_inlier_per_frame} inliers, skip BA.")
        return None, None


    # Reconstruction object, following the format of PyCOLMAP/COLMAP
    reconstruction = pycolmap.Reconstruction()

    inlier_num = masks.sum(0)
    valid_mask = inlier_num >= 2  # a track is invalid if without two inliers
    valid_idx = np.nonzero(valid_mask)[0]

    # Only add 3D points that have sufficient 2D points
    for vidx in valid_idx:
        pt = np.asarray(points3d[vidx], dtype=np.float64).reshape(3)
        rgb = np.asarray(points_rgb[vidx], dtype=np.uint8).reshape(3)
        pt = np.ascontiguousarray(pt)
        rgb = np.ascontiguousarray(rgb)
        track = pycolmap.Track()
        reconstruction.add_point3D(pt, track, rgb)


    num_points3D = len(valid_idx)
    camera = None
    # frame idx
    for fidx in range(N):
        # set camera
        if camera is None or (not shared_camera):
            pycolmap_intri = _build_pycolmap_intri(fidx, intrinsics, camera_type, extra_params)

            camera = pycolmap.Camera(
                model=camera_type, width=image_size[0], height=image_size[1], params=pycolmap_intri, camera_id=fidx + 1
            )

            # add camera
            reconstruction.add_camera(camera)

        # set image
        cam_from_world = pycolmap.Rigid3d(
            pycolmap.Rotation3d(extrinsics[fidx][:3, :3]), extrinsics[fidx][:3, 3]
        )  # Rot and Trans

        image = pycolmap.Image(
            id=fidx + 1, name=image_list[fidx], camera_id=camera.camera_id, cam_from_world=cam_from_world
        )

        points2D_list = []

        point2D_idx = 0

        # NOTE point3D_id start by 1
        for point3D_id in range(1, num_points3D + 1):
            original_track_idx = valid_idx[point3D_id - 1]

            if (reconstruction.points3D[point3D_id].xyz < max_points3D_val).all():
                if masks[fidx][original_track_idx]:
                    # It seems we don't need +0.5 for BA
                    point2D_xy = tracks[fidx][original_track_idx]
                    # Please note when adding the Point2D object
                    # It not only requires the 2D xy location, but also the id to 3D point
                    points2D_list.append(pycolmap.Point2D(point2D_xy, point3D_id))

                    # add element
                    track = reconstruction.points3D[point3D_id].track
                    track.add_element(fidx + 1, point2D_idx)
                    point2D_idx += 1

        assert point2D_idx == len(points2D_list)

        try:
            image.points2D = pycolmap.ListPoint2D(points2D_list)
            image.registered = True
        except:
            print(f"frame {fidx + 1} is out of BA")
            image.registered = False

        # add image
        reconstruction.add_image(image)

    return reconstruction, valid_mask



In [None]:
key_frame_idx = 0
key_frame_name = image_list[key_frame_idx]  # e.g., "0001.jpg"

# 讀 keyframe 的 RGB 圖片
img_path = f"output/images/{key_frame_name}"
key_img = cv2.imread(img_path, cv2.IMREAD_COLOR_RGB)
if key_img is None:
    raise FileNotFoundError(f"Image not found: {img_path}")

# 取得顏色
_, points_rgb_bgr = project_and_visualize_points(
    name=key_frame_name,
    target_frame_idx=key_frame_idx,
    extrinsic=extrinsic,
    intrinsics=intrinsics,
    pts3d=pts3d,
    image_shape=key_img.shape[:2],
    visualize=False,
    color_image=key_img  # 傳入 RGB 圖片
)

# 注意：轉換為 RGB（如果不是的話）
points_rgb = points_rgb_bgr  # 已經在 function 中轉成 RGB
image_size = np.array(img1.shape[::-1])  # (W, H)
extrinsics_array = np.stack([np.hstack((R, t.reshape(-1, 1))) for (R, t) in extrinsic])
intrinsics_array = np.stack(intrinsics)



In [None]:
# print(intrinsics)

In [None]:
print(point_list)

{20264: [np.float64(-0.04595829657452879), np.float64(0.09699112427718187), np.float64(0.34570307115324833)], 20263: [np.float64(-0.03592680232333226), np.float64(0.09533781138099552), np.float64(0.33815108798055893)], 20262: [np.float64(-0.00459388663533238), np.float64(-0.0532299773107558), np.float64(0.36021556781066544)], 20261: [np.float64(-0.059457174146674015), np.float64(0.15320468401477658), np.float64(0.35305580459016483)], 20260: [np.float64(-0.04383735294389313), np.float64(0.10531731973288162), np.float64(0.3416888657726338)], 20259: [np.float64(-0.010296038079397342), np.float64(0.11128815618580569), np.float64(0.3317281655450847)], 20258: [np.float64(-0.06548555616032135), np.float64(0.11790069552940226), np.float64(0.39045566882179616)], 20257: [np.float64(-0.034787551556119574), np.float64(-0.021296232218379188), np.float64(0.3196411983621306)], 20256: [np.float64(-0.03236645044261528), np.float64(0.080516273349206), np.float64(0.33740766900635266)], 20255: [np.float64

In [None]:
sorted_items = sorted(point_list.items())  # [(key, value), ...]
keys = np.array(sorted(point_list.keys()))
all_points = np.array([point_list[k] for k in keys])
print(len(all_points), tracks.shape)

34116 (28, 34116, 2)


In [None]:
import pycolmap

reconstruction, valid_track_mask = batch_np_matrix_to_pycolmap(
    all_points,
    extrinsics_array,
    intrinsics_array,
    tracks,
    image_size,
    masks=None,
    max_reproj_error=25,
    shared_camera=False,
    camera_type="SIMPLE_PINHOLE",
    image_list=image_list,
    points_rgb=colors,
)

if reconstruction is None:
    raise ValueError("No reconstruction can be built with BA")

print("fininshed building pycolmap reconstruction")
# # Bundle Adjustment
# ba_options = pycolmap.BundleAdjustmentOptions()
# pycolmap.bundle_adjustment(reconstruction, ba_options)

[DEBUG] Final inliers per frame: [3275 3278 2930 2921 3339 3156 3438 3219 3351 3380 3215 3294 4567 5047
 4158 5672 4956 4576 5184 4260 5092 5531 5418 4874 4748 5617 5387 4341]
[DEBUG] Min inliers per frame: 2921, Max: 5672
fininshed building pycolmap reconstruction


In [None]:
ba_options = pycolmap.BundleAdjustmentOptions()
pycolmap.bundle_adjustment(reconstruction, ba_options)

I20251016 10:12:30.241936 133972206380864 misc.cc:198] 
Global bundle adjustment
I20251016 10:12:33.010358 133972206380864 bundle_adjustment.cc:866] 
Bundle adjustment report:
    Residuals : 232392
   Parameters : 40806
   Iterations : 23
         Time : 2.64502 [s]
 Initial cost : 4.48819 [px]
   Final cost : 3.75385 [px]
  Termination : Convergence

I20251016 10:12:33.010385 133972206380864 timer.cc:91] Elapsed time: 0.046 [minutes]


In [None]:
output_dir = "results/sparse/0"
reconstruction.write(output_dir)
