<a href="https://colab.research.google.com/github/TechnoPolizzz/safety_doors/blob/main/SafetyDoors_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%%capture
!pip install open3d

In [2]:
import open3d as o3d
import cv2
import numpy as np

In [3]:
import os
from google.colab.patches import cv2_imshow # Позволяет выводить изображения

In [4]:
import matplotlib.pyplot as plt
%matplotlib inline
import plotly.graph_objects as go

### Вспомогательные функции

In [5]:
def joinPath(path_name):
  return os.path.join(path_name["path"], path_name["name"])

In [6]:
# Объединение списков без повторений
def mergeLists(list1, list2):
  return list(set(list1 + list2))

### Разархивируем датасеты

In [None]:
!unzip /content/drive/MyDrive/technoPolizzz/point_cloud_train.zip

In [None]:
!mkdir -p pcd/clouds_tof
!unzip "point_cloud_train/clouds_tof/*.zip" -d pcd/clouds_tof

In [None]:
!mkdir -p pcd/clouds_stereo
!unzip "point_cloud_train/clouds_stereo/*.zip" -d pcd/clouds_stereo

### Функции для визуализации

In [7]:
# Создаем mesh из обрамляющего прямоугольника
def createMeshFromBB(bb):
  bbcloud = o3d.geometry.PointCloud()
  bbcloud.points = bb.get_box_points()
  bb.color = bb.color
  hull, _= bbcloud.compute_convex_hull()
  return hull

In [8]:
def createMeshData(mesh, opacity=0.2):
  verts = np.asarray(mesh.vertices)
  triangs = np.asarray(mesh.triangles)

  mdata = go.Mesh3d(
        x=verts[:,0],
        y=verts[:,1],
        z=verts[:,2],
        i = triangs[:,0],
        j = triangs[:,1],
        k = triangs[:,2],
        opacity=opacity,
    )
  return mdata

In [9]:
def createScatter3dData(pcd):
  points = np.asarray(pcd.points)
  colors = np.asarray(pcd.colors)
  scdata = go.Scatter3d(
            x=points[:,0], 
            y=points[:,1], 
            z=points[:,2], 
            mode='markers',
            marker=dict(size=1, color=colors))
  return scdata

In [10]:
def drawGeometry(geometry, width=800, height=600, title=""):
  mydata = []
  for g in geometry:
    if type(g) is o3d.geometry.PointCloud:
      mydata.append(createScatter3dData(g))
    elif type(g) is o3d.geometry.AxisAlignedBoundingBox or type(g) is o3d.geometry.OrientedBoundingBox:
      m = createMeshFromBB(g)
      mydata.append(createMeshData(m))
    elif type(g) is o3d.geometry.TriangleMesh:
      mydata.append(createMeshData(g))
  
  fig = go.Figure(
    data=mydata,
    layout=dict(
        width = width,
        height = height,
        scene=dict(
            xaxis=dict(visible=True),
            yaxis=dict(visible=True),
            zaxis=dict(visible=True)
        ),
        title=title,
    )
  )
  fig.show()

In [11]:
def display_inlier_outlier(cloud, ind, in_color=None, out_color=[1, 0, 0], title=""):
    inlier_cloud = cloud.select_by_index(ind)
    outlier_cloud = cloud.select_by_index(ind, invert=True)

    print("Showing outliers (red) and inliers (gray): ")
    if not out_color is None:
      outlier_cloud.paint_uniform_color(out_color)
    if not in_color is None:
      inlier_cloud.paint_uniform_color(in_color)
    drawGeometry([inlier_cloud, outlier_cloud], title=title)

### Загрузка облаков

Получаем массивы имен облаков

In [12]:
def getNamePathDict(root_dir):
  # Список пар: путь к архиву, имя архива
  files = {}
  for file in os.listdir(root_dir):
    files[file] = []
    sub_dir = os.path.join(root_dir, file)
    if os.path.isdir(sub_dir):
      for subfile in os.listdir(sub_dir):
        file_dict = {}
        file_dict["path"] = sub_dir
        file_dict["name"] = subfile
        files[file].append(file_dict)
  return files

In [None]:
# Папка с датасетом
root_dir = "/content/pcd"

# Список пар: путь к архиву, имя архива
pcd_files = {}
for file in os.listdir(root_dir):
  pcd_files[file] = []
  sub_dir = os.path.join(root_dir, file)
  for subfile in os.listdir(sub_dir):
    file_dict = {}
    file_dict["path"] = sub_dir
    file_dict["name"] = subfile
    pcd_files[file].append(file_dict)

In [None]:
print(pcd_files.keys())

dict_keys(['clouds_stereo', 'clouds_tof'])


### Обработчики облаков точек и кластеров

Класс для обработки облака точек

In [13]:
class CloudProcessor:
  def __init__(self, cloud):
    self.cloud = cloud

  def downsample(self, voxel_size=0.03):
    self.cloud = self.cloud.voxel_down_sample(voxel_size=voxel_size)
    return self

  def alignPlaneWithZ(self, plane):
    # Параметры плоскости
    [a,b,c,d] = plane
    # Ось вращения для совмещения нормали к плоскости с осью z
    axis = np.array([b,a,0])
    # Угол поворота
    angle = np.arccos(c)
    # Приводим ось вращения в формат вектора Родриго
    axis = (axis / np.linalg.norm(axis)) * angle
    # Получаем матрицу поворота вокруг вектора на заданный угол
    R = self.cloud.get_rotation_matrix_from_axis_angle(axis)
    # Поворачиваем облако
    self.cloud.rotate(R, center=(0,0,0))
    # Переносим облако для совпадения плоскостей
    self.cloud.translate((0,0,d))
    return self
  
  def statisticalFiltration(self, nb_neighbors=20, std_ratio=0.01):
    self.cloud, ind_f1 = self.cloud.remove_statistical_outlier(nb_neighbors=nb_neighbors,
                                                      std_ratio=std_ratio)
    return self

  def radialFiltration(self, nb_points=15, radius=0.15):
    self.cloud, ind_f2 = self.cloud.remove_radius_outlier(nb_points=nb_points, radius=radius)
    return self
  
  def removePlane(self, distance_threshold=0.04, ransac_n=3, num_iterations=1000):
    plane_model, inliers = self.cloud.segment_plane(distance_threshold=0.04,
                                          ransac_n=3, num_iterations=1000)
    self.cutPoints(inliers, invert=True)
    return self

  def cutPoints(self, indices, invert=False):
    self.cloud = self.cloud.select_by_index(indices, invert=invert)
    return self

  # Кластеризация с использованием алгоритма DBSCAN
  def DbscanClusterization(self, eps=0.1, min_points=10, verbose=False):
    # with o3d.utility.VerbosityContextManager(o3d.utility.VerbosityLevel.Debug) as cm:
    labels = np.array(self.cloud.cluster_dbscan(eps=eps, min_points=min_points, print_progress=verbose))

    max_label = labels.max()
    clusters = []
    for i in range(max_label + 1):
      cluster = o3d.geometry.PointCloud()
      cluster.points = o3d.utility.Vector3dVector(np.asarray(self.cloud.points)[labels == i])
      cluster.colors = o3d.utility.Vector3dVector(np.asarray(self.cloud.colors)[labels == i])
      clusters.append(cluster)

    return clusters


Класс для обработки кластеров облаков точек

In [14]:
class ClustersProcessor:
  def __init__(self, clusters):
    self.clusters = clusters

  def filterByNumPoints(self, min_points_num = 80):
    filtered_clusters = []
    for cl in self.clusters:
      if len(cl.points) > min_points_num:
        filtered_clusters.append(cl)
    self.clusters = filtered_clusters
    return self
    
  def getAABB(self, color = (1, 0, 0)):
    bb = []
    for cl in self.clusters:
      aabb = cl.get_axis_aligned_bounding_box()
      aabb.color = color
      bb.append(aabb)
    return bb

  def getOBB(self, color = (1, 0, 0)):
    bb = []
    for cl in self.clusters:
      obb = cl.get_oriented_bounding_box()
      obb.color = color
      bb.append(obb)
    return bb

  def createMeshes(self, alpha=0.2):
    meshes = []
    for cluster in self.clusters:
      mesh = o3d.geometry.TriangleMesh.create_from_point_cloud_alpha_shape(cluster, alpha=0.2)
      mesh.compute_vertex_normals()
      meshes.append(mesh)
    return meshes

Функции для опеределения пересечения обрамляющим параллелепипедом плоскости

In [15]:
def line_plane_intersect(pointA, pointB, plane_coeff):
  # Вычисление коэффициентов прямой
  A = pointB[0]-pointA[0]
  B = pointB[1]-pointA[1]
  C = pointB[2]-pointA[2]
  # Свободные коэффициенты
  D=(B*pointA[0])-(A*pointA[1])
  D2=(C*pointA[0])-(A*pointA[2])
  # Матрицы
  M1 = np.array([[B, -A, 0], [C, 0 ,-A], [plane_coeff[0], plane_coeff[1], plane_coeff[2]]])
  M2 = np.array([[D], [D2], [-plane_coeff[3]]])
  # Координата точки пересечения
  point_intersect = np.linalg.solve(M1, M2)
  return point_intersect

In [16]:
def euclideanDistance(pointA, pointB):
  dist = np.sqrt((pointB[0]-pointA[0])**2+(pointB[1]-pointA[1])**2+(pointB[2]-pointA[2])**2)
  return dist

In [17]:
# Определяем пересечение ограничивающего прямоугольника с плоскостью
def bbox_plane_intersect(bbox_points, bbox_center, plane_coeff):
  for i in range(len(bbox_points)):
    # Получаем первую точку
    pointA = bbox_points[i]
    # Координата точки пересечения
    x = line_plane_intersect(pointA, bbox_center, plane_coeff)
    # Вычисление расстояния от точки пересечения до центра и до края bbox'а
    delta1 = euclideanDistance(x, bbox_center)
    delta2 = euclideanDistance(pointA, bbox_center)
    if delta1 < delta2:
      return True
  return False

### Детектирование наличия объектов в области портала двери

In [18]:
def classifyClusters(clusters_obb, height=-1.5):
  classes = []
  for obb in clusters_obb:
    center  = obb.get_center()
    if center[2] < height:
      classes.append("human")
    else:
      classes.append("other")
  return classes

In [19]:
def genAnnotation(path_to_pcd, verbose=False):
  # Путь к облаку точек
  # path_to_pcd = joinPath(pcd_files['clouds_tof'][index])
  # Открываем облако точек
  pcd = o3d.io.read_point_cloud(path_to_pcd)
  # Выводим количество точек в облаке
  if verbose:
    print(pcd)
  # Flip it, otherwise the pointcloud will be upside down
  # pcd.transform([[1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]])
  # Создаем обработчик облака точек
  tof_processor = CloudProcessor(pcd)
  # Сжимаем
  tof_processor.downsample(voxel_size=0.03)
  if verbose:
    drawGeometry([tof_processor.cloud])
  # Статистическая фильтрация
  tof_processor.statisticalFiltration(nb_neighbors=20, std_ratio=0.01)
  # Радиальная фильтрация
  tof_processor.radialFiltration(nb_points=15, radius=0.2)
  # Удаляем наибольшую плоскость
  tof_processor.removePlane(distance_threshold=0.04, ransac_n=3, num_iterations=1000)
  # Находим следующую наибольшую плоскость (предположительно дверь)
  door_plane, inliers = tof_processor.cloud.segment_plane(distance_threshold=0.005, ransac_n=3, num_iterations=1000)
  # Выделяем дверь из облака точек
  door_cloud = tof_processor.cloud.select_by_index(inliers)
  # Запоминаем обрамляющий прямоугольник двери
  door_bb = door_cloud.get_axis_aligned_bounding_box()
  # Удаляем задетектированную дверь
  tof_processor.cutPoints(inliers, invert=True)
  # Детектируем кластеры
  tof_clusters = ClustersProcessor(tof_processor.DbscanClusterization(eps=0.1, min_points=10, verbose=False))
  # Фильтруем кластеры по количеству точек в них
  tof_clusters = tof_clusters.filterByNumPoints()
  # Создаем mesh'и кластеров
  clusters_meshes = tof_clusters.createMeshes(alpha=0.1)
  # Создаем mesh портала двери
  door_mesh = createMeshFromBB(door_bb)
  # Вычисляем количество пересечений объектов с порталом двери
  intersections = [mesh.is_intersecting(door_mesh) for mesh in clusters_meshes]
  # Выводим массив пересечений с порталом
  if verbose:
    print(intersections)
  # Отображаем кластеры и портал двери
  if verbose:
    drawGeometry(clusters_meshes + [door_mesh])
  return tof_clusters.getOBB()

### Генерирование аннотаций к облакам

In [20]:
# Checks if a matrix is a valid rotation matrix.
import math
def isRotationMatrix(R) :
    Rt = np.transpose(R)
    shouldBeIdentity = np.dot(Rt, R)
    I = np.identity(3, dtype = R.dtype)
    n = np.linalg.norm(I - shouldBeIdentity)
    return n < 1e-6
# Calculates rotation matrix to euler angles
# The result is the same as MATLAB except the order
# of the euler angles ( x and z are swapped ).
def rotationMatrixToEulerAngles(R) :
    assert(isRotationMatrix(R))
    sy = math.sqrt(R[0,0] * R[0,0] +  R[1,0] * R[1,0])
    singular = sy < 1e-6
    if  not singular :
        x = math.atan2(R[2,1] , R[2,2])
        y = math.atan2(-R[2,0], sy)
        z = math.atan2(R[1,0], R[0,0])
    else :
        x = math.atan2(-R[1,2], R[1,1])
        y = math.atan2(-R[2,0], sy)
        z = 0
    return np.array([x, y, z])

In [21]:
# Генерирует аннотацию к кластерам
def obb2json(obbs, classes):
  json_data = {'figures': []}
  for i, obb in enumerate(obbs):
    pos_x, pos_y, pos_z = obb.get_center()
    rot_x, rot_y, rot_z = rotationMatrixToEulerAngles(obb.R)
    dim_x, dim_y, dim_z = obb.extent
    pos = {'x': pos_x, 'y': pos_y, 'z': pos_z}
    rot = {'x': rot_x, 'y': rot_y, 'z': rot_z}
    dim = {'x': dim_x, 'y': dim_y, 'z': dim_z}
    geometry = {'position': pos, 'rotation': rot, 'dimensions': dim}
    json_data['figures'].append({'object': classes[i], 'geometry': geometry})
  return json_data

In [22]:
dir = "annotations/clouds_tof"
for i in range(len(pcd_files['clouds_tof'])):
  data = str(obb2json(genAnnotation(i)))
  file_name = os.path.join(dir, pcd_files['clouds_tof'][i]["name"] + ".json")
  with open (file_name, "w") as myfile:
    myfile.write(data)

NameError: ignored

### Работа с тестовым датасетом

Распаковываем архив и облака точек в нем

In [None]:
!unzip /content/drive/MyDrive/technoPolizzz/point_end.zip

In [None]:
!unzip "point_end/clouds_tof/*.zip" -d point_end/clouds_tof/
!unzip "point_end/clouds_stereo/*.zip" -d point_end/clouds_stereo/

Удаляем zip-файлы

In [13]:
!rm point_end/clouds_tof/*.zip
!rm point_end/clouds_stereo/*.zip

rm: cannot remove 'point_end/clouds_tof/*.zip': No such file or directory


Получаем имена всех файлов в тестовом датасете и пути к ним

In [23]:
pcd_files = getNamePathDict("/content/point_end")
print(pcd_files.keys())

dict_keys(['clouds_tof_ann', 'clouds_stereo', 'img', 'clouds_stereo_ann', 'key_id_map.json', 'meta.json', 'clouds_tof', 'door_state.txt'])


Генерируем аннотацию к данным

In [24]:
!mkdir -p annotations_test/clouds_stereo

In [None]:
clouds_type = "clouds_stereo"
dir = "annotations_test/" + clouds_type
for i in range(len(pcd_files[clouds_type])):
  try:
    print(i)
    # Путь к облаку
    path_to_pcd = joinPath(pcd_files[clouds_type][i])
    # Получаем ориентированные обрамляющие прямоугольники
    clusters_obb = genAnnotation(path_to_pcd, verbose=False)
    # Классифицируем кластеры
    classes = classifyClusters(clusters_obb, height=4)
    # Генерируем json-аннотацию
    jsondata = obb2json(clusters_obb, classes)
    data = str(jsondata)
    # Формируем путь к файлу аннотации
    file_name = os.path.join(dir, pcd_files[clouds_type][i]["name"] + ".json")
    # Записываем json-аннотацию в файл
    with open (file_name, "w") as myfile:
      myfile.write(data)
  except:
    print("Error on: ", i)
    pass

Заменяем в аннотации одинарные кавычки на двойные

In [27]:
!sed -i "s/'/\"/g"  /content/annotations_test/clouds_stereo/*.json

Сохраняем сгенерированную аннотацию в архив

In [None]:
!zip -r annotations_test_unflipped.zip annotations_test/