# 0. Cam parameter

In [1]:
import xml.etree.ElementTree as ET
import os
from PIL import Image
import numpy as np
import cv2
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from skimage import io

def read_camera_transforms(xml_file_path):
    tree = ET.parse(xml_file_path)
    root = tree.getroot()
    sensor_calibrations = {}
    camera_info = []
    for camera in root.iter('camera'):
        camera_id = camera.attrib.get('id')
        sensor_id = camera.attrib.get('sensor_id', 'default')
        label = camera.attrib.get('label', 'No Label')
        transform_text = camera.find('transform').text if camera.find('transform') is not None else "N/A"
        camera_info.append({
            'camera_id': camera_id,
            'label': label,
            'sensor_id': sensor_id,
            'transform': transform_text
        })
    return camera_info

file_path = r'camera.xml'
camera_info = read_camera_transforms(file_path)

In [2]:
import xml.etree.ElementTree as ET

def read_camera_calibrations(xml_file_path):
    tree = ET.parse(xml_file_path)
    root = tree.getroot()
    sensor_calibrations = {}
    for calibration in root.iter('calibration'):
        f = calibration.find('f').text if calibration.find('f') is not None else '0'
        cx = calibration.find('cx').text if calibration.find('cx') is not None else '0'
        cy = calibration.find('cy').text if calibration.find('cy') is not None else '0'
        k1 = calibration.find('k1').text if calibration.find('k1') is not None else '0'
        k2 = calibration.find('k2').text if calibration.find('k2') is not None else '0'
        k3 = calibration.find('k3').text if calibration.find('k3') is not None else '0'
        k4 = calibration.find('k4').text if calibration.find('k4') is not None else '0'
        p1 = calibration.find('p1').text if calibration.find('p1') is not None else '0'
        p2 = calibration.find('p2').text if calibration.find('p2') is not None else '0'
        b1 = calibration.find('b1').text if calibration.find('b1') is not None else '0'
        b2 = calibration.find('b2').text if calibration.find('b2') is not None else '0'                
        sensor_calibrations = {
            'f': f, 
            'cx': cx, 
            'cy': cy,
            'k1':k1,
            'k2':k2,
            'k3':k3,
            'k4':k4, 
            'p1':p1,
            'p2':p2,            
            'b1':b1,
            'b2':b2         
        }
    return sensor_calibrations

file_path = r'camera.xml'
camera_para = read_camera_calibrations(file_path)
print(camera_para)   

{'f': '2822.7761746467008', 'cx': '-1.4846167111733253', 'cy': '-19.719878146815457', 'k1': '0.03995859213266123', 'k2': '-0.0067492350191110757', 'k3': '-0.040375477160038357', 'k4': '0', 'p1': '4.3724736785751615e-05', 'p2': '-0.0014542416392983482', 'b1': '0', 'b2': '0'}


# 1. Read Annotation 

In [3]:
import json

def load_and_extract_data(file_path, index):
    
    with open(file_path, 'r', encoding='utf-8') as file:
        data = json.load(file)
    if index < 0 or index > len(data):
        return "Index is out of range, please enter a valid index."
    extracted_data = data[index]
    return extracted_data

In [4]:
id_mapping = {
    '1a': 0, '1b': 1, '1c': 2, '1d': 3, '2a': 4, '2b': 5, '2c': 6, '2d': 7, '2e': 8, '2f': 9, '3a': 10, '3b': 11, '3c': 12, '3d': 13, '3e': 14, '4a': 15, '4b': 16, '5a': 17, '5b': 18, '5c': 19, '6a': 20, '6b': 21, '7a': 22, '7b': 23, '7c': 24, '7d': 25, '7e': 26, '7f': 27, 
    '7g': 28, '7h': 29, '7i': 30, '7j': 31, '7m': 32, '8a': 33, '9a': 34, '10a': 35, '12a': 36, '12b': 37, '13a': 38, '13b': 39, '13c': 40, '14a': 41, '14b': 42, '15a': 43, '15b': 44, '15c': 45, '16a': 46, '16b': 47, '16c': 48, '16d': 49, '16e': 50, '17a': 51, '17b': 52,
    '17c': 53, '17f': 54, '18a': 55, '18b': 56, '18c': 57, '18d': 58, '18e': 59, '18f': 60, '18g': 61, '19a': 62, '19b': 63, '19c': 64, '20a': 65, '20b': 66, '20c': 67, '20h': 68, '20i': 69, '21a': 70, '21b': 71, '21c': 72, '23a': 73, '23b': 74, '23c': 75, '23d': 76,
    '23e': 77, '23f': 78, '23g': 79, '23h': 80, '24a': 81, '24b': 82, '24c': 83, '24d': 84, '24e': 85, '24f': 86, '24g': 87, '24h': 88, '25a': 89, '25b': 90, '25c': 91, '26a': 92, '27a': 93, '27b': 94, '27d': 95, '27e': 96, '28a': 97, '28b': 98, '29a': 99, '30a': 100,
    '31a': 101, '32a': 102, '33a': 103, '34a': 104, '35a': 105, '37a': 106, '37b': 107, '44a': 108, '44b': 109, '44c': 110, '44d': 111, '44e': 112, '45a': 113, '45b': 114, '45c': 115, '46a': 116, '46b': 117
}
reverse_mapping = {value: key for key, value in id_mapping.items()}

In [5]:
class_priority = id_mapping

def GenerateMask(camera_index, camera_info, class_priority):
    print(camera_info[camera_index])
    camera_id = int(camera_info[camera_index]['camera_id'])
    file_path = r'consolidated_results.json'
    index_to_extract = camera_id
    result = load_and_extract_data(file_path, index_to_extract)
    print(result)
    data = result
    image_name = data['image_file']
    folder_path = r'images'
    file_path = os.path.join(folder_path, image_name)
    if os.path.exists(file_path):
        image = Image.open(file_path)
        image_info = f"Image Name: {image_name}\nImage Size (Pixels): {image.size}"
        print(image_info)
    else:
        print(f"Image {image_name} not found in the folder.")
    anns_class = []
    anns_maskbound = []    
    class_id = None
    for pred in data['predictions']:
        class_id = int(pred['class'])
        bbox = pred['bbox'][0]
        class_label = reverse_mapping.get(class_id, 'Unknown')
        anns_class.append(class_label)
        anns_maskbound.append(bbox)
    
    if class_id is None:
        print("No valid class_id found, skipping.")
        return None, None, None, None
    print("anns_class:", anns_class)

    text = image_info
    lines = text.split('\n')
    file_name_line = lines[0]
    size_line = lines[1]
    img_name = file_name_line.split(': ')[1]
    img_label = img_name.split('.')[0]
    size_info = size_line.split(': ')[1].strip('()')
    img_width, img_height = size_info.split(', ')
    img_height = int(img_height)
    img_width = int(img_width)
    mask = np.zeros((img_height, img_width), dtype=np.uint8)
    anns_combined = list(zip(anns_class, anns_maskbound))
    anns_sorted = sorted(anns_combined, key=lambda x: class_priority.get(x[0], 99))
    print(anns_sorted)
    
    for cls, boundaries in anns_sorted:
        for boundary in boundaries:
            print("Boundary:", boundary)
            if len(boundaries) == 4:
                x1, y1, x2, y2 = boundaries
                pts = np.array([[x1, y1], [x2, y1], [x2, y2], [x1, y2]], dtype=np.int32).reshape((-1, 1, 2))
                cv2.fillPoly(mask, [pts], id_mapping[cls])

    return img_width, img_height, img_name, mask

# 2. Read Point Cloud

In [15]:
import open3d as o3dpoint
import numpy as np

point_cloud_file = r'point cloud.ply'

pc = o3dpoint.io.read_point_cloud(point_cloud_file)

point_cloud=np.asarray(pc.points)
ds_points=point_cloud[::300]

In [8]:
o3dpoint.visualization.draw_geometries([pc])

# 3. Establish Projection Model

## Reference:
1. Metashape Projection Model: https://www.agisoft.com/pdf/metashape-pro_2_1_en.pdf
2. Pinhole camera model: https://kornia.readthedocs.io/en/v0.1.2/pinhole.html
3. Metashape code for Brown's distortion model: https://www.agisoft.com/forum/index.php?topic=13117.0
4. Brown's distortion model in OpenCV: https://docs.opencv.org/4.x/d4/d94/tutorial_camera_calibration.html

In [9]:
from numpy.linalg import inv
from numpy import array
import math


def convert_point_to_pixel(point, camera_info, camera_para, img_width, img_height,img_name):
    img_name_label=img_name.split('.')[0]
    cam=next((camera for camera in camera_info if camera['label'] == img_name_label), None)

    transform=cam['transform']
    transform_list = list(map(float, transform.split()))
    transform_matrix = np.array(transform_list).reshape(4, 4)
    transform_matrix_3Dto2D=inv(transform_matrix)
    
    point_h = np.append(point, 1)   
    point_2D=transform_matrix_3Dto2D@point_h

    X, Y, Z= point_2D[0:3]
    x=X/Z
    y=Y/Z

    f=float(camera_para['f'])
    cx=float(camera_para['cx'])
    cy=float(camera_para['cy'])
    k1=float(camera_para['k1'])
    k2=float(camera_para['k2'])
    k3=float(camera_para['k3'])
    k4=float(camera_para['k4'])
    p1=float(camera_para['p1'])
    p2=float(camera_para['p2'])
    b1=float(camera_para['b1'])
    b2=float(camera_para['b2'])

    r = math.sqrt(x**2 + y**2)

    x_rad = x*(1 + k1*r**2 + k2*r**4 + k3*r**6 + k4*r**8)
    x_tang = (p1*(r**2 + 2*x**2) + 2*p2*x*y)
    y_rad = y*(1 + k1*r**2 + k2*r**4 + k3*r**6 + k4*r**8)
    y_tang = (p2*(r**2 + 2*y**2) + 2*p1*x*y)

    x_dash = x_rad + x_tang
    y_dash = y_rad + y_tang

    u = img_width*0.5 + cx + x_dash*f + x_dash*b1 + y_dash*b2
    v = img_height *0.5 + cy + y_dash*f
    
    return np.asarray([u,v])

In [10]:
def custom_project_points_to_image(point_cloud, camera_info, camera_para, img_width, img_height, img_name,mask):
    class_info_list = []

    if next_camera_position:
        camera_to_next_vector = [next_camera_position[j] - camera_position[j] for j in range(3)]
    else:
        camera_to_next_vector = None
        
    for point in point_cloud:
        
        pixel= convert_point_to_pixel(point, camera_info, camera_para, img_width, img_height,img_name)

        if (0<= pixel[0]<img_width) and (0<= pixel[1]<img_height):
            class_info = mask[int(pixel[1]), int(pixel[0])]
        else:
            class_info = -1

        if camera_to_next_vector is not None and class_info != -1:
            point_position = point
            camera_to_point_vector = [point_position[j] - camera_position[j] for j in range(3)]

            dot_product = sum(camera_to_next_vector[j] * camera_to_point_vector[j] for j in range(3))
            
            if dot_product <= 0:
                class_info = -1

        class_info_list.append(class_info)

    return class_info_list

In [11]:
import numpy as np
from collections import defaultdict, Counter

def extract_camera_position(transform):

    transform_values = list(map(float, transform.split()))
    
    print("Transform values:", transform_values) 
    
    position = [transform_values[3], transform_values[7], transform_values[11]]
    
    print("Extracted position:", position)
    
    return position

def apply_distance_filter(class_info_list, points, camera_position, max_distance):
    filtered_class_info = []
    valid_counts = []
    for point, class_info in zip(points, class_info_list):
        if np.linalg.norm(np.array(point) - np.array(camera_position)) > max_distance:
            filtered_class_info.append(0)  
            valid_counts.append(0)
        else:
            filtered_class_info.append(class_info)  
            valid_counts.append(1)  
    return filtered_class_info, valid_counts

In [12]:
import numpy as np
from collections import defaultdict, Counter

def majority_vote(values):
    count = Counter(values)
    count.pop(-1, None)  
    count.pop(0, None)   
    if count:
        return count.most_common(1)[0][0]
    else:
        return 0 

In [12]:
# 无向量筛选
# # 初始化存储每个摄像头结果的列表
# all_filtered_class_infos = []
# valid_projections = defaultdict(int)  # 累计每个点在有效范围内的出现次数

# # 处理前 i 个摄像头
# for i in range(139):
#     camera_index = i  
#     img_width, img_height, img_name, mask = GenerateMask(camera_index, camera_info, class_priority)
#     if img_width is None:
#         continue  # 如果返回值为 None，跳过当前循环
#     camera_position = extract_camera_position(camera_info[camera_index]['transform'])
#     class_info_list = custom_project_points_to_image(ds_points, camera_info, camera_para, img_width, img_height, img_name, mask)
    
#     max_distance = 100
#     filtered_class_info, valid_counts = apply_distance_filter(class_info_list, ds_points, camera_position, max_distance)
#     all_filtered_class_infos.append(filtered_class_info)
#     for idx, count in enumerate(valid_counts):
#         valid_projections[idx] += count  # 累加每个点在有效范围内的计数

# # 应用阈值和决策规则
# final_filtered_class_info = []
# threshold = 0
# for idx, labels in enumerate(zip(*all_filtered_class_infos)):  # 使用 zip(*) 转置列表
#     non_neg_labels = [label for label in labels if label not in (-1, 0)]
#     if valid_projections[idx] > 0 and len(non_neg_labels) / valid_projections[idx] > threshold:
#         final_label = majority_vote(non_neg_labels)   
#     else:
#         final_label = -1
#     final_filtered_class_info.append(final_label)

# # 输出最终的分类结果
# print(final_filtered_class_info)

In [16]:
all_filtered_class_infos = []
valid_projections = defaultdict(int) 

total_cameras = 149  

for i in range(total_cameras):
    camera_index = i  
    img_width, img_height, img_name, mask = GenerateMask(camera_index, camera_info, class_priority)
    
    print(len(ds_points))
    print(camera_para)
    print(img_width)
    print(img_height)
    print(img_name)
    print(mask)
    
    if img_width is None:
        continue
    camera_position = extract_camera_position(camera_info[camera_index]['transform'])

    if i < total_cameras - 1: 
        next_camera_position = extract_camera_position(camera_info[camera_index + 1]['transform'])
        camera_to_next_vector = [next_camera_position[j] - camera_position[j] for j in range(3)]
    else:
        camera_to_next_vector = None
        
    class_info_list = custom_project_points_to_image(ds_points, camera_info, camera_para, img_width, img_height, img_name, mask)
    print(len(class_info_list))
    
    max_distance = 6
    filtered_class_info, valid_counts = apply_distance_filter(class_info_list, ds_points, camera_position, max_distance)
    print(len(filtered_class_info))
    
    for idx, count in enumerate(valid_counts):
        valid_projections[idx] += count

    all_filtered_class_infos.append( filtered_class_info)

final_filtered_class_info = []
threshold = 0.1
for idx, labels in enumerate(zip(*all_filtered_class_infos)):
    non_neg_labels = [label for label in labels if label not in (-1, 0)]
    if valid_projections[idx] > 0 and len(non_neg_labels) / valid_projections[idx] > threshold:
        final_label = majority_vote(non_neg_labels)   
    else:
        final_label = -1
    final_filtered_class_info.append(final_label)

print(final_filtered_class_info)

{'camera_id': '0', 'label': 'frame_0000', 'sensor_id': '0', 'transform': '0.99939159700617797 -0.0079063785040407609 -0.033969471770875198 1.1525284631105057 -0.0080032229337263817 -0.99996428552287142 -0.0027158979005930117 0.71342949283616341 -0.033946785652172143 0.0029861107957024902 -0.99941918076761038 45.943179163308237 0 0 0 1'}
{'image_file': 'frame_0000.png', 'predictions': [{'class': 89.0, 'confidence': 0.9420522451400757, 'bbox': [[2544.64697265625, 1307.96875, 2634.67333984375, 1395.8896484375]]}]}
Image Name: frame_0000.png
Image Size (Pixels): (3840, 2160)
anns_class: ['25a']
[('25a', [2544.64697265625, 1307.96875, 2634.67333984375, 1395.8896484375])]
Boundary: 2544.64697265625
Boundary: 1307.96875
Boundary: 2634.67333984375
Boundary: 1395.8896484375
85041
{'f': '2822.7761746467008', 'cx': '-1.4846167111733253', 'cy': '-19.719878146815457', 'k1': '0.03995859213266123', 'k2': '-0.0067492350191110757', 'k3': '-0.040375477160038357', 'k4': '0', 'p1': '4.3724736785751615e-05

In [14]:
# from collections import Counter

# # Transpose the list of lists
# transposed_results = [list(group) for group in zip(*all_filtered_class_infos)]

# # Now transposed_results contains ["r1p1", "r2p1", "r3p1"], ["r1p2", "r2p2", "r3p2"], etc.
# #print(transposed_results)

# points_classes=np.array(transposed_results)

# #print(points_classes)

# weight_dict={
#     -1:0.1,
#     0:0.1,
#     34:3,   # Green 双向交通
#     103: 3,  # Blue 人行横道
#     98: 3,   # Yellow 允许某种车辆泊车
#     16: 3,   # Cyan 注意行人
#     47: 3,   # Magenta 禁止泊车
#     89: 3,   # Brown 必须越过安全岛
#     23: 3,   # Orange 碎石碾压
#     82: 3,   # Purple 应遵直行
#     33: 3,   #让先通过
#     22: 3,
#     11: 3
# }


# def weight_most_common_element(row):
#     count = Counter(row)
#     # Get a list of elements sorted by their frequency (most common first)
#     most_common_elements = count.most_common()
#     #print(most_common_elements)
#     max_ele_index=0
#     max_score=0
#     for i in range(len(most_common_elements)):
#         ele_index=most_common_elements[i][0]
#         ele_score=most_common_elements[i][1]*weight_dict[ele_index]
#         if (ele_score>max_score):
#             max_score=ele_score
#             max_ele_index=ele_index
#     #print(max_ele_index)
#     return max_ele_index
# #print(most_frequent_values)

# most_frequent_values = [weight_most_common_element(row) for row in points_classes]

# print(np.unique(most_frequent_values))

In [17]:
color_map = {
    -1: [0.5, 0.5, 0.5],
    0: [1.0, 0.0, 0.0],
    34: [0.0, 1.0, 0.0],
    103: [0.0, 0.0, 1.0],
    98: [1.0, 1.0, 0.0],
    16: [0.0, 1.0, 1.0],
    47: [1.0, 0.0, 1.0],
    89: [0.6, 0.4, 0.2],
    23: [1.0, 0.5, 0.0],
    82: [0.5, 0.0, 0.5],
    33: [0.1, 0.8, 0.6],
    110: [0.5, 0.7, 0.3],
    11: [0.8, 0.12, 0.9]
}

point_cloud_new = o3dpoint.geometry.PointCloud()

point_cloud_new.points=o3dpoint.utility.Vector3dVector(ds_points)

point_colors = [color_map[class_id] for class_id in final_filtered_class_info]

point_cloud_new.colors=o3dpoint.utility.Vector3dVector(point_colors)

o3dpoint.visualization.draw_geometries([point_cloud_new])

In [24]:
import open3d as o3d
import numpy as np

def save_to_ply(ds_points, labels, save_path):

    points = np.array(ds_points)
    colors = np.zeros((points.shape[0], 3))

    for i, label in enumerate(labels):
        if label == -1:
            colors[i] = [0.5, 0.5, 0.5]
        else:
            np.random.seed(label)
            colors[i] = np.random.rand(3) 

    point_cloud = o3d.geometry.PointCloud()
    point_cloud.points = o3d.utility.Vector3dVector(points)
    point_cloud.colors = o3d.utility.Vector3dVector(colors)

    o3d.io.write_point_cloud(save_path, point_cloud)
    print(f"Point cloud saved to {save_path}")

save_path = r'Results\final_classification_result.ply'
save_to_ply(ds_points, final_filtered_class_info, save_path)


Point cloud saved to Results\final_classification_result.ply


# 4. DBSCAN Point Cloud Clustering

In [21]:
import numpy as np
from sklearn.cluster import DBSCAN
from collections import Counter

filtered_points = np.array([point for point, label in zip(ds_points, final_filtered_class_info) if label != -1])
filtered_labels = [label for label in final_filtered_class_info if label != -1]

dbscan = DBSCAN(eps=0.5, min_samples=4)
clusters = dbscan.fit_predict(filtered_points)

cluster_centers = []
cluster_labels = []
for cluster_id in set(clusters):
    if cluster_id != -1:
        points_in_cluster = filtered_points[clusters == cluster_id]
        labels_in_cluster = [filtered_labels[i] for i in range(len(filtered_labels)) if clusters[i] == cluster_id]
        
        center = np.mean(points_in_cluster, axis=0)
        most_common_label = Counter(labels_in_cluster).most_common(1)[0][0]
        
        cluster_centers.append((cluster_id, center))
        cluster_labels.append(most_common_label)

for (cluster_id, center), label in zip(cluster_centers, cluster_labels):
    print(f"Cluster {cluster_id}: Center at {center}, Most common label: {label}")

Cluster 0: Center at [ -3.50825753  -0.64066676 -18.22353668], Most common label: 98
Cluster 1: Center at [ -1.48326631  -0.54011914 -12.35514891], Most common label: 47
Cluster 2: Center at [-1.57332944 -0.5051125  -9.22962968], Most common label: 103
Cluster 3: Center at [ 2.47328709 -0.46748228 -9.94541931], Most common label: 103
Cluster 4: Center at [ 2.67740933 -1.31298826 -9.66422399], Most common label: 89
Cluster 5: Center at [-1.24270044 -0.26636095 11.89683604], Most common label: 103
Cluster 6: Center at [ 2.79402073 -0.09982016 11.73270117], Most common label: 103
Cluster 7: Center at [-1.09689398e+00 -2.47833963e-02  2.69563244e+01], Most common label: 16
Cluster 8: Center at [ 2.86918072  0.19949126 26.91465777], Most common label: 16
Cluster 9: Center at [-1.04832193  0.09225679 36.02168798], Most common label: 47
Cluster 10: Center at [ 3.20124385 -0.4126371  35.86958915], Most common label: 89


In [23]:
cluster_center_points = o3dpoint.geometry.PointCloud()
cluster_center_points.points = o3dpoint.utility.Vector3dVector(np.array([center for _, center in cluster_centers]))

colors = {
    -1: [0.5, 0.5, 0.5],
    0: [1.0, 0.0, 0.0],
    34: [0.0, 1.0, 0.0],
    103: [0.0, 0.0, 1.0],
    98: [1.0, 1.0, 0.0],
    16: [0.0, 1.0, 1.0],
    47: [1.0, 0.0, 1.0],
    89: [0.6, 0.4, 0.2],
    23: [1.0, 0.5, 0.0],
    82: [0.5, 0.0, 0.5],
    9: [0.1, 0.8, 0.6],
    10: [0.5, 0.7, 0.3],
    11: [0.8, 0.12, 0.9]
}

center_colors = []
for label in cluster_labels:
    center_colors.append(colors.get(label, [0.5, 0.5, 0.5]))

spheres = []
for center, color in zip(cluster_center_points.points, center_colors):
    sphere = o3dpoint.geometry.TriangleMesh.create_sphere(radius=0.2)
    sphere.translate(center)
    sphere.paint_uniform_color(color)
    spheres.append(sphere)

o3dpoint.visualization.draw_geometries([pc] + spheres)

# 5. Alignment

In [25]:
# TXT_Loading
def load_camera_positions_with_ids(file_path, dimension=3):
    positions = []
    camera_ids = []
    try:
        with open(file_path, 'r') as file:
            for line in file:
                if 'Position:' in line:
                    cam_id_part, position_str = line.split('Position:')
                    cam_id = int(cam_id_part.split('Camera ID:')[1].strip().strip(','))
                    position = tuple(map(float, position_str.strip().strip('()').split(',')))
                    if len(position) < dimension:
                        position += (0,) * (dimension - len(position))
                    positions.append(position)
                    camera_ids.append(cam_id)
    except FileNotFoundError:
        print(f"File not found: {file_path}")
    except Exception as e:
        print(f"An error occurred when loading data from {file_path}: {e}")
    return np.array(positions), camera_ids

In [26]:
# PCA，origin, Scaling
def create_point_cloud_from_positions(positions):
    if positions.size == 0:
        print("Empty position array received.")
        return None
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(positions)
    return pcd

def transform_point_cloud_with_pca(pcd):
    points = np.asarray(pcd.points)
    pca = PCA(n_components=3)
    pca.fit(points)
    transformed_points = np.dot(points - pca.mean_, pca.components_.T)
    new_pcd = o3d.geometry.PointCloud()
    new_pcd.points = o3d.utility.Vector3dVector(transformed_points)
    return new_pcd, pca.mean_, pca.components_

def compute_scale_factor(source, target):
    source_dist = np.linalg.norm(source, axis=1)
    target_dist = np.linalg.norm(target, axis=1)
    return np.mean(target_dist) / np.mean(source_dist)

def scale_point_cloud(pcd, scale_factor):
    points = np.asarray(pcd.points)
    scaled_points = points * scale_factor
    new_pcd = o3d.geometry.PointCloud()
    new_pcd.points = o3d.utility.Vector3dVector(scaled_points)
    return new_pcd

In [27]:
# Rotation and calculating the angle
def rotate_point_cloud(pcd, angle_degrees, axis='x'):
    radians = np.deg2rad(angle_degrees)
    if axis == 'z':
        rotation_matrix = np.array([[np.cos(radians), -np.sin(radians), 0],
                                    [np.sin(radians), np.cos(radians), 0],
                                    [0, 0, 1]])
    elif axis == 'y':
        rotation_matrix = np.array([[np.cos(radians), 0, np.sin(radians)],
                                    [0, 1, 0],
                                    [-np.sin(radians), 0, np.cos(radians)]])
    else:  # 'x'
        rotation_matrix = np.array([[1, 0, 0],
                                    [0, np.cos(radians), -np.sin(radians)],
                                    [0, np.sin(radians), np.cos(radians)]])
    
    points = np.asarray(pcd.points)
    rotated_points = points.dot(rotation_matrix.T)
    rotated_pcd = o3d.geometry.PointCloud()
    rotated_pcd.points = o3d.utility.Vector3dVector(rotated_points)
    return rotated_pcd


def calculate_mean_distance(pcd1, pcd2):
    points1 = np.asarray(pcd1.points)
    points2 = np.asarray(pcd2.points)
    distances = np.linalg.norm(points1 - points2, axis=1)
    return np.mean(distances)

def find_best_alignment(source_pcd, target_pcd, step=5, axis='x'):
    min_distance = float('inf')
    best_angle = 0
    for angle in np.arange(0, 360, step):
        rotated_pcd = rotate_point_cloud(source_pcd, angle)
        current_distance = calculate_mean_distance(rotated_pcd, target_pcd)

        if current_distance < min_distance:
            min_distance = current_distance
            best_angle = angle

    best_aligned_pcd = rotate_point_cloud(source_pcd, best_angle)

    return best_angle, best_aligned_pcd

In [28]:
# generate coordinate axis
def visualize_pc_with_axes(pcds, colors, scale=1.0):
    axes_objects = []
    for pcd, color in zip(pcds, colors):
        pcd.paint_uniform_color(color)
        axes_objects.append(pcd)
    origin = np.array([0, 0, 0], dtype=np.float64)
    for i, color in zip(range(3), [[1, 0, 0], [0, 1, 0], [0, 0, 1]]):
        axis_points = np.array([origin, scale * np.eye(3)[i]], dtype=np.float64)
        lines = o3d.geometry.LineSet(
            points=o3d.utility.Vector3dVector(axis_points),
            lines=o3d.utility.Vector2iVector([[0, 1]]))
        lines.paint_uniform_color(color)
        axes_objects.append(lines)
    o3d.visualization.draw_geometries(axes_objects, window_name="3D Visualization with Aligned Axes")

In [29]:
# main Procedure
import numpy as np
import open3d as o3d
from sklearn.decomposition import PCA

def process_positions_with_pca_and_visualize(file_path1, file_path2):
    positions1, ids1 = load_camera_positions_with_ids(file_path1, 3)
    positions2, ids2 = load_camera_positions_with_ids(file_path2, 3)
    pcd1 = create_point_cloud_from_positions(positions1)
    pcd2 = create_point_cloud_from_positions(positions2)
    transformed_pcd1, _, _ = transform_point_cloud_with_pca(pcd1)
    transformed_pcd2, _, _ = transform_point_cloud_with_pca(pcd2)
    scale_factor = compute_scale_factor(np.asarray(transformed_pcd1.points), np.asarray(transformed_pcd2.points))
    scaled_pcd1 = scale_point_cloud(transformed_pcd1, scale_factor)
    best_angle, best_aligned_pcd1 = find_best_alignment(scaled_pcd1, transformed_pcd2, step=5, axis='x')
    visualize_pc_with_axes([best_aligned_pcd1, transformed_pcd2], [[1, 0, 0], [0, 1, 0]])
    return pcd1, pcd2, best_aligned_pcd1, transformed_pcd2

file_path1 = r'metashape_camera_positions.txt'
file_path2 = r'world_camera_positions.txt'
process_positions_with_pca_and_visualize(file_path1, file_path2)

(PointCloud with 150 points.,
 PointCloud with 150 points.,
 PointCloud with 150 points.,
 PointCloud with 150 points.)

# 6. Matrix_transformation

In [30]:
def compute_transformation_matrix(points, transformed_points):
    num_points = points.shape[0]

    points_homogeneous = np.hstack((points, np.ones((num_points, 1))))
    transformed_points_homogeneous = np.hstack((transformed_points, np.ones((num_points, 1))))

    transformation_matrix, _, _, _ = np.linalg.lstsq(points_homogeneous, transformed_points_homogeneous, rcond=None)
    transformation_matrix = transformation_matrix.T

    return transformation_matrix

def transform_matrix_on_pc(transformation_matrix, pcd):
    points = np.asarray(pcd.points)

    num_points = points.shape[0]
    points_homogeneous = np.hstack((points, np.ones((num_points, 1))))

    transformed_points_homogeneous = np.dot(points_homogeneous, transformation_matrix.T)

    transformed_points = transformed_points_homogeneous[:, :3]

    transformed_pcd = o3d.geometry.PointCloud()
    transformed_pcd.points = o3d.utility.Vector3dVector(transformed_points)

    if pcd.has_colors():
        transformed_pcd.colors = o3d.utility.Vector3dVector(np.asarray(pcd.colors))
    
    return transformed_pcd

def multiply_transformation_matrices(matrix1, matrix2):

    matrix1 = np.asarray(matrix1)
    matrix2 = np.asarray(matrix2)

    if matrix1.shape != (4, 4) or matrix2.shape != (4, 4):
        raise ValueError("Both matrices must be 4x4.")

    result_matrix = np.dot(matrix1, matrix2)

    return result_matrix

pcd1, pcd2, best_aligned_pcd1, transformed_pcd2 = process_positions_with_pca_and_visualize(file_path1, file_path2)

RC1_pc_tm=compute_transformation_matrix(np.asarray(pcd1.points),np.asarray(best_aligned_pcd1.points))
RC2_pc_tm=compute_transformation_matrix(np.asarray(pcd2.points),np.asarray(transformed_pcd2.points))
RC1_pc_tm_inv = np.linalg.inv(RC1_pc_tm)
final_tm=multiply_transformation_matrices(RC1_pc_tm_inv, RC2_pc_tm)
New_RC=transform_matrix_on_pc(final_tm, pcd2)

In [31]:
print("RC1_pc_tm:\n", RC1_pc_tm)
print("RC1_pc_tm Determinant:", np.linalg.det(RC1_pc_tm))
print("RC2_pc_tm:\n", RC2_pc_tm)
print("RC2_pc_tm Determinant:", np.linalg.det(RC2_pc_tm))
print("final_tm:\n", final_tm)
print("final_tm Determinant:", np.linalg.det(final_tm))

RC1_pc_tm:
 [[-5.51535904e-02 -4.22640181e-02 -2.05571955e+00  3.61469147e+01]
 [ 2.05613971e+00 -8.79030094e-03 -5.49841412e-02 -1.05621222e-01]
 [-7.65549721e-03 -2.05644050e+00  4.24842321e-02 -4.46149334e-01]
 [-5.54740289e-16  1.48914382e-15 -7.20517540e-16  1.00000000e+00]]
RC1_pc_tm 行列式: 8.702327897339552
RC2_pc_tm:
 [[-9.99510858e-01 -3.12737186e-02  0.00000000e+00  8.38524855e+05]
 [-3.12737185e-02  9.99510858e-01  0.00000000e+00 -2.42472922e+06]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  0.00000000e+00]
 [-2.51622074e-22 -4.65470176e-23  0.00000000e+00  1.00000000e+00]]
RC2_pc_tm 行列式: 0.0
final_tm:
 [[-2.16897469e-03  4.86161827e-01  0.00000000e+00 -1.18932929e+06]
 [ 1.00496690e-02 -1.76425942e-03  0.00000000e+00 -3.33853736e+03]
 [ 4.86061313e-01  2.20590058e-03  0.00000000e+00 -3.75903331e+05]
 [ 3.34046830e-16  2.73910132e-16  0.00000000e+00  9.99999999e-01]]
final_tm 行列式: 0.0


In [32]:
import numpy as np
import open3d as o3d

points1 = np.array([center for _, center in cluster_centers])

def transform_points(points, transformation_matrix):

    points_homogeneous = np.hstack([points, np.ones((points.shape[0], 1))])

    transformed_points_homogeneous = np.dot(points_homogeneous, transformation_matrix.T)

    return transformed_points_homogeneous[:, :3]

intermediate_points = transform_points(points1, RC1_pc_tm)

intermediate_points[:, 2] = 0

if np.linalg.det(RC2_pc_tm) != 0:
    inv_RC2_pc_tm = np.linalg.inv(RC2_pc_tm)
else:
    inv_RC2_pc_tm = np.linalg.pinv(RC2_pc_tm)

estimated_points = transform_points(intermediate_points, inv_RC2_pc_tm)

estimated_pcd = o3d.geometry.PointCloud()
estimated_pcd.points = o3d.utility.Vector3dVector(estimated_points)

o3d.visualization.draw_geometries([estimated_pcd])

for point, label in zip(estimated_points, cluster_labels):
    print(f"Point: {point}, Label: {label}")

Point: [ 762210.80051757 2449758.35585019       0.        ], Label: 98
Point: [ 762222.85415991 2449762.57498893       0.        ], Label: 47
Point: [ 762229.28389746 2449762.41865016       0.        ], Label: 103
Point: [ 762227.77638975 2449770.73500792       0.        ], Label: 103
Point: [ 762228.31682668 2449771.16379394       0.        ], Label: 89
Point: [ 762272.73618807 2449763.29409323       0.        ], Label: 103
Point: [ 762272.36869358 2449771.59428386       0.        ], Label: 103
Point: [ 762303.71395885 2449763.73273976       0.        ], Label: 16
Point: [ 762303.60141527 2449771.8883326        0.        ], Label: 16
Point: [ 762322.36080756 2449763.91637674       0.        ], Label: 47
Point: [ 762321.98756296 2449772.65948277       0.        ], Label: 89


In [33]:
# Result save

save_path = r'Results\estimated_points_with_labels.txt'

with open(save_path, 'w') as file:
    for point, label in zip(estimated_points, cluster_labels):

        point_str = " ".join(map(str, point))
        label_str = str(label)
        file.write(f"{point_str} {label_str}\n")

print(f"Points and labels saved to {save_path}")


Points and labels saved to Results\estimated_points_with_labels.txt


# 7. Visulization point cloud

In [34]:
import open3d as o3dpoint
import numpy as np

point_cloud_file2 = r'Results\final_classification_result.ply'

pc2 = o3dpoint.io.read_point_cloud(point_cloud_file2)

point_cloud2=np.asarray(pc2.points)
ds_points2=point_cloud2[::300]

o3dpoint.visualization.draw_geometries([pc2])