In [None]:
import math
import copy
import random
import numpy as np
import pickle
import heapq
import matplotlib.pyplot as plt
import open3d as o3d
import re

## Helper functions

### Visualize

In [None]:
def gridToImage(grid, color_dict):
    grid = copy.deepcopy(grid)
    for row in grid:
        for i in range(len(row)):
            row[i] = color_dict.get(row[i], [0, row[i], 0])
    plt.imshow(grid)
    plt.show()

def showPath(start, end, grid, path):
    grid = copy.deepcopy(grid)
    for n in path:
        grid[n[1]][n[0]] = 7

    grid[start[1]][start[0]] = 'S'
    grid[end[1]][end[0]] = 'E'

    color_dict = {
        0 : [255, 255, 255],
        1 : [0, 0, 0],
        7 : [0, 255, 0],
        'S' : [0, 0, 255],
        'E' : [255, 0, 0]
    }

    gridToImage(grid, color_dict)

def showValues(start, end, grid):
    grid = copy.deepcopy(grid)
    max_value = max([max(row) for row in grid])
    for row in grid:
        for i in range(len(row)):
            if isinstance(row[i], int) and row[i] == 1:
                row[i] = 'O'
            elif isinstance(row[i], float):
                row[i] /= max_value

    grid[start[1]][start[0]] = 'S'
    grid[end[1]][end[0]] = 'E'

    color_dict = {
        0 : [255, 255, 255],
        'O' : [0, 0, 0],
        7 : [0, 255, 0],
        'S' : [0, 0, 255],
        'E' : [255, 0, 0]
    }

    gridToImage(grid, color_dict)

In [None]:
def get_points(s):
    pattern = re.compile(r'user_message\|([0-9\.\,\-e]*,)')
    new_points = []
    for i in re.findall(pattern, s):
        i = i.split(',')[:-1]
        if len(i) == 0:
            continue
        new_points.append([float(x) for x in i])
    new_points = np.array(new_points)
    return new_points

def visualize_pointcloud(pcd, show_draw=False):
    pcd = copy.deepcopy(pcd)
    o3d.visualization.draw_plotly([pcd])
    if show_draw:
        R = pcd.get_rotation_matrix_from_xyz((-np.pi / 2, 0, np.pi / 2))
        pcd.rotate(R, center=(0, 0, 0))
        o3d.visualization.draw([pcd], show_ui=True, point_size=5)

In [None]:
def binary_array_to_points(binary_array):
    points = []
    for z in range(len(binary_array)):
        for y in range(len(binary_array[z])):
            for x in range(len(binary_array[z][y])):
                if binary_array[z][y][x] == 1:
                    points.append((x, y, z))
    return points

### A star helpers

In [None]:
def create_cost_grid(grid, check_range):
    new_grid = copy.deepcopy(grid)

    for l in range(len(grid)):
        for r in range(len(grid[0])):
            for c in range(len(grid[0][0])):
                if grid[l][r][c] == 1:
                    if c == 0 or grid[l][r][c - 1] == 1:
                        left_check = 0
                    else:
                        left_check = -check_range

                    if c == len(grid[0][0]) - 1 or grid[l][r][c + 1] == 1:
                        right_check = 0
                    else:
                        right_check = check_range

                    if r == len(grid[0]) - 1 or grid[l][r + 1][c] == 1:
                        up_check = 0
                    else:
                        up_check = check_range

                    if r == 0 or grid[l][r - 1][c] == 1:
                        down_check = 0
                    else:
                        down_check = -check_range

                    layer_down_chcek = -check_range
                    layer_up_check = check_range
                    for dx in range(left_check, right_check + 1):
                        for dy in range(down_check, up_check + 1):
                            for dz in range(layer_down_chcek, layer_up_check + 1):
                                x, y, z = c + dx, r + dy, l + dz
                                if 0 <= x < len(grid[0][0]) and 0 <= y < len(grid[0]) and 0 <= z < len(grid) and grid[z][y][x] != 1:
                                    cost = min((check_range**2), (check_range**2) / math.sqrt(dx**2 + dy**2 + dz**2))
                                    new_grid[z][y][x] = max(new_grid[z][y][x], cost)

    return new_grid

In [None]:
def get_directions():
    directions = []
    for x in [-1, 0, 1]:
        for y in [-1, 0, 1]:
            for z in [-1, 0, 1]:
                if (x, y, z) != (0, 0, 0):  # Exclude the zero vector
                    directions.append((x, y, z))
    return directions

In [None]:
def get_distance(start, end, scales):
    return math.hypot((start[0] - end[0]) * scales[0], (start[1] - end[1]) * scales[1], (start[2] - end[2]) * scales[2])

## astar function

In [None]:
def astar(start, end, graph, scales, cost_grid=[]):
    """
    Returns the path traversing graph using a* search

    Parameters
    ----------
    start : tuple or list
        (x, y)
    end : tuple or list
        (x, y)
    graph : 3d list
        graph to traverse
    cost_grid: 3d list
        adds a penalty

    Returns
    -------
    list of (x, y, z) or False
        path from start to end if found
    """
    
    if graph[start[2]][start[1]][start[0]] == 1 or graph[end[2]][end[1]][end[0]] == 1:
        return []

    open_list = []
    closed_list = set()
    open_set = {}

    # f_score. g_score, pos
    start_node = (0, 0, start)
    index = 0
    open_list.append((0, index, start_node))
    heapq.heapify(open_list)

    parent = {}

    while open_list:
        start_f_score, start_g_score, start_pos = heapq.heappop(open_list)[-1]
        closed_list.add(start_pos)

        if start_pos[0] == end[0] and start_pos[1] == end[1] and start_pos[2] == end[2]:
            pos = start_pos
            path = []
            while pos in parent:
                path.append(pos)
                pos = parent[pos]

            path = path[::-1]
            return path, [node[2] for _, _, node in open_list], closed_list

#        for dir in get_directions():
        for dir in [(-1, -1, -1), (-1, -1, 0), (-1, -1, 1), (-1, 0, -1), (-1, 0, 0), (-1, 0, 1), (-1, 1, -1), (-1, 1, 0), (-1, 1, 1), (0, -1, -1), (0, -1, 0), (0, -1, 1), (0, 0, -1), (0, 0, 1), (0, 1, -1), (0, 1, 0), (0, 1, 1), (1, -1, -1), (1, -1, 0), (1, -1, 1), (1, 0, -1), (1, 0, 0), (1, 0, 1), (1, 1, -1), (1, 1, 0), (1, 1, 1)]:
            x = dir[0]
            y = dir[1]
            z = dir[2]

            pos = (start_pos[0] + x, start_pos[1] + y, start_pos[2] + z)

            if pos in closed_list:
                continue

            if pos[0] < 0:
                continue
            if pos[0] >= len(graph[0][0]):
                continue
            if pos[1] < 0:
                continue
            if pos[1] >= len(graph[0]):
                continue
            if pos[2] < 0:
                continue
            if pos[2] >= len(graph):
                continue

            if graph[pos[2]][pos[1]][pos[0]] == 1:
                continue

            g_score = start_g_score + get_distance(start_pos, pos, scales)
            h_score = get_distance(end, pos, scales)
            f_score = g_score + h_score

            if len(cost_grid) != 0:
                f_score += cost_grid[pos[2]][pos[1]][pos[0]]

            if pos in open_set:
                if open_set[pos] <= g_score:
                    continue

            open_set[pos] = g_score
            parent[pos] = start_pos
            index += 1
            heapq.heappush(open_list, (f_score, index, (f_score, g_score, pos)))

    return []

## Occupancy map functions

In [None]:
def point_cloud_to_2d_occupancy_map(points, grid_size, scale, z_range):
  (z_min, z_max) = z_range
  mask = (points[:, 2] >= z_min) & (points[:, 2] <= z_max)
  points = points[mask, :2]

  binary_grid = np.zeros(grid_size, dtype=float)
  for point in points:
    x, y = point
    grid_x = int(grid_size[0] / 2 + x / scale[0])
    grid_y = int(grid_size[1] / 2 + y / scale[1])
    if 0 <= grid_x < grid_size[0] and 0 <= grid_y < grid_size[1]:
      binary_grid[grid_x, grid_y] = 1

  binary_grid = binary_grid.T

  return binary_grid

def point_cloud_to_3d_occupancy_map(points, grid_size, scale, z_start):
    """
    Convert point cloud to a 3d occupancy map

    Parameters
    ----------
    points : list of points
        [(x, y, z), ...]
    grid_size : size of grid to be returned
        (x, y, z)
    scale : how each dimension is scaled
        (x, y, z)
    z_start : the bottom of z planes

    Returns
    -------
    3d binary grid
    """

    grid = []
    for i in range(grid_size[2]):
        z_min = z_start + scale[2] * i
        z_max = z_start + scale[2] * (i + 1)
        new_grid = point_cloud_to_2d_occupancy_map(points, (grid_size[0], grid_size[1]), (scale[0], scale[1]), (z_min, z_max))
        grid.append(new_grid)

    return grid

## Testing

Full pipeline

In [None]:
with open('../lidar/points.txt') as f:
    s = f.read()

points = get_points(s)

grid = point_cloud_to_3d_occupancy_map(points, (500, 500, 30), (1, 1, 1), -2)
grid = np.array(grid)

pos = binary_array_to_points(grid)
cost_grid = create_cost_grid(grid, 5)

pcl = o3d.geometry.PointCloud()
pcl.points = o3d.utility.Vector3dVector(pos)

visualize_pointcloud(pcl, False)

In [None]:
path, open_list, closed_list = astar((250, 250, 1), (350, 250, 1), grid, (1, 1, 1), cost_grid)
print(len(path))
print(len(open_list))
print(len(closed_list))
#path = closed_list

pcl = o3d.geometry.PointCloud()
pcl.points = o3d.utility.Vector3dVector(pos)

pcl.points.extend(o3d.utility.Vector3dVector(path))

colors = [(0, 0, 0)] * len(pos)
colors.extend([(255, 0, 0) for i in range(len(path))])
pcl.colors = o3d.utility.Vector3dVector(colors)

visualize_pointcloud(pcl, False)

In [None]:
data = path

pcl = o3d.geometry.PointCloud()
pcl.points = o3d.utility.Vector3dVector(pos)

pcl.points.extend(o3d.utility.Vector3dVector(data))

m =217
ma =348
colors = [(0, (ma - y)/ m, 0) for x, y, z in pos]
colors = np.array(colors)
pos = np.array(pos)
colors[pos[:, 2] == 0] = (0, 0, 0)
colors = list(colors)
colors.extend([(255, 0, 0) for i in range(len(data))])
pcl.colors = o3d.utility.Vector3dVector(colors)

visualize_pointcloud(pcl, True)

### Profiling

In [None]:
%load_ext line_profiler
%lprun -f function_to_profile function_to_profile(arguments)

In [None]:
%lprun -f astar astar((250, 250, 0), (450, 250, 0), grid, (0.5, 0.5, 0.5), cost_grid)

In [None]:
%lprun -f create_cost_grid create_cost_grid(grid, 5)