## 导入所需的库文件

In [None]:
import os
import time
import numpy as np
import cv2
from typing import List, Dict, Any,Tuple
from scipy.spatial.transform import Rotation as R_scipy
import open3d as o3d
from contextlib import redirect_stdout
from feature_extraction import load_or_extract_features
from feature_matching import load_or_match_image_pairs, filter_matches_by_homography, filter_two_view_geometry
from initial_recon import print_pose_info, pick_initial_pair
from pnp_recon import pnp_pose, visualize_with_open3d, triangulate_points
from bundle_adjustment import BundleAdjustment,TorchBundleAdjustment
from incremental_sfm import IncrementalSfM,FastIncrementalSfM

In [6]:
# User parameters: adjust these paths before running
IMAGE_FOLDER = r"D:\lyz\Python\CV\AI4701\final project\images"
INTRINSICS_FILE = r"D:\lyz\Python\CV\AI4701\final project\camera_intrinsic.txt"
OUTPUT_DIR = r"D:\lyz\Python\CV\AI4701\final project\output"
SAVE_FEATURES = False
LOAD_FEATURES = True
VERBOSE = False



## Cell 2: Load Images and Intrinsics

In [None]:
# %%

def load_images(image_folder: str) -> List[str]:
    paths = []
    for root, dirs, files in os.walk(image_folder):
        # 1. 只保留图片文件
        image_files = [
            f for f in files
            if f.lower().endswith(('.png', '.jpg', '.jpeg'))
        ]
        # 2. 按文件名逆序排序（"163225" > "163224"）
        image_files.sort(reverse=True)
        # 3. 拼接完整路径
        for f in image_files:
            paths.append(os.path.join(root, f))
    return paths


def load_intrinsics(k_path: str) -> np.ndarray:
    if not os.path.exists(k_path):
        raise FileNotFoundError(f"相机内参文件未找到：{k_path}")
    K = np.loadtxt(k_path, dtype=np.float32)
    if K.shape != (3, 3):
        raise ValueError(f"相机内参形状错误，应为3x3，但得到: {K.shape}")
    return K

# Load at module level
image_paths = load_images(IMAGE_FOLDER)
K = load_intrinsics(INTRINSICS_FILE)



## Cell 3: Feature Extraction and Matching

In [8]:
# %%

def run_feature_pipeline(paths: List[str], K: np.ndarray, output_dir: str,
                         save: bool, load: bool, verbose: bool=True):
    def _run_quiet(func, *args, **kwargs):
        if verbose:
            return func(*args, **kwargs)
        with open(os.devnull, 'w') as devnull:
            with redirect_stdout(devnull):
                return func(*args, **kwargs)

    print("Extracting features...")
    features = _run_quiet(
        load_or_extract_features,
        paths, method='sift', save=save, load=load, output_dir=output_dir
    )

    print("Matching features...")
    raw_matches = _run_quiet(
        load_or_match_image_pairs,
        features, method='sift', save=save, load=load, output_dir=output_dir
    )

    # RANSAC filtering
    n = len(features)
    al_matches = [[[] for _ in range(n)] for __ in range(n)]
    for i in range(n):
        for j in range(n):
            if i == j: continue
            kp1, _ = features[i]
            kp2, _ = features[j]
            m = raw_matches[i][j]
            if len(m) < 4:
                continue
            inliers, _ = _run_quiet(filter_two_view_geometry, kp1, kp2, m, K)
            al_matches[i][j] = inliers

    # Flatten into pairs
    matches_list = []
    for i in range(n):
        for j in range(i+1, n):
            if len(al_matches[i][j]) >= 4:
                matches_list.append((i, j, al_matches[i][j]))

    return features, al_matches, matches_list

# Execute
features, al_matches, matches_list = run_feature_pipeline(
    image_paths, K, OUTPUT_DIR, SAVE_FEATURES, LOAD_FEATURES, VERBOSE
)



Extracting features...
Matching features...



## Cell 4: Initial Pose Estimation

In [9]:

def initialize_reconstruction(K: np.ndarray,
                              al_matches: List[List[List[cv2.DMatch]]],
                              features: List[Tuple[List[cv2.KeyPoint], np.ndarray]]):
    print("Estimating initial pose...")
    keypoints = [kp for kp, _ in features]
    i, j, R, t, inliers = pick_initial_pair(K, al_matches, keypoints=keypoints)
    print(f"Initial pair: {i}, {j}")
    print_pose_info(R, t, len(inliers), len(inliers))
    return i, j, R, t, inliers

init_i, init_j, R0, t0, init_inliers = initialize_reconstruction(K, al_matches, features)

Estimating initial pose...
Initial pair: 16, 38
SCENE INITIALIZATION RESULTS

1. Feature Matching:
   - Total matches: 2257
   - Inlier matches: 2257
   - Inlier ratio: 1.000

2. Rotation Matrix (R):
[[ 9.99907392e-01  2.48507082e-03 -1.33802883e-02]
 [-9.01863977e-04  9.93125232e-01  1.17053238e-01]
 [ 1.35791875e-02 -1.17030331e-01  9.93035502e-01]]

3. Translation Vector (t):
[-0.06339221 -0.98179335 -0.17906214]

4. Rotation as Rodrigues vector: [-0.11731431 -0.01351113 -0.00169741]
5. Rotation in degrees: [-6.72161498 -0.77413044 -0.09725446]
6. Translation distance (normalized): 1.0000
7. Total rotation angle: 6.77 degrees


## Cell 5: Incremental SfM Reconstruction

In [10]:
sfm = IncrementalSfM(K, optimize_intrinsics=False)
sfm.features = {idx: feat for idx, feat in enumerate(features)}
sfm.matches = {(i,j): m for i,j,m in matches_list}

print("Initializing SfM...")
succ = sfm.initialize_reconstruction((init_i, init_j))
if not succ:
    raise RuntimeError("初始化失败，请检查初始对或匹配质量。")

print("Running incremental reconstruction...")
succ = sfm.reconstruct_incremental(sfm.features, sfm.matches, (init_i, init_j))
if not succ:
    raise RuntimeError("增量重建中断。")

# Collect results
poses = []
for idx, cam in sfm.cameras.items():
    if idx in sfm.registered_cams:
        poses.append({'R': cam.R, 't': cam.t})
points_3d = np.vstack([pt.xyz for pt in sfm.points_3d.values()])
point_observations = {pid: pt.observations for pid, pt in sfm.points_3d.items()}

Initializing SfM...
Running incremental reconstruction...
初始化成功，注册了相机 16 和 38
初始3D点数量: 4382

第 1 次迭代：尝试注册图像 18
成功注册图像 18，内点数: 1542
新增3D点: 11，总点数: 4393

第 2 次迭代：尝试注册图像 19
成功注册图像 19，内点数: 1591
新增3D点: 10，总点数: 4403

第 3 次迭代：尝试注册图像 37
成功注册图像 37，内点数: 1434
新增3D点: 1069，总点数: 5472

第 4 次迭代：尝试注册图像 35
成功注册图像 35，内点数: 1448
新增3D点: 229，总点数: 5701

第 5 次迭代：尝试注册图像 36
成功注册图像 36，内点数: 1696
新增3D点: 13，总点数: 5714
清理了 4376 个质量差的3D点

第 6 次迭代：尝试注册图像 21
成功注册图像 21，内点数: 714
新增3D点: 325，总点数: 1663

第 7 次迭代：尝试注册图像 15
图像 15 PnP失败

第 8 次迭代：尝试注册图像 22
成功注册图像 22，内点数: 954
新增3D点: 2537，总点数: 4200

第 9 次迭代：尝试注册图像 39
成功注册图像 39，内点数: 3574
新增3D点: 1272，总点数: 5472

第 10 次迭代：尝试注册图像 14
图像 14 PnP失败

第 11 次迭代：尝试注册图像 17
成功注册图像 17，内点数: 58
新增3D点: 570，总点数: 6042

第 12 次迭代：尝试注册图像 0
图像 0 PnP失败

第 13 次迭代：尝试注册图像 20
成功注册图像 20，内点数: 1840
新增3D点: 0，总点数: 6042

第 14 次迭代：尝试注册图像 53
成功注册图像 53，内点数: 3596
新增3D点: 5199，总点数: 11241

第 15 次迭代：尝试注册图像 23
成功注册图像 23，内点数: 4342
新增3D点: 3664，总点数: 14905

第 16 次迭代：尝试注册图像 34
成功注册图像 34，内点数: 5279
新增3D点: 1828，总点数: 16733

第 17 次迭代：尝试

## Cell 6: Recentering and Camera Matrices

In [12]:
def recenter_poses_and_points(
    poses: List[Dict[str, Any]],
    points_3d: np.ndarray,
    world_filename: str,
    paths: List[str]
) -> Tuple[List[Dict[str, np.ndarray]], np.ndarray]:
    """
    对相机位姿和三维点做“重心化”处理：
    - world_filename 对应的那台相机 将成为新的原点且朝向不变
    - 其它相机与三维点 都按 R_w*(X - t_w) 变换到新坐标系

    Args:
        poses: List of {'R': (3,3), 't': (3,)}，原始世界坐标系下的相机外参
        points_3d: (N_pts,3) ndarray，原始世界坐标系下的三维点
        world_filename: 要作为新世界原点的那张图的文件名（例如 image_paths[0] 的 basename）
        paths:     List of 对应的所有图片完整路径

    Returns:
        new_poses: List of {'R': (3,3), 't': (3,)}，重心化后的相机外参
        new_points: (N_pts,3) ndarray，重心化后的三维点坐标
    """
    # 找到 world 相机在 poses/paths 中的索引
    idx_world = next(
        i for i, p in enumerate(paths)
        if os.path.basename(p) == world_filename
    )
    R_w = poses[idx_world]['R']
    t_w = poses[idx_world]['t'].reshape(3,)

    # 1) 重心化相机位姿
    new_poses = []
    for pose in poses:
        R_k = pose['R']
        t_k = pose['t'].reshape(3,)

        # 新的旋转、平移
        Rk_new = R_k @ R_w.T
        tk_new = t_k - (Rk_new @ t_w)

        new_poses.append({'R': Rk_new, 't': tk_new})

    # 2) 重心化三维点：X' = R_w @ (X - t_w)
    #    points_3d: (N_pts,3) → (3, N_pts) → 变换 → (N_pts,3)
    shifted = points_3d - t_w[None, :]        # (N_pts,3)
    new_points = (R_w @ shifted.T).T         # (N_pts,3)

    return new_poses, new_points


sorted_pids = sorted(points_3d.keys())
points = np.vstack([points_3d[pid].reshape(1,3) for pid in sorted_pids])
world_filename = "DJI_20200223_163225_243.jpg"
# 2. 把 camera_poses 转成和 image_paths 一一对应的列表
poses_list = []
valid_paths = []
for idx, path in enumerate(image_paths):
    cam = poses.get(idx)
    if cam is None:
        continue
    R = cam['R']
    t = cam['t'].reshape(3,)    # 从 (3,1) → (3,)
    poses_list.append({'R': R, 't': t})
    valid_paths.append(path)

# 3. 调用 recenter_poses_and_points
new_poses_list, new_points = recenter_poses_and_points(
    poses_list,
    points,
    world_filename,
    valid_paths
)

# 4. 把 new_poses_list 再转回和 camera_poses 格式一致的 dict
new_camera_poses = {}
for idx, pose in enumerate(new_poses_list):
    new_camera_poses[idx] = {
        'R': pose['R'],
        't': pose['t'].reshape(3,1)   # 恢复成 (3,1)
    }

# 5. 构造保存用的 cams 列表
from scipy.spatial.transform import Rotation as R_scipy
cams = []
for idx in sorted(new_camera_poses.keys()):
    cam = new_camera_poses[idx]
    T = np.eye(4, dtype=np.float32)
    T[:3,:3] = cam['R']
    T[:3, 3] = cam['t'].reshape(3,)
    cams.append(T)

# new_poses 是 [{'R':..., 't':...}, …]
# new_points 形状 (N_pts,3)


AttributeError: 'numpy.ndarray' object has no attribute 'keys'

## Cell 7: Bundle Adjustment

In [None]:

def run_bundle_adjustment(K: np.ndarray,
                          cameras: np.ndarray,
                          points: np.ndarray,
                          observations: List[Tuple[float,float]],
                          pt_idxs: List[int],
                          cam_idxs: List[int]):
    ba = TorchBundleAdjustment(camera_intrinsics=K, verbose=True,init_cameras=cameras, init_points=points)
    opt_cams, opt_pts, info = ba.optimize(
        cameras=cameras,
        points_3d=points,
        observations=observations,
        point_indices=pt_idxs,
        camera_indices=cam_idxs
    )
    return opt_cams, opt_pts, info

# Prepare BA inputs
obs, pidxs, cidxs = [], [], []
for pid, cams in point_observations.items():
    for cid, xy in cams.items():
        obs.append(xy)
        pidxs.append(pid)
        cidxs.append(cid)

for cam in poses:
    T = np.eye(4)
    T[:3,:3] = cam['R']
    T[:3,3] = cam['t'].reshape(3,)
    camera_matrices.append(T)
camera_matrices = np.array(camera_matrices, dtype=np.float32)
points_3d = np.array(points_3d, dtype=np.float32)

opt_cams, opt_pts, ba_info = run_bundle_adjustment(
    K, camera_matrices, points_3d, obs, pidxs, cidxs
)
print(ba_info)


## Cell 8: Save Results

In [11]:

def save_outputs(points: np.ndarray, cameras: List[np.ndarray], output_dir: str):
    os.makedirs(output_dir, exist_ok=True)
    ply = os.path.join(output_dir, 'optimized_point_cloud.ply')
    with open(ply, 'w') as f:
        f.write('ply\nformat ascii 1.0\n')
        f.write(f'element vertex {points.shape[0]}\n')
        f.write('property float x\nproperty float y\nproperty float z\nend_header\n')
        for p in points:
            f.write(f"{p[0]} {p[1]} {p[2]}\n")
    traj = os.path.join(output_dir, 'optimized_camera_trajectory.txt')
    with open(traj, 'w') as f:
        f.write('# CamIdx tx ty tz qx qy qz qw\n')
        for i, T in enumerate(cameras):
            pos = T[:3,3]
            quat = R_scipy.from_matrix(T[:3,:3]).as_quat()
            f.write(f"{i} {pos[0]} {pos[1]} {pos[2]} {quat[0]} {quat[1]} {quat[2]} {quat[3]}\n")
    print(f"Saved PLY: {ply}\nSaved trajectory: {traj}")

cams = []
for i, cam in enumerate(poses):
    T = np.eye(4)
    T[:3,:3] = cam['R']
    T[:3,3] = cam['t'].reshape(3,)
    cams.append(T)


save_outputs(points_3d, cams, OUTPUT_DIR)

# %%
# Cell 9: Visualization
pcd = o3d.io.read_point_cloud(os.path.join(OUTPUT_DIR, 'optimized_point_cloud.ply'))
points = np.asarray(pcd.points)
poses_vis = []
with open(os.path.join(OUTPUT_DIR, 'optimized_camera_trajectory.txt'), 'r') as f:
    for line in f:
        if line.startswith('#'): continue
        parts = line.split()
        _, tx, ty, tz, qx, qy, qz, qw = parts
        Rm = R_scipy.from_quat([float(qx),float(qy),float(qz),float(qw)]).as_matrix()
        poses_vis.append({'R': Rm, 't': np.array([float(tx),float(ty),float(tz)]).reshape(3,1)})

print(f"Loaded {len(points)} points and {len(poses_vis)} cameras.")
visualize_with_open3d(points, poses_vis)


Saved PLY: D:\lyz\Python\CV\AI4701\final project\output\optimized_point_cloud.ply
Saved trajectory: D:\lyz\Python\CV\AI4701\final project\output\optimized_camera_trajectory.txt
Loaded 299451 points and 46 cameras.
