In [36]:
import numpy as np
import open3d as o3d
import os
import numpy as np
from tqdm import tqdm

In [37]:
class LZW:
    @staticmethod
    def compress(uncompressed: str) -> list:
        dictionary = {}
        word = ""
        result = []
        dict_size = 256

        for i in range(dict_size):
            dictionary[str(chr(i))] = i

        for index in range(len(uncompressed)):
            current_char = str(uncompressed[index])
            word_and_symbol = word + current_char

            if word_and_symbol in dictionary:
                word = word_and_symbol
            else:
                try:
                    result.append(dictionary[word])
                except:
                    print(index)
                    print(word)
                    print("-------------")
                dictionary[word_and_symbol] = dict_size
                dict_size += 1
                word = str(current_char)

        if word != "":
            result.append(dictionary[word])

        return result

    @staticmethod
    def decompress(compressed: list) -> str:
        dictionary = {}
        dict_size = 256

        for i in range(dict_size):
            dictionary[i] = str(chr(i))

        word = str(chr(compressed[0]))
        result = word

        for i in range(1, len(compressed)):
            temp = compressed[i]

            if temp in dictionary:
                entry = dictionary[temp]
            else:
                if temp == dict_size:
                    entry = word + str(word[0])
                else:
                    return None

            result += entry
            dictionary[dict_size] = word + str(entry[0])
            dict_size += 1
            word = entry

        return result

In [38]:
class FIC:
    @staticmethod
    def compress(input_list_of_integers: list) -> list:
        result = []
        for number in input_list_of_integers:
            if number < (1 << 7):
                result.append((number).to_bytes(1, byteorder="little"))
            elif number < (1 << 14):
                result.append(((number & 0x7F) | 0x80).to_bytes(1, byteorder="little"))
                result.append((number >> 7).to_bytes(1, byteorder="little"))
            elif number < (1 << 21):
                result.append(((number & 0x7F) | 0x80).to_bytes(1, byteorder="little"))
                result.append((((number >> 7) & 0x7F) | 0x80).to_bytes(
                    1, byteorder="little"
                ))
                result.append((number >> 14).to_bytes(1, byteorder="little"))
            elif number < (1 << 28):
                result.append(((number & 0x7F) | 0x80).to_bytes(1, byteorder="little"))
                result.append((((number >> 7) & 0x7F) | 0x80)).to_bytes(
                    1, byteorder="little"
                )
                result.append((((number >> 14) & 0x7F) | 0x80).to_bytes(
                    1, byteorder="little"
                ))
                result.append((number >> 21).to_bytes(1, byteorder="little"))
            else:
                result.append(((number & 0x7F) | 0x80).to_bytes(1, byteorder="little"))
                result.append((((number >> 7) & 0x7F) | 0x80).to_bytes(
                    1, byteorder="little"
                ))
                result.append((((number >> 14) & 0x7F) | 0x80).to_bytes(
                    1, byteorder="little"
                ))
                result.append((((number >> 21) & 0x7F) | 0x80).to_bytes(
                    1, byteorder="little"
                ))
                result.append((number >> 28).to_bytes(1, byteorder="little"))

        return result

    @staticmethod
    def decompress(input_byte_array: list) -> list:
        result = []
        position = 0

        while len(input_byte_array) > position:
            byte_to_int = int.from_bytes(
                input_byte_array[position:position + 1], signed=True, byteorder="little"
            )
            position += 1
            temp_int = byte_to_int & 0x7F
            if byte_to_int >= 0:
                result.append(temp_int)
                continue
            byte_to_int = int.from_bytes(
                input_byte_array[position:position + 1], signed=True, byteorder="little"
            )
            position += 1
            temp_int |= (byte_to_int & 0x7F) << 7
            if byte_to_int >= 0:
                result.append(temp_int)
                continue
            byte_to_int = int.from_bytes(
                input_byte_array[position:position + 1], signed=True, byteorder="little"
            )
            position += 1
            temp_int |= (byte_to_int & 0x7F) << 14
            if byte_to_int >= 0:
                result.append(temp_int)
                continue
            byte_to_int = int.from_bytes(
                input_byte_array[position:position + 1], signed=True, byteorder="little"
            )
            position += 1
            temp_int |= (byte_to_int & 0x7F) << 21
            if byte_to_int >= 0:
                result.append(temp_int)
                continue
            byte_to_int = int.from_bytes(
                input_byte_array[position:position + 1], signed=True, byteorder="little"
            )
            position += 1
            temp_int |= byte_to_int << 28
            result.append(temp_int)

        return result


In [39]:
 def load_labels(path) -> np.array:
        with open(path, "rb") as label_file:
            data = bytearray(label_file.read())
            labels_string = LZW.decompress(FIC.decompress(data))
            labels_string = labels_string[1:-1]  # skip []
            return np.asarray(list(map(lambda x: int(x), labels_string.split(","))), dtype=int)

In [61]:
path_to_annot = 'test_750_1000_20cm.pcd.labels'
labels = load_labels(path_to_annot)

In [62]:
import open3d as o3d
pcd = o3d.io.read_point_cloud('test_750_1000_20cm.pcd')

In [97]:
import matplotlib.pyplot as plt

def visualize_labels(pcd, labels):
    geometries = []
    points = np.asarray(pcd.points)
    N_planes = int(np.max(labels) + 1)
    print(N_planes)
    
    cmap = plt.cm.get_cmap('gist_ncar')
    
    for i in range(N_planes):
        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utility.Vector3dVector(points[np.where(labels == i)])
        if i == 0:
            pcd.paint_uniform_color([0.5, 0.5, 0.5])
        else:
            pcd.paint_uniform_color(cmap(i / N_planes)[:3])
        geometries.append(pcd)

    o3d.visualization.draw_geometries(geometries)

In [64]:
visualize_labels(pcd, labels)

100


In [44]:
def get_matrix_from_kitti_file(line: str) -> np.ndarray:
    matrix = np.eye(4)
    matrix[:3, :4] = np.array(list(map(float, line.rstrip().split(" ")))).reshape(3, 4)

    return matrix


def get_calibration_matrix_from_calib_file(path_to_calibration_file: str) -> np.ndarray:
    with open(path_to_calibration_file) as file:
        return get_matrix_from_kitti_file(file.readlines()[4][4:])


def get_position_matrices_from_poses_file(path_to_poses_file: str) -> list:
    with open(path_to_poses_file) as file:
        lines = file.readlines()
        pose_matrices = []

        for frame_number in range(len(lines)):
            pose_matrices.append(get_matrix_from_kitti_file(lines[frame_number]))

        return pose_matrices
    
def get_point_cloud_from_bin_file(path_to_bin_file: str) -> o3d.geometry.PointCloud:
    """
    Function allows you to get a point cloud from a file with lidar data
    """
    point_cloud_numpy = np.fromfile(path_to_bin_file, dtype=np.float32).reshape(-1, 4)
    point_cloud = o3d.geometry.PointCloud()
    point_cloud.points = o3d.utility.Vector3dVector(point_cloud_numpy[:, :3])
    return point_cloud

def transform_positions_in_point_cloud(
    calib_matrix: np.ndarray,
    pose_matrix: np.ndarray,
    point_cloud: o3d.geometry.PointCloud
) -> o3d.geometry.PointCloud:
    left_camera_matrix = pose_matrix @ calib_matrix

    return point_cloud.transform(left_camera_matrix)

In [69]:
path_to_bin_dir = "/home/anasyasiia/Downloads/SemanticKITTI_00/"
path_to_poses_file = "/home/anasyasiia/Downloads/rss2018_frame2frame/00.txt"
path_to_calib_file = "/home/anasyasiia/Downloads/data_odometry_calib/dataset/sequences/00/calib.txt"

point_cloud = o3d.geometry.PointCloud()
calib_matrix = get_calibration_matrix_from_calib_file(path_to_calib_file)
poses_matrices = get_position_matrices_from_poses_file(path_to_poses_file)

index = 800
path_to_current_point_cloud = os.path.join(
        path_to_bin_dir, str(index).zfill(6) + ".bin"
    )
temp_point_cloud = get_point_cloud_from_bin_file(path_to_current_point_cloud)
temp_point_cloud = transform_positions_in_point_cloud(
    calib_matrix, poses_matrices[index], temp_point_cloud
    )

obs_points = np.asarray(temp_point_cloud.points)
temp_point_cloud.paint_uniform_color([1, 0, 0])
o3d.visualization.draw_geometries([temp_point_cloud])

In [67]:
def remap_labels(pc_obs, pc_map, labels_map):
    cmap = plt.cm.get_cmap('gist_ncar')
    N_planes = np.max(labels_map) + 1
    map_tree = o3d.geometry.KDTreeFlann(pc_map)
    obs_points = np.asarray(pc_obs.points)
    pc_obs.paint_uniform_color([0.5, 0.5, 0.5])

    labels_obs = np.zeros(obs_points.shape[0])
    for point_idx in range(obs_points.shape[0]):
        [k, idx, _] = map_tree.search_knn_vector_3d(obs_points[point_idx], 1)
        if labels_map[idx[0]] == 0:
            np.asarray(pc_obs.colors)[point_idx] = [0.5, 0.5, 0.5]
        else:
            np.asarray(pc_obs.colors)[point_idx] = cmap(labels_map[idx[0]] / N_planes)[:3]
        labels_obs[point_idx] = labels_map[idx[0]]
        
    o3d.visualization.draw_geometries([pc_obs]) #, pc_map])
    return labels_obs

In [70]:
remap_labels(temp_point_cloud, pcd, labels)

array([27., 27., 27., ...,  0.,  0.,  0.])

In [172]:
pc1 = o3d.io.read_point_cloud('pcds/test_100_20cm.pcd')
pc2 = o3d.io.read_point_cloud('pcds/test_150_250_20cm.pcd')
pc3 = o3d.io.read_point_cloud('pcds/test_350_500_20cm.pcd')
pc4 = o3d.io.read_point_cloud('pcds/test_500_750_20cm.pcd')
pc5 = o3d.io.read_point_cloud('pcds/test_750_1000_20cm.pcd')

labels1 = load_labels('labels/Облака/test_100_20cm.pcd.labels')
labels2 = load_labels('labels/Облака/test_150_250_20cm.pcd.labels')
labels3 = load_labels('labels/Облака/test_350_500_20cm.pcd.labels')
labels4 = load_labels('labels/Облака/test_500_750_20cm.pcd.labels')
labels5 = load_labels('labels/Облака/test_750_1000_20cm.pcd.labels')

o3d.visualization.draw_geometries([pc1, pc2, pc3, pc4, pc5])

In [82]:
def color_planar_points(pcd, labels):
    planar_indices = np.where(labels != 0)
    print(planar_indices)
    pcd.paint_uniform_color([0.5, 0.5, 0.5])
    np.asarray(pcd.colors)[planar_indices] = [1, 0, 0]
    
#     return pcd

In [165]:
color_planar_points(pc1, labels1)
color_planar_points(pc2, labels2)
color_planar_points(pc3, labels3)
color_planar_points(pc4, labels4)
color_planar_points(pc5, labels5)

o3d.visualization.draw_geometries([pc5])

(array([     5,    105,    210, ..., 208358, 208360, 208363]),)
(array([   182,    183,    232, ..., 384106, 384107, 384108]),)
(array([   584,    648,    649, ..., 259060, 259061, 259062]),)
(array([   192,    288,    289, ..., 370351, 370355, 370363]),)
(array([   198,    229,    230, ..., 442571, 442574, 442579]),)


In [173]:
mp_labels = np.hstack((labels1, labels2, labels3, labels4, labels5))
mp_pcd = pc1 + pc2 + pc3 + pc4 + pc5

array([ 0,  0,  0, ..., 16,  0,  0])

In [160]:
index = 800
path_to_current_point_cloud = os.path.join(
        path_to_bin_dir, str(index).zfill(6) + ".bin"
    )
temp_point_cloud = get_point_cloud_from_bin_file(path_to_current_point_cloud)
temp_point_cloud = transform_positions_in_point_cloud(
    calib_matrix, poses_matrices[index], temp_point_cloud
    )

In [174]:
def remap_binary_labels(pc_obs, pc_map, labels_map):
    map_tree = o3d.geometry.KDTreeFlann(pc_map)
    obs_points = np.asarray(pc_obs.points)

    labels_obs = np.zeros(obs_points.shape[0])
    for point_idx in range(obs_points.shape[0]):
        [k, idx, _] = map_tree.search_radius_vector_3d(obs_points[point_idx], 0.15)
#         print(np.any(labels_map[idx != 0]))
        if np.any(labels_map[idx] != 0):
            labels_obs[point_idx] = 1
        
    return labels_obs

In [175]:
from tqdm import tqdm

for index in tqdm(range(0, 1000)):
    path_to_current_point_cloud = os.path.join(
            path_to_bin_dir, str(index).zfill(6) + ".bin"
        )
    temp_point_cloud = get_point_cloud_from_bin_file(path_to_current_point_cloud)
    temp_point_cloud = transform_positions_in_point_cloud(
        calib_matrix, poses_matrices[index], temp_point_cloud
        )

    labels_bin = remap_binary_labels(temp_point_cloud, mp_pcd, mp_labels)

    np.save('labeled_00/' + str(index).zfill(6) + '.npy', labels_bin)

100%|██████████| 1000/1000 [56:38<00:00,  3.40s/it] 


In [164]:
visualize_labels(temp_point_cloud, labels_800)

2
