In [None]:
from pathlib import Path

import h5py
import numpy as np
import open3d as o3d
import pandas as pd
import pycolmap
from deep_image_matching.thirdparty.transformations import (
    affine_matrix_from_points,
    decompose_matrix,
)
from deep_image_matching.triangulation import db_from_existing_poses
from deep_image_matching.utils import COLMAPDatabase, OutputCapture

root_path = Path("datasets/belv_20230725")
image_dir = root_path / "images"

sfm_dir = root_path / "results_superpoint+lightglue_bruteforce_quality_highest"
sfm_rec_path = sfm_dir / "reconstruction"

# Output path for the triangulated markers
output_path = root_path / "marker_triang"
db_path = output_path / "database_markers.db"

# Image coordinates of the markers
markers_file = root_path / "markers_image_20230725.csv"

# World coordinates of the markers
georef_points = root_path / "markers_utm.csv"

output_path.mkdir(exist_ok=True, parents=True)

# Get image list
images = sorted(image_dir.glob("*"))

# Define a subset of the markers to use (leve none to use all)
# markers_to_use = ["D38", "T2", "F2", "F4", "F10", "F20"]
markers_to_use = ["D38", "F2", "F4", "F10", "F20"]


# Dense reconstruction path to georeference
dense_rec_path = root_path / "results_roma_bruteforce_quality_high/dense_model"

In [None]:
# Read marker image coordinates and create keypoints dictionary with the form:
# {"image_name": keypoints_array}
# {
#     "image1.jpg": np.array([[x1, y1], [x2, y2], ...]),
#     "image2.jpg": np.array([[x1, y1], [x2, y2], ...]),
#     ...
# }
markers_image = pd.read_csv(markers_file, header=None, names=["image", "marker", "x", "y"])
markers_image.sort_values(["image", "marker"], inplace=True, ascending=True)
if markers_to_use:
    markers_image = markers_image[markers_image["marker"].isin(markers_to_use)]

kpts = {}
for image, gr in markers_image.groupby("image"):
    image = image + ".JPG"
    kpts[image] = gr[["x", "y"]].values
for k, v in kpts.items():
    print(k, ":\n", v)

# Manually create 1-to-1 matches array
ids = np.arange(0, len(kpts[images[0].name]))
matches_idx = np.array([ids, ids]).astype(np.int64).T
print("matches idx:\n", matches_idx)

In [None]:
# # plot image with markers
# import cv2
# from matplotlib import pyplot as plt

# image_id = 1

# img = cv2.cvtColor(cv2.imread(str(images[image_id])), cv2.COLOR_BGR2RGB)
# plt.imshow(img)
# plt.scatter(
#     kpts[images[image_id].name][:, 0],
#     kpts[images[image_id].name][:, 1],
#     c="r",
#     s=10,
# )

In [None]:
# Save features to h5 file
features_h5 = output_path / "features.h5"
with h5py.File(features_h5, "w") as f:
    for image in images:
        image_name = image.name
        kp = kpts[image_name]
        f.create_group(image_name)
        f[image_name].create_dataset("keypoints", data=kp, dtype=np.float32)

# Save matches to h5 file
matches_h5 = output_path / "matches.h5"
with h5py.File(matches_h5, "w") as f:
    image0, image1 = images[0].name, images[1].name
    gr0 = f.create_group(image0)
    gr0.create_dataset(image1, data=matches_idx, dtype=np.int64)

pair_file = sfm_dir / "pairs.txt"

# # print features_h5 content
# with h5py.File(features_h5, "r") as f:
#     print(f.keys())
#     print(f[images[0].name].keys())
#     print(f[images[0].name]["keypoints"][:])
#     print(f[images[1].name].keys())
#     print(f[images[1].name]["keypoints"][:])

# # Print matches.h5 content
# with h5py.File(matches_h5, "r") as f:
#     print(f.keys())
#     g0 = f[images[0].name]
#     print(g0.keys())
#     g1 = g0[images[1].name]
#     print(g1.__array__())

In [None]:
sfm_rec = pycolmap.Reconstruction(sfm_rec_path)

# Create a new database with the dense features and the known camera poses

db_from_existing_poses(
    db_path,
    features_h5,
    matches_h5,
    sfm_rec_path,
    pair_file,
    do_geometric_verification=False,
)

In [None]:
# Define the options for the triangulation according to the IncrementalPipelineOptions available in pycolmap
# print(pycolmap.IncrementalPipelineOptions().summary())
opt = dict(
    min_num_matches=3,
    triangulation=dict(
        ignore_two_view_tracks=False,
    ),
)

In [None]:
# Run the triangulation with the known camera poses
verbose = True
with OutputCapture(verbose):
    with pycolmap.ostream():
        reconstruction = pycolmap.triangulate_points(
            sfm_rec,
            db_path,
            image_dir,
            output_path,
            options=opt,
        )

In [None]:
pts = reconstruction.points3D
for i, pt in sorted(pts.items()):
    print(i, pt)

In [None]:
img = reconstruction.images[1]
img.points2D

In [None]:
image_id = 1
local = []
for pt in reconstruction.images[image_id].points2D:
    if not isinstance(pt.point3D_id, int) or pt.point3D_id < 0:
        print(f"Point {pt.point2D_idx} is not triangulated")
        continue
    local.append(reconstruction.points3D[pt.point3D_id].xyz)
local = np.array(local)
print(local)

In [None]:
# Read the georeferenced points
georef = pd.read_csv(georef_points)
georef.index = georef["Label"]
georef.drop(columns=["Label"], inplace=True)
georef.sort_values("Label", inplace=True)
if markers_to_use:
    georef = georef[georef.index.isin(markers_to_use)]
print(georef)

In [None]:
# Estimate a Helmert transformation between the mean pcd and the ref.
T = affine_matrix_from_points(
    local.T, georef.to_numpy().T, shear=False, scale=True, usesvd=True
)
scale, _, angles, translation, _ = decompose_matrix(T)
scale_percent = scale.mean() - 1
angles_deg = np.rad2deg(angles)

print(f"Translation: {translation} m")
print(f"Angles: {angles_deg} deg")
print(f"Scale: {scale_percent:.6}%")

In [None]:
# Export the dense point cloud from roma to a ply file
dense_rec = pycolmap.Reconstruction(dense_rec_path)
dense_rec.export_PLY(dense_rec_path / "dense.ply")

In [None]:
# Read and transform the point cloud
pcd = o3d.io.read_point_cloud(str(dense_rec_path / "dense_merged_clean.ply"))
georef_pcd = pcd.transform(T)
o3d.io.write_point_cloud(str(dense_rec_path / "dense_georef.ply"), georef_pcd)

In [None]:
dense_rec_path