In [None]:
# Baseline Image Pairing(Vocab tree-based retriever)
import os
import shutil
import sqlite3
from pathlib import Path
from collections import defaultdict
import subprocess
import numpy as np

def match_index(filename, indices):
        try:
            base = os.path.splitext(filename)[0]
            num_part = base.split("_")[-1]  # 예: frame_00001 → 00001
            return int(num_part) in indices
        except:
            return False
            
def pair_id_to_image_ids(pair_id):
    image_id2 = pair_id % 2147483647
    image_id1 = (pair_id - image_id2) / 2147483647
    return image_id1, image_id2

def extract_best_matches(database_path):
    """DB에서 가장 높은 similarity 쌍을 src1:src2 매칭으로 추출"""
    conn = sqlite3.connect(database_path)
    cursor = conn.cursor()

    # image_id ↔ name 매핑
    cursor.execute("SELECT image_id, name FROM images")
    id_to_name = {row[0]: row[1] for row in cursor.fetchall()}

    # matches 테이블에서 best match 추출
    matches = defaultdict(list)
    cursor.execute("SELECT pair_id, rows FROM two_view_geometries WHERE rows > 0")
    for pair_id, score in cursor.fetchall():
        
        image_id1, image_id2 = pair_id_to_image_ids(pair_id)
        name1, name2 = id_to_name[image_id1], id_to_name[image_id2]
        print(name1, name2)
        if name1.startswith("frame_") and name2.startswith("new_frame_"):
            
            matches[name2].append((name1, score))
        elif name2.startswith("frame_") and name1.startswith("new_frame_"):
            print(1)
            matches[name1].append((name2, score))        

    # best match 추출
    best_matches = {}    
    for t2_name, pair_list in matches.items():
        pair_list.sort(key=lambda x: -x[1])  # score 높은 순
        t1_name = pair_list[0][0]
        print(t1_name, t2_name, pair_list[0][1])
        best_matches[t2_name] = t1_name

    conn.close()
    return best_matches

def copy_best_pairs(src_folder_1, src_folder_2, dest_dir, best_match, indices):
    os.makedirs(dest_dir, exist_ok=True)
    log_path = os.path.join(dest_dir, "matched_pairs.txt")

    with open(log_path, "w") as log_file:
        for t2_name, t1_name in best_match.items():
            if not t2_name.startswith("new_"):
                raise ValueError(f"❌ t2_name은 'new_'로 시작해야 합니다: {t2_name}")
            
            if indices is not None and not match_index(t2_name, indices):
                continue  # 인덱스에 포함되지 않으면 건너뜀

            t1_dest = t2_name.replace("new_", "initial_paired_", 1)
            t2_dest = t2_name

            shutil.copy(os.path.join(src_folder_1, t1_name), os.path.join(dest_dir, t1_dest))
            shutil.copy(os.path.join(src_folder_2, t2_name), os.path.join(dest_dir, t2_dest))
            log_file.write(f"{t2_name}, {t1_name} → {t2_dest}, {t1_dest}\n")

    print(f"✅ 선택된 페어 복사 및 기록 완료 → {dest_dir}")

def run_vocab_tree_matching_pipeline(src1, src2, dest_dir, db_path, indices):
    best_matches = extract_best_matches(db_path)
    print(best_matches)
    copy_best_pairs(src1, src2, dest_dir, best_matches, indices)

# 사용 예시
workspace_dir = "/workspace/Laboratory/02.Rapid3DReconstruction/00.workspace/Naju_90m_ChangeDetection_workspace/trial5"

src_folder_1 = os.path.join(workspace_dir, "initial/ns_processed/images")
src_folder_2 = os.path.join(workspace_dir, "new_correspondence/ns_processed/images")
database_path = os.path.join(workspace_dir, "new_correspondence/ns_processed/colmap/database.db")
indices_path = os.path.join(workspace_dir, "new_correspondence/renders/image_indices.npy")

if os.path.exists(indices_path):
    indices = np.load(indices_path)
    dest_folder = os.path.join(workspace_dir, "CD_PAIR/baseline_cd_pair")
    print(f"Loaded {len(indices)} indices from {indices_path}")
else:
    indices = None
    dest_folder = os.path.join(workspace_dir, "CD_PAIR/baseline_cd_pair")
    print(f"No file found at {indices_path}, indices set to None.")

run_vocab_tree_matching_pipeline(src_folder_1, src_folder_2, dest_folder, database_path, indices)


In [None]:
# Baseline Image Pairing(RMSE-based retriever)
import os
import shutil
import sqlite3
from pathlib import Path
from collections import defaultdict
import subprocess
import numpy as np

def match_index(filename, indices):
        try:
            base = os.path.splitext(filename)[0]
            num_part = base.split("_")[-1]  # 예: frame_00001 → 00001
            return int(num_part) in indices
        except:
            return False
            
def pair_id_to_image_ids(pair_id):
    image_id2 = pair_id % 2147483647
    image_id1 = (pair_id - image_id2) / 2147483647
    return image_id1, image_id2

def load_keypoints(conn):
    cursor = conn.cursor()
    cursor.execute("SELECT image_id, data FROM keypoints")
    kp_dict = {}
    for image_id, kp_blob in cursor.fetchall():
        keypoints = np.frombuffer(kp_blob, dtype=np.float32).reshape(-1, 6)[:, :2]
        kp_dict[image_id] = keypoints
    return kp_dict

def extract_best_matches_by_rmse(database_path):
    """vocab tree 기반 매칭 결과에서 RMSE 기준으로 best match 추출"""
    conn = sqlite3.connect(database_path)
    cursor = conn.cursor()

    # image_id ↔ name 매핑
    cursor.execute("SELECT image_id, name FROM images")
    id_to_name = {row[0]: row[1] for row in cursor.fetchall()}
    name_to_id = {v: k for k, v in id_to_name.items()}

    keypoints = load_keypoints(conn)

    matches_by_new_img = defaultdict(list)

    # matches 추출
    cursor.execute("SELECT pair_id, data FROM matches WHERE data IS NOT NULL")
    match_data = {row[0]: np.frombuffer(row[1], dtype=np.uint32).reshape(-1, 2) for row in cursor.fetchall()}

    # geometry (유효한 매칭만)
    cursor.execute("SELECT pair_id, rows FROM two_view_geometries WHERE rows > 0")
    for pair_id, num_matches in cursor.fetchall():
        image_id1, image_id2 = pair_id_to_image_ids(pair_id)
        if image_id1 not in keypoints or image_id2 not in keypoints:
            continue
        name1, name2 = id_to_name[image_id1], id_to_name[image_id2]

        # 매칭 이름 조건 필터
        if name1.startswith("frame_") and name2.startswith("new_frame_"):
            t1_id, t2_id = image_id1, image_id2
            t1_name, t2_name = name1, name2
        elif name2.startswith("frame_") and name1.startswith("new_frame_"):
            t1_id, t2_id = image_id2, image_id1
            t1_name, t2_name = name2, name1
        else:
            continue

        # 매칭 점 추출
        matches = match_data.get(pair_id)
        if matches is None or len(matches) == 0:
            continue

        kp1 = keypoints[t1_id][matches[:, 0]]
        kp2 = keypoints[t2_id][matches[:, 1]]

        rmse = np.sqrt(np.mean(np.sum((kp1 - kp2) ** 2, axis=1)))
        matches_by_new_img[t2_name].append((t1_name, rmse, len(matches)))

    # best match는 RMSE 기준으로 선정
    best_match = {}
    for t2_name, candidates in matches_by_new_img.items():
        candidates.sort(key=lambda x: x[1])  # RMSE 오름차순
        best = candidates[0]
        print(f"[{t2_name}] ← {best[0]} | RMSE: {best[1]:.2f}, matches: {best[2]}")
        best_match[t2_name] = best[0]

    conn.close()
    return best_match

def extract_top_k_matches(database_path, top_k=5):
    """DB에서 상위 k개의 vocab 유사도 후보 추출"""
    conn = sqlite3.connect(database_path)
    cursor = conn.cursor()

    cursor.execute("SELECT image_id, name FROM images")
    id_to_name = {row[0]: row[1] for row in cursor.fetchall()}

    matches = defaultdict(list)
    cursor.execute("SELECT pair_id, rows FROM two_view_geometries WHERE rows > 0")
    for pair_id, score in cursor.fetchall():
        image_id1, image_id2 = pair_id_to_image_ids(pair_id)
        name1, name2 = id_to_name[image_id1], id_to_name[image_id2]
        if name1.startswith("frame_") and name2.startswith("new_frame_"):
            matches[name2].append((name1, score))
        elif name2.startswith("frame_") and name1.startswith("new_frame_"):
            matches[name1].append((name2, score))

    top_matches = {}
    for new_name, pair_list in matches.items():
        pair_list.sort(key=lambda x: -x[1])
        top_matches[new_name] = pair_list[:top_k]

    conn.close()
    return top_matches


def copy_best_pairs(src_folder_1, src_folder_2, dest_dir, best_match, indices):
    os.makedirs(dest_dir, exist_ok=True)
    log_path = os.path.join(dest_dir, "matched_pairs.txt")

    with open(log_path, "w") as log_file:
        for t2_name, t1_name in best_match.items():
            if not t2_name.startswith("new_"):
                raise ValueError(f"❌ t2_name은 'new_'로 시작해야 합니다: {t2_name}")

            if indices is not None and not match_index(t2_name, indices):
                continue  # 인덱스에 포함되지 않으면 건너뜀

            t1_dest = t2_name.replace("new_", "initial_paired_", 1)
            t2_dest = t2_name

            shutil.copy(os.path.join(src_folder_1, t1_name), os.path.join(dest_dir, t1_dest))
            shutil.copy(os.path.join(src_folder_2, t2_name), os.path.join(dest_dir, t2_dest))
            log_file.write(f"{t2_name}, {t1_name} → {t2_dest}, {t1_dest}\n")

    print(f"✅ 선택된 페어 복사 및 기록 완료 → {dest_dir}")

def run_rmse_based_matching_pipeline(src1, src2, dest_dir, db_path, indices):
    best_match = extract_best_matches_by_rmse(db_path)
    copy_best_pairs(src1, src2, dest_dir, best_match, indices)


# 사용 예시
workspace_dir = "/workspace/Laboratory/02.Rapid3DReconstruction/00.workspace/Naju_90m_ChangeDetection_workspace/trial5"
src_folder_1 = os.path.join(workspace_dir, "initial/ns_processed/images")
src_folder_2 = os.path.join(workspace_dir, "new_correspondence/ns_processed/images")
database_path = os.path.join(workspace_dir, "new_correspondence/ns_processed/colmap/database.db")
indices_path = os.path.join(workspace_dir, "new_correspondence/renders/image_indices.npy")

if os.path.exists(indices_path):
    indices = np.load(indices_path)
    dest_folder = os.path.join(workspace_dir, "CD_PAIR/baseline_cd_pair2")
    print(f"Loaded {len(indices)} indices from {indices_path}")
else:
    indices = None
    dest_folder = os.path.join(workspace_dir, "CD_PAIR/baseline_cd_pair2")
    print(f"No file found at {indices_path}, indices set to None.")

run_rmse_based_matching_pipeline(src_folder_1, src_folder_2, dest_folder, database_path, indices)


In [None]:
# NeRF image pair generation 
import os
import numpy as np
import shutil

def match_index(filename, indices):
        try:
            base = os.path.splitext(filename)[0]
            num_part = base.split("_")[-1]  # 예: frame_00001 → 00001
            return int(num_part) in indices
        except:
            return False
            
def collect_jpg_pairs(src_dir1, src_dir2, dest_dir, indices, prefix = ""):
    """
    첫번째 폴더에서는 'new_' prefix를 가진 jpg 파일만,
    두번째 폴더에서는 모든 jpg 파일을 모아서
    하나의 pair 폴더에 저장한다.

    indices가 None이 아니면, 해당 인덱스에 해당하는 파일만 저장한다.
    """

    os.makedirs(dest_dir, exist_ok=True)

    # 첫 번째 경로: new_ prefix jpg
    jpg_files1 = [f for f in os.listdir(src_dir1) if f.startswith(prefix) and f.endswith(".jpg")]

    # 두 번째 경로: 모든 jpg
    jpg_files2 = [f for f in os.listdir(src_dir2) if f.endswith(".jpg")]

    # 인덱스 필터링 적용
    if indices is not None:
        jpg_files1 = [f for f in jpg_files1 if match_index(f, indices)]
        jpg_files2 = [f for f in jpg_files2 if match_index(f, indices)]
    
    # for f in jpg_files1:
        # src_path = os.path.join(src_dir1, f)

        # # 숫자 추출 및 1 증가
        # name, ext = os.path.splitext(f)
        # prefix, frame, number = name.split("_")
        # new_number = int(number)
        # new_filename = f"{prefix}_{frame}_{new_number:05d}{ext}"  # zero-padding 5자리 유지

        # dst_path = os.path.join(dest_dir, new_filename)
        # shutil.copyfile(src_path, dst_path)

    # 복사: src_dir1 (new_ prefix)
    for f in jpg_files1:
        src_path = os.path.join(src_dir1, f)
        dst_path = os.path.join(dest_dir, f)
        shutil.copyfile(src_path, dst_path)

    # 복사: src_dir2 (initial_paired_ prefix 추가)
    for f in jpg_files2:
        src_path = os.path.join(src_dir2, f)
        dst_path = os.path.join(dest_dir, f"initial_paired_{f}")
        shutil.copyfile(src_path, dst_path)

    print(f"✅ 저장 완료: {len(jpg_files1)}개(new_) + {len(jpg_files2)}개(initial_) → {dest_dir}")


# 사용 예시
workspace_dir = "/workspace/Laboratory/02.Rapid3DReconstruction/00.workspace/Sintanjin_80m_CD_workspace/longlong_trial1"
src_folder1 = os.path.join(workspace_dir, "new_correspondence/ns_processed/new_images_2")
src_folder2 = os.path.join(workspace_dir, "new_correspondence/renders")

indices_path = os.path.join(workspace_dir, "new_correspondence/renders/image_indices.npy")

if os.path.exists(indices_path):
    indices = np.load(indices_path)
    dest_dir = os.path.join(workspace_dir, "CD_PAIR/nerf_cd_pair")
    print(f"Loaded {len(indices)} indices from {indices_path}")
else:
    indices = None
    dest_dir = os.path.join(workspace_dir, "CD_PAIR/nerf_cd_pair")
    print(f"No file found at {indices_path}, indices set to None.")

prefix = "new_" # if correspondence search, new_
collect_jpg_pairs(src_folder1, src_folder2, dest_dir, indices, prefix)


In [None]:
# SIFT-based RMSE Calculator
import matplotlib.pyplot as plt
import cv2
import json
import numpy as np
import os
from glob import glob

def draw_feature_matches(img1, kp1, img2, kp2, matches, max_display=50, save_path=None):
    """
    이미지 1과 이미지 2의 feature 매칭 결과를 시각화 (최대 max_display개)
    """
    img_matches = cv2.drawMatches(
        img1, kp1,
        img2, kp2,
        matches[:max_display],
        None,
        flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS
    )

    # BGR → RGB
    img_matches = cv2.cvtColor(img_matches, cv2.COLOR_BGR2RGB)
    
    plt.figure(figsize=(16, 8))
    plt.title(f"Feature Matching (top {min(len(matches), max_display)} matches)")
    plt.imshow(img_matches)
    plt.axis('off')
    if save_path:
        plt.savefig(save_path, bbox_inches='tight')
    plt.show()



import numpy as np

def rotation_error_deg(R_est):
    """
    Rotation matrix 간의 오차 (deg)
    """
    R_gt = np.array([
    [1, 0, 0],
    [0, 1, 0],
    [0, 0, 1]
    ])

    R_diff = R_est @ R_gt.T
    trace_val = np.clip((np.trace(R_diff) - 1) / 2.0, -1.0, 1.0)  # clip for numerical safety
    angle_rad = np.arccos(trace_val)
    return np.degrees(angle_rad)

def translation_direction_error_deg(t_est):
    """
    방향 벡터 간의 오차 (deg)
    (길이는 정규화되어 있어야 함)
    """
    t_gt = np.array([[0], [0], [1]])
    
    t_gt = t_gt.flatten() / np.linalg.norm(t_gt)
    t_est = t_est.flatten() / np.linalg.norm(t_est)
    dot_val = np.clip(np.dot(t_gt, t_est), -1.0, 1.0)
    angle_rad = np.arccos(dot_val)
    return np.degrees(angle_rad)



def resize_image(img, resize_factor=None, max_size=None):
    """
    이미지 다운스케일 함수
    - resize_factor 주어지면 해당 배수로 축소
    - max_size가 주어지면 해당 최대 크기 기준으로 축소
    둘 중 하나만 사용하세요.
    
    반환: (resized_img, scale)
    """
    h, w = img.shape[:2]

    if resize_factor is not None:
        scale = 1.0 / resize_factor
        new_w = int(w * scale)
        new_h = int(h * scale)
        resized_img = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
        return resized_img, scale

    elif max_size is not None:
        scale = min(max_size / h, max_size / w)
        if scale < 1.0:
            new_w = int(w * scale)
            new_h = int(h * scale)
            resized_img = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
        else:
            resized_img = img
            scale = 1.0
        return resized_img, scale

    else:
        raise ValueError("Either downscale_factor or max_size must be provided.")


def evaluate_pose_match(src_dir, transforms_path, altitude_m, downscale_factor = 1, resize_factor = 1, sensor_pixel_size_mm = None, prefix=""):
    image_pairs = []
    for img1_path in sorted(glob(os.path.join(src_dir, "initial_paired_frame_*.jpg"))):
        basename = os.path.basename(img1_path).replace("initial_paired_", "")
        img2_path = os.path.join(src_dir, f"{prefix}{basename}")
        if os.path.exists(img2_path):
            image_pairs.append((img1_path, img2_path))

    rmses_pixel = []
    rmses_meter = []
    sift = cv2.SIFT_create()

    # 카메라 파라미터 로드
    K, dist_coeffs = load_camera_intrinsics(transforms_path)

    gsd_m_per_pixel = calculate_gsd(altitude_m, K, downscale_factor)
    

    for idx, (img1_path, img2_path) in enumerate(image_pairs):
        
        img1 = cv2.imread(img1_path, cv2.IMREAD_GRAYSCALE)
        img2 = cv2.imread(img2_path, cv2.IMREAD_GRAYSCALE)

        print(f"original: {img1.shape}")
        img1, scale1 = resize_image(img1, resize_factor=resize_factor)
        img2, scale2 = resize_image(img2, resize_factor=resize_factor)
        print(f"reshaped: {img1.shape}")    
        assert scale1 == scale2
        scale = scale1
        
        kp1, des1 = sift.detectAndCompute(img1, None)
        kp2, des2 = sift.detectAndCompute(img2, None)

        if des1 is None or des2 is None:
            print(f"❌ No features in {img1_path}")
            continue

        matcher = cv2.BFMatcher()
        matches = matcher.knnMatch(des1, des2, k=2)

        # Ratio test
        good_matches = []
        for m, n in matches:
            if m.distance < 0.8 * n.distance:
                good_matches.append(m)
                
        # good_matches = bidirectional_match(des1, des2, matcher, ratio_thresh=1.0)

        if len(good_matches) < 8:
            print(f"❌ Not enough good matches: {len(good_matches)}")
            continue

        pts1 = np.float32([kp1[m.queryIdx].pt for m in good_matches])
        pts2 = np.float32([kp2[m.trainIdx].pt for m in good_matches])

        

        # Guided Matching: RANSAC 기반 Fundamental Matrix 추정으로 inlier 필터링
        F, inliers = cv2.findFundamentalMat(pts1, pts2, cv2.FM_RANSAC, ransacReprojThreshold=3.0, confidence=0.99)


        inliers = inliers.ravel().astype(bool)
        inlier_pts1 = pts1[inliers]
        inlier_pts2 = pts2[inliers]
        inlier_matches = [m for m, inlier in zip(good_matches, inliers) if inlier]

        if idx == len(image_pairs) -1 :
            draw_feature_matches(img1, kp1, img2, kp2, inlier_matches)
                
        # RMSE 계산
        meter_dists = np.linalg.norm(inlier_pts1 - inlier_pts2, axis=1) * gsd_m_per_pixel / scale
        valid_idx = meter_dists < 30.0

        if np.sum(valid_idx) == 0:
            print("⚠️ No inliers under 30m threshold.")
            continue

        filtered_pts1 = inlier_pts1[valid_idx]
        filtered_pts2 = inlier_pts2[valid_idx]

        filtered_rmse_pixel = np.sqrt(np.mean(np.sum((filtered_pts1 - filtered_pts2)**2, axis=1)))
        filtered_rmse_meter = filtered_rmse_pixel * gsd_m_per_pixel / scale

        print(f"✅ RMSE (inliers < 30m): {filtered_rmse_pixel:.4f} px | {filtered_rmse_meter:.4f} m (Count: {np.sum(valid_idx)})")

        rmses_pixel.append(filtered_rmse_pixel)
        rmses_meter.append(filtered_rmse_meter)
            
        
    mean_rmse_pixel = np.array(rmses_pixel).mean()
    mean_rmse_meter = np.array(rmses_meter).mean()

    print(f"✅ Mean RMSE: {mean_rmse_pixel:.4f} px | {mean_rmse_meter:.4f} m")

In [47]:
from glob import glob
import cv2
import os

from glob import glob
import cv2
import os
import numpy as np

def resize_image(img, resize_factor=None, max_size=None):
    """
    이미지 다운스케일 함수
    - resize_factor 주어지면 해당 배수로 축소
    - max_size가 주어지면 해당 최대 크기 기준으로 축소
    둘 중 하나만 사용하세요.
    
    반환: (resized_img, scale)
    """
    h, w = img.shape[:2]

    if resize_factor is not None:
        scale = 1.0 / resize_factor
        new_w = int(w * scale)
        new_h = int(h * scale)
        resized_img = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
        return resized_img, scale

    elif max_size is not None:
        scale = min(max_size / h, max_size / w)
        if scale < 1.0:
            new_w = int(w * scale)
            new_h = int(h * scale)
            resized_img = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
        else:
            resized_img = img
            scale = 1.0
        return resized_img, scale

    else:
        raise ValueError("Either downscale_factor or max_size must be provided.")

def warp_img1_to_img2_and_save(src_dir, output_dir, resize_factor, prefix=""):
    os.makedirs(output_dir, exist_ok=True)
    
    homography_dir = os.path.join(output_dir, "homography")
    os.makedirs(homography_dir, exist_ok=True)

    image_pairs = []
    for img1_path in sorted(glob(os.path.join(src_dir, "initial_paired_frame_*.jpg"))):
        basename = os.path.basename(img1_path).replace("initial_paired_", "")
        img2_path = os.path.join(src_dir, f"{prefix}{basename}")
        if os.path.exists(img2_path):
            image_pairs.append((img1_path, img2_path))
    
    for idx, (img1_path, img2_path) in enumerate(image_pairs):
        
        sift = cv2.SIFT_create()

        img1_rgb = cv2.imread(img1_path)
        img2_rgb = cv2.imread(img2_path)


        image1 = img1_rgb.copy()
        image2 = img2_rgb.copy()

        img1 = cv2.cvtColor(image1, cv2.COLOR_BGR2GRAY)
        img2 = cv2.cvtColor(image2, cv2.COLOR_BGR2GRAY)

        img1_resize, scale1 = resize_image(img1, resize_factor)
        img2_resize, scale2 = resize_image(img2, resize_factor)

        kp1, des1 = sift.detectAndCompute(img1_resize, None)
        kp2, des2 = sift.detectAndCompute(img2_resize, None)


        if des1 is None or des2 is None:
            print(f"❌ No features in {img1_path}")
            continue

        matcher = cv2.BFMatcher()
        matches = matcher.knnMatch(des1, des2, k=2)
        good_matches = [m for m, n in matches if m.distance < 0.7 * n.distance]

        if len(good_matches) < 10:
            print(f"❌ Not enough good matches: {len(good_matches)}")
            continue

        pts1 = np.float32([kp1[m.queryIdx].pt for m in good_matches])
        pts2 = np.float32([kp2[m.trainIdx].pt for m in good_matches])

        F, inliers = cv2.findFundamentalMat(pts1, pts2, cv2.FM_RANSAC, ransacReprojThreshold=3.0, confidence=0.99)
        if inliers is None or inliers.sum() < 10:
            print(f"❌ Fundamental matrix failed or insufficient inliers.")
            continue

        pts1 = pts1[inliers.ravel() == 1]
        pts2 = pts2[inliers.ravel() == 1]

        H, inliers = cv2.findHomography(pts1, pts2, cv2.RANSAC, 5.0)
        if H is None or inliers is None or inliers.sum() < 10:
            print(f"❌ Homography too unstable or insufficient inliers: {inliers.sum()}")
            continue

        scale_matrix = np.eye(3)
        scale_matrix[0, 0] = 1 / scale1
        scale_matrix[1, 1] = 1 / scale1
        H_scaled = scale_matrix @ H @ np.linalg.inv(scale_matrix)

        print(f"✅ Inliers after RANSAC for {os.path.basename(img1_path)}: {inliers.sum()}")

        height, width = img2.shape
        corrected_image1 = cv2.warpPerspective(img1_rgb, H_scaled, (width, height))

        mask_warped = np.zeros_like(img1_rgb, dtype=np.uint8)
        cv2.warpPerspective(np.ones_like(img1_rgb, dtype=np.uint8), H_scaled, (width, height), dst=mask_warped, borderMode=cv2.BORDER_CONSTANT, borderValue=0)
        corrected_image2 = img2_rgb * mask_warped

        base_img2_path = os.path.basename(img2_path)
        warped_img1_path = os.path.basename(img1_path)

        cv2.imwrite(os.path.join(output_dir, base_img2_path), corrected_image2)
        cv2.imwrite(os.path.join(output_dir, warped_img1_path), corrected_image1)

        # Save Homography matrix
        
        homography_name = os.path.splitext(base_img2_path)[0]  + ".npy"
        homography_name = homography_name[len(prefix):]  # prefix 제거
        np.save(os.path.join(homography_dir, homography_name), H_scaled)

        print(f"✅ Saved aligned: {warped_img1_path}")
        print(f"✅ Saved homography: {homography_name}")

# Example usage
workspace_dir = "/workspace/Laboratory/02.Rapid3DReconstruction/00.workspace/Sintanjin_80m_CD_workspace/longlong_trial1"
# src_dir = os.path.join(workspace_dir, "CD_PAIR/nerf_cd_pair")
# output_dir = os.path.join(workspace_dir, "CD_PAIR/nerf_cd_pair_rectified")
src_dir = os.path.join(workspace_dir, "CD_PAIR/baseline_cd_pair")
output_dir = os.path.join(workspace_dir, "CD_PAIR/baseline_cd_pair_rectified")
warp_img1_to_img2_and_save(src_dir, output_dir, resize_factor=4, prefix="new_")

✅ Inliers after RANSAC for initial_paired_frame_00045.jpg: 363
✅ Saved aligned: initial_paired_frame_00045.jpg
✅ Saved homography: frame_00045.npy
✅ Inliers after RANSAC for initial_paired_frame_00047.jpg: 371
✅ Saved aligned: initial_paired_frame_00047.jpg
✅ Saved homography: frame_00047.npy
✅ Inliers after RANSAC for initial_paired_frame_00049.jpg: 252
✅ Saved aligned: initial_paired_frame_00049.jpg
✅ Saved homography: frame_00049.npy
✅ Inliers after RANSAC for initial_paired_frame_00051.jpg: 273
✅ Saved aligned: initial_paired_frame_00051.jpg
✅ Saved homography: frame_00051.npy
✅ Inliers after RANSAC for initial_paired_frame_00053.jpg: 205
✅ Saved aligned: initial_paired_frame_00053.jpg
✅ Saved homography: frame_00053.npy
✅ Inliers after RANSAC for initial_paired_frame_00055.jpg: 170
✅ Saved aligned: initial_paired_frame_00055.jpg
✅ Saved homography: frame_00055.npy
✅ Inliers after RANSAC for initial_paired_frame_00057.jpg: 214
✅ Saved aligned: initial_paired_frame_00057.jpg
✅ Saved

In [None]:
# Calculator base functions

def calculate_gsd(altitude_m, K, sensor_pixel_size_mm = None):
    fx = K[0, 0] # focal length in pixel (x axis)    
    # sensor_pixel_size_m = sensor_pixel_size_mm / 1000.0  # mm → meter

    # GSD 계산
    # gsd_m_per_pixel = (altitude_m * sensor_pixel_size_m) / (fx * sensor_pixel_size_m)
    gsd_m_per_pixel = altitude_m / fx  # 결국 pixel size가 상쇄됨

    print(f"📏 GSD (m/pixel): {gsd_m_per_pixel:.6f}")
    return gsd_m_per_pixel

def load_camera_intrinsics(json_path):
    with open(json_path, 'r') as f:
        data = json.load(f)

    fx = data['fl_x']
    fy = data['fl_y']
    cx = data['cx']
    cy = data['cy']
    K = np.array([
        [fx,  0, cx],
        [ 0, fy, cy],
        [ 0,  0,  1]
    ])
    dist_coeffs = np.array([
        data.get('k1', 0.0),
        data.get('k2', 0.0),
        data.get('p1', 0.0),
        data.get('p2', 0.0),
        0  # optional k3 if needed
    ])

    w = data['w']
    h = data['h']

    return K, dist_coeffs, w, h

In [43]:
# Click-based RMSE Calculator
import json
import numpy as np
import os 

def load_features(json_path, prefix_to_strip="", downscale = 1):
    with open(json_path, 'r') as f:
        data = json.load(f)

    features = {}
    for item in data:
        filename = item['filename']
        if prefix_to_strip and filename.startswith(prefix_to_strip):
            filename = filename[len(prefix_to_strip):]  # prefix 제거
        features[filename] = np.array(item['keypoints'], dtype=np.float32) * downscale


    return features

def compute_rmse(points_a, points_b):
    if len(points_a) != 5 or len(points_b) != 5:
        print("error occured. length of points is not 5!")
        return None, 0    

    length = len(points_a)

    # 유효 인덱스 (둘 다 [-1, -1]이 아닌 경우만)
    valid_indices = [
        i for i in range(length)
        if (points_a[i][0] >= 0 or points_a[i][1] >= 0) and
        (points_b[i][0] >= 0 or points_b[i][1] >= 0)
    ]

    if not valid_indices:
        return None, 0  # 유효 포인트 없음

    a_filtered = points_a[valid_indices]
    b_filtered = points_b[valid_indices]

    diff = a_filtered - b_filtered
    rmse = np.sqrt(np.mean(np.sum(diff**2, axis=1)))
    return rmse, len(valid_indices)

def apply_homography_to_points(points_2d: np.ndarray, homography: np.ndarray) -> np.ndarray:
    """
    Apply a homography transformation to 2D points.

    Args:
        points_2d (np.ndarray): Nx2 array of 2D points (x, y).
        homography (np.ndarray): 3x3 homography matrix.

    Returns:
        np.ndarray: Nx2 array of transformed 2D points.
    """
    # Reshape to (N, 1, 2) for OpenCV
    points_reshaped = points_2d.reshape(-1, 1, 2).astype(np.float32)

    # Apply homography
    transformed_points = cv2.perspectiveTransform(points_reshaped, homography)

    # Reshape back to (N, 2)
    return transformed_points.reshape(-1, 2)

def compare_features(json_path_a, json_path_b, transforms_path, height_m, down_a=1, down_b=1, prefix_a = "", prefix_b = "", homography_dir = None):

    # 카메라 파라미터 로드
    K, dist_coeffs, width, height = load_camera_intrinsics(transforms_path)

    gsd_m_per_pixel = calculate_gsd(height_m, K)

    features_a = load_features(json_path_a, prefix_a, down_a)
    features_b = load_features(json_path_b, prefix_b, down_b)

    common_filenames = set(features_a.keys()) & set(features_b.keys())

    
    if not common_filenames:
        print("No common filenames found.")
        return

    rmse_list = []
    rmse_pixel_list = []
    used_pts_list = []
    fail_count = 0
    
    total_used_pts = 0
    total_possible_pts = 0

    for filename in sorted(common_filenames):
        raw_a = features_a[filename]
        raw_b = features_b[filename]

        if homography_dir is not None:
            homography_path = os.path.join(homography_dir, f"{os.path.splitext(filename)[0]}.npy")
            h_12_down = np.load(homography_path)

            S = np.eye(3)
            S[0, 0] = down_a
            S[1, 1] = down_a

            # H_orig = S_inv @ H @ S
            S_inv = np.linalg.inv(S)            
            h_12 = S @ h_12_down @ S_inv
            raw_a = apply_homography_to_points(raw_a, h_12)
            

        rmse_pixel, used_pts = compute_rmse(raw_a, raw_b)        

        possible_pts = min(len(raw_a), len(raw_b))
        total_possible_pts += possible_pts
        total_used_pts += used_pts

        if rmse_pixel is None:
            print(f"{filename}: FAIL - no valid keypoints to compare")
            fail_count += 1
            continue

        rmse_meter = rmse_pixel * gsd_m_per_pixel
        rmse_list.append((filename, rmse_meter)) 
        rmse_pixel_list.append((filename, rmse_pixel))        
        used_pts_list.append((filename, used_pts))
        print(f"{filename}: RMSE = {rmse_meter:.2f}m, {rmse_pixel:.2f}pixel (used {used_pts}/{possible_pts} points)")

    if rmse_list:
        sorted_rmse = sorted(rmse_list, key=lambda x: x[1])
        sorted_used = sorted(used_pts_list, key=lambda x: x[1])

        mean_rmse = np.mean([r[1] for r in rmse_list])
        mean_rmse_pixel = np.mean([r[1] for r in rmse_pixel_list])
        print(f"\n[✔] Average RMSE over {len(rmse_list)} images: {mean_rmse:.2f}m / {mean_rmse_pixel:.2f}pixels")

        print("\n[🔺] Top 3 highest RMSE files:")
        for fname, val in sorted_rmse[-3:][::-1]:
            print(f"  {fname}: {val:.2f}m")

        print("\n[🔻] Top 3 lowest RMSE files:")
        for fname, val in sorted_rmse[:3]:
            print(f"  {fname}: {val:.2f}m")
        
        print("\n[📉] Top 3 lowest used keypoints:")
        for fname, val in sorted_used[:3]:
            print(f"  {fname}: {val} points")
    else:
        print("\n[⚠] No valid images to compare.")
    
    if total_possible_pts > 0:
        used_ratio = total_used_pts / total_possible_pts
        print(f"\n[ℹ] Average used keypoint ratio: {used_ratio:.2%} ({total_used_pts}/{total_possible_pts})")

    print(f"\n[!] Failed images due to invalid keypoints: {fail_count}")

In [44]:
workspace_dir = "/workspace/Laboratory/02.Rapid3DReconstruction/00.workspace/Naju_90m_ChangeDetection_workspace/trial5"
height_m = 90
prefix_a = "initial_paired_"
prefix_b = "new_"
down_a = 1
down_b = 1
testset = "nerf_cd_pair"

homography_dir = os.path.join(workspace_dir, f"CD_PAIR/nerf_cd_pair_rectified/homography") # For gps version

if down_a == 1:
    initial_features_path = os.path.join(workspace_dir, f'CD_PAIR/{testset}/initial_paired_features.json')
else:
    initial_features_path = os.path.join(workspace_dir, f'CD_PAIR/{testset}/initial_paired_features_{down_a}.json')

if down_b == 1:
    new_features_path = os.path.join(workspace_dir, 'CD_PAIR/new_features.json')
else:
    new_features_path = os.path.join(workspace_dir, f'CD_PAIR/new_features_{down_b}.json')

transforms_path = os.path.join(workspace_dir, "initial/ns_processed/transforms.json")


compare_features(initial_features_path, new_features_path, transforms_path, height_m = height_m, down_a = down_a, down_b = down_b, prefix_a = prefix_a, prefix_b = prefix_b, homography_dir = homography_dir)

📏 GSD (m/pixel): 0.029846
frame_00001.jpg: RMSE = 0.22m, 7.24pixel (used 5/5 points)
frame_00004.jpg: RMSE = 0.26m, 8.70pixel (used 5/5 points)
frame_00007.jpg: RMSE = 0.18m, 5.87pixel (used 5/5 points)
frame_00010.jpg: RMSE = 0.21m, 7.13pixel (used 5/5 points)
frame_00013.jpg: RMSE = 0.24m, 8.15pixel (used 5/5 points)
frame_00016.jpg: RMSE = 0.22m, 7.27pixel (used 5/5 points)
frame_00019.jpg: RMSE = 0.27m, 9.08pixel (used 5/5 points)
frame_00022.jpg: RMSE = 0.24m, 8.12pixel (used 5/5 points)
frame_00044.jpg: RMSE = 0.26m, 8.73pixel (used 5/5 points)
frame_00047.jpg: RMSE = 0.36m, 12.00pixel (used 5/5 points)
frame_00050.jpg: RMSE = 0.23m, 7.71pixel (used 5/5 points)
frame_00053.jpg: RMSE = 0.20m, 6.53pixel (used 5/5 points)
frame_00056.jpg: RMSE = 0.19m, 6.36pixel (used 5/5 points)
frame_00059.jpg: RMSE = 0.14m, 4.80pixel (used 5/5 points)
frame_00062.jpg: RMSE = 0.24m, 8.05pixel (used 5/5 points)
frame_00065.jpg: RMSE = 0.26m, 8.83pixel (used 5/5 points)
frame_00088.jpg: RMSE = 0.18m

In [None]:
workspace_dir = "/workspace/Laboratory/02.Rapid3DReconstruction/00.workspace/Sintanjin_80m_CD_workspace/longlong_trial1"
initial_features_path = os.path.join(workspace_dir, 'CD_PAIR/correspondence_search_nerf_cd_pair_filter/initial_paired_features.json')
new_features_path = os.path.join(workspace_dir, 'CD_PAIR/correspondence_search_nerf_cd_pair_filter/features.json')
transforms_path = os.path.join(workspace_dir, "initial/ns_processed/transforms.json")

height_m = 80

compare_features(initial_features_path, new_features_path, transforms_path, height_m = height_m, downscale_factor = 2,  prefix_a="initial_paired_")

In [None]:
workspace_dir = "/workspace/Laboratory/02.Rapid3DReconstruction/00.workspace/Sintanjin_80m_CD_workspace/longlong_trial1"
initial_features_path = os.path.join(workspace_dir, 'CD_PAIR/baseline_cd_pair_filter/initial_paired_features.json')
new_features_path = os.path.join(workspace_dir, 'CD_PAIR/baseline_cd_pair_filter/features.json')
transforms_path = os.path.join(workspace_dir, "initial/ns_processed/transforms.json")

height_m = 80
compare_features(initial_features_path, new_features_path, transforms_path, height_m = height_m, downscale_factor = 1, prefix_a="initial_paired_")

In [None]:
workspace_dir = "/workspace/Laboratory/02.Rapid3DReconstruction/00.workspace/Sintanjin_80m_CD_workspace/longlong_trial1"
initial_features_path = os.path.join(workspace_dir, 'CD_PAIR/baseline_cd_pair2_filter/initial_paired_features.json')
new_features_path = os.path.join(workspace_dir, 'CD_PAIR/baseline_cd_pair2_filter/features.json')
transforms_path = os.path.join(workspace_dir, "initial/ns_processed/transforms.json")

height_m = 80
compare_features(initial_features_path, new_features_path, transforms_path, height_m = height_m, downscale_factor = 1, prefix_a="initial_paired_")

In [None]:
import cv2
import numpy as np
from typing import Tuple

def rectify(image1: np.ndarray, image2: np.ndarray, resize_factor: float, resize_image_func) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Rectify the initial image to align with the new image using homography.

    Args:
        image1 (np.ndarray): Initial image.
        image2 (np.ndarray): New image to align with.
        resize_factor (float): Factor by which to resize images before processing.
        resize_image_func (function): Function to resize an image and return the resized image and scale.

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray]: 
            The corrected initial image, corrected new image, and the homography matrix.
    """
    sift = cv2.SIFT_create()

    img1 = cv2.cvtColor(image1.copy(), cv2.COLOR_BGR2GRAY)
    img2 = cv2.cvtColor(image2.copy(), cv2.COLOR_BGR2GRAY)

    img1_resize, scale1 = resize_image_func(img1, resize_factor)
    img2_resize, scale2 = resize_image_func(img2, resize_factor)

    kp1, des1 = sift.detectAndCompute(img1_resize, None)
    kp2, des2 = sift.detectAndCompute(img2_resize, None)

    if des1 is None or des2 is None:
        raise ValueError("❌ No features found in one or both images.")

    matcher = cv2.BFMatcher()
    matches = matcher.knnMatch(des1, des2, k=2)
    good_matches = [m for m, n in matches if m.distance < 0.7 * n.distance]

    if len(good_matches) < 10:
        raise ValueError(f"❌ Not enough good matches: {len(good_matches)}")

    pts1 = np.float32([kp1[m.queryIdx].pt for m in good_matches])
    pts2 = np.float32([kp2[m.trainIdx].pt for m in good_matches])

    F, inliers = cv2.findFundamentalMat(pts1, pts2, cv2.FM_RANSAC, ransacReprojThreshold=3.0, confidence=0.99)
    if inliers is None or inliers.sum() < 10:
        raise ValueError("❌ Fundamental matrix estimation failed or too few inliers.")

    pts1 = pts1[inliers.ravel() == 1]
    pts2 = pts2[inliers.ravel() == 1]

    H_12, inliers = cv2.findHomography(pts1, pts2, cv2.RANSAC, 5.0)
    if H_12 is None or inliers is None or inliers.sum() < 10:
        raise ValueError("❌ Homography too unstable or insufficient inliers.")

    scale_matrix = np.eye(3)
    scale_matrix[0, 0] = 1 / scale1
    scale_matrix[1, 1] = 1 / scale1
    homography_12 = scale_matrix @ H_12 @ np.linalg.inv(scale_matrix)

    height, width, _ = image2.shape
    corrected_image1 = cv2.warpPerspective(image1, homography_12, (width, height))

    mask_warped = np.zeros_like(image1, dtype=np.uint8)
    cv2.warpPerspective(np.ones_like(image1, dtype=np.uint8), homography_12, (width, height), dst=mask_warped, borderMode=cv2.BORDER_CONSTANT, borderValue=0)

    corrected_image2 = image2 * mask_warped

    return corrected_image1, corrected_image2, homography_12
