In [2]:
from lib.validate_inputs import validate
import numpy as np
import cv2
import pickle
import json
import matplotlib.pyplot as plt
import pydegensac
import open3d as o3d

from pathlib import Path
from easydict import EasyDict as edict

from lib.classes import (Camera, Imageds, Features, Targets)
from lib.config import parse_yaml_cfg
from lib.sfm.two_view_geometry import Two_view_geometry, Single_camera_geometry
from lib.sfm.triangulation import Triangulate
from lib.match_pairs import match_pair
from lib.track_matches import track_matches

from lib.geometry import (project_points,
                          compute_reprojection_error
                          )
from lib.utils import (build_dsm,
                       generate_ortophoto,
                       )
from lib.point_clouds import (create_point_cloud,
                              write_ply,
                              )
from lib.visualization import (display_point_cloud,
                               display_pc_inliers,
                               plot_features,
                               plot_projections,
                               )
from lib.misc import (convert_to_homogeneous,
                      convert_from_homogeneous,
                      create_directory,
                      )

from thirdparty.transformations import affine_matrix_from_points

# Parse options from yaml file
cfg_file = 'config/config_base.yaml'
cfg = parse_yaml_cfg(cfg_file)

# Inizialize Variables
cams = cfg.paths.cam_names
features = dict.fromkeys(cams)  # @TODO: put this in an inizialization function

# Create Image Datastore objects
images = dict.fromkeys(cams)  # @TODO: put this in an inizialization function
for cam in cams:
    images[cam] = Imageds(cfg.paths.imdir / cam)

cfg = validate(cfg, images)

''' Perform matching and tracking '''
# Load matching and tracking configurations
with open(cfg.matching_cfg) as f:
    opt_matching = edict(json.load(f))
with open(cfg.tracking_cfg) as f:
    opt_tracking = edict(json.load(f))

# epoch = 0
if cfg.proc.do_matching:
    for cam in cams:
        features[cam] = []

    for epoch in cfg.proc.epoch_to_process:
        print(f'Processing epoch {epoch}...')

        # opt_matching = cfg.matching.copy()
        epochdir = Path(cfg.paths.resdir) / f'epoch_{epoch}'

        #-- Find Matches at current epoch --#
        print(f'Run Superglue to find matches at epoch {epoch}')
        opt_matching.output_dir = epochdir
        pair = [
            images[cams[0]].get_image_path(epoch),
            images[cams[1]].get_image_path(epoch)
        ]
        # Call matching function
        matchedPts, matchedDescriptors, matchedPtsScores = match_pair(
            pair, cfg.images.bbox, opt_matching
        )

        # Store matches in features structure
        for jj, cam in enumerate(cams):
            # Dict keys are the cameras names, internal list contain epoches
            features[cam].append(Features())
            features[cam][epoch].append_features({
                'kpts': matchedPts[jj],
                'descr': matchedDescriptors[jj],
                'score': matchedPtsScores[jj]
            })
            # @TODO: Store match confidence!

        #=== Track previous matches at current epoch ===#
        if cfg.proc.do_tracking and epoch > 0:
            print(f'Track points from epoch {epoch-1} to epoch {epoch}')

            trackoutdir = epochdir / f'from_t{epoch-1}'
            opt_tracking['output_dir'] = trackoutdir
            pairs = [
                [images[cams[0]].get_image_path(epoch-1),
                    images[cams[0]].get_image_path(epoch)],
                [images[cams[1]].get_image_path(epoch-1),
                    images[cams[1]].get_image_path(epoch)],
            ]
            prevs = [
                features[cams[0]][epoch-1].get_features_as_dict(),
                features[cams[1]][epoch-1].get_features_as_dict()
            ]
            # Call actual tracking function
            tracked_cam0, tracked_cam1 = track_matches(
                pairs, cfg.images.bbox, prevs, opt_tracking)
            # @TODO: keep track of the epoch in which feature is matched
            # @TODO: Check bounding box in tracking
            # @TODO: clean tracking code

            # Store all matches in features structure
            features[cams[0]][epoch].append_features(tracked_cam0)
            features[cams[1]][epoch].append_features(tracked_cam1)

        # Run Pydegensac to estimate F matrix and reject outliers
        F, inlMask = pydegensac.findFundamentalMatrix(
            features[cams[0]][epoch].get_keypoints(),
            features[cams[1]][epoch].get_keypoints(),
            px_th=1.5, conf=0.99999, max_iters=10000,
            laf_consistensy_coef=-1.0,
            error_type='sampson',
            symmetric_error_check=True,
            enable_degeneracy_check=True,
        )
        print(f'Matching at epoch {epoch}: pydegensac found {inlMask.sum()} \
            inliers ({inlMask.sum()*100/len(features[cams[0]][epoch]):.2f}%)')
        features[cams[0]][epoch].remove_outliers_features(inlMask)
        features[cams[1]][epoch].remove_outliers_features(inlMask)

        # Write matched points to disk
        im_stems = images[cams[0]].get_image_stem(
            epoch), images[cams[1]].get_image_stem(epoch)
        for jj, cam in enumerate(cams):
            features[cam][epoch].save_as_txt(
                epochdir / f'{im_stems[jj]}_mktps.txt')
        with open(epochdir / f'{im_stems[0]}_{im_stems[1]}_features.pickle', 'wb') as f:
            pickle.dump(features, f, protocol=pickle.HIGHEST_PROTOCOL)
        last_match_path = create_directory('res/last_epoch')
        with open(last_match_path / 'last_features.pickle', 'wb') as f:
            pickle.dump(features, f, protocol=pickle.HIGHEST_PROTOCOL)

        print('Matching completed')

elif not features[cams[0]]:
    last_match_path = 'res/last_epoch/last_features.pickle'
    with open(last_match_path, 'rb') as f:
        features = pickle.load(f)
        print("Loaded previous matches")
else:
    print("Features already present, nothing was changed.")


Input parameters are valid.

Image datastores created successfully.
Loaded previous matches


In [5]:
''' SfM '''

# Initialize variables @TODO: build function for variable inizialization
cameras = dict.fromkeys(cams)
cameras[cams[0]], cameras[cams[1]] = [], []
pcd = []
tform = []
# @TODO: store this information in exif inside an Image Class
im_height, im_width = 4000, 6000


# Read target image coordinates and object coordinates 
# @TODO: include onbject coordinate reading in methods
targets = [Targets(cam_id=[0, 1],  im_coord_path=cfg.georef.target_paths),]
targets[0].obj_coor = np.loadtxt("data/target_world.csv",
                              delimiter=",")

# Camera baseline
baseline_world = np.linalg.norm(
    cfg.georef.camera_centers_world[0] - cfg.georef.camera_centers_world[1]
    )

for epoch in cfg.proc.epoch_to_process:
    # epoch = 0
    print(f'Reconstructing epoch {epoch}...')

    # Initialize Intrinsics
    ''' Inizialize Camera Intrinsics at every epoch setting them equal to
        the reference cameras ones.
    '''
    # @TODO: replace append with insert or a more robust data structure...
    for cam in cams:
        cameras[cam].append(
            Camera(
                width=im_width,
                height=im_height,
                calib_path=cfg.paths.caldir / f'{cam}.txt'
            )
        )
            
    #--- Perform Space resection of the first camera by using GCPs ---#
    ''' Initialize Single_camera_geometry class with a cameras object'''
    # Temporary block to copy targets infos for every epoch
    if len(targets) == 1:
        targets.append(targets[0])
    
    space_resection = Single_camera_geometry(cameras[cams[0]][epoch])
    space_resection.space_resection(
        targets[epoch].get_im_coord(cam_id=0),
        targets[epoch].get_obj_coord()
        )
    # Store result in camera 0 object
    cameras[cams[0]][epoch] = space_resection.camera
    
    #--- Perform Relative orientation of the two cameras ---#
    ''' Initialize Two_view_geometry class with a list containing the two cameras and a list contaning the matched features location on each camera.
    '''
    relative_ori = Two_view_geometry(
        [cameras[cams[0]][epoch], cameras[cams[1]][epoch]],
        [features[cams[0]][epoch].get_keypoints(),
         features[cams[1]][epoch].get_keypoints()],
    )
    relative_ori.relative_orientation(
        threshold=1.5, 
        confidence=0.999999, 
        scale_factor=261.60624502293524,
        )
    # Store result in camera 1 object
    cameras[cams[1]][epoch] = relative_ori.cameras[1]
    
    # cam_baseline = np.linalg.norm(
    #     cameras[cams[0]][0].get_C_from_pose() -
    #     cameras[cams[1]][0].get_C_from_pose()
    # )
    # scale_fct = baseline_world / cam_baseline
    # scale_fct = 261.60624502293524
    
    # Fix the EO of both the cameras as those estimated in the first epoch
    # if epoch > 0:
    #     for cam in cams:
    #         cameras[cam][epoch] = cameras[cam][0]
    #     print('Camera exterior orientation fixed to that of the master cameras.')

    #--- Triangulate Points ---#
    ''' Initialize Triangulate class with a list containing the two cameras
        and a list contaning the matched features location on each camera.
        Triangulated points are saved as points3d proprierty of the
        Triangulate object (eg., triangulation.points3d)
    '''
    triangulation = Triangulate(
        [cameras[cams[0]][epoch], 
         cameras[cams[1]][epoch]],
        [features[cams[0]][epoch].get_keypoints(),
         features[cams[1]][epoch].get_keypoints()]
    )
    triangulation.triangulate_two_views()
    triangulation.interpolate_colors_from_image(
        images[cams[1]][epoch],
        cameras[cams[1]][epoch],
        convert_BRG2RGB=True,
    )

  
    # Create point cloud and save .ply to disk
    pcd_epc = create_point_cloud(
        triangulation.points3d, triangulation.colors)

    # Filter outliers in point cloud with SOR filter
    if cfg.other.do_SOR_filter:
        _, ind = pcd_epc.remove_statistical_outlier(nb_neighbors=10,
                                                    std_ratio=3.0)
        #     display_pc_inliers(pcd_epc, ind)
        pcd_epc = pcd_epc.select_by_index(ind)
        print("Point cloud filtered by Statistical Oulier Removal")

    # Write point cloud to disk and store it in Point Cloud List
    write_ply(pcd_epc, f'res/pt_clouds/sparse_pts_t{epoch}.ply')
    pcd.append(pcd_epc)

print('Done.')


# Visualize point cloud
display_point_cloud(
    pcd,
    [cameras[cams[0]][epoch], cameras[cams[1]][epoch]],
    plot_scale=10,
)

Reconstructing epoch 0...
Using OPENCV camera model.
Using OPENCV camera model.
Space resection succeded. Number of inlier points: 12/14
Relative Orientation - valid points: 3285/3769
Point triangulation succeded: 1.0.
Loaded image IMG_1289.jpg
Points color interpolated
Point cloud filtered by Statistical Oulier Removal
Done.


In [6]:

# sparse_loc = o3d.io.read_point_cloud('data/belvedere_20220728_sparse_LOC.ply')
# display_point_cloud(
#     [pcd[0], sparse_loc],
#     [cameras[cams[0]][epoch], cameras[cams[1]][epoch]],
#     plot_scale=10,
# )


#  Coregistration
# if cfg.proc.do_coregistration:
#     # @TODO: make wrappers to handle RS transformations

#     # Triangulate targets
#     triangulate = Triangulate(
#         [cameras[cams[0]][epoch], cameras[cams[1]][epoch]],
#         [targets.get_im_coord(0)[epoch],
#          targets.get_im_coord(1)[epoch]],
#     )
#     targets.append_obj_cord(triangulate.triangulate_two_views())

#     # Estimate rigid body transformation between first epoch RS and current epoch RS
#     # @TODO: make a wrapper for this
#     v0 = np.concatenate((cameras[cams[0]][0].C,
#                          cameras[cams[1]][0].C,
#                          targets.get_obj_coord()[0].reshape(3, 1),
#                          ), axis=1)
#     v1 = np.concatenate((cameras[cams[0]][epoch].C,
#                          cameras[cams[1]][epoch].C,
#                          targets.get_obj_coord()[epoch].reshape(3, 1),
#                          ), axis=1)
#     tform.append(affine_matrix_from_points(
#         v1, v0, shear=False, scale=False, usesvd=True))
#     print('Point cloud coregistered based on {len(v0)} points.')
# elif epoch > 0:
#        # Fix the EO of both the cameras as those estimated in the first epoch
#        for cam in cams:
#             cameras[cam][epoch] = cameras[cam][0]
#         print('Camera exterior orientation fixed to that of the master cameras.')

# if cfg.proc.do_coregistration:
#     # Apply rigid body transformation to triangulated points
#     # @TODO: make wrapper for apply transformation to arrays
#     pts = np.dot(tform[epoch],
#                  convert_to_homogeneous(triangulation.points3d.T)
#                  )
#     triangulation.points3d = convert_from_homogeneous(pts).T


In [11]:
'''Export image coordinates'''

# from lib.io import export_keypoints_for_calge
def export_keypoints_for_calge(
    filename: str,
    features: Features,
    imageds: Imageds,
    epoch: int = None,
    pixel_size_micron: float = None,
) -> None:
    """ Write keypoints image coordinates to csv file,
    sort by camera, as follows:
    cam1, kpt1, x, y
    cam1, kpt2, x, y
    ...
    cam1, kptM, x, y
    cam2, kpt1, x, y
    ....
    camN, kptM, x, y

    Args:
        filename (str): path of the output csv file
        features (calsses.Features):
        imageds (calsses.Imageds):
        epoch (int, default = None):
        pixel_size_micron (float, default = None) [micron]
    """

    if epoch is not None:

        cams = list(imageds.keys())

        # Write header to file
        file = open(filename, "w")
        if pixel_size_micron is not None:
            file.write("image_name, feature_id, xi, eta\n")
            img = imageds[cams[0]][epoch]
            img_size = img.shape[:2]
        else:
            file.write("image_name, feature_id, x, y\n")

        for cam in cams:
            image_name = imageds[cam].get_image_name(epoch)
            
            # Write image name line 
            # NB: must be manually modified if it contains characters of symbols
            file.write(f"{image_name}\n")

            for id, kpt in enumerate(features[cam][epoch].get_keypoints()):
                x, y = kpt

                # If pixel_size_micron is not empty, convert image coordinates from x-y (row,column) image coordinate system to xi-eta image coordinate system (origin at the center of the image, xi towards right, eta upwards)
                if pixel_size_micron is not None:
                    xi = (x - img_size[1]/2) * pixel_size_micron
                    eta = (img_size[0]/2 - y) * pixel_size_micron

                    file.write(
                        f"{id:05}{xi:10.1f}{eta:15.1f} \n")
                else:
                    file.write(
                        f"{id:05}{x:10.1f}{y:15.1f} \n")
            # Write end image line
            file.write(f"-99\n")

        file.close()
        print("Marker exported successfully")
    else:
        print('please, provide the epoch number.')
        return

''' Export object coordinates'''
def export_points3D_for_calge(
    filename: str,
    points3D: np.ndarray,
) -> None:
    """ Write 3D world coordinates of matched points to csv file,
    sort by camera, as follows:
    marker1, X, Y, Z
    ...
    markerM, X, Y, Z

    Args:
        filename (str): path of the output csv file
        points3D (np.ndarray):
    """

    # Write header to file
    file = open(filename, "w")
    file.write("point_id, X, Y, Z\n")
    
    for id, pt in enumerate(points3D):
        file.write(f"{id:05}{pt[0]:20.4f}{pt[1]:25.4f}{pt[2]:24.4f}\n")

    file.close()
    print("Points exported successfully")


epoch = 0
export_keypoints_for_calge('simulaCalge/keypoints_280722.txt',
                           features=features,
                           imageds=images,
                           epoch=epoch,
                           pixel_size_micron=3.773
                           )
export_points3D_for_calge('simulaCalge/points3D_280722.txt',
                           points3D=np.asarray(pcd[epoch].points)
                           )


Loaded image IMG_2814.jpg
Marker exported successfully
Points exported successfully


In [12]:
# import open3d as o3d

# # from traceback import print_tb

# from lib.visualization import display_point_cloud
# from thirdparty.transformations import euler_from_matrix, euler_matrix

# cam1_loc = ref_cams[cams[0]]
# cam2_loc = ref_cams[cams[1]]

# # IMG_1289.jpg
# # loc = np.array([312.930747,300.536595,135.159918]).reshape(3,-1)
# # angles = np.deg2rad(np.array([-83.914310,80.177810,174.222339]))

# # IMG_2814.jpg
# loc = np.array([151.703845, 99.171778, 91.618363]).reshape(3, -1)
# ang = np.deg2rad(np.array([100.281584, 54.781799, -12.652574]))
# R1_world2cam = euler_matrix(ang[0], ang[1], ang[2])[:3, :3]
# # print(R_world2cam1_loc)

# # Read point cloud in LOC RS
# sparse_loc = o3d.io.read_point_cloud('data/belvedere_20220728_sparse_LOC.ply')

# # Define cam1_locera EO


# def build_pose_matrix(R: np.ndarray, C: np.ndarray) -> np.ndarray:
#     # Check for input dimensions
#     if R.shape != (3, 3):
#         raise ValueError(
#             'Wrong dimension of the R matrix. It must be a 3x3 numpy array')
#     if C.shape == (3,) or C.shape == (1, 3):
#         C = C.T
#     elif C.shape != (3, 1):
#         raise ValueError(
#             'Wrong dimension of the C vector. It must be a 3x1 or a 1x3 numpy array')

#     pose = np.eye(4)
#     pose[0:3, 0:3] = R
#     pose[0:3, 3:4] = C
#     return pose


# def compute_camera_EO(camera: Camera,
#                       extrinsics: np.ndarray = None,
#                       pose: np.ndarray = None
#                       ) -> Camera:

#     if extrinsics is not None:
#         camera.extrinsics = extrinsics
#         camera.extrinsics_to_pose()
#         camera.update_camera_from_extrinsics()
#         return camera

#     if pose is not None:
#         camera.pose = pose
#         camera.pose_to_extrinsics()
#         camera.update_camera_from_extrinsics()
#         return camera

#     else:
#         raise ValueError(
#             'Not enough data to build Camera External Orientation matrixes.')


# cam1toWorld = build_pose_matrix(R1_world2cam.T, loc) @ cameras[cams[0]][0].pose
# cam1_loc = compute_camera_EO(cam1_loc, pose=cam1toWorld)

# cam2toWorld = cam1_loc.pose @ cameras[cams[1]][0].pose
# cam2_loc = compute_camera_EO(cam2_loc, pose=cam2toWorld)


# # Visualize point cloud
# display_point_cloud(
#     sparse_loc,
#     [cam1_loc, cam2_loc],
#     plot_scale=7,
# )
