In [1]:
# Imports
import struct
import traceback
import math
import os
import plotly.express as px
import plotly.graph_objects as go
import numpy as np
import pandas as pd
import open3d as o3d
from scipy.spatial import Delaunay, cKDTree
from functools import reduce
import copy
# %matplotlib qt

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [2]:
# Constants
# Ethernet ----------------------------------------------------------------
SENSOR_FRONT_IP = "192.168.1.10"
SENSOR_RIGHT_IP = "192.168.1.11"
SENSOR_LEFT_IP = "192.168.1.12"
SENSOR_TOP_IP = "192.168.1.13"

# Scans -------------------------------------------------------------------
SCANS_DIRECTORY = r'C:\Users\victo\OneDrive\Desktop\Studies\2024.1\Projeto Integrador\Projeto\DAS5104_Projeto_LiDAR\pointcloud'
SCAN_DIRECTION = "ccw"

# Sensors -----------------------------------------------------------------
SENSOR_TOP_HEIGHT = 2400

# [sensor_x_offset, sensor_y_offset, sensor_z_offset]
SENSOR_RIGHT_TRANSLATION = (1070, 1130, 0)
SENSOR_LEFT_TRANSLATION = (1105, -1130, 0)

# Euler angles [x, y, z]
SENSOR_RIGHT_ROTATION = (0, 0, 0)
SENSOR_LEFT_ROTATION = (0, 0, 0)

BOUNDARIES_PROFILE_X_MIN = 100
BOUNDARIES_PROFILE_X_MAX = 2385
BOUNDARIES_PROFILE_Y_MIN = -1000
BOUNDARIES_PROFILE_Y_MAX = 1000
BOUNDARIES_PROFILE_Z_MIN = -2750
BOUNDARIES_PROFILE_Z_MAX = -500

BOUNDARIES_ZAXIS_X_MIN = -2300
BOUNDARIES_ZAXIS_X_MAX = -400
BOUNDARIES_ZAXIS_Y_MIN = -7000
BOUNDARIES_ZAXIS_Y_MAX = -100

# Output -----------------------------------------------------------------
OUTPUT_DIRECTORY = r'outputs'

TRUCK_VELOCITY = 10

In [3]:
def ntp64_to_seconds(integer):
    # Upper 32 bits for seconds
    seconds = integer >> 32

    # Lower 32 bits for fractional seconds
    fractional_seconds = integer & 0xFFFFFFFF
    fractional_seconds = fractional_seconds / 0x100000000

    return round(seconds + fractional_seconds, 3)

In [4]:
def polar_to_xy(distances: list, first_angle: int, angular_increment: int):
    first_angle /= 10000
    angular_increment /= 10000

    xy = list()

    for i, distance in enumerate(distances):
        # Invalid measurements return 0xFFFFFFFF
        if distance == 4_294_967_295:
            continue

        angle = (first_angle + i * angular_increment) * math.pi / 180.0

        x = round(distance * math.cos(angle))
        y = round(distance * math.sin(angle))

        xy.append((x, y))

    return xy

In [5]:
def process_binary_file(file_path: str):
    file = open(file_path, "rb")
    data = file.read()
    file.close()

    scans = dict()
    magic_byte = struct.pack("H", 0xa25c)

    for packet in data.split(magic_byte):

        if len(packet) <= 10:
            continue

        try:
            packet_size = struct.unpack("I", packet[2:6])[0] - len(magic_byte)
            header_size = struct.unpack("H", packet[6:8])[0] - len(magic_byte)
            scan_number = struct.unpack("H", packet[8:10])[0]
            timestamp_raw = struct.unpack("Q", packet[12:20])[0]
            first_angle = struct.unpack("i", packet[42:46])[0]
            angular_increment = struct.unpack("i", packet[46:50])[0]
        except Exception as e:
            print(f"Package from IP {os.path.basename(file_path).split('.bin')[0]} corrupted...")
            print(e)
            continue

        if len(packet) != packet_size:
            print(f"Package from IP {os.path.basename(file_path).split('.bin')[0]} corrupted")
            print('Expected packet size:', packet_size)
            print('Received packet size:', len(packet))
            continue

        if scan_number not in scans:
            scans[scan_number] = dict()
            scans[scan_number]["xy"] = list()
            scans[scan_number]["timestamp"] = ntp64_to_seconds(timestamp_raw)

        payload = packet[header_size:]  # list[uint32] - 4byte
        distances = struct.unpack(f"{len(payload) // 4}I", payload[:len(payload) // 4 * 4])

        scans[scan_number]["xy"].extend(polar_to_xy(distances, first_angle, angular_increment))

    return scans

In [6]:
def calculate_z_axis(scans_front: dict, x_min: int, x_max: int, y_min: int, y_max: int, velocity: int):
    z_axis = {}
    xyz_front = []

    for i, scan_key in enumerate(sorted(scans_front.keys())):
        z_axis[i] = y_min

        for xy in scans_front[scan_key]["xy"]:
            x = xy[0]
            y = xy[1]
            z = i * velocity

            if x <= x_min or x >= x_max or y <= y_min or y >= y_max:
                continue

            if y > z_axis[i]:
                z_axis[i] = y

            xyz_front.append((x, y, z))

    return z_axis, xyz_front

In [7]:
def reconstruct_z_axis(scans: dict, z_axis: dict) -> list[tuple[int, int, int]]:
    xyz = list()

    for key in zip(sorted(scans.keys()), sorted(z_axis.keys())):
        for xy in scans[key[0]]["xy"]:
            x = xy[0]
            y = xy[1]
            z = z_axis[key[1]]

            xyz.append((x, y, z))

    return xyz

In [8]:
def transform(points, rotation: tuple[int, int, int], translation: tuple[int, int, int]):
    try:
        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utility.Vector3dVector(points)
    
        if rotation != (0, 0, 0):
            rotation_matrix = pcd.get_rotation_matrix_from_xyz(rotation)
            pcd.rotate(rotation_matrix, center=(0, 0, 0))

        if translation != (0, 0, 0):
            pcd.translate(translation)
    except Exception as e:
        print(f'Error in point {points}: {traceback.format_exc()}')

    return np.asarray(pcd.points)

In [9]:
def remove_boundaries(points, x_min: int, x_max: int, y_min: int, y_max: int):
  return [p for p in points if not (p[0] <= x_min or p[0] >= x_max or p[1] <= y_min or p[1] >= y_max)]

In [10]:
def plot_point_cloud(arr, title=''):
  df=pd.DataFrame(data=arr, columns=['x', 'y', 'z'])
  fig = px.scatter_3d(df, x='x', y='y', z='z', color='y', title=title, color_continuous_scale=px.colors.sequential.Cividis_r)
  fig.update_traces(marker_size=2)
  fig.show()

In [11]:
def filter_point_cloud(points, voxel_size, nb_neighbors, std_ratio, nb_points, radius):
  pcd = o3d.geometry.PointCloud()
  pcd.points = o3d.utility.Vector3dVector(points)

  # xyz_1 = pcd.voxel_down_sample(voxel_size)
  # xyz_1 = pcd.uniform_down_sample(every_k_points=2)
  xyz_2, _ = pcd.remove_statistical_outlier(nb_neighbors=nb_neighbors, std_ratio=std_ratio)
  xyz_3, _ = xyz_2.remove_radius_outlier(nb_points=nb_points, radius=radius)

  return np.asarray(xyz_3.points)

In [12]:
def get_point_cloud(obj, velocity):
  scans_front = process_binary_file(os.path.join(SCANS_DIRECTORY, obj, f'{SENSOR_FRONT_IP}.bin'))
  scans_right = process_binary_file(os.path.join(SCANS_DIRECTORY, obj, f'{SENSOR_RIGHT_IP}.bin'))
  scans_left = process_binary_file(os.path.join(SCANS_DIRECTORY, obj, f'{SENSOR_LEFT_IP}.bin'))
  scans_top = process_binary_file(os.path.join(SCANS_DIRECTORY, obj, f'{SENSOR_TOP_IP}.bin'))
  
  z_axis, _ = calculate_z_axis(
    scans_front,
    BOUNDARIES_ZAXIS_X_MIN,
    BOUNDARIES_ZAXIS_X_MAX,
    BOUNDARIES_ZAXIS_Y_MIN,
    BOUNDARIES_ZAXIS_Y_MAX,
    velocity
  )
  
  xyz_right = reconstruct_z_axis(scans_right, z_axis)
  # plot_point_cloud(xyz_right, 'Scans do sensor right com reconstrução do eixo Z')
  xyz_left = reconstruct_z_axis(scans_left, z_axis)
  xyz_top = reconstruct_z_axis(scans_top, z_axis)
  
  xyz_right = transform(xyz_right, SENSOR_RIGHT_ROTATION, SENSOR_RIGHT_TRANSLATION)
  # plot_point_cloud(xyz_right, 'Scans do sensor right após translação')
  xyz_left = transform(xyz_left, SENSOR_LEFT_ROTATION, SENSOR_LEFT_TRANSLATION)
  
  xyz_right = remove_boundaries(
    xyz_right,
    BOUNDARIES_PROFILE_X_MIN,
    BOUNDARIES_PROFILE_X_MAX,
    BOUNDARIES_PROFILE_Y_MIN,
    BOUNDARIES_PROFILE_Y_MAX,
  )
  
  # plot_point_cloud(xyz_right, 'Scans do sensor right sem dados fora dos limites')

  xyz_left = remove_boundaries(
    xyz_left,
    BOUNDARIES_PROFILE_X_MIN,
    BOUNDARIES_PROFILE_X_MAX,
    BOUNDARIES_PROFILE_Y_MIN,
    BOUNDARIES_PROFILE_Y_MAX,
  )

  xyz_top = remove_boundaries(
    xyz_top,
    BOUNDARIES_PROFILE_X_MIN,
    BOUNDARIES_PROFILE_X_MAX,
    BOUNDARIES_PROFILE_Y_MIN,
    BOUNDARIES_PROFILE_Y_MAX,
  )

  xyz_right = filter_point_cloud(xyz_right, 15, 40, 0.1, 25, 50)
  xyz_left = filter_point_cloud(xyz_left, 15, 40, 0.1, 25, 50)
  xyz_top = filter_point_cloud(xyz_top, 15, 60, 0.06, 25, 120)
  
  xyz = list()
  xyz.extend(xyz_right)
  xyz.extend(xyz_left)
  xyz.extend(xyz_top)

  xyz = transform(xyz, (0, 0, -math.pi/2), (0, SENSOR_TOP_HEIGHT, 0))
  # plot_point_cloud(xyz, 'Point cloud reconstruída')

  return xyz

In [18]:
def get_bucket_bottom_height(point_cloud):
  n_points = {}
  for p in point_cloud:
    height = round(p[1])
    if height in n_points:
      n_points[height] += 1
    else:
      n_points[height] = 1

  return max(n_points, key=n_points.get)


In [20]:
def get_min_coordinates(point_cloud):
  return min(point_cloud, key=lambda p: p[0])[0], min(point_cloud, key=lambda p: p[1])[1], min(point_cloud, key=lambda p: p[2])[2]

In [21]:
def get_max_coordinates(point_cloud):
  return max(point_cloud, key=lambda p: p[0])[0], max(point_cloud, key=lambda p: p[1])[1], max(point_cloud, key=lambda p: p[2])[2]

In [22]:
def preprocess_point_cloud(pcd, voxel_size, max_nn_normals, max_nn_fpfh):
    # print(":: Downsample with a voxel size %.3f." % voxel_size)
    pcd_down = pcd.voxel_down_sample(voxel_size)

    radius_normal = voxel_size * 2
    # print(":: Estimate normal with search radius %.3f." % radius_normal)
    pcd_down.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=max_nn_normals))

    radius_feature = voxel_size * 5
    # print(":: Compute FPFH feature with search radius %.3f." % radius_feature)
    pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd_down,
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=max_nn_fpfh))
    return pcd_down, pcd_fpfh

In [23]:
def ransac_registration(source_down, target_down, source_fpfh, target_fpfh, voxel_size, max_iteration, confidence):
  distance_threshold = voxel_size * 1.5
  result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
    source_down, target_down, source_fpfh, target_fpfh, True,
    distance_threshold,
    o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
    3, [
        o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(
            0.1),
        o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(
            distance_threshold)
    ], o3d.pipelines.registration.RANSACConvergenceCriteria(max_iteration, confidence))
  
  return result

In [24]:
def icp_registration(source, target, trans_init, voxel_size, method='point_to_point', epsilon=1e-4, max_iteration=30):
  distance_threshold = voxel_size * 0.4
  target.estimate_normals()

  convergence_criteria = o3d.pipelines.registration.ICPConvergenceCriteria(
    relative_fitness=1e-6,
    relative_rmse=1e-6,
    max_iteration=max_iteration)
  if method == 'generalized':
    print(":: Generalized ICP registration is applied on original point")
    estimation_method = o3d.pipelines.registration.TransformationEstimationForGeneralizedICP(epsilon)
    result = o3d.pipelines.registration.registration_generalized_icp(
      source,
      target,
      distance_threshold,
      trans_init,
      estimation_method,
      convergence_criteria)
    return result
  if method == 'point_to_point':
    print(":: Point-to-point ICP registration is applied on original point")
    estimation_method = o3d.pipelines.registration.TransformationEstimationPointToPoint()
  elif method == 'point_to_plane':
    print(":: Point-to-plane ICP registration is applied on original point")
    estimation_method = o3d.pipelines.registration.TransformationEstimationPointToPlane()
  else:
    raise ValueError("Informed method not allowed. Available methods: 'point_to_point', 'point_to_plane', 'generalized'.")
  
  print("   clouds to refine the alignment. This time we use a strict")
  print("   distance threshold %.3f." % distance_threshold)

  result = o3d.pipelines.registration.registration_icp(
    source, target, distance_threshold, trans_init,
    o3d.pipelines.registration.TransformationEstimationPointToPoint(),
    convergence_criteria
  )
  
  return result

In [25]:
def align_truck_bucket_and_load(load, bucket, voxel_size, max_iteration_ransac, confidence, max_nn_normals, max_nn_fpfh, epsilon, max_iteration_icp, ransac_loop_size=5):
  result_ransac = None
  print(":: RANSAC registration on downsampled point clouds.")
  print("   Since the downsampling voxel size is %.3f," % voxel_size)
  print("   we use a liberal distance threshold %.3f." % (voxel_size*1.5))

  for _ in range(ransac_loop_size):
    source_down, source_fpfh = preprocess_point_cloud(load, voxel_size, max_nn_normals, max_nn_fpfh)
    target_down, target_fpfh = preprocess_point_cloud(bucket, voxel_size, max_nn_normals, max_nn_fpfh)
      
    result = ransac_registration(source_down, target_down, source_fpfh, target_fpfh, voxel_size, max_iteration_ransac, confidence)
    if not result_ransac or result.fitness > result_ransac.fitness:
      result_ransac = result
    
  print('RANSAC result:', result_ransac)
  print('RANSAC transformation:', result_ransac.transformation)
  result_icp = icp_registration(load, bucket, result_ransac.transformation, voxel_size, 'generalized', epsilon, max_iteration_icp)

  print('ICP result:', result_icp)
  print('ICP transformation:', result_icp.transformation)

  aligned = copy.deepcopy(load)
  aligned.transform(result_icp.transformation)
  
  return result_ransac, result_icp, aligned

In [26]:
def find_wall_planes(pcd, distance_threshold, y_threshold, ransac_n, n_iterations, max_iterations):
  remaining_cloud = copy.deepcopy(pcd)
  i = 0
  plane_models = []
  plane_clouds = []

  while len(plane_clouds) < 2:
      plane_model, inliers = remaining_cloud.segment_plane(distance_threshold=distance_threshold,
                                                          ransac_n=ransac_n,
                                                          num_iterations=n_iterations)
      [_, b, _, _] = plane_model
      if b <= y_threshold:
          inlier_cloud = remaining_cloud.select_by_index(inliers)
          remaining_cloud = remaining_cloud.select_by_index(inliers, invert=True)
          plane_models.append(plane_model)
          plane_clouds.append(inlier_cloud)

      i += 1
      if i == max_iterations:
        raise Exception('Could not find wall planes in max number of iterations.')

  return plane_models, plane_clouds

In [56]:
bucket_dir = 'caixa_vazia'
bucket_points = get_point_cloud(bucket_dir, 5)
bucket_bottom_y = get_bucket_bottom_height(bucket_points)
bucket_points = [p for p in bucket_points if -2750 < p[2] < -500 and p[1] >= bucket_bottom_y - 10]

load_dir = 'caixa_brita_pico'
load_points = get_point_cloud(load_dir, 5)
load_points = [p for p in load_points if -2750 < p[2] < -500 and p[1] >= bucket_bottom_y - 10]

Package from IP 192.168.1.12 corrupted...
unpack requires a buffer of 8 bytes
Package from IP 192.168.1.12 corrupted
Expected packet size: -2
Received packet size: 1385
Package from IP 192.168.1.12 corrupted
Expected packet size: 1402
Received packet size: 60
Package from IP 192.168.1.12 corrupted
Expected packet size: -2
Received packet size: 1340


In [57]:
load = o3d.geometry.PointCloud()
load.points = o3d.utility.Vector3dVector(load_points)
bucket = o3d.geometry.PointCloud()
bucket.points = o3d.utility.Vector3dVector(bucket_points)

voxel_size = 30
max_nn_normals = 30
max_nn_fpfh = 100
confidence = 1.0
max_iteration_ransac = 1000000
epsilon = 1e-6
max_iteration_icp = 50
ransac_loop_size = 5

result_ransac, result_icp, aligned = align_truck_bucket_and_load(load, bucket, voxel_size, max_iteration_ransac, confidence, max_nn_normals, max_nn_fpfh,
                            epsilon, max_iteration_icp, ransac_loop_size)

:: RANSAC registration on downsampled point clouds.
   Since the downsampling voxel size is 30.000,
   we use a liberal distance threshold 45.000.
RANSAC result: RegistrationResult with fitness=7.633880e-01, inlier_rmse=2.603471e+01, and correspondence_set size of 2794
Access transformation to get result.
RANSAC transformation: [[ 9.98683637e-01 -4.24461344e-02  2.87978886e-02  5.68069472e+01]
 [ 4.32953216e-02  9.98625671e-01 -2.95344495e-02 -1.14628033e+02]
 [-2.75046876e-02  3.07423853e-02  9.99148837e-01 -1.66661295e+01]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  1.00000000e+00]]
:: Generalized ICP registration is applied on original point
ICP result: RegistrationResult with fitness=4.620347e-01, inlier_rmse=8.329529e+00, and correspondence_set size of 3438
Access transformation to get result.
ICP transformation: [[ 9.99781529e-01  1.34330936e-02  1.60139121e-02 -3.90001334e+01]
 [-1.31157691e-02  9.99718748e-01 -1.97585594e-02 -7.53624044e+01]
 [-1.62748267e-02  1.95442080

In [58]:
distance_threshold = 20
y_threshold = 0.1
ransac_n = 3
n_iterations = 1000
max_iterations = 50
num_planes = 3

plane_models, plane_clouds = find_wall_planes(load, distance_threshold, y_threshold, ransac_n, n_iterations, max_iterations)
for i, model in enumerate(plane_models):
  [a, b, c, d] = model
  print(f"Plane {i+1} equation: {a:.2f}x + {b:.2f}y + {c:.2f}z + {d:.2f} = 0")

Plane 1 equation: 1.00x + 0.00y + 0.02z + -416.79 = 0
Plane 2 equation: 1.00x + 0.05y + 0.02z + 347.87 = 0


In [63]:
ransac_aligned = copy.deepcopy(load)
ransac_aligned.transform(result_ransac.transformation)

updated_transformation = np.array(result_icp.transformation)

# Cancel x rotation
updated_transformation[1][0] = 0
updated_transformation[2][0] = 0
updated_transformation[2][1] = 0
updated_transformation[1][2] = 0
updated_transformation[1][1] = 1
updated_transformation[2][2] = 1


# Cancel y translation
updated_transformation[1][3] = 0

#Update z translation
updated_transformation[2][3] = updated_transformation[2][3] + 80
updated_load = copy.deepcopy(load)

updated_load.transform(updated_transformation)

x1 = [p[0] for p in bucket.points]
y1 = [p[1] for p in bucket.points]
z1 = [p[2] for p in bucket.points]

x2 = [p[0] for p in load.points]
y2 = [p[1] for p in load.points]
z2 = [p[2] for p in load.points]

x3 = [p[0] for p in ransac_aligned.points]
y3 = [p[1] for p in ransac_aligned.points]
z3 = [p[2] for p in ransac_aligned.points]

x4 = [p[0] for p in aligned.points]
y4 = [p[1] for p in aligned.points]
z4 = [p[2] for p in aligned.points]

x5 = [p[0] for p in updated_load.points]
y5 = [p[1] for p in updated_load.points]
z5 = [p[2] for p in updated_load.points]

trace1 = go.Scatter3d(
    x=x1, y=y1, z=z1,
    mode='markers',
    marker=dict(
        size=3,
        color='blue',
    ),
    name='Empty truck'
)

trace2 = go.Scatter3d(
    x=x2, y=y2, z=z2,
    mode='markers',
    marker=dict(
        size=3,
        color='red',
    ),
    name='Truck with load (not aligned)'
)

trace3 = go.Scatter3d(
    x=x3, y=y3, z=z3,
    mode='markers',
    marker=dict(
        size=3,
        color='green',
    ),
    name='Truck with load (RANSAC aligned)'
)

trace4 = go.Scatter3d(
    x=x4, y=y4, z=z4,
    mode='markers',
    marker=dict(
        size=3,
        color='purple',
    ),
    name='Truck with load (G-ICP aligned)'
)

trace5 = go.Scatter3d(
    x=x5, y=y5, z=z5,
    mode='markers',
    marker=dict(
        size=3,
        color='magenta',
    ),
    name='Truck with load ("filtered" transformation)'
)

fig = go.Figure()
fig.add_trace(trace1)
fig.add_trace(trace2)
fig.add_trace(trace3)
fig.add_trace(trace4)
fig.add_trace(trace5)

# Update the layout

fig.update_layout(
    legend=dict(
        x=0.8,  # Adjust x-coordinate
        y=0.8,  # Adjust y-coordinate
        traceorder="normal",
        font=dict(
            family="sans-serif",
            size=12,
            color="black"
        ),
        bgcolor="LightSteelBlue",
        bordercolor="Black",
        borderwidth=2
    ),
    margin=dict(l=0, r=0, b=0, t=0),
    scene=dict(
        xaxis=dict(title='X'),
        yaxis=dict(title='Y'),
        zaxis=dict(title='Z')
    )
)

# Show the figure
fig.show()

In [76]:
kd_tree = o3d.geometry.KDTreeFlann(bucket)
inner_load_points = []
threshold_distance = 20
nb_neighbors = 40
std_ratio = 1
nb_points = 20
radius = 100

for point in updated_load.points:
    [_, idx, _] = kd_tree.search_knn_vector_3d(point, 1)
    closest_point = bucket.points[idx[0]]
    if np.linalg.norm(np.array(point) - np.array(closest_point)) > threshold_distance:
        inner_load_points.append(point)

print(f"Size before:", len(updated_load.points))
print(f"Size after:", len(inner_load_points))

removed_points = o3d.geometry.PointCloud()
removed_points.points = o3d.utility.Vector3dVector(inner_load_points)

inner_load, _ = removed_points.remove_statistical_outlier(nb_neighbors=nb_neighbors, std_ratio=std_ratio)
inner_load, _ = inner_load.remove_radius_outlier(nb_points=nb_points, radius=radius)
print(f"Size after filter:", len(inner_load.points))

x1 = [p[0] for p in updated_load.points]
y1 = [p[1] for p in updated_load.points]
z1 = [p[2] for p in updated_load.points]

x2 = [p[0] for p in bucket.points]
y2 = [p[1] for p in bucket.points]
z2 = [p[2] for p in bucket.points]

x3 = [p[0] for p in removed_points.points]
y3 = [p[1] for p in removed_points.points]
z3 = [p[2] for p in removed_points.points]

x4 = [p[0] for p in inner_load.points]
y4 = [p[1] for p in inner_load.points]
z4 = [p[2] for p in inner_load.points]

trace1 = go.Scatter3d(
    x=x1, y=y1, z=z1,
    mode='markers',
    marker=dict(
        size=3,
        color='grey',
    ),
    name='Original load'
)

trace2 = go.Scatter3d(
    x=x2, y=y2, z=z2,
    mode='markers',
    marker=dict(
        size=3,
        color='black',
    ),
    name='Original bucket'
)

trace3 = go.Scatter3d(
    x=x3, y=y3, z=z3,
    mode='markers',
    marker=dict(
        size=3,
        color='red',
    ),
    name='Inner load'
)

trace4 = go.Scatter3d(
    x=x4, y=y4, z=z4,
    mode='markers',
    marker=dict(
        size=3,
        color='pink',
    ),
    name='Filtered inner load'
)

fig = go.Figure()
fig.add_trace(trace1)
fig.add_trace(trace2)
fig.add_trace(trace3)
fig.add_trace(trace4)

# Update the layout
fig.update_layout(
     legend=dict(
        x=0.8,  # Adjust x-coordinate
        y=0.8,  # Adjust y-coordinate
        traceorder="normal",
        font=dict(
            family="sans-serif",
            size=12,
            color="black"
        ),
        bgcolor="LightSteelBlue",
        bordercolor="Black",
        borderwidth=2
    ),
    margin=dict(l=0, r=0, b=0, t=0),
    scene=dict(
        xaxis=dict(title='X'),
        yaxis=dict(title='Y'),
        zaxis=dict(title='Z')
    )
)

# Show the figure
fig.show()

Size before: 7441
Size after: 2944
Size after filter: 2756


In [195]:
def point_to_ray_distance(point, ray_origin, ray_direction):
    ray_direction = ray_direction / np.linalg.norm(ray_direction)
    origin_to_point = point - ray_origin
    projection_length = np.dot(origin_to_point, ray_direction)
    projection_point = ray_origin + projection_length * ray_direction
    distance = np.linalg.norm(point - projection_point)
    return distance

def find_points_near_ray(point_cloud, ray_origin, ray_direction, threshold=0.02):
    near_points = []
    for point in point_cloud:
        distance = point_to_ray_distance(point, ray_origin, ray_direction)
        if distance <= threshold:
            for load_point in inner_load.points:
                distance = point_to_ray_distance(load_point, ray_origin, ray_direction)
                if distance <= threshold:
                    near_points.append(point)
                    break
    return near_points

def generate_rays_with_slope(angular_step, slope, radius):
    rays = []
    angles = np.arange(0, 360, angular_step)
    for angle in angles:
        rad = np.deg2rad(angle)
        x = np.cos(rad) * radius
        z = np.sin(rad) * radius
        y = -slope
        direction = np.array([x, y, z])
        rays.append(direction)
    return rays

# Define the ray origin and direction
ray_origins = [np.array([11.5, 1000, -1800]), np.array([11.5, 800, -1300]), np.array([11.5, 800, -2300])]
angular_step = 25
slope = 500
rays = []

rays += generate_rays_with_slope(angular_step, slope, radius=50)
rays += generate_rays_with_slope(angular_step, slope, radius=200)
rays += generate_rays_with_slope(angular_step, slope, radius=300)
rays += generate_rays_with_slope(angular_step, slope, radius=500)
near_points = []

# Find points near the ray
for ray_origin in ray_origins:
    for ray_direction in rays:
        near_points += find_points_near_ray(np.array(bucket.points), ray_origin, ray_direction, threshold=20)
    
near_pcd = o3d.geometry.PointCloud()
near_pcd.points = o3d.utility.Vector3dVector(near_points)
kd_tree = o3d.geometry.KDTreeFlann(near_pcd)
print(kd_tree)
inner_bucket_points = []
threshold_distance = 120
nb_neighbors = 40
std_ratio = 1
nb_points = 20
radius = 100

for point in bucket.points:
    [_, idx, _] = kd_tree.search_knn_vector_3d(point, 1)
    closest_point = near_pcd.points[idx[0]]
    if np.linalg.norm(np.array(point) - np.array(closest_point)) < threshold_distance:
        inner_bucket_points.append(point)

# Plot using Plotly
fig = go.Figure()

x1 = [p[0] for p in bucket.points]
y1 = [p[1] for p in bucket.points]
z1 = [p[2] for p in bucket.points]

x2 = [p[0] for p in inner_load.points]
y2 = [p[1] for p in inner_load.points]
z2 = [p[2] for p in inner_load.points]

x3 = [p[0] for p in near_points]
y3 = [p[1] for p in near_points]
z3 = [p[2] for p in near_points]

x4 = [p[0] for p in inner_bucket_points]
y4 = [p[1] for p in inner_bucket_points]
z4 = [p[2] for p in inner_bucket_points]

trace_1 = go.Scatter3d(
        x=x1,
        y=y1,
        z=z1,
        mode='markers',
        marker=dict(size=4, color='black'),
        name='Bucket point cloud'
    )

trace_2 = go.Scatter3d(
        x=x2,
        y=y2,
        z=z2,
        mode='markers',
        marker=dict(size=4, color='grey'),
        name='Load point cloud'
    )

trace_3 = go.Scatter3d(
        x=x3,
        y=y3,
        z=z3,
        mode='markers',
        marker=dict(size=4, color='green'),
        name='Points detected by raycast'
    )

trace_4 = go.Scatter3d(
        x=x4,
        y=y4,
        z=z4,
        mode='markers',
        marker=dict(size=4, color='blue'),
        name='Points deteced by KDTree'
    )

fig.add_trace(trace_1)
fig.add_trace(trace_2)
fig.add_trace(trace_3)
fig.add_trace(trace_4)

for ray_origin in ray_origins:
    for ray_direction in rays:
        ray_end = ray_origin + ray_direction
        fig.add_trace(go.Scatter3d(
            x=[ray_origin[0], ray_end[0]],
            y=[ray_origin[1], ray_end[1]],
            z=[ray_origin[2], ray_end[2]],
            mode='lines',
            line=dict(color='red', width=1),
            showlegend=False
        ))

# Update layout
fig.update_layout(
    scene=dict(
        xaxis_title='X',
        yaxis_title='Y',
        zaxis_title='Z',
        aspectratio=dict(x=1, y=1, z=1)
    ),
    title='Point Cloud and Ray Visualization'
)

fig.show()


<open3d.cpu.pybind.geometry.KDTreeFlann object at 0x00000170691E2AB0>


In [212]:
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(inner_bucket_points + list(inner_load.points))

# Plot using Plotly
fig = go.Figure()

trace_1 = go.Scatter3d(
        x=x1,
        y=y1,
        z=z1,
        mode='markers',
        marker=dict(size=4, color='blue'),
        name='Load point cloud'
    )

fig.add_trace(trace_1)

# Update layout
fig.update_layout(
    scene=dict(
        xaxis_title='X',
        yaxis_title='Y',
        zaxis_title='Z',
        aspectratio=dict(x=1, y=1, z=1)
    ),
    title='Point Cloud and Ray Visualization'
)

fig.show()

In [213]:
radius = 100
max_nn = 100
k = 5

pcd.estimate_normals(
    search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=radius, max_nn=max_nn))
pcd.orient_normals_consistent_tangent_plane(k)
# pcd.orient_normals_to_align_with_direction()
o3d.visualization.draw_geometries([pcd], point_show_normal=True)

In [214]:
mesh, densities = o3d.geometry.TriangleMesh.create_from_point_cloud_poisson(pcd)
bbox = pcd.get_axis_aligned_bounding_box()
mesh = mesh.crop(bbox)

# Refine the mesh
mesh = mesh.filter_smooth_simple(number_of_iterations=1)
mesh.paint_uniform_color([0.7, 0.7, 0.7])

# Save and visualize the mesh
o3d.io.write_triangle_mesh("load.ply", mesh)
o3d.visualization.draw_geometries([mesh], point_show_normal=False, mesh_show_back_face=True, mesh_show_wireframe=True)