In [43]:
import open3d as o3d
import e57
import numpy as np
import laspy

# source = o3d.io.read_point_cloud("source.e57")
# target = o3d.io.read_point_cloud("target.pcd")

In [2]:
def sample_test(sou, **kwargs):
    print(kwargs.keys())
    print(kwargs['voxel_size'])
    print(kwargs['distance_threshold'])

In [3]:
sample_test(11, voxel_size=10, distance_threshold=20)

dict_keys(['voxel_size', 'distance_threshold'])
10
20


In [64]:
def read_e57_as_pcd(pcd_path):
    pcd_e57 = e57.read_points(pcd_path)

    point_cloud = o3d.geometry.PointCloud()
    point_cloud.points = o3d.utility.Vector3dVector(pcd_e57.points)
    point_cloud.colors = o3d.utility.Vector3dVector(pcd_e57.color)
    return point_cloud

def merge_e57_point_clouds(source_path, target_path, output_path, **kwargs):
    """
    Merge two overlapping .e57 point clouds into a single point cloud.

    Args:
        source_path (str): Path to the source .e57 file.
        target_path (str): Path to the target .e57 file.
        output_path (str): Path to save the merged point cloud as .e57 or other formats.

    Returns:
        merged_pcd (open3d.geometry.PointCloud): Merged point cloud.
    """
    # Load source and target .e57 files
    print("Loading point clouds...")
    
    source_pcd = read_e57_as_pcd(source_path)
    target_pcd = read_e57_as_pcd(target_path)
    # source_pcd = o3d.io.read_point_cloud(source_path)
    # target_pcd = o3d.io.read_point_cloud(target_path)

    # Downsample point clouds for faster processing
    print("Downsampling point clouds...")
    if kwargs['voxel_size'] is not None:
        voxel_size = kwargs['voxel_size']
    else:
        voxel_size = 0.05  # Adjust voxel size as needed

    if kwargs['distance_threshold'] is not None:
        distance_threshold = kwargs['distance_threshold']
    else:
        distance_threshold = voxel_size * 2  # Adjust threshold as needed

    source_down = source_pcd.voxel_down_sample(voxel_size)
    target_down = target_pcd.voxel_down_sample(voxel_size)

    # Compute FPFH feature descriptors for both point clouds
    print("Computing FPFH features...")
    source_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        source_down, o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 5, max_nn=100)
    )
    target_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        target_down, o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 5, max_nn=100)
    )

    # Perform RANSAC-based initial alignment
    print("Performing initial alignment with RANSAC...")
    
    result_ransac = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        source_down, target_down, source_fpfh, target_fpfh,
        max_correspondence_distance=distance_threshold,
        estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint(),
        ransac_n=4,
        criteria=o3d.pipelines.registration.RANSACConvergenceCriteria(400000, 500)
    )

    print("Initial alignment result:")
    print(result_ransac)

    # Perform fine-tuning with ICP
    print("Refining alignment with ICP...")
    result_icp = o3d.pipelines.registration.registration_icp(
        source_pcd, target_pcd, max_correspondence_distance=voxel_size,
        init=result_ransac.transformation,
        estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint()
    )

    print("ICP alignment result:")
    print(result_icp)

    # Apply transformation to source point cloud
    print("Applying transformation to source point cloud...")
    source_pcd.transform(result_icp.transformation)

    # Merge the point clouds
    print("Merging point clouds...")
    merged_pcd = source_pcd + target_pcd

    # Save the merged point cloud
    print(f"Saving merged point cloud to {output_path}...")
    o3d.io.write_point_cloud(output_path, merged_pcd)

    print("Point clouds merged successfully.")
    return merged_pcd

def load_e57_to_o3d(filename):
    """
    pye57를 사용하여 E57 파일에서 포인트 클라우드를 읽고,
    Open3D의 PointCloud 객체로 변환하는 함수.
    """
    # E57 파일 열기 (pye57 API는 다를 수 있음)
    e57_file = e57.read(filename)
    
    # 예제에서는 get_points() 메서드를 통해 numpy 배열 (N x D)을 반환한다고 가정
    # 첫 3열은 x, y, z 좌표, (옵션) 4~6열은 RGB 정보 (0~255 범위)
    points_data = e57_file.get_points()  
    # 예외 처리나 추가 전처리가 필요한 경우 여기에 추가
    
    # Open3D PointCloud 객체 생성
    pcd = o3d.geometry.PointCloud()
    # 좌표 할당 (x,y,z)
    pcd.points = o3d.utility.Vector3dVector(points_data[:, :3])
    
    # RGB 정보가 있다면 (예: 컬럼 수가 6 이상)
    if points_data.shape[1] >= 6:
        # 0~255 범위를 0~1로 정규화하여 할당
        colors = points_data[:, 3:6] / 255.0
        pcd.colors = o3d.utility.Vector3dVector(colors)
    
    return pcd

def preprocess_point_cloud(pcd, voxel_size):
    """
    다운샘플링 및 노멀 추정 수행
    """
    print(":: Downsampling with voxel size %.3f." % voxel_size)
    pcd_down = pcd.voxel_down_sample(voxel_size)
    
    print(":: Estimating normals.")
    pcd_down.estimate_normals(
        search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 2.0, max_nn=30))
    return pcd_down

def run_icp(source, target, voxel_size):
    """
    o3d.pipelines.registration의 ICP를 사용한 정합 수행
    """
    # 전처리: 다운샘플링 및 노멀 계산
    source_down = preprocess_point_cloud(source, voxel_size)
    target_down = preprocess_point_cloud(target, voxel_size)
    
    # ICP 초기 매개변수
    threshold = voxel_size * 1.5
    trans_init = np.eye(4)
    
    print(":: Running ICP registration.")
    reg_result = o3d.pipelines.registration.registration_icp(
        source_down, target_down, threshold, trans_init,
        o3d.pipelines.registration.TransformationEstimationPointToPoint())
    
    print(":: ICP fitness:", reg_result.fitness)
    print(":: ICP inlier RMSE:", reg_result.inlier_rmse)
    print(":: Transformation:\n", reg_result.transformation)
    print(":: Transformation:\n", reg_result.transformation)
    return reg_result.transformation

def write_pcd2las(pcd, output_path):
    # point_format 3는 RGB 정보를 포함하는 포맷 (LAS 1.4에서 지원)
    header = laspy.LasHeader(point_format=3, version="1.4")
    # 좌표 값의 정밀도를 조정하기 위한 scale과 offset (데이터 범위에 맞게 조절)
    header.scale = [0.001, 0.001, 0.001]
    header.offset = [0, 0, 0]

    # LAS 데이터 객체 생성
    las = laspy.LasData(header)

    # 좌표값 할당
    las.x = np.asarray(pcd.points)[:,0]
    las.y = np.asarray(pcd.points)[:,1]
    las.z = np.asarray(pcd.points)[:,2]
    las.red   = (np.asarray(pcd.colors)[:,0]*255.).astype(np.int8)
    las.green = (np.asarray(pcd.colors)[:,1]*255.).astype(np.int8)
    las.blue  = (np.asarray(pcd.colors)[:,2]*255.).astype(np.int8)
    las.write(output_path)

voxel_size:다운샘플링 크기와 정렬의 정밀도에 영향을 미칩니다. 겹치는 영역이 작거나 더 세밀한 정렬이 필요할 경우 값을 줄이세요.

distance_threshold:RANSAC 및 ICP 정렬 시의 대응점 거리 허용값. 겹치는 영역이 적다면 값을 늘려야 합니다.

max_correspondence_distance: ICP의 대응점 거리 허용값으로, 병합 정밀도에 영향을 줍니다.

In [None]:
# Example usage
source_path = "./../daeyoung/CustomPCD/(Test)LectureRoom2_1_cleaned.e57"
target_path = "./../daeyoung/CustomPCD/(Test)LectureRoom2_2_cleaned.e57"
output_path = "./../daeyoung/CustomPCD/(Test)LectureRoom2_merged.e57"

merged_pcd = merge_e57_point_clouds(source_path, target_path, output_path, voxel_size=10, distance_threshold=20)

# # If overlapped points erased,
# merged_pcd = merged_pcd.voxel_down_sample(voxel_size=0.01)

# # Statistical noise removal
# cl, ind = merged_pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0)
# merged_pcd = merged_pcd.select_by_index(ind)

# Visualize the merged point cloud
o3d.visualization.draw_geometries([merged_pcd])

## Upper ones not work, here's new ones

In [33]:
# def load_e57_to_o3d(filename):
#     """
#     pye57를 사용하여 E57 파일에서 포인트 클라우드를 읽고,
#     Open3D의 PointCloud 객체로 변환하는 함수.
#     """
#     # E57 파일 열기 (pye57 API는 다를 수 있음)
#     e57_file = e57.read(filename)
    
#     # 예제에서는 get_points() 메서드를 통해 numpy 배열 (N x D)을 반환한다고 가정
#     # 첫 3열은 x, y, z 좌표, (옵션) 4~6열은 RGB 정보 (0~255 범위)
#     points_data = e57_file.get_points()  
#     # 예외 처리나 추가 전처리가 필요한 경우 여기에 추가
    
#     # Open3D PointCloud 객체 생성
#     pcd = o3d.geometry.PointCloud()
#     # 좌표 할당 (x,y,z)
#     pcd.points = o3d.utility.Vector3dVector(points_data[:, :3])
    
#     # RGB 정보가 있다면 (예: 컬럼 수가 6 이상)
#     if points_data.shape[1] >= 6:
#         # 0~255 범위를 0~1로 정규화하여 할당
#         colors = points_data[:, 3:6] / 255.0
#         pcd.colors = o3d.utility.Vector3dVector(colors)
    
#     return pcd

# def preprocess_point_cloud(pcd, voxel_size):
#     """
#     다운샘플링 및 노멀 추정 수행
#     """
#     print(":: Downsampling with voxel size %.3f." % voxel_size)
#     pcd_down = pcd.voxel_down_sample(voxel_size)
    
#     print(":: Estimating normals.")
#     pcd_down.estimate_normals(
#         search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 2.0, max_nn=30))
#     return pcd_down

# def run_icp(source, target, voxel_size):
#     """
#     o3d.pipelines.registration의 ICP를 사용한 정합 수행
#     """
#     # 전처리: 다운샘플링 및 노멀 계산
#     source_down = preprocess_point_cloud(source, voxel_size)
#     target_down = preprocess_point_cloud(target, voxel_size)
    
#     # ICP 초기 매개변수
#     threshold = voxel_size * 1.5
#     trans_init = np.eye(4)
    
#     print(":: Running ICP registration.")
#     reg_result = o3d.pipelines.registration.registration_icp(
#         source_down, target_down, threshold, trans_init,
#         o3d.pipelines.registration.TransformationEstimationPointToPoint())
    
#     print(":: ICP fitness:", reg_result.fitness)
#     print(":: ICP inlier RMSE:", reg_result.inlier_rmse)
#     print(":: Transformation:\n", reg_result.transformation)
#     print(":: Transformation:\n", reg_result.transformation)
#     return reg_result.transformation

# def write_pcd2las(pcd, output_path):
#     # point_format 3는 RGB 정보를 포함하는 포맷 (LAS 1.4에서 지원)
#     header = laspy.LasHeader(point_format=3, version="1.4")
#     # 좌표 값의 정밀도를 조정하기 위한 scale과 offset (데이터 범위에 맞게 조절)
#     header.scale = [0.001, 0.001, 0.001]
#     header.offset = [0, 0, 0]

#     # LAS 데이터 객체 생성
#     las = laspy.LasData(header)

#     # 좌표값 할당
#     las.x = np.asarray(pcd.points)[0]
#     las.y = np.asarray(pcd.points)[1]
#     las.z = np.asarray(pcd.points)[2]
#     las.red   = np.asarray(pcd.colors)[0]
#     las.green = np.asarray(pcd.colors)[1]
#     las.blue  = np.asarray(pcd.colors)[2]
#     las.write(output_path)

In [None]:
source_file = "./../daeyoung/CustomPCD/(Test)LectureRoom2_1_cleaned.e57"
target_file = "./../daeyoung/CustomPCD/(Test)LectureRoom2_2_cleaned.e57"
output_path = "./../daeyoung/CustomPCD/(Test)LectureRoom2_merged.e57"

print("Loading source point cloud from:", source_file)
source_pcd = read_e57_as_pcd(source_file)

print("Loading target point cloud from:", target_file)
target_pcd = read_e57_as_pcd(target_file)

# registration을 위한 voxel 크기 설정 (데이터 특성에 따라 조절)
voxel_size = 0.05  # 예: 5cm voxel

# 두 포인트 클라우드 정합 수행
transformation = run_icp(source_pcd, target_pcd, voxel_size)

# 소스 PCD 변환환
source_pcd.transform(transformation)

#포인트 클라우드 합침
merged_pcd = source_pcd + target_pcd
# merged_pcd_down = merged_pcd.voxel_down_sample(voxel_size) ## If needed

o3d.io.write_point_cloud("merged_point_cloud.ply", merged_pcd)

Loading source point cloud from: ./../daeyoung/CustomPCD/(Test)LectureRoom2_1_cleaned.e57
Loading target point cloud from: ./../daeyoung/CustomPCD/(Test)LectureRoom2_2_cleaned.e57
:: Downsampling with voxel size 0.050.
:: Estimating normals.
:: Downsampling with voxel size 0.050.
:: Estimating normals.
:: Running ICP registration.
:: ICP fitness: 0.9711419156004429
:: ICP inlier RMSE: 0.015293299127615446
:: Transformation:
 [[ 9.99999956e-01 -8.04108071e-05 -2.86265562e-04 -1.46346269e-02]
 [ 8.04230759e-05  9.99999996e-01  4.28469193e-05  3.82017844e-03]
 [ 2.86262115e-04 -4.28699397e-05  9.99999958e-01  2.75107145e-04]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  1.00000000e+00]]
:: Transformation:
 [[ 9.99999956e-01 -8.04108071e-05 -2.86265562e-04 -1.46346269e-02]
 [ 8.04230759e-05  9.99999996e-01  4.28469193e-05  3.82017844e-03]
 [ 2.86262115e-04 -4.28699397e-05  9.99999958e-01  2.75107145e-04]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  1.00000000e+00]]


In [65]:
write_pcd2las(merged_pcd, "./../data/outputs/predPCDs/merged_point_cloud_LectureRoom.las")