In [None]:
# Импорт библиотек
import numpy as np
import open3d as o3d
import os
import matplotlib.pyplot as plt
import copy
from sklearn.cluster import DBSCAN


In [None]:
def draw_registration_result(source, target, transformation):
    """
    Визуализация метода ICP

    Args:
        source (numpy.ndarray[numpy.float64[N, 3]]): эталонное облако точек
        target (numpy.ndarray[numpy.float64[N, 3]]): облако точек, зафиксированное камерой, 
                                                    с которым будем сопоставлять source
        transformation (numpy.ndarray[numpy.float64[4, 4]]): трансформация одним из методов 
                                                    ICP из встроенной функции 
                                                    o3d.pipelines.registration.registration_icp()
    
    Returns:
        None
    """
    
    source_temp = copy.deepcopy(source)
    target_temp = copy.deepcopy(target)
    source_temp.paint_uniform_color([1, 0.0706, 0])
    source_temp.paint_uniform_color([0, 0.651, 0.929])
    source_temp.transform(transformation)
    o3d.visualization.draw_plotly([source_temp, target_temp])
    

In [None]:
def clusterization_labels(points_cloud, dbscan_eps, dbscan_min_samples):
    """
    Получение для каждой точки в облаке своей метки класса (кластеризация облака точек)

    Args:
        points_cloud (numpy.ndarray[numpy.float64[N, 3]]): облако точек, которое нужно кластеризовать
        dbscan_eps (float): максимальное расстояние между двумя точками, чтобы они считались соседними
        dbscan_min_samples (int): минимальное число точек, расположенных рядом, 
                                чтобы они относились к определённому классу, иначе - шум
    
    Returns:
        numpy.ndarray[numpy.float64[N, 3]]: номер класса для каждой точки облака точек
    """
    
    dbscan = DBSCAN(eps=dbscan_eps, min_samples=dbscan_min_samples)
    return dbscan.fit_predict(points_cloud)  # labels


In [None]:
def clusters_visualization(points_cloud, labels):
    """
    Визуализация облака точек после кластеризации

    Args:
        points_cloud (numpy.ndarray[numpy.float64[N, 3]]): облако точек, которое будем визуализировать 
        labels (numpy.ndarray[numpy.float64[N, 3]]): номер класса для каждой точки облака точек
    
    Returns:
        None
    """

    unique_labels = np.unique(labels)  # Уникальные значения меток классов
    colors = np.zeros((len(points_cloud), 3))
    cmap = plt.get_cmap("jet", len(unique_labels))
    for idx, label in enumerate(unique_labels):
        if label == -1:
            colors[labels == label] = [0, 0, 0]  # Шумовые точки (label == -1) отображаем черным.
        else:
            colors[labels == label] = cmap(idx)[:3]  # Используем первые 3 компоненты (RGB)
    
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points_cloud)
    pcd.colors = o3d.utility.Vector3dVector(colors)
    o3d.visualization.draw_geometries([pcd])
    

In [None]:
def clusterization_points_cloud(points_cloud, labels):
    """
    Кластеризация облако точек

    Args:
        points_cloud (numpy.ndarray[numpy.float64[N, 3]]): облако точек, которое будем визуализировать 
        labels (numpy.ndarray[numpy.float64[N, 3]]): номер класса для каждой точки облака точек
    
    Returns:
        clusters_no_noise (numpy.ndarray[numpy.float64[4, N, 3]]): кластеризованное облако точек, где M - количество кластеров
    """
    
    clusters_no_noise_list = [points_cloud[labels == label] for label in np.unique(labels) if label != -1]
    clusters_no_noise = np.array(clusters_no_noise_list, dtype=object)

    return clusters_no_noise


In [None]:
def lowest_points(points_cloud):
    """
    Поиск нижней точки для каждого кластера в кластеризованном облаке точек

    Args:
        points_cloud (numpy.ndarray[numpy.float64[4, N, 3]]): кластеризованное облако точек

    Returns:
        lowest_points (numpy.ndarray[numpy.float64[4, 3]]): координаты 4-х самых нижних точки кластеризованного облака точек
    """
    
    lowest_points = np.empty((4, 3))
    for i, elem in enumerate(points_cloud):
        max_y_index = np.argmax(elem[:, 1])
        lowest_points[i] = elem[max_y_index]
    
    return lowest_points


In [None]:
def points2ply(file_name, points_cloud):
    """
    Преобразование массива точек в формат .ply

    Args:
        file_name (str): имя файла, в который сохранится облако точек 
        points_cloud (numpy.ndarray[numpy.float64[N, 3]]): облако точек

    Returns:
        None
    """
    
    bubs_rotated = o3d.geometry.PointCloud()
    bubs_rotated.points = o3d.utility.Vector3dVector(points_cloud)
    o3d.io.write_point_cloud(file_name + ".ply", bubs_rotated)
    

In [None]:
def rotate_Y_and_bias(points_cloud, radians, arr_bias):
    """
    Смещение и поворот относительно оси Y облака точек. Используется для аугментации данных и проверки метода ICP

    Args:
        points_cloud (numpy.ndarray[numpy.float64[N, 3]]): облако точек
        radians (float): поворот на угол в градусах
        arr_bias (numpy.ndarray[numpy.float64[3]]): массив смещение по координатам X, Y, Z

    Returns:
        points_cloud_rotbias: изменённое облако точек
    """
    rad = np.radians(radians)
    
    # Матрица поворота на угол rad
    R = np.array([[np.cos(rad), 0, np.sin(rad)],
                [0, 1, 0],
                [-np.sin(rad), 0, np.cos(rad)]])

    t = np.array(arr_bias)  # Матрица смещения
    points_cloud_rotbias = (R @ points_cloud.T).T + t

    return   points_cloud_rotbias


In [None]:
def classify_4_points(lowest_points):
    """
    Определение положения точек в пространстве относительно друг друга с использованием медианы
    Args:
        lowest_points (numpy.ndarray[numpy.float64[4, 3]]): массив координат 4-х точек
    Returns:
        labels {index: class_id}: каждой координате точки присваивается класс/номер, где class_id:
         1 — дальняя правая
         2 — дальняя левая
         3 — ближняя левая
         4 — ближняя правая
    """
    
    # Проецируем в XZ-плоскость
    xz = lowest_points[:, [0, 2]]  # shape=(4,2)
    
    # Границы между «ближними» и «дальними» / «левыми» и «правыми»
    center_x, center_z = np.median(xz, axis=0)
    
    labels = {}
    for i, (x, z) in enumerate(xz):
        zone = 'near' if z <= center_z else 'far'
        side = 'left' if x <= center_x else 'right'
        
        if   zone=='far'  and side=='right': class_id = 1
        elif zone=='far'  and side=='left' : class_id = 2
        elif zone=='near' and side=='left' : class_id = 3
        elif zone=='near' and side=='right': class_id = 4
        
        labels[i] = class_id
    
    return labels


In [None]:
project_path = os.getcwd()  # Получение пути к корневой папке проекта


In [None]:
# Загрузка файлов .ply с облаком точек и сохранение его в переменную 

# source - облако точек, к которому будем сопоставлять каждый frame
bubs_source_ply = o3d.io.read_point_cloud(project_path + "\\data\\raw\\bubs_65.ply")  

# target - каждый раз новое облако точек, которое будем получать с камеры
bubs_target_ply = o3d.io.read_point_cloud(project_path + "\\data\\raw\\bubs_100.ply") 


In [None]:
# Визуализация облака точек 
o3d.visualization.draw_plotly([bubs_source_ply])
o3d.visualization.draw_plotly([bubs_target_ply])


In [None]:
# Преобразование обалака точек из формата .ply в массив
bubs_source_points = np.array(bubs_source_ply.points)
bubs_target_points = np.array(bubs_target_ply.points)


In [None]:
bubs_source_points_rotbiasY = rotate_Y_and_bias(bubs_source_points, -20, [0, 0, 0])
points2ply("bubs_source_points_rotbiasY", bubs_source_points_rotbiasY)
bubs_source_ply_rotbiasY_main = o3d.io.read_point_cloud("bubs_source_points_rotbiasY.ply")


In [None]:
o3d.visualization.draw_plotly([bubs_source_ply_rotbiasY_main])


In [None]:
bubs_target_points_rotbiasY = rotate_Y_and_bias(bubs_target_points, 55, [-380, 20, 120])
points2ply("bubs_target_points_rotbiasY", bubs_target_points_rotbiasY)
bubs_target_ply_rotbiasY_main = o3d.io.read_point_cloud("bubs_target_points_rotbiasY.ply")


In [None]:
o3d.visualization.draw_plotly([bubs_target_ply_rotbiasY_main])


In [None]:
source = bubs_source_ply_rotbiasY_main
target = bubs_target_ply_rotbiasY_main

treshhold = 100
reg_p2p = o3d.pipelines.registration.registration_icp(
    source, target, treshhold,
    init=np.eye(4),
    estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint()
)

print(reg_p2p)
print(reg_p2p.transformation)
draw_registration_result(source, target, reg_p2p.transformation)


In [None]:
bubs_target_points_rotbiasY_main = np.array(o3d.io.read_point_cloud("bubs_target_points_rotbiasY.ply").points)


In [None]:
labels = clusterization_labels(bubs_target_points_rotbiasY_main, 5, 50)
bubs_target_points_clusters = clusterization_points_cloud(bubs_target_points_rotbiasY_main, labels)
bubs_target_lowest_points_rotbiasY_main_clusters = lowest_points(bubs_target_points_clusters)


In [None]:
clusters_visualization(bubs_target_points_rotbiasY_main, labels)

In [None]:
bubs_target_lowest_points_rotbiasY_main_clusters


In [None]:
classify_4_points(bubs_target_lowest_points_rotbiasY_main_clusters)


In [None]:
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')

points_cloud = bubs_target_points_rotbiasY_main
low_points = bubs_target_lowest_points_rotbiasY_main_clusters

# Отображаем общее облако точек
ax.scatter(points_cloud[:, 2], points_cloud[:, 0], points_cloud[:, 1],
           s=2,  c='blue', label='Облако точек')

# Отображаем lowest_points
ax.scatter(low_points[:, 2], low_points[:, 0], low_points[:, 1],
           s=200, c='red', marker='o', label='Минимальные точки')

ax.legend()
ax.set_title("Визуализация облака точек и минимальных точек")
plt.show()
