# Leveraging Camera Triplets for Efficient and Accurate Structure-from-Motion

https://ee.iisc.ac.in/cvlab/research/camtripsfm/

In [2]:
import os
from pathlib import Path
from collections import defaultdict
import shutil
from tqdm import tqdm

import cv2
import numpy as np
import pycolmap
import networkx as nx

In [4]:
# hypterparams
min_inlier_score = 0  # don't add edges (image pairs) to the graph with num_inliers below this number
# 0.3 for small/medium scenes 0.6-0.8 for large scenes
score_threshold = 0.3 # averaged relative triplet score threshold, below this edges will be deleted
min_component_size = 40  # minimum num images in component to check
top_k = None  # keep k largest component only

In [None]:
# image list has distractors from other locations via megaloc retrieval (tympanon matching)
image_list = set(Path("image_list.txt").read_text().rstrip().split("\n"))

database_path = Path("recon_dir/database.db")
database_path_out = Path("recon_dir/database_triplets.db")

# dir for reconstruction
out_path = Path("recon_dir_triplets")
out_path.mkdir(exist_ok=True, parents=True)
image_dir = Path("image_dir_path")

In [None]:
def image_ids_to_pair_id(image_id1, image_id2):
    if image_id1 > image_id2:
        return 2147483647 * image_id2 + image_id1
    else:
        return 2147483647 * image_id1 + image_id2

def pair_id_to_image_ids(pair_id):
    image_id2 = pair_id % 2147483647
    image_id1 = int((pair_id - image_id2) / 2147483647)
    return image_id1, image_id2

In [None]:
def enumerate_triangles_nx(graph):
    """Use NetworkX's optimized triangle enumeration"""
    triangles = set()
    # nx.triangles returns {node: count} dict
    # We need to enumerate actual triangles
    for edge in graph.edges():
        common_neighbors = set(graph.neighbors(edge[0])) & set(graph.neighbors(edge[1]))
        for cn in common_neighbors:
            triangle = tuple(sorted([edge[0], edge[1], cn]))
            triangles.add(triangle)
    return list(triangles)

In [None]:
def score_edges(graph, inlier_counts):
    """
    For every triplet (a,b,c) in the graph:
    - compute relative edge score per edge in triplet
    - accumulate scores to get final per-edge score
    """
    # accumulate score sums & counts
    edge_scores_sum = defaultdict(float)
    edge_scores_cnt = defaultdict(int)
    triangles = enumerate_triangles_nx(G)
    
    for (a,b,c) in tqdm(triangles, desc="Scoring triplets"):
        # check if edges exist
        # if not graph.has_edge(a,b) or not graph.has_edge(b,c) or not graph.has_edge(a,c):
        #     continue

        eab, ebc, eac = image_ids_to_pair_id(a,b), image_ids_to_pair_id(b,c), image_ids_to_pair_id(a,c)
        # get inliers
        nab = inlier_counts.get(eab, 0)
        nbc = inlier_counts.get(ebc, 0)
        nac = inlier_counts.get(eac, 0)

        # max
        m = max(nab, nbc, nac, 1)
        # relative scores
        for e,n in [(eab,nab), (ebc,nbc), (eac,nac)]:
            score = float(n) / float(m)
            edge_scores_sum[e] += score
            edge_scores_cnt[e] += 1

    # final averages
    edge_score = {e: edge_scores_sum[e]/edge_scores_cnt[e] for e in inlier_counts.keys() if edge_scores_cnt[e]>0}

    return edge_score


In [None]:
def adaptive_threshold(graph, min_score=0.5):
    """
    Compute adaptive threshold based on graph connectivity.
    
    τ = m * (1 - dmax/|V|) + dmax/|V|
    
    where:
    - m is the minimum score edges should satisfy
    - dmax is the maximum node degree
    - |V| is the number of nodes
    """
    num_nodes = graph.number_of_nodes()
    if num_nodes == 0:
        return min_score
    
    # Maximum degree in the graph
    degrees = dict(graph.degree())
    dmax = max(degrees.values()) if degrees else 0
    
    # Adaptive threshold formula from paper
    tau = min_score * (1 - dmax / num_nodes) + (dmax / num_nodes)
    
    print(f"Adaptive threshold: τ = {tau:.4f} (dmax={dmax}, |V|={num_nodes}, m={min_score})")
    
    return tau

In [None]:
# database has to contain 2 view geometries and inlier matches (Ransac, F/E/H matrix)
with pycolmap.Database.open(database_path) as db:
    inlier_counts = db.read_two_view_geometry_num_inliers()
    image_data = db.read_all_images()
id_to_name = {image.image_id: image for image in image_data if image.name in image_list}

In [None]:
G = nx.Graph()
G.add_nodes_from(id_to_name.keys()) # image_ids
for pair_id, num_inliers in zip(*inlier_counts):
    image_id1, image_id2 = pair_id_to_image_ids(pair_id)
    if num_inliers >= min_inlier_score:
        G.add_edge(image_id1, image_id2)

In [None]:
# Find all connected components
components = list(nx.connected_components(G))

print(f"Found {len(components)} connected components")

# Sort by size (largest first)
components_sorted = sorted(components, key=len, reverse=True)

# Print component sizes
for i, comp in enumerate(components_sorted[:10]):  # Show top 10
    print(f"  Component {i+1}: {len(comp)} nodes")

# Filter by size
large_components = [comp for comp in components_sorted if len(comp) >= min_component_size]

print(f"Components with >= {min_component_size} nodes: {len(large_components)}")

# Optionally keep only top k
if top_k is not None:
    large_components = large_components[:top_k]
    print(f"Keeping top {top_k} largest components")

# Create subgraphs
component_graphs = [G.subgraph(comp).copy() for comp in large_components]

In [None]:
inlier_dict = {pair_id: num_inliers for pair_id, num_inliers in zip(*inlier_counts)}
edge_scores = score_edges(component_graphs[0], inlier_dict)

In [None]:
shutil.copy2(database_path, database_path_out)

In [None]:
tau = adaptive_threshold(component_graphs[0], min_score=score_threshold)

In [None]:
num_removed_edges = 0
with pycolmap.Database.open(database_path_out) as db:
    for pair_id, score in tqdm(edge_scores.items()):
        image_id1, image_id2 = pair_id_to_image_ids(pair_id)
        if score < tau:
            db.delete_inlier_matches(image_id1, image_id2)
            num_removed_edges += 1
print(f"{num_removed_edges} edges with scores lower than {tau=} {score_threshold=} removed")

In [None]:
pycolmap.incremental_mapping(database_path_out, image_dir,
                            out_path, options=dict(#min_num_matches=100, 
                                                   multiple_models=True,
                                                   ignore_watermarks=True,
                                                   snapshot_path=out_path/"snapshots",
                                                   snapshot_frames_freq=100,
                                                   ba_use_gpu=True))