In [7]:
import matplotlib.pyplot as plt
import numpy as np
import open3d as o3d
from sklearn.cluster import DBSCAN
from scipy.spatial import KDTree
from PIL import Image
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas


import sys
sys.path.append("/Users/apple/Documents/UniversityDocument/Thesis/automative_self_driving_car/Autonomous-Navigation-and-Collision-Avoidance-Sytem-for-car/Collision_Avoidance")

from constants import *
from helpers.utils import * 

import pickle
# Load the pickle file
with open('/Users/apple/Documents/UniversityDocument/Thesis/automative_self_driving_car/Autonomous-Navigation-and-Collision-Avoidance-Sytem-for-car/Collision_Avoidance/data/cloud_point_data4k.pkl', 'rb') as f:
    cloud_point_data = pickle.load(f)
    
print(f"Loaded {len(cloud_point_data)} point clouds")
cloud_point_data = cloud_point_data[1000:1004]
# cloud_point_data = cloud_point_data[500:503]
# cloud_point_data = cloud_point_data[520:540]
print(f"Loaded {len(cloud_point_data)} point clouds")

def generate_3d_circle_points(radius, height, num_points=1000):
    r = radius * np.sqrt(np.random.rand(num_points))
    theta = np.random.rand(num_points) * 2 * np.pi
    x = r * np.cos(theta)
    y = r * np.sin(theta)
    z = np.full(num_points, height)  # z = 0 for a circle in the xy-plane
    circle_points = np.vstack((x, y, z)).T
    return circle_points

def generate_3d_rectangle_points(a, b, height, num_points=5000):
    x = np.random.uniform(-3*a/10, 7*a/10, num_points)
    y = np.random.uniform(-b/2, b/2, num_points)
    z = np.full(num_points, height)
    rectangle_points = np.vstack((x, y, z)).T
    return rectangle_points

def filter_road(pcd):
    z_values = [point[2] for point in pcd]
    unique_z_values = sorted(set(z_values))
    # print(unique_z_values)
    average_height = np.sum(unique_z_values[:100]) / 100
    # road_height = np.ceil(average_height * 10) / 10
    road_height = np.ceil(unique_z_values[0] * 10) / 10
    # print(road_height)
    height_tolerance = 0.05  # Define a tolerance level for "closeness"
    height_filter = np.isclose(pcd[:, 2], road_height, atol=height_tolerance)
    combined_points = pcd[height_filter]

    # height_filter = pcd[:, 2] <= road_height
    # combined_points = pcd[height_filter]
    circle_points = generate_3d_circle_points(radius=4, height=road_height, num_points=500)
    rectangle_points = generate_3d_rectangle_points(a=15, b=5, height=road_height, num_points=500)
    combined_points = np.vstack((combined_points, circle_points, rectangle_points))
    return combined_points, road_height

def build_road_boundary_map(road_points):
    # Initialize the boundary maps
    x_boundary_map = defaultdict(lambda: {'min': float('inf'), 'max': -float('inf')})
    y_boundary_map = defaultdict(lambda: {'min': float('inf'), 'max': -float('inf')})
    
    # Populate the maps with min/max x and y values for each y and x, respectively
    for point in road_points:
        x, y, _ = np.round(point)  # We ignore the z value

        x_boundary_map[x]['min'] = min(x_boundary_map[x]['min'], y)
        x_boundary_map[x]['max'] = max(x_boundary_map[x]['max'], y)
        
        # Update y map with the min/max x values
        y_boundary_map[y]['min'] = min(y_boundary_map[y]['min'], x)
        y_boundary_map[y]['max'] = max(y_boundary_map[y]['max'], x)
    
    return x_boundary_map, y_boundary_map


def is_point_on_road(x_boundary_map, y_boundary_map, point, road_height):
    x, y , z = point
    x_r, y_r, _ = np.round(point)
    _, _, z_r = np.floor(point * 10) / 10
    if ((x_boundary_map[x_r]['min'] <= y <= x_boundary_map[x_r]['max']) and (y_boundary_map[y_r]['min'] <= x <= y_boundary_map[y_r]['max']) and z_r > road_height):
        return True

    return False

def find_vehicle_on_road_per_frame(frames):
    road_points, road_height = filter_road(frames)
    x_boundary_map, y_boundary_map = build_road_boundary_map(road_points)
    vehicle_points = []
    for i, point in enumerate(frames):
        if is_point_on_road(x_boundary_map, y_boundary_map, point, road_height):
            vehicle_points.append(point)
    return vehicle_points

# def plot_road_boundary_with_fill(x_boundary_map, y_boundary_map, frame_idx, frames_path):
#     plt.figure(figsize=(6, 6))

#     # Extract and sort x and y boundaries, swap roles for rotation
#     y_values = sorted(x_boundary_map.keys())
#     x_min_values = [x_boundary_map[y]['min'] for y in y_values]
#     x_max_values = [x_boundary_map[y]['max'] for y in y_values]

#     plt.fill_betweenx(y_values, x_min_values, x_max_values, color='green', alpha=0.4, label='Road Area')
    
#     plt.fill_betweenx(y_values, min(x_min_values) - 10, x_min_values, color='red', alpha=0.4, label='Outside Road Area (Left)')
#     plt.fill_betweenx(y_values, x_max_values, max(x_max_values) + 10, color='red', alpha=0.4, label='Outside Road Area (Right)')
    
#     teal_bbox_x = [-1.1, 1.1, 1.1, -1.1, -1.1]  # X-coordinates (in swapped axes)
#     teal_bbox_y = [-2.4, -2.4, 2.4, 2.4, -2.4]  # Y-coordinates (in swapped axes)
#     plt.plot(teal_bbox_x, teal_bbox_y, color='teal', lw=2, label='Fixed Teal BBox')
    
#     # plt.title('Rotated Road Boundary and Filled Area Visualization')
#     # plt.xlabel('Y-axis')
#     # plt.ylabel('X-axis')
#     plt.axis('equal')  # Ensure equal scaling

#     plt.minorticks_on()
#     plt.grid(which='major', linestyle='-', linewidth=0.5, color='black')
    
#     # Set major and minor ticks to 0.2 interval
#     plt.gca().xaxis.set_major_locator(plt.MultipleLocator(1))
#     plt.gca().yaxis.set_major_locator(plt.MultipleLocator(1))
    
#     plt.gca().xaxis.set_ticklabels([])
#     plt.gca().yaxis.set_ticklabels([])
        
#     plt.grid(True)
#     plt.xlim(-30, 30)  # Adjust limits as needed for visualization
#     plt.ylim(-30, 30)
#     plt.gca().invert_yaxis()  
#     plt.gca().set_aspect('equal', adjustable='box')
    
#     # plt.show()
#     # if not os.path.exists(frames_path):
#     #     os.makedirs(frames_path)
    
#     # frame_filename = os.path.join(frames_path, f"frame_{frame_idx:04d}.png")
#     # plt.savefig(frame_filename, bbox_inches='tight')
#     # plt.close()


def plot_road_boundary_with_fill(x_boundary_map, y_boundary_map, frame_idx, frames_path):
    fig, ax = plt.subplots(figsize=(6, 6))
    canvas = FigureCanvas(fig)

    # Extract and sort x and y boundaries, swap roles for rotation
    y_values = sorted(x_boundary_map.keys())
    x_min_values = [x_boundary_map[y]['min'] for y in y_values]
    x_max_values = [x_boundary_map[y]['max'] for y in y_values]

    ax.fill_betweenx(y_values, x_min_values, x_max_values, color='green', label='Road Area')
    ax.fill_betweenx(y_values, min(x_min_values) - 10, x_min_values, color='red', label='Outside Road Area (Left)')
    ax.fill_betweenx(y_values, x_max_values, max(x_max_values) + 10, color='red', label='Outside Road Area (Right)')
    
    teal_bbox_x = [-1.1, 1.1, 1.1, -1.1, -1.1]  # X-coordinates (in swapped axes)
    teal_bbox_y = [-2.4, -2.4, 2.4, 2.4, -2.4]  # Y-coordinates (in swapped axes)
    ax.plot(teal_bbox_x, teal_bbox_y, color='teal', lw=2, label='Fixed Teal BBox')
    
    ax.axis('equal')  # Ensure equal scaling

    # ax.minorticks_on()
    # ax.grid(which='major', linestyle='-', linewidth=0.5, color='black')
    
    # Set major and minor ticks to 0.2 interval
    # ax.xaxis.set_major_locator(plt.MultipleLocator(1))
    # ax.yaxis.set_major_locator(plt.MultipleLocator(1))
    
    ax.xaxis.set_ticklabels([])
    ax.yaxis.set_ticklabels([])
        
    # ax.grid(True)
    ax.set_xlim(-30, 30)  # Adjust limits as needed for visualization
    ax.set_ylim(-30, 30)
    ax.invert_yaxis()  
    ax.set_aspect('equal', adjustable='box')

    # Render the plot to a NumPy array
    canvas.draw()
    width, height = fig.get_size_inches() * fig.get_dpi()
    image = np.frombuffer(canvas.buffer_rgba(), dtype='uint8')
    image = image.reshape(int(height), int(width), 4)[:,:,:3]
    # Convert NumPy array to PIL Image and display it
    # image = Image.fromarray(image)
    # image.show()

    plt.close(fig)
    return image



class PointCloudProcessor:
    def __init__(self, output_dir=None, store_frames=True, visualize=True, eps=EPS, min_samples=MIN_SAMPLES):
        self.eps = eps
        self.min_samples = min_samples
        self.visualize = visualize
        self.output_dir = output_dir
        self.store_frames = store_frames
        
        self.previous_centroids = []
        self.previous_cluster_ids = []

        # Initialize tracking history and IDs
        self.cluster_history = []
        self.tracking_id = 0
        self.cluster_color_map = {}
        self.previous_centroids_map = {}
        self.previous_directions_map = {}
        self.vehicle_type_cluster = {}
        
        if self.visualize == True:
            self.vis = o3d.visualization.Visualizer()
            self.vis.create_window(visible=visualize)
        
        self.frame_count = 0
    
    def process_frame(self, current_frame):
        vehicle_points = find_vehicle_on_road_per_frame(current_frame)
        vehicle_points = np.array(vehicle_points)
        clustering = DBSCAN(eps=self.eps, min_samples=self.min_samples).fit(vehicle_points)
        labels = clustering.labels_
        
        current_clusters_points = []
        current_centroids = []
        current_bboxs = []
        
        for cluster_label in set(labels):
            if cluster_label == -1:
                continue  
            
            cluster_points = vehicle_points[labels == cluster_label]
            
            if np.any(cluster_points[:, 2] >= 1):
                continue
            
            if np.any(cluster_points[:, 2] >= -1):
                cluster_pcd = o3d.geometry.PointCloud()
                cluster_pcd.points = o3d.utility.Vector3dVector(cluster_points)
                bbox = cluster_pcd.get_axis_aligned_bounding_box()
                point = np.asarray(bbox.get_box_points())
                d_height = np.linalg.norm(point[0] - point[3])
                d0 = np.linalg.norm(point[0] - point[1])
                d1 = np.linalg.norm(point[0] - point[2])
                d_sorted = sorted(np.asarray([d0, d1]), reverse=True)
                
                # d0, d1, d2 = bbox.get_extent()
                # if (d0 * d1 * d2 > 2): 
                if d_height > 1 and d1 > 1:
                # if (d0 * d1 > 2) and d_sorted[0]/d_height < 4 : 
                    current_clusters_points.append(cluster_points)
                    current_centroids.append(bbox.get_center())
                    current_bboxs.append(bbox)
            
        current_centroids = np.array(current_centroids)
        
        if self.frame_count == 0:
            cluster_ids = np.arange(len(current_centroids))
            self.tracking_id = len(current_centroids)
        else:
            cluster_ids, self.tracking_id = self.track_clusters(current_centroids, self.previous_centroids, self.previous_cluster_ids)
        
        if self.visualize == True:
            self.visualize_frame(current_clusters_points, current_bboxs, cluster_ids)
        
        self.previous_centroids = current_centroids
        self.previous_cluster_ids = cluster_ids
        
        self.frame_count += 1
            
            
    def track_clusters(self, current_centroids, previous_centroids, prev_cluster_ids, threshold=DIST_THRESHOLD_TRACKING):
        if len(previous_centroids) == 0:
            return np.arange(len(current_centroids)) + self.tracking_id, self.tracking_id + len(current_centroids)
        
        if len(current_centroids) == 0:
            return np.array([]), self.tracking_id
        
        tree = KDTree(previous_centroids)
        assigned_ids = np.full(len(current_centroids), -1)
        distances, indices = tree.query(current_centroids)
        
        # Assign tracking IDs based on distance threshold
        for i, (dist, idx) in enumerate(zip(distances, indices)):
            if dist < threshold:
                assigned_ids[i] = prev_cluster_ids[idx]
                
        new_clusters = (assigned_ids == -1)
        assigned_ids[new_clusters] = np.arange(self.tracking_id, self.tracking_id + np.sum(new_clusters))
        
        return assigned_ids, self.tracking_id + np.sum(new_clusters)
    

    def visualize_frame(self, cluster_points, bboxs, cluster_ids):
        all_points = []
        all_colors = []
        bounding_boxes = []
        arrows = []
        arrows_2D = []
        
        for i, cluster in enumerate(cluster_points):
            cluster_pcd = o3d.geometry.PointCloud()
            cluster_pcd.points = o3d.utility.Vector3dVector(cluster)
            
            bbox = bboxs[i]
            bbox_center = bbox.get_center()
            
            bbox.color = (0, 1, 0)
            centroid_color = [0.5, 0.4, 0.4]
            
            if cluster_ids[i] in self.previous_centroids_map:
                prev_centroid = self.previous_centroids_map[cluster_ids[i]]
                previous_direction = self.previous_directions_map.get(cluster_ids[i], None)
            
                arrow_result = create_arrows(prev_centroid, bbox_center, bbox, previous_direction)

                if arrow_result is not None:
                    lines = [np.asarray(line.points) for line in arrow_result]
                    arrows_2D.append(lines)

                    for arrow in arrow_result:
                        arrows.append(arrow)

                new_direction = bbox_center - prev_centroid
                self.previous_directions_map[cluster_ids[i]] = new_direction  
                
                prev_sphere = o3d.geometry.TriangleMesh.create_sphere(radius=0.05)
                prev_sphere.translate(prev_centroid)
                prev_sphere.paint_uniform_color([0.5, 0.5, 0.5])  
            
                
            self.previous_centroids_map[cluster_ids[i]] = bbox_center
            
            bounding_boxes.append(bbox)
            all_colors.append(np.tile(centroid_color, (len(cluster), 1))) 
            all_points.append(cluster)
            
        if len(all_points) > 0:
            all_points = np.vstack(all_points)
            all_colors = np.vstack(all_colors)
            
            pcd = o3d.geometry.PointCloud()
            pcd.points = o3d.utility.Vector3dVector(all_points)
            pcd.colors = o3d.utility.Vector3dVector(all_colors)
            
            self.vis.clear_geometries()
            entities_to_draw = [pcd] + bounding_boxes + arrows

            for entity in entities_to_draw:
                self.vis.add_geometry(entity)
            
            view_control = self.vis.get_view_control() 
            view_control.set_lookat([0,0,0])
            view_control.set_up([-1,0,0])
            
            # view_control.set_zoom(0.5)
            
            self.vis.poll_events()
            self.vis.update_renderer()
            
            if self.store_frames == True and self.output_dir != None:
                frame_image = np.array(self.vis.capture_screen_float_buffer(True)) * 255
                frame_image = frame_image.astype(np.uint8)
                
                if not os.path.exists(self.output_dir):
                    os.makedirs(self.output_dir)

                frame_filename = os.path.join(self.output_dir, f"frame_{self.frame_count:04d}.png")
                visualize_highest_points(pcd, bounding_boxes, arrows_2D, frame_filename)
                # cv2.imwrite(frame_filename, frame_image)

    
    def close_vis(self):
        self.vis.destroy_window()
     
frames_path = "/Users/apple/Documents/UniversityDocument/Thesis/automative_self_driving_car/Autonomous-Navigation-and-Collision-Avoidance-Sytem-for-car/Collision_Avoidance/frames"
output_path = "/Users/apple/Documents/UniversityDocument/Thesis/automative_self_driving_car/Autonomous-Navigation-and-Collision-Avoidance-Sytem-for-car/Collision_Avoidance/output_videos/test.mov"

# processor = PointCloudProcessor()
image = None
# Example usage:
for frame_idx in range(len(cloud_point_data)):
    current_frame = cloud_point_data[frame_idx]
    
    road_points, road_height = filter_road(current_frame)
    x_boundary_map, y_boundary_map = build_road_boundary_map(road_points)
    
    image = plot_road_boundary_with_fill(x_boundary_map, y_boundary_map, frame_idx, frames_path)
    
    
    # pcd = o3d.geometry.PointCloud()
    # pcd.points = o3d.utility.Vector3dVector(cloud_point_data[frame_idx])
    # vis = o3d.visualization.Visualizer()
    # vis.create_window()
    # vis.add_geometry(pcd)
    # vis.run()  # Blocks until the window is closed
    # vis.destroy_window()

# create_video_from_frames("/Users/apple/Documents/UniversityDocument/Thesis/automative_self_driving_car/Autonomous-Navigation-and-Collision-Avoidance-Sytem-for-car/Collision_Avoidance/frames", "/Users/apple/Documents/UniversityDocument/Thesis/automative_self_driving_car/Autonomous-Navigation-and-Collision-Avoidance-Sytem-for-car/Collision_Avoidance/output_videos/test.mov", frame_rate=FRAME_RATE)

Loaded 4625 point clouds
Loaded 4 point clouds


In [46]:
create_video_from_frames("/Users/apple/Documents/UniversityDocument/Thesis/automative_self_driving_car/Autonomous-Navigation-and-Collision-Avoidance-Sytem-for-car/Collision_Avoidance/frames", "/Users/apple/Documents/UniversityDocument/Thesis/automative_self_driving_car/Autonomous-Navigation-and-Collision-Avoidance-Sytem-for-car/Collision_Avoidance/output_videos/test.mov", frame_rate=FRAME_RATE)

OpenCV: FFMPEG: tag 0x47504a4d/'MJPG' is not supported with codec id 7 and format 'mov / QuickTime / MOV'
OpenCV: FFMPEG: fallback to use tag 0x6765706a/'jpeg'


Video saved as /Users/apple/Documents/UniversityDocument/Thesis/automative_self_driving_car/Autonomous-Navigation-and-Collision-Avoidance-Sytem-for-car/Collision_Avoidance/output_videos/test.mov


In [8]:

# print(image.shape)
img = image.copy()
print(img.shape)
# image_pil = Image.fromarray(img)
# image_pil.show()

def crop_center_rectangle(image, target_height=440, target_width=440):
    h, w, c = image.shape  # Get the shape of the original image
    # Calculate the start and end indices for height and width
    start_y = (h - target_height) // 2
    start_x = (w - target_width) // 2
    end_y = start_y + target_height
    end_x = start_x + target_width

    # Crop the image
    cropped_image = image[start_y:end_y, start_x:end_x]
    return cropped_image

# Example usage
# Assuming `img` is a (600, 600, 3) NumPy array
cropped_image = crop_center_rectangle(img)
print(cropped_image.shape)
# cropped_image = np.squeeze(cropped_image, axis=2)
image_pil = Image.fromarray(cropped_image)
image_pil.show()

# def max_pooling_2d(image, pool_size=(3, 3), stride=4):
#     # Get the dimensions of the image
#     h, w, c = image.shape
#     pool_height, pool_width = pool_size

#     # Calculate the dimensions of the output after pooling
#     out_height = h // stride
#     out_width = w // stride

#     # Initialize the output matrix
#     pooled_image = np.zeros((out_height, out_width), dtype=image.dtype)

#     # Perform max pooling
#     for i in range(0, h - pool_height + 1, stride):
#         for j in range(0, w - pool_width + 1, stride):
#             window = image[i:i + pool_height, j:j + pool_width]
#             pooled_image[i // stride, j // stride] = np.max(window)

#     return pooled_image

# pooled_image = max_pooling_2d(cropped_image)
# print(pooled_image.shape) 
# image_pil = Image.fromarray(pooled_image)
# image_pil.show()


(600, 600, 3)
(440, 440, 3)


In [5]:
green_threshold = [0, 255, 0]

# Create a mask for green pixels
green_mask = (img[:, :, 0] == green_threshold[0]) & (img[:, :, 1] == green_threshold[1]) & (img[:, :, 2] == green_threshold[2])

# Convert the mask to a single-channel image (1 for green, 0 for others)
binary_image = green_mask.astype(np.uint8)

print(binary_image.shape)  # Should output (600, 600)

(600, 600)


In [49]:
print(binary_image)

[[0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 ...
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]]
