# Create Video

Cloud point data collect of autopilot car have LiDAR sensor and generic traffic surround.

Get the cloud point data -> process and store all the frames -> create video from the frames

In [57]:
import os
import numpy as np
import open3d as o3d
from sklearn.cluster import DBSCAN
from scipy.spatial import KDTree
import cv2

from constants import *

class PointCloudProcessor:
    """
    Processes point cloud data frames, performs clustering, tracking, and vehicle classification.
    """
    def __init__(self, cloud_point_data, eps=EPS, min_samples=MIN_SAMPLES):
        self.cloud_point_data = cloud_point_data
        self.num_frames = len(cloud_point_data)
        self.eps = eps
        self.min_samples = min_samples

        # Initialize tracking history and IDs
        self.cluster_history = []
        self.tracking_id = 0
        self.cluster_color_map = {}
        self.previous_centroids_map = {}
        self.previous_directions_map = {}
        self.vehicle_type_cluster = {}

    def process_frames(self):
        """Processes each frame to perform clustering, tracking, and vehicle classification."""
        previous_centroids = []
        previous_cluster_ids = []
        for frame_idx in range(self.num_frames):
            current_frame = self.cloud_point_data[frame_idx]
            # Perform clustering
            clustering = DBSCAN(eps=self.eps, min_samples=self.min_samples).fit(current_frame)
            labels = clustering.labels_

            current_centroids = []
            cluster_points = []

            # Extract clusters and centroids
            for cluster_label in set(labels):
                if cluster_label == -1:
                    continue  # Skip noise points
                
                cluster_data = current_frame[labels == cluster_label]
  
                if np.any(cluster_data[:, 2] >= 2):
                    continue
                
                clus_points = current_frame[labels == cluster_label]
                cluster_points.append(clus_points)

                cluster_pcd = o3d.geometry.PointCloud()
                cluster_pcd.points = o3d.utility.Vector3dVector(clus_points)

                bbox = cluster_pcd.get_axis_aligned_bounding_box()
                bbox_center = bbox.get_center()
                current_centroids.append(bbox_center)

            current_centroids = np.array(current_centroids)

            # Track clusters
            if frame_idx == 0:
                # For the first frame, initialize the cluster IDs
                cluster_ids = np.arange(len(current_centroids))
                self.tracking_id = len(current_centroids)
            else:
                cluster_ids, self.tracking_id = self.track_clusters(current_centroids, previous_centroids, previous_cluster_ids)

            # Save the cluster history for visualization
            self.cluster_history.append((cluster_points, current_centroids, cluster_ids))

            # Update previous centroids and cluster IDs
            previous_centroids = current_centroids
            previous_cluster_ids = cluster_ids

            # Assign colors to clusters
            for cluster_id in cluster_ids:
                self.assign_color(cluster_id)
        
    def track_clusters(self, current_centroids, previous_centroids, prev_cluster_ids, threshold=DIST_THRESHOLD_TRACKING):
        """Matches current centroids with previous centroids using nearest neighbor approach."""
        if len(previous_centroids) == 0:
            return np.arange(len(current_centroids)) + self.tracking_id, self.tracking_id + len(current_centroids)
        
        # Create a KDTree for fast nearest neighbor lookup
        tree = KDTree(previous_centroids)
        
        # Assign new tracking IDs based on nearest neighbors from the previous frame
        assigned_ids = np.full(len(current_centroids), -1)  # -1 for new clusters
        distances, indices = tree.query(current_centroids)
        
        # Assign tracking IDs based on distance threshold
        for i, (dist, idx) in enumerate(zip(distances, indices)):
            if dist < threshold:
                assigned_ids[i] = prev_cluster_ids[idx]  # Match with existing cluster ID
        
        # Assign new IDs to unmatched clusters
        new_clusters = (assigned_ids == -1)
        assigned_ids[new_clusters] = np.arange(self.tracking_id, self.tracking_id + np.sum(new_clusters))
        
        return assigned_ids, self.tracking_id + np.sum(new_clusters)
    
    def classify_vehicle(self, bbox):
        """Classifies the object as a car, truck, or motorcycle based on the bounding box dimensions."""
        dimensions = bbox.get_extent()
        dimensions = sorted(dimensions, reverse=True)
        d0, d1, d2 = dimensions[0], dimensions[1], dimensions[2]
    
        for vehicle_type, size_range in VEHICLE_SIZE_RANGES.items():
            if (size_range['0'][0] <= d0 <= size_range['0'][1] and
                size_range['1'][0] <= d1 <= size_range['1'][1] and
                size_range['2'][0] <= d2 <= size_range['2'][1]):
                return vehicle_type
        return None

    def assign_color(self, cluster_id):
        """Assigns a color to a cluster if not already assigned."""
        if cluster_id not in self.cluster_color_map:
            color_index = len(self.cluster_color_map) % len(COLOR_NAMES)  # Cycle through the color list
            self.cluster_color_map[cluster_id] = (COLOR_NAMES[color_index], COLOR_VALUES[COLOR_NAMES[color_index]])
        
    def create_arrow(self, start, end, bbox, previous_direction=None, scale_factor=ARROW_SCALE_FACTOR):
        """
        Creates an arrow mesh representing movement direction with a scaled length based on the magnitude of the vector.
        Uses Exponential Moving Average (EMA) for smoothing the direction.
        """
        kmh = self.find_velocity_kmh(end, start)
        
        if kmh > VELOCITY_THRESHOLD:
            # Calculate the direction vector
            new_direction = end - start
            new_direction[2] = 0  # Ignore Z-axis movement for 2D plane movement visualization
    
            # If there is a previous direction, update it using exponential moving average
            if previous_direction is not None:
                updated_direction = 0.7 * previous_direction + 0.3 * new_direction  # Apply EMA
            else:
                updated_direction = new_direction
    
            # Calculate the magnitude (length) of the direction vector
            length = np.linalg.norm(updated_direction)
    
            if length == 0:
                return None  # No arrow needed if there's no movement
    
            # Scale the arrow length by a given factor
            scaled_length = length * scale_factor
    
            # Normalize the updated direction vector
            updated_direction_normalized = updated_direction / length
    
            # Create an arrow mesh with length proportional to the scaled movement
            arrow1 = o3d.geometry.TriangleMesh.create_arrow(cylinder_radius=0.1, cone_radius=0.2,
                                                           cylinder_height=scaled_length * 0.3, cone_height=scaled_length * 0.2)
            arrow2 = o3d.geometry.TriangleMesh.create_arrow(cylinder_radius=0.1, cone_radius=0.2,
                                                           cylinder_height=scaled_length * 0.3, cone_height=scaled_length * 0.2)
    
            # Align the arrow with the updated direction vector
            z_unit_vector = np.array([0, 0, 1])  # Upward vector
            axis = np.cross(z_unit_vector, updated_direction_normalized)
            axis_length = np.linalg.norm(axis)
    
            if axis_length != 0:
                axis /= axis_length  # Normalize the axis of rotation
                angle = np.arccos(np.clip(np.dot(z_unit_vector, updated_direction_normalized), -1.0, 1.0))
                R = o3d.geometry.get_rotation_matrix_from_axis_angle(axis * angle)
                arrow1.rotate(R, center=(0, 0, 0))
                arrow2.rotate(R, center=(0, 0, 0))

            front_top_center, back_top_center = self.find_top_edge_centers(bbox)
            # Translate the arrow to start from the previous centroid (start position)
            arrow1.translate(front_top_center)
            arrow2.translate(back_top_center)
            arrow1.paint_uniform_color([1, 0, 1])  # Set arrow color to purple
            arrow2.paint_uniform_color([1, 0, 1])  # Set arrow color to purple
    
            return arrow1, arrow2
        
        return None

    def find_velocity_kmh(self, start, end, time=1/20):
        """Computes velocity in km/h between two points."""
        v = np.linalg.norm(end - start) / time
        kmh = int(3.6 * v)
        return kmh
    
    def find_top_edge_centers(self, bbox):
        bbox_corners = np.asarray(bbox.get_box_points())
        top_face_points = bbox_corners[bbox_corners[:, 2] == np.max(bbox_corners[:, 2])]
        edges = {
            ('0', '1'): np.linalg.norm(top_face_points[0] - top_face_points[1]),
            ('0', '2'): np.linalg.norm(top_face_points[0] - top_face_points[2]),
            ('0', '3'): np.linalg.norm(top_face_points[0] - top_face_points[3]),
            ('1', '2'): np.linalg.norm(top_face_points[1] - top_face_points[2]),
            ('1', '3'): np.linalg.norm(top_face_points[1] - top_face_points[3]),
            ('2', '3'): np.linalg.norm(top_face_points[2] - top_face_points[3])
        }

        sorted_edges = sorted(edges.items(), key=lambda x: x[1], reverse=True)
        a, b = sorted_edges[4][0]
        c, d = sorted_edges[5][0]
        front = (top_face_points[int(a)]+top_face_points[int(b)])/2
        front[2] = top_face_points[int(a)][2]
        back = (top_face_points[int(c)]+top_face_points[int(d)])/2
        back[2] = top_face_points[int(c)][2]
        return front, back


class Visualizer:
    """
    Handles visualization of point cloud data, bounding boxes, centroids, and movement arrows.
    Saves frames as images.
    """
    def __init__(self, output_dir, output_video='output.mov', frame_rate=FRAME_RATE):
        self.output_dir = output_dir
        self.output_video = output_video
        self.frame_rate = frame_rate

        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)

        self.vis = self.setup_visualizer()

    def setup_visualizer(self):
        """Sets up Open3D visualizer with camera parameters."""
        vis = o3d.visualization.Visualizer()
        vis.create_window(visible=False)
        return vis

    def visualize(self, processor):
        """Visualizes each frame using the processed data from PointCloudProcessor."""
        for frame_idx, (cluster_points, centroids, cluster_ids) in enumerate(processor.cluster_history):
            all_points = []
            all_colors = []
            bounding_boxes = []
            centroids_spheres = []
            arrows = []  # To store arrows between previous and current centroids
            previous_centroids_spheres = []  # To store previous centroids as spheres

            for i, cluster in enumerate(cluster_points):
                # Add all points to the list for visualization
                all_points.append(cluster)
                
                # Create an Open3D point cloud for the current cluster
                cluster_pcd = o3d.geometry.PointCloud()
                cluster_pcd.points = o3d.utility.Vector3dVector(cluster)
                
                # Compute the bounding box for the cluster
                bbox = cluster_pcd.get_axis_aligned_bounding_box()
                bbox_center = bbox.get_center()

                # Ensure that the color is assigned
                processor.assign_color(cluster_ids[i])
                
                vehicle_type = processor.classify_vehicle(bbox)
                
                if (cluster_ids[i] in processor.vehicle_type_cluster) and ((vehicle_type == None) or vehicle_type == processor.vehicle_type_cluster[cluster_ids[i]]):
                    vehicle_type = processor.vehicle_type_cluster[cluster_ids[i]]
                elif (cluster_ids[i] in processor.vehicle_type_cluster) and (vehicle_type != None) and vehicle_type != processor.vehicle_type_cluster[cluster_ids[i]]:
                    processor.vehicle_type_cluster[cluster_ids[i]] = vehicle_type

                if vehicle_type:
                    # Add the bounding box to the list for visualization
                    bbox.color = (0, 1, 0)  # Set bounding box color (e.g., green)
                    bounding_boxes.append(bbox)

                    centroid_color = VEHICLE_CENTROID_COLORS[vehicle_type]  # Color based on vehicle type

                    # Set the color of the entire point cloud cluster to match the centroid color
                    all_colors.append(np.tile(centroid_color, (len(cluster), 1)))  # Apply centroid color to the points

                    # Check if we have a previous centroid for this cluster to draw an arrow
                    if cluster_ids[i] in processor.previous_centroids_map:
                        prev_centroid = processor.previous_centroids_map[cluster_ids[i]]
                        
                        # Get the previous direction if it exists
                        previous_direction = processor.previous_directions_map.get(cluster_ids[i], None)

                        # Create and append the arrow with EMA
                        arrow_result = processor.create_arrow(prev_centroid, bbox_center, bbox, previous_direction)
                        if arrow_result is not None:
                            arrow1, arrow2 = arrow_result
                            arrows.append(arrow1)
                            arrows.append(arrow2)

                        
                        # Store the new direction for future EMA updates
                        new_direction = bbox_center - prev_centroid
                        processor.previous_directions_map[cluster_ids[i]] = new_direction  # Update the direction map

                        # Create a sphere at the previous centroid (grey color)
                        prev_sphere = o3d.geometry.TriangleMesh.create_sphere(radius=0.05)
                        prev_sphere.translate(prev_centroid)
                        prev_sphere.paint_uniform_color([0.5, 0.5, 0.5])  # Grey color for previous centroids
                        previous_centroids_spheres.append(prev_sphere)
                    
                    # Update the previous centroid for this cluster
                    processor.previous_centroids_map[cluster_ids[i]] = bbox_center
                    
                else:
                    centroid_color = [0, 0, 1]  # Default blue if no vehicle classification
                    all_colors.append(np.tile(centroid_color, (len(cluster), 1)))  # Apply default color to points
                
                # Create a sphere at the centroid with the appropriate vehicle type color
                sphere = o3d.geometry.TriangleMesh.create_sphere(radius=0.1)
                sphere.translate(bbox_center)
                sphere.paint_uniform_color(centroid_color)  # Set centroid color based on vehicle type
                centroids_spheres.append(sphere)

            if len(all_points) > 0:
                # Flatten the points and colors arrays for visualization
                all_points = np.vstack(all_points)
                all_colors = np.vstack(all_colors)
                
                # Create an Open3D point cloud object for visualization
                pcd = o3d.geometry.PointCloud()
                pcd.points = o3d.utility.Vector3dVector(all_points)
                pcd.colors = o3d.utility.Vector3dVector(all_colors)
            
                # Visualize all points with bounding boxes, tracked cluster colors, current and previous centroids, and arrows
                self.visualize_frame(pcd, bounding_boxes, arrows, frame_idx)

    def visualize_frame(self, cloud, bounding_boxes, arrows, frame_idx):
        """Renders the current frame with the visualizer and saves it as an image."""
        self.vis.clear_geometries()
        entities_to_draw = [cloud] + bounding_boxes + arrows

        for entity in entities_to_draw:
            self.vis.add_geometry(entity)

        self.vis.poll_events()
        self.vis.update_renderer()

        frame_image = np.array(self.vis.capture_screen_float_buffer(True)) * 255
        frame_image = frame_image.astype(np.uint8)

        frame_filename = os.path.join(self.output_dir, f"frame_{frame_idx:04d}.png")
        cv2.imwrite(frame_filename, frame_image)
        
    def close(self):
        self.vis.destroy_window()
        

class VideoCreator:
    """
    Creates a video from the saved frame images.
    """
    def __init__(self, output_dir, output_video='output.mov', frame_rate=FRAME_RATE):
        self.output_dir = output_dir
        self.output_video = output_video
        self.frame_rate = frame_rate
        
    def get_frame_count(self):
        """Counts the number of frame images in the output directory."""
        files = [f for f in os.listdir(self.output_dir) if os.path.isfile(os.path.join(self.output_dir, f)) and f.startswith('frame_') and f.endswith('.png')]
        return len(files)


    def create_video_from_frames(self):
        """Combines saved frame images into a video."""
        frame_filename = os.path.join(self.output_dir, "frame_0000.png")
        frame = cv2.imread(frame_filename)

        if frame is None:
            print("Error: Could not load the first frame.")
            return
        
        num_frames = self.get_frame_count()
        
        height, width, _ = frame.shape
        fourcc = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')
        video_writer = cv2.VideoWriter(
            self.output_video, fourcc, self.frame_rate, (width, height))

        for frame_idx in range(num_frames):
            frame_filename = os.path.join(self.output_dir, f"frame_{frame_idx:04d}.png")
            frame = cv2.imread(frame_filename)
            if frame is not None:
                video_writer.write(frame)
            else:
                print(f"Frame {frame_idx} not found or couldn't be loaded.")

        video_writer.release()
        print(f"Video saved as {self.output_video}")


def main():
    try:
        cloud_point_data = np.load('/Users/apple/Documents/UniversityDocument/Thesis/automative_self_driving_car/23-10-2024/data/cloud_point_data.pkl', allow_pickle=True)

        output_dir = '/Users/apple/Documents/UniversityDocument/Thesis/automative_self_driving_car/23-10-2024/frames'
        output_video = '/Users/apple/Documents/UniversityDocument/Thesis/automative_self_driving_car/23-10-2024/videos/output2.mov'
        processor = PointCloudProcessor(cloud_point_data)
        processor.process_frames()
        visualizer = Visualizer(output_dir, output_video=output_video)
        visualizer.visualize(processor)
        
        video_creator = VideoCreator(output_dir, output_video=output_video)
        video_creator.create_video_from_frames()
        visualizer.close()
    finally:
        print("Success")
        
main()



OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.
OpenCV: AVF: waiting to write video data.


Video saved as /Users/apple/Documents/UniversityDocument/Thesis/automative_self_driving_car/23-10-2024/videos/output2.mov
Success


