In [None]:
import matplotlib.pyplot as plt
import plotly.graph_objects as go

import numpy as np
import itertools
import cv2

In [None]:
def disturb_pc(pcData, types, levels, ROI, radar=False, lidar=False):
    """
    Apply a series of disturbances to a point cloud dataset.

    Parameters:
    pcData (numpy.ndarray): The input point cloud data, typically in the shape (n, 3) or (n, 4).
    types (list of str): List of disturbance types to apply. Options include "add_points", "lose_points", 
                         "shifting", "cluster", "information_noise", and "None".
    levels (list of float): List of disturbance levels corresponding to each disturbance type.
    ROI (dict): Dictionary specifying the Region of Interest (ROI) with keys "xmin", "xmax", "ymin", "ymax", 
                "zmin", and "zmax".
    radar (bool): If True, apply radar-specific processing in relevant disturbances.
    lidar (bool): If True, apply lidar-specific processing in relevant disturbances.

    Returns:
    numpy.ndarray: The disturbed point cloud after applying all specified disturbances.
    
    Raises:
    NotImplementedError: If a disturbance type is not implemented.
    """
    operations = {
        "add_points": add_points,
        "lose_points": lose_random_points,
        "shifting": shifting,
        "cluster": lambda data, level, ROI: cluster_radar(data, level, ROI) if \
                        radar else cluster_lidar(data, level, ROI),
        "information_noise": information_noise,
        "None": lambda data, level, ROI: data
    }

    for type, level in zip(types, levels):
        if type in operations:
            parameter = level / 4
            pcData = operations[type](pcData, parameter, ROI)
            pcData = filter_points_in_roi(pcData, ROI)  # Sicherstellen, dass Punkte innerhalb des ROI bleiben
        else:
            raise NotImplementedError(f"Operation '{type}' is not implemented.")
    
    return pcData

def information_noise(pcData, parameter, ROI):
    """
    Add noise to the additional information (intensity or magnitude) of each point in the point cloud.

    Parameters:
    pcData (numpy.ndarray): The input point cloud data, typically in the shape (n, 4) or (n, 5).
    parameter (float): The disturbance parameter affecting the noise level.
    ROI (dict): Dictionary specifying the Region of Interest (ROI) with keys "xmin", "xmax", "ymin", "ymax", 
                "zmin", and "zmax".

    Returns:
    numpy.ndarray: The point cloud with noisy additional information.
    """
    rand_range = 100 if pcData.shape[1] == 4 else 70
    rand_i = np.random.uniform(-rand_range * parameter, rand_range * parameter, pcData.shape[0])
    pcData[:, 3] += rand_i
    
    if pcData.shape[1] > 4:
        rand_v = np.random.uniform(-5 * parameter, 5 * parameter, pcData.shape[0])
        pcData[:, 4] += rand_v
    
    return pcData

def add_points(pcData, parameter, ROI):
    """
    Add random points to the point cloud within the specified Region of Interest (ROI).

    Parameters:
    pcData (numpy.ndarray): The input point cloud data, typically in the shape (n, 3) or (n, 4).
    parameter (float): The disturbance parameter determining the number of points to add.
    ROI (dict): Dictionary specifying the Region of Interest (ROI) with keys "xmin", "xmax", "ymin", "ymax", 
                "zmin", and "zmax".

    Returns:
    numpy.ndarray: The point cloud with additional random points added.
    """
    num_points = int(parameter * pcData.shape[0])
    rand_x = np.random.uniform(ROI["xmin"], ROI["xmax"], num_points)
    rand_y = np.random.uniform(ROI["ymin"], ROI["ymax"], num_points)
    rand_z = np.random.uniform(ROI["zmin"], ROI["zmax"], num_points)
    
    if pcData.shape[1] == 4:
        rand_i = np.random.uniform(0, 100, num_points)
        rand_points = np.column_stack([rand_x, rand_y, rand_z, rand_i])
    else:
        rand_m = np.random.uniform(0, 70, num_points)
        rand_v = np.random.uniform(-5, 5, num_points)
        rand_points = np.column_stack([rand_x, rand_y, rand_z, rand_m, rand_v])
    
    return np.concatenate((pcData, rand_points), axis=0)

def shifting(pcData, parameter, ROI):
    """
    Apply random shifts to all points in the point cloud based on a random shift factor.

    Parameters:
    pcData (numpy.ndarray): The input point cloud data, typically in the shape (n, 3).
    parameter (float): The disturbance parameter determining the magnitude of the shift.
    ROI (dict): Dictionary specifying the Region of Interest (ROI) with keys "xmin", "xmax", "ymin", "ymax", 
                "zmin", and "zmax".

    Returns:
    numpy.ndarray: The shifted point cloud.
    """
    rand_shift_factor = np.random.uniform(-2 * parameter, 2 * parameter, pcData.shape[0])
    norm_factors = np.linalg.norm(pcData[:, 0:3], axis=1).reshape(-1, 1)
    pcData[:, 0:3] += pcData[:, 0:3] * (rand_shift_factor / norm_factors)
    return pcData

def cluster_lidar(pcData, parameter, ROI):
    """
    Add clusters of points to the point cloud, simulating local disturbances in the data.

    Parameters:
    pcData (numpy.ndarray): The input point cloud data, typically in the shape (n, 3) or (n, 4).
    parameter (float): The disturbance parameter determining the size and number of clusters.
    ROI (dict): Dictionary specifying the Region of Interest (ROI) with keys "xmin", "xmax", "ymin", "ymax", 
                "zmin", and "zmax".

    Returns:
    numpy.ndarray: The point cloud with additional clusters of points.
    """
    amount_cluster = np.random.randint(max(1, 16 * parameter**2 - 2), 16 * parameter**2 + 3) if parameter != 0 else 0

    for _ in range(amount_cluster):
        offset = np.array([np.random.uniform(ROI["xmin"], ROI["xmax"]),
                           np.random.uniform(ROI["ymin"], ROI["ymax"]),
                           np.random.uniform(ROI["zmin"], ROI["zmax"])])
        
        dist = np.linalg.norm(offset)
        if dist < 7:
            continue
        
        theta_length = max(1, int(round(16 - 3/10 * dist) - 3 * offset[2]))
        phi_length = max(1, int(round(50 - dist)))

        r = np.random.uniform(0.1 + parameter**2 * 2, 1 + parameter**2 * 2)
        theta_list = np.linspace(1, np.pi - 1, num=theta_length)
        phi_list = np.linspace(0.5, np.pi - 0.5, num=phi_length)

        sphere = np.array([[r * np.sin(theta) * np.cos(phi) + np.random.uniform(-0.1, 0.1),
                            -r * np.sin(theta) * np.sin(phi) + np.random.uniform(-0.1, 0.1),
                            r * np.cos(theta) + np.random.uniform(-0.1, 0.1)]
                           for theta, phi in itertools.product(theta_list, phi_list)])
        
        sphere = sphere[sphere[:, 2] > ROI["zmin"] + 0.2]
        
        phi_angle = -np.arctan2(offset[0], offset[1])
        theta_angle = -np.arcsin(offset[2] / dist)
        sphere = rotate_points(sphere, rt_matrix(phi_angle, theta_angle))

        sphere += offset

        rand_i = np.random.uniform(0, 100) + np.random.normal(0, 4, (len(sphere), 1))
        rand_cluster = np.column_stack((sphere, rand_i))

        pcData = np.concatenate((pcData, rand_cluster), axis=0)

    return pcData

def cluster_radar(pcData, parameter, ROI):
    """
    Add clusters of points to the point cloud, simulating disturbances caused by radar-specific effects.

    Parameters:
    pcData (numpy.ndarray): The input point cloud data, typically in the shape (n, 4) or (n, 5).
    parameter (float): The disturbance parameter determining the size and number of clusters.
    ROI (dict): Dictionary specifying the Region of Interest (ROI) with keys "xmin", "xmax", "ymin", "ymax", 
                "zmin", and "zmax".

    Returns:
    numpy.ndarray: The point cloud with additional clusters of points.
    """

    amount_cluster = np.random.randint(max(1, 32 * parameter**2 - 2), 32 * parameter**2 + 2)

    for _ in range(int(amount_cluster)):
        size_cluster = np.random.uniform(0.1 + parameter**2 * 2, 1 + parameter**2 * 2,2)
        points_cluster = np.random.randint(max(1, 100 * parameter**4), 100 * parameter**4 + 20)

        rand_x = np.random.uniform(0, size_cluster[0], points_cluster) + np.random.uniform(ROI["xmin"], ROI["xmax"])
        rand_y = np.random.uniform(0, size_cluster[1], points_cluster) + np.random.uniform(ROI["ymin"], ROI["ymax"])
        rand_z = np.random.uniform(0, 1, points_cluster) + np.random.uniform(ROI["zmin"], ROI["zmax"])

        rand_m = np.random.uniform(0, 70, points_cluster)
        rand_cluster = np.column_stack([rand_x, rand_y, rand_z, rand_m])

        pcData = np.concatenate((pcData, rand_cluster), axis=0)

    return pcData

def lose_random_points(pcData, parameter, ROI):
    """
    Randomly remove a portion of points from the point cloud.

    Parameters:
    pcData (numpy.ndarray): The input point cloud data, typically in the shape (n, 3) or (n, 4).
    parameter (float): The disturbance parameter determining the proportion of points to remove.
    ROI (dict): Dictionary specifying the Region of Interest (ROI) with keys "xmin", "xmax", "ymin", "ymax", 
                "zmin", and "zmax".

    Returns:
    numpy.ndarray: The point cloud with a portion of the points removed.
    """
    keep_indices = np.random.choice(pcData.shape[0], size=int(pcData.shape[0] * (1 - parameter)), replace=False)
    return pcData[keep_indices]

def rotate_points(points, rot_t):
    """
    Rotate the points in the point cloud using a given rotation matrix.

    Parameters:
    points (numpy.ndarray): The input points, typically in the shape (n, 3).
    rot_t (numpy.ndarray): A 3x3 rotation matrix.

    Returns:
    numpy.ndarray: The rotated point cloud.
    """
    return np.dot(points[:, 0:3], rot_t.T)

def rt_matrix(yaw=0, roll=0, pitch=0):
    """
    Generate a 3x3 rotation matrix from yaw, roll, and pitch angles.

    Parameters:
    yaw (float): Rotation angle around the z-axis (in radians).
    roll (float): Rotation angle around the x-axis (in radians).
    pitch (float): Rotation angle around the y-axis (in radians).

    Returns:
    numpy.ndarray: A 3x3 rotation matrix.
    """
    c_y, s_y = np.cos(yaw), np.sin(yaw)
    c_r, s_r = np.cos(roll), np.sin(roll)
    c_p, s_p = np.cos(pitch), np.sin(pitch)
    
    rotation_matrix = np.dot(np.dot(
        np.array([[c_y, -s_y, 0], [s_y, c_y, 0], [0, 0, 1]]),
        np.array([[c_p, 0, s_p], [0, 1, 0], [-s_p, 0, c_p]])
    ), np.array([[1, 0, 0], [0, c_r, -s_r], [0, s_r, c_r]]))
    
    return rotation_matrix

def filter_points_in_roi(pcData, ROI):
    """
    Filter the points in the point cloud to retain only those within a specified Region of Interest (ROI).

    Parameters:
    pcData (numpy.ndarray): The input point cloud data, typically in the shape (n, 3).
    ROI (dict): Dictionary specifying the Region of Interest (ROI) with keys "xmin", "xmax", "ymin", "ymax", 
                "zmin", and "zmax".

    Returns:
    numpy.ndarray: The point cloud with only the points inside the ROI.
    """
    mask = (
        (pcData[:, 0] >= ROI["xmin"]) & (pcData[:, 0] <= ROI["xmax"]) & 
        (pcData[:, 1] >= ROI["ymin"]) & (pcData[:, 1] <= ROI["ymax"]) & 
        (pcData[:, 2] >= ROI["zmin"]) & (pcData[:, 2] <= ROI["zmax"])
    )
    
    return pcData[mask]

# Example

In [None]:
class calib_astyx():
    """Calibration class"""
    def __init__(self, file):
        with open(file) as json_file:
            data = json.load(json_file)
            
        self.radar2ref = np.array(data["sensors"][0]["calib_data"]["T_to_ref_COS"])
        self.lidar2ref_cos = np.array(data["sensors"][1]["calib_data"]["T_to_ref_COS"])
        self.camera2ref = np.array(data["sensors"][2]["calib_data"]["T_to_ref_COS"])
        self.K = np.array(data["sensors"][2]["calib_data"]["K"])
        
        self.ref2radar = self.inv_trans(self.radar2ref)
        self.ref2lidar = self.inv_trans(self.lidar2ref_cos)
        self.ref2camera = self.inv_trans(self.camera2ref)
        self.K = np.array(data["sensors"][2]["calib_data"]["K"])

    @staticmethod
    def inv_trans(T):
        rotation = np.linalg.inv(T[0:3, 0:3])
        translation = T[0:3, 3]
        translation = -1 * np.dot(rotation, translation.T)
        translation = np.reshape(translation, (3, 1))
        Q = np.hstack((rotation, translation))

        return Q
    
    def lidar2ref(self, points):
        n = points.shape[0]
        
        points_hom = np.hstack((points, np.ones((n,1))))
        points_ref = np.dot(points_hom, np.transpose(self.lidar2ref_cos))
        
        return points_ref[:,0:3]

    def ref2Camera(self, points, img_size):
        obj_image = np.dot(self.ref2camera[0:3, 0:3], points.T)
        T = self.ref2camera[0:3, 3]
        obj_image = obj_image + T[:, np.newaxis]
        obj_image = np.dot(self.K, obj_image)
        obj_image = obj_image / obj_image[2]
        obj_image = np.delete(obj_image, 2, 0)
        mask = (obj_image[0,:] <= img_size[0]) & \
                (obj_image[1,:] <= img_size[1]) & \
                (obj_image[0,:] >= 0) & (obj_image[1,:] >= 0) & \
                (points[:,0] >= 0)
        return obj_image[:, mask], mask

In [None]:
ROI = {"xmin":0,
       "xmax":50,
       "ymin":-25,
       "ymax":25,
       "zmin":-2,
       "zmax":2}

In [None]:
idx = 21 
root = "your/path/to/dataset_astyx_hires2019"
img = cv2.imread(f"{root}/camera_front/{idx:06d}.jpg")
lidar = filter_points_in_roi(np.loadtxt(f"{root}/lidar_vlp16/{idx:06d}.txt", skiprows=1), ROI)
radar = filter_points_in_roi(np.loadtxt(f"{root}/radar_6455/{idx:06d}.txt", skiprows=2), ROI)
calib = calib_astyx(f"{root}/calibration/{idx:06d}.json")
lidar[:,0:3] = calib.lidar2ref(lidar[:,0:3])

In [None]:
data = [go.Scatter3d(x = lidar[:,0],
                     y = lidar[:,1],
                     z = lidar[:,2],
                    mode='markers', type='scatter3d',
                    marker={
                        'size': 1,
                        'color': lidar[:,3],
                        'colorscale':'rainbow',
                        'colorbar':{"thickness":20}
})]
layout = go.Layout(
    scene={
        'xaxis': {'range': [0, 50], 'rangemode': 'tozero', 'tick0': -5},
        'yaxis': {'range': [-25, 25], 'rangemode': 'tozero', 'tick0': -5},
        'zaxis': {'range': [-25, 25], 'rangemode': 'tozero'},
    },showlegend=False
)
go.Figure(data=data, layout=layout)

In [None]:
distortion = "cluster"
level = 4
lidar_dist = disturb_pc(lidar[:,:4], [distortion], [level], ROI, lidar = True)

In [None]:
data = [go.Scatter3d(x = lidar_dist[:,0],
                     y = lidar_dist[:,1],
                     z = lidar_dist[:,2],
                    mode='markers', type='scatter3d',
                    marker={
                        'size': 1,
                        'color': lidar_dist[:,3],
                        'colorscale':'rainbow',
                        'colorbar':{"thickness":20}
})]
layout = go.Layout(
    scene={
        'xaxis': {'range': [0, 50], 'rangemode': 'tozero', 'tick0': -5},
        'yaxis': {'range': [-25, 25], 'rangemode': 'tozero', 'tick0': -5},
        'zaxis': {'range': [-25, 25], 'rangemode': 'tozero'},
    },showlegend=False
)
go.Figure(data=data, layout=layout)