In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install torch torchvision matplotlib scipy h5py tqdm

In [None]:
!git clone https://github.com/magicleap/SuperGluePretrainedNetwork.git
%cd SuperGluePretrainedNetwork

In [None]:
import os
import sys
import numpy as np
import cv2 as cv
import torch
from pathlib import Path
import json
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import shutil
from datetime import datetime
import glob
import zipfile
import time
import gc

sys.path.append('/content/SuperGluePretrainedNetwork')
from models.matching import Matching
from models.utils import (compute_pose_error, compute_epipolar_error,
                          estimate_pose, make_matching_plot,
                          error_colormap, AverageTimer, pose_auc)

In [None]:
def adjust_K_for_orientation(img_shape, K):
    """
    Adjust camera matrix based on image orientation
    """
    h, w = img_shape[:2]
    if w > h:
        K_adjusted = np.array([
            [K[1, 1], 0, K[1, 2]],  # fy->fx, cy->cx
            [0, K[0, 0], K[0, 2]],
            [0, 0, 1]
        ])
    else:
        K_adjusted = K.copy()

    return K_adjusted

In [None]:
def adjust_K_for_scaling(K, scale_factor):
    """
    Adjust camera matrix for image scaling
    """
    K_scaled = K.copy()
    K_scaled[0, 0] *= scale_factor  # fx
    K_scaled[1, 1] *= scale_factor  # fy
    K_scaled[0, 2] *= scale_factor  # cx
    K_scaled[1, 2] *= scale_factor  # cy

    return K_scaled

In [None]:
def adjust_camera_matrix(img_shape, K, scale_factor=1.0):
    K_oriented = adjust_K_for_orientation(img_shape, K)
    K_adjusted = adjust_K_for_scaling(K_oriented, scale_factor)

    return K_adjusted

In [None]:
config = {
    'superpoint': {
        'nms_radius': 3,
        'keypoint_threshold': 0.005,
        'max_keypoints': 100000
    },
    'superglue': {
        'weights': 'outdoor',
        'sinkhorn_iterations': 30,
        'match_threshold': 0.3,
    }
}

def enhance_sculpture_details(img):
    # Convert to LAB color space
    if len(img.shape) == 3:
        lab = cv.cvtColor(img, cv.COLOR_BGR2LAB)
        l, a, b = cv.split(lab)
    else:
        l = img.copy()
        lab = None

    # Apply CLAHE to enhance contrast
    clahe = cv.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
    cl = clahe.apply(l)

    # Apply sharpening
    kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
    cl = cv.filter2D(cl, -1, kernel)

    # Edge enhancement
    edges = cv.Canny(cl, 50, 150)
    edges_dilated = cv.dilate(edges, np.ones((3,3), np.uint8))

    cl_float = cl.astype(np.float32)
    edges_float = edges_dilated.astype(np.float32)
    cl = cv.addWeighted(cl_float, 1.0, edges_float, 0.3, 0)
    cl = np.clip(cl, 0, 255).astype(np.uint8)

    if lab is not None:
        enhanced_lab = cv.merge((cl, a, b))
        enhanced = cv.cvtColor(enhanced_lab, cv.COLOR_LAB2BGR)
    else:
        enhanced = cl

    return enhanced

def resize_image(img, max_size=1024):
    h, w = img.shape[:2]
    scale = min(max_size/w, max_size/h)
    if scale < 1.0:
        new_size = (int(w*scale), int(h*scale))
        resized = cv.resize(img, new_size, interpolation=cv.INTER_AREA)
        return resized, scale
    else:
        return img, 1.0

def compute_reprojection_error(P1, P2, pts1, pts2, pts3d):
    pts3d_h = np.hstack([pts3d, np.ones((pts3d.shape[0], 1))])
    proj1 = (P1 @ pts3d_h.T).T
    proj2 = (P2 @ pts3d_h.T).T
    proj1 = proj1[:, :2] / proj1[:, 2:3]
    proj2 = proj2[:, :2] / proj2[:, 2:3]
    err1 = np.linalg.norm(proj1 - pts1, axis=1)
    err2 = np.linalg.norm(proj2 - pts2, axis=1)
    rmse = np.sqrt(np.mean(np.hstack([err1, err2]) ** 2))
    return float(rmse)

def visualize_camera_trajectory(cam_centers, save_path=None):
    fig = plt.figure(figsize=(10, 8))
    ax = fig.add_subplot(111, projection='3d')

    ax.scatter(cam_centers[:, 0], cam_centers[:, 1], cam_centers[:, 2],
              c='blue', marker='o', s=50, label='Camera Centers')

    ax.plot(cam_centers[:, 0], cam_centers[:, 1], cam_centers[:, 2],
           c='red', linewidth=2, label='Camera Path')

    ax.set_xlabel('X')
    ax.set_ylabel('Y')
    ax.set_zlabel('Z')
    ax.set_title('Camera Trajectory')

    if save_path:
        plt.savefig(save_path, dpi=200, bbox_inches='tight')
    return fig, ax

def load_images_from_folder(folder_path, extensions=('.jpg', '.jpeg', '.png'), max_size=1024):
    images = {}
    image_paths = []
    scales = {}

    for ext in extensions:
        image_paths.extend(sorted(glob.glob(os.path.join(folder_path, f'*{ext}'))))

    if not image_paths:
        raise ValueError(f"No images found in {folder_path}")

    print(f"Found {len(image_paths)} images in {folder_path}")

    for path in tqdm(image_paths, desc="Loading images"):
        img = cv.imread(path)
        if img is not None:
            name = os.path.splitext(os.path.basename(path))[0]
            resized_img, scale = resize_image(img, max_size=max_size)
            images[name] = resized_img
            scales[name] = scale

    return images, image_paths, scales

def extract_features_with_superpoint(images, device='cuda', batch_size=4):
    """Use SuperPoint to extract features"""
    matching = Matching(config).eval().to(device)
    keypoints = {}
    descriptors = {}
    scores = {}  # Add keypoint scores

    image_names = list(images.keys())
    for i in range(0, len(image_names), batch_size):
        batch_names = image_names[i:i+batch_size]

        for name in tqdm(batch_names, desc=f"Extracting features"):
            img = images[name]
            enhanced = enhance_sculpture_details(img)
            if len(enhanced.shape) == 3:
                gray = cv.cvtColor(enhanced, cv.COLOR_BGR2GRAY)
            else:
                gray = enhanced.copy()

            gray_tensor = torch.from_numpy(gray).float().to(device)[None, None] / 255.0
            with torch.no_grad():
                pred = matching.superpoint({'image': gray_tensor})
                kpts = pred['keypoints'][0].cpu().numpy()
                desc = pred['descriptors'][0].cpu().numpy()
                score = pred['scores'][0].cpu().numpy()  # Save keypoint scores

                # Combine points and sizes (x, y, size)
                kp_with_size = np.hstack([kpts, 8 * np.ones((kpts.shape[0], 1))])

                keypoints[name] = kp_with_size
                descriptors[name] = desc.T  # Transpose to match OpenCV format
                scores[name] = score

            torch.cuda.empty_cache()
            del gray_tensor, pred
            gc.collect()

    return keypoints, descriptors, scores

def match_features_with_superglue(images, keypoints, descriptors, scores, device='cuda'):
    """Use SuperGlue for feature matching"""
    matching = Matching(config).eval().to(device)
    image_names = sorted(list(images.keys()))
    matches_dict = {}

    for i in tqdm(range(len(image_names) - 1), desc="Matching image pairs"):
        name0, name1 = image_names[i], image_names[i+1]
        img0, img1 = images[name0], images[name1]

        # Convert to grayscale
        if len(img0.shape) == 3:
            gray0 = cv.cvtColor(img0, cv.COLOR_BGR2GRAY)
        else:
            gray0 = img0.copy()

        if len(img1.shape) == 3:
            gray1 = cv.cvtColor(img1, cv.COLOR_BGR2GRAY)
        else:
            gray1 = img1.copy()

        data = {
            'image0': torch.from_numpy(gray0).float().to(device)[None, None] / 255.0,
            'image1': torch.from_numpy(gray1).float().to(device)[None, None] / 255.0,
            'keypoints0': torch.from_numpy(keypoints[name0][:, :2]).float().to(device)[None],
            'keypoints1': torch.from_numpy(keypoints[name1][:, :2]).float().to(device)[None],
            'descriptors0': torch.from_numpy(descriptors[name0].T).float().to(device)[None],
            'descriptors1': torch.from_numpy(descriptors[name1].T).float().to(device)[None],
            'scores0': torch.from_numpy(scores[name0]).float().to(device)[None],
            'scores1': torch.from_numpy(scores[name1]).float().to(device)[None]
        }

        with torch.no_grad():
            pred = matching(data)

        matches = pred['matches0'][0].cpu().numpy()
        confidence = pred['matching_scores0'][0].cpu().numpy()

        good_matches = []
        for i, m in enumerate(matches):
            if m != -1 and confidence[i] > 0.5:  # 仅保留高置信度匹配
                good_matches.append(cv.DMatch(i, int(m), float(1.0 - confidence[i])))

        matches_dict[(name0, name1)] = {
            'matches': good_matches,
            'confidence': confidence
        }

        print(f"Matched {name0}-{name1}: {len(good_matches)} matches")

        torch.cuda.empty_cache()
        del data, pred
        gc.collect()

    return matches_dict

def estimate_pose_from_matches(kps1, kps2, matches, K, K1=None, K2=None, scale1=1.0, scale2=1.0):
    """
    Compute F, E, R, t using matched keypoints
    """
    if K1 is None:
        K1 = K.copy()
    if K2 is None:
        K2 = K.copy()

    if isinstance(matches[0], cv.DMatch):
        # DMatch objects
        pts1 = np.float32([kps1[m.queryIdx][:2] for m in matches])
        pts2 = np.float32([kps2[m.trainIdx][:2] for m in matches])
    else:
        # Index pairs [idx1, idx2]
        pts1 = np.float32([kps1[m[0]][:2] for m in matches])
        pts2 = np.float32([kps2[m[1]][:2] for m in matches])

    # Apply scale if needed
    if scale1 != 1.0:
        pts1 *= scale1
    if scale2 != 1.0:
        pts2 *= scale2

    F, mask = cv.findFundamentalMat(
        pts1, pts2, cv.FM_RANSAC,
        ransacReprojThreshold=1.0,
        confidence=0.999
    )

    if F is None or mask is None or np.sum(mask) < 8:
        print(f"[ERR] findFundamentalMat失败: 内点太少 ({np.sum(mask) if mask is not None else 0})")
        return None

    inlier_mask = mask.ravel() == 1
    inlier_pts1 = pts1[inlier_mask]
    inlier_pts2 = pts2[inlier_mask]
    inlier_matches = [m for i, m in enumerate(matches) if inlier_mask[i]]
    inlier_count = len(inlier_pts1)
    inlier_ratio = inlier_count / len(pts1)

    if inlier_count < 8:
        print(f"[ERR] RANSAC后内点太少 ({inlier_count})")
        return None

    E, mask = cv.findEssentialMat(pts1, pts2, K1, method=cv.RANSAC, prob=0.999, threshold=1.0)

    try:
        success, R, t, mask_pose = cv.recoverPose(E, inlier_pts1, inlier_pts2, K1)
    except cv.error as e:
        print(f"[ERR] recoverPose失败: {e}")
        return None

    if not success or np.sum(mask_pose) < 5:
        print(f"[ERR] recoverPose失败: 有效点太少 ({np.sum(mask_pose)})")
        return None

    P1 = K1 @ np.hstack((np.eye(3), np.zeros((3, 1))))
    P2 = K2 @ np.hstack((R, t))
    pts4d = cv.triangulatePoints(P1, P2, inlier_pts1.T, inlier_pts2.T)
    pts3d = (pts4d[:3] / pts4d[3]).T
    reproj_error = compute_reprojection_error(P1, P2, inlier_pts1, inlier_pts2, pts3d)

    result = {
        "F": F,
        "E": E,
        "R": R,
        "t": t,
        "inlier_matches": inlier_matches,
        "inliers1": inlier_pts1,
        "inliers2": inlier_pts2,
        "inlier_count": inlier_count,
        "inlier_ratio": inlier_ratio,
        "reproj_error": reproj_error,
        "pts3d": pts3d
    }

    return result

def visualize_matches(img1, img2, kps1, kps2, matches, out_path=None):
    kp_cv1 = [cv.KeyPoint(x=f[0], y=f[1], size=8.0) for f in kps1]
    kp_cv2 = [cv.KeyPoint(x=f[0], y=f[1], size=8.0) for f in kps2]

    img_matches = cv.drawMatches(
        img1, kp_cv1, img2, kp_cv2, matches, None,
        flags=cv.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS
    )

    font = cv.FONT_HERSHEY_SIMPLEX
    cv.putText(img_matches, f"Matches: {len(matches)}", (10, 30), font, 0.6, (0, 255, 0), 2)
    cv.putText(img_matches, f"Inliers: {len([m for m in matches if m.distance < 0.5])}",
               (10, 60), font, 0.6, (0, 255, 0), 2)

    if out_path:
        cv.imwrite(out_path, img_matches)

    return img_matches

def process_image_sequence(images, keypoints, descriptors, scores, K, output_dir, original_scales=None):
    matches_dir = os.path.join(output_dir, "matches")
    viz_dir = os.path.join(output_dir, "viz")
    poses_dir = os.path.join(output_dir, "poses")

    os.makedirs(matches_dir, exist_ok=True)
    os.makedirs(viz_dir, exist_ok=True)
    os.makedirs(poses_dir, exist_ok=True)

    image_names = sorted(list(images.keys()))
    summary_records = []
    pose_list = []

    match_results = match_features_with_superglue(images, keypoints, descriptors, scores)

    for i in tqdm(range(len(image_names) - 1), desc="Processing matches"):
        name1, name2 = image_names[i], image_names[i+1]
        pair_key = (name1, name2)

        if pair_key not in match_results:
            print(f"[WARN] No matches found for {name1}-{name2}, skipping")
            continue

        matches = match_results[pair_key]['matches']

        if len(matches) < 30:
            print(f"[WARN] Too few matches ({len(matches)}) for {name1}-{name2}, skipping")
            continue

        scale1 = original_scales[name1] if original_scales else 1.0
        scale2 = original_scales[name2] if original_scales else 1.0

        img1_shape, img2_shape = images[name1].shape, images[name2].shape
        K1 = adjust_camera_matrix(img1_shape, K.copy(), scale1)
        K2 = adjust_camera_matrix(img2_shape, K.copy(), scale2)

        print(f"[INFO] Original K:\n{K}")
        print(f"[INFO] Adjusted K1 (shape={img1_shape[:2]}, scale={scale1}):\n{K1}")
        print(f"[INFO] Adjusted K2 (shape={img2_shape[:2]}, scale={scale2}):\n{K2}")

        pose_result = estimate_pose_from_matches(
          keypoints[name1], keypoints[name2], matches, K,
          K1=K1, K2=K2
        )

        if pose_result is None:
            print(f"[WARN] Failed to estimate pose for {name1}-{name2}, skipping")
            continue

        F, E, R, t = pose_result["F"], pose_result["E"], pose_result["R"], pose_result["t"]
        inliers1, inliers2 = pose_result["inliers1"], pose_result["inliers2"]
        inlier_matches = pose_result["inlier_matches"]
        inlier_count = pose_result["inlier_count"]
        inlier_ratio = pose_result["inlier_ratio"]
        reproj_error = pose_result["reproj_error"]
        pts3d = pose_result["pts3d"]


        try:
            img1, img2 = images[name1], images[name2]
            viz_img = visualize_matches(
                img1, img2, keypoints[name1][:, :2], keypoints[name2][:, :2],
                inlier_matches,
                out_path=os.path.join(viz_dir, f"{name1}_{name2}_matches.jpg")
            )
        except Exception as e:
            print(f"[WARN] Failed to create visualization for {name1}-{name2}: {e}")
            viz_path = os.path.join(viz_dir, f"{name1}_{name2}_matches.jpg")
            cv.imwrite(viz_path, viz_img)
        except Exception as e:
            print(f"[WARN] Failed to create visualization for {name1}-{name2}: {e}")

        match_data = {
            "image_1": name1,
            "image_2": name2,
            "num_matches": len(matches),
            "inlier_count": inlier_count,
            "inlier_ratio": float(inlier_ratio),
            "reproj_rmse": float(reproj_error),
            "F": F.tolist(),
            "E": E.tolist(),
            "R": R.tolist(),
            "t": t.flatten().tolist()
        }

        match_path = os.path.join(matches_dir, f"{name1}_{name2}_pose.json")
        with open(match_path, "w") as f:
            json.dump(match_data, f, indent=2)

        inlier_path = os.path.join(matches_dir, f"{name1}_{name2}_inliers.npz")
        np.savez_compressed(inlier_path, pts1=inliers1, pts2=inliers2)
        pose_path = os.path.join(poses_dir, f"{name1}_{name2}.txt")
        np.savetxt(pose_path, np.hstack([R.flatten(), t.flatten()])[None], fmt="%.6f")
        summary_records.append([
            "sculpture",
            f"{name1}-{name2}",
            len(matches),
            inlier_count,
            round(float(inlier_ratio), 3),
            round(float(reproj_error), 3),
            "OK"
        ])

        pose_list.append((R, t))

        print(f"[DONE] {name1}-{name2}: inliers={inlier_count}, ratio={inlier_ratio:.2f}, RMSE={reproj_error:.2f}")
        gc.collect()

    non_sequential_dir = os.path.join(output_dir, "non_sequential")
    os.makedirs(os.path.join(non_sequential_dir, "matches"), exist_ok=True)
    os.makedirs(os.path.join(non_sequential_dir, "poses"), exist_ok=True)

    summary_path = os.path.join(output_dir, "superglue_summary.csv")
    with open(summary_path, "w", newline="") as f:
        import csv
        writer = csv.writer(f)
        writer.writerow(["scene", "pair", "num_matches", "inlier_count", "inlier_ratio", "reproj_error", "status"])
        writer.writerows(summary_records)

    return summary_records, pose_list

def accumulate_camera_poses(pose_list):
    cam_centers = []
    T_world = np.eye(4)
    cam_centers.append(T_world[:3, 3].copy())

    for R, t in pose_list:
        T_rel = np.eye(4)
        T_rel[:3, :3] = R
        T_rel[:3, 3] = t.flatten()

        T_rel_inv = np.eye(4)
        T_rel_inv[:3, :3] = R.T
        T_rel_inv[:3, 3] = -R.T @ t.flatten()

        T_world = T_world @ T_rel_inv
        cam_centers.append(T_world[:3, 3].copy())

    return np.array(cam_centers)

def save_features_to_drive(features_dir, keypoints, descriptors, scores=None):
    os.makedirs(features_dir, exist_ok=True)

    for name, kps in keypoints.items():
        np.save(os.path.join(features_dir, f"{name}_keypoints.npy"), kps)
        np.save(os.path.join(features_dir, f"{name}_descriptors.npy"), descriptors[name])
        if scores is not None:
            np.save(os.path.join(features_dir, f"{name}_scores.npy"), scores[name])

    print(f"Saved features to {features_dir}")
    return features_dir

def create_downloadable_zip(output_dir, zip_name="superglue_results.zip"):
    zip_path = os.path.join(output_dir, zip_name)
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, dirs, files in os.walk(output_dir):
            for file in files:
                if file != zip_name:
                    file_path = os.path.join(root, file)
                    arcname = os.path.relpath(file_path, output_dir)
                    zipf.write(file_path, arcname)

    print(f"Created downloadable ZIP at {zip_path}")
    return zip_path

In [None]:
SCENE_NAME = 'sculpture'
MAX_IMAGE_SIZE = 1024
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

BASE_DIR = '/content/drive/MyDrive/sfm'
IMAGE_DIR = '/content/drive/MyDrive/sfm/reaching_high_sculpture_fps2'
#IMAGE_DIR = '/content/drive/MyDrive/sfm/arch'
OUTPUT_DIR = '/content/drive/MyDrive/sfm/results/sculpture'

os.makedirs(OUTPUT_DIR, exist_ok=True)

if DEVICE == 'cuda':
    print(f"Using GPU: {torch.cuda.get_device_name(0)}")
    print(f"Available GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
else:
    print("CUDA not available, using CPU. This will be much slower.")

print(f"Looking for images in {IMAGE_DIR}...")
images, image_paths, original_scales = load_images_from_folder(IMAGE_DIR, max_size=MAX_IMAGE_SIZE)

if not images:
    raise ValueError("No images found! Please upload images first.")

print(f"Successfully loaded {len(images)} images")
sample_image = list(images.values())[0]
print(f"Sample image shape: {sample_image.shape}")

print("Extracting SuperPoint features...")
start_time = time.time()
keypoints, descriptors, scores = extract_features_with_superpoint(images, device=DEVICE)
feature_time = time.time() - start_time
print(f"Feature extraction took {feature_time:.2f} seconds")

features_dir = os.path.join(OUTPUT_DIR, 'features')
save_features_to_drive(features_dir, keypoints, descriptors, scores)

calib_path = os.path.join(BASE_DIR, 'iphone_calibration.npz')
print(f"Loading camera calibration from {calib_path}")
data = np.load(calib_path, allow_pickle=True)
K = data["K"]
print(K)

print("Matching features and estimating poses...")
start_time = time.time()
summary_records, pose_list = process_image_sequence(images, keypoints, descriptors, scores, K, OUTPUT_DIR, original_scales)
matching_time = time.time() - start_time
print(f"Feature matching and pose estimation took {matching_time:.2f} seconds")

if pose_list:
    print("Computing camera trajectory...")
    cam_centers = accumulate_camera_poses(pose_list)
    np.save(os.path.join(OUTPUT_DIR, 'camera_trajectory.npy'), cam_centers)
    os.makedirs(os.path.join(OUTPUT_DIR, 'viz'), exist_ok=True)
    fig, ax = visualize_camera_trajectory(cam_centers,
                        save_path=os.path.join(OUTPUT_DIR, 'viz', 'camera_trajectory.png'))
    plt.show()
else:
    print("No poses estimated, cannot compute trajectory.")

zip_path = create_downloadable_zip(OUTPUT_DIR)

try:
    from google.colab import files
    files.download(zip_path)
    print(f"You can download the results from: {zip_path}")
except:
    print(f"Results saved to: {zip_path}")

successful_pairs = len(summary_records)
if successful_pairs > 0:
    avg_inliers = np.mean([r[3] for r in summary_records])
    avg_ratio = np.mean([r[4] for r in summary_records])
    avg_error = np.mean([r[5] for r in summary_records])

    print("\n=== SuperGlue Matching Results ===")
    print(f"Total image pairs processed: {successful_pairs}")
    print(f"Average inliers per pair: {avg_inliers:.1f}")
    print(f"Average inlier ratio: {avg_ratio:.3f}")
    print(f"Average reprojection error: {avg_error:.3f} pixels")
    print(f"Results saved to {OUTPUT_DIR}")
    print(f"ZIP file created at {zip_path}")
else:
    print("\n=== No successful matches found ===")
    print("Please check your images and try again with different parameters.")

print("\nProcessing complete!")