In [None]:
import os 
os.environ['XDG_SESSION_TYPE'] = 'x11'
import open3d as o3d

In [None]:
from photogrammetry_ai import (
    PhotogrammetryPipeline,
    LightGlueMatcher,
    VGGTReconstructor,
    ICPAligner,
)
import os

image_dir = "/home/jourdelune/Images/colmap/input"

images = os.listdir(image_dir)
images = [
    os.path.join(image_dir, img)
    for img in images
    if img.lower().endswith((".jpg", ".jpeg", ".png"))
]


pipeline = PhotogrammetryPipeline(
    matcher=LightGlueMatcher(),  # used to create related batches of images
    reconstructor=VGGTReconstructor(),  # used to reconstruct the 3D points from the images
    aligner=ICPAligner(),  # used to merges the 3D points from the batches
    max_batch_size=4,
)


In [None]:
batches, missing_images = pipeline.build_batches(images, display_graph=True)

In [None]:
sequence = []
for batch in batches:
    sequence.extend(batch)
print(sequence)

In [None]:
import os
import numpy as np
import torch
import trimesh
from torch.nn import functional as F
from vggt.models.vggt import VGGT
from vggt.utils.geometry import unproject_depth_map_to_point_map
from vggt.utils.helper import create_pixel_coordinate_grid, randomly_limit_trues
from vggt.utils.load_fn import load_and_preprocess_images_square
from vggt.utils.pose_enc import pose_encoding_to_extri_intri

device = "cuda" if torch.cuda.is_available() else "cpu"
dtype = torch.bfloat16 if torch.cuda.get_device_capability()[0] >= 8 else torch.float16

model = VGGT.from_pretrained("facebook/VGGT-1B").to(device)

image_dir = "/home/jourdelune/Images/colmap/input"
image_names = sequence

vggt_fixed_resolution = 518
img_load_resolution = 1024
batch_size = 3  # max images per VGGT run

# Load all images
images_all, original_coords_all = load_and_preprocess_images_square(
    image_names, img_load_resolution
)

# Split into batches
total_images = images_all.shape[0]
batched_extrinsic, batched_intrinsic = [], []
batched_points_3d, batched_points_rgb, batched_points_xyf = [], [], []

print(f"Total images: {total_images}, Batch size: {batch_size}")

for i in range(0, total_images, batch_size):
    images_batch = images_all[i : i + batch_size].to(device)
    original_coords = original_coords_all[i : i + batch_size].to(device)

    # Resize and run VGGT
    images_resized = F.interpolate(
        images_batch,
        size=(vggt_fixed_resolution, vggt_fixed_resolution),
        mode="bilinear",
        align_corners=False,
    )

    with torch.no_grad():
        with torch.cuda.amp.autocast(dtype=dtype):
            images_input = images_resized[None]
            aggregated_tokens_list, ps_idx = model.aggregator(images_input)
            pose_enc = model.camera_head(aggregated_tokens_list)[-1]
            extrinsic, intrinsic = pose_encoding_to_extri_intri(
                pose_enc, images_input.shape[-2:]
            )
            depth_map, depth_conf = model.depth_head(
                aggregated_tokens_list, images_input, ps_idx
            )

    extrinsic = extrinsic.squeeze(0).cpu().numpy()
    intrinsic = intrinsic.squeeze(0).cpu().numpy()
    depth_map = depth_map.squeeze(0).cpu().numpy()
    depth_conf = depth_conf.squeeze(0).cpu().numpy()

    points_3d = unproject_depth_map_to_point_map(depth_map, extrinsic, intrinsic)

    image_size = np.array([vggt_fixed_resolution, vggt_fixed_resolution])
    num_frames, height, width, _ = points_3d.shape

    points_rgb = (images_resized.cpu().numpy() * 255).astype(np.uint8)
    points_rgb = points_rgb.transpose(0, 2, 3, 1)
    points_xyf = create_pixel_coordinate_grid(num_frames, height, width)

    conf_thres_value = 5.0
    max_points_for_colmap = 100000
    conf_mask = depth_conf >= conf_thres_value
    conf_mask = randomly_limit_trues(conf_mask, max_points_for_colmap)

    batched_extrinsic.append(extrinsic)
    batched_intrinsic.append(intrinsic)
    batched_points_3d.append(points_3d[conf_mask])
    batched_points_rgb.append(points_rgb[conf_mask])
    batched_points_xyf.append(points_xyf[conf_mask])

In [None]:
import copy
from typing import Union

import numpy as np

def merge_and_draw(source: o3d.geometry.PointCloud, target: o3d.geometry.PointCloud, transformation: np.ndarray = np.identity(4)) -> o3d.geometry.PointCloud:
    source_temp = copy.deepcopy(source)
    target_temp = copy.deepcopy(target)
    source_temp.transform(transformation)
    merged = source_temp + target_temp
    return merged


In [37]:
# generate source from the first batch
source = o3d.geometry.PointCloud()
source.points = o3d.utility.Vector3dVector(batched_points_3d[0])
source.colors = o3d.utility.Vector3dVector(batched_points_rgb[0] / 255.0)
source.estimate_normals()

In [36]:
# generate target from the second batch
target = o3d.geometry.PointCloud()
target.points = o3d.utility.Vector3dVector(batched_points_3d[1])
target.colors = o3d.utility.Vector3dVector(batched_points_rgb[1] / 255.0)
target.estimate_normals()

In [38]:
import copy

source_vis = copy.deepcopy(source)
target_vis = copy.deepcopy(target)

# Génère une transformation de translation (décalage)
translation = np.identity(4)
# translation[:3, 3] = [2, 0.2, -0.3]  # exemple de décalage (x, y, z)

target_vis.transform(translation)

o3d.visualization.draw_geometries([source_vis, target_vis])

In [39]:
mu, sigma = 0, 0.1 
threshold = 1.0
loss = o3d.pipelines.registration.TukeyLoss(k=sigma)
p2l = o3d.pipelines.registration.TransformationEstimationPointToPlane(loss)
result_icp = o3d.pipelines.registration.registration_icp(source, target,
                                                      threshold, translation,
                                                      p2l)

In [40]:
o3d.visualization.draw_geometries([merge_and_draw(source, target, result_icp.transformation)])