In [1]:
# CARLA LiDAR Data Visualization and Video Creation
# ================================================

import os
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation
import glob
from tqdm import tqdm
import open3d as o3d
import cv2
import re

# Set the figure style
plt.style.use('dark_background')

# Define a function to read PLY files
def read_ply_file(ply_path):
    """Read a PLY file and return the points and intensities."""
    with open(ply_path, 'r') as f:
        lines = f.readlines()
    
    # Find where the header ends
    header_end = 0
    for i, line in enumerate(lines):
        if line.strip() == "end_header":
            header_end = i
            break
    
    # Extract points and intensities
    points = []
    intensities = []
    for line in lines[header_end + 1:]:
        data = line.strip().split()
        if len(data) >= 4:
            points.append([float(data[0]), float(data[1]), float(data[2])])
            intensities.append(float(data[3]))
    
    return np.array(points), np.array(intensities)

# Function to extract timestamp from filename
def extract_timestamp(filename):
    match = re.search(r'lidar_(\d+\.\d+)\.ply', filename)
    if match:
        return float(match.group(1))
    return 0.0

# Function to visualize a single LiDAR point cloud
def visualize_lidar_point_cloud(points, intensities, ax=None, title=None, colormap='viridis', elev=35, azim=45):
    """Visualize a LiDAR point cloud with matplotlib."""
    if ax is None:
        fig = plt.figure(figsize=(12, 10))
        ax = fig.add_subplot(111, projection='3d')
    
    # Normalize intensities for color mapping
    if len(intensities) > 0:
        norm_intensities = (intensities - intensities.min()) / (intensities.max() - intensities.min() + 1e-10)
    else:
        norm_intensities = np.zeros(len(points))
    
    # Plot points
    scatter = ax.scatter(
        points[:, 0], points[:, 1], points[:, 2],
        c=norm_intensities,
        cmap=colormap,
        s=0.5,  # Point size
        alpha=0.7  # Transparency
    )
    
    # Set view angle
    ax.view_init(elev=elev, azim=azim)
    
    # Set labels and title
    ax.set_xlabel('X (m)')
    ax.set_ylabel('Y (m)')
    ax.set_zlabel('Z (m)')
    
    if title:
        ax.set_title(title, fontsize=14)
    
    # Set equal aspect ratios
    max_range = np.array([
        points[:, 0].max() - points[:, 0].min(),
        points[:, 1].max() - points[:, 1].min(),
        points[:, 2].max() - points[:, 2].min()
    ]).max() / 2.0
    
    mid_x = (points[:, 0].max() + points[:, 0].min()) * 0.5
    mid_y = (points[:, 1].max() + points[:, 1].min()) * 0.5
    mid_z = (points[:, 2].max() + points[:, 2].min()) * 0.5
    
    ax.set_xlim(mid_x - max_range, mid_x + max_range)
    ax.set_ylim(mid_y - max_range, mid_y + max_range)
    ax.set_zlim(mid_z - max_range, mid_z + max_range)
    
    # Add a color bar
    plt.colorbar(scatter, ax=ax, label='Intensity', shrink=0.6)
    
    return ax

# Function to create a 2D top-down view
def create_top_down_view(points, intensities, resolution=0.1, max_range=50):
    """Create a top-down view (bird's eye view) of the LiDAR data."""
    # Create a grid for the bird's eye view
    grid_size = int(2 * max_range / resolution)
    grid = np.zeros((grid_size, grid_size), dtype=np.float32)
    
    # Filter points within range
    mask = (np.abs(points[:, 0]) < max_range) & (np.abs(points[:, 1]) < max_range)
    filtered_points = points[mask]
    filtered_intensities = intensities[mask] if intensities.size > 0 else np.zeros(filtered_points.shape[0])
    
    if filtered_points.size == 0:
        return grid
    
    # Convert points to grid coordinates
    x_grid = ((filtered_points[:, 0] + max_range) / resolution).astype(int)
    y_grid = ((filtered_points[:, 1] + max_range) / resolution).astype(int)
    
    # Clip to ensure within grid bounds
    x_grid = np.clip(x_grid, 0, grid_size - 1)
    y_grid = np.clip(y_grid, 0, grid_size - 1)
    
    # Assign intensity values to grid
    for i in range(len(x_grid)):
        grid[y_grid[i], x_grid[i]] = max(grid[y_grid[i], x_grid[i]], filtered_intensities[i])
    
    return grid

# Function to create an Open3D visualization for a single frame
def visualize_with_open3d(points, intensities, save_path=None):
    """Visualize LiDAR data with Open3D and optionally save to an image."""
    # Create point cloud
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)
    
    # Normalize intensities for coloring
    if len(intensities) > 0:
        norm_intensities = (intensities - intensities.min()) / (intensities.max() - intensities.min() + 1e-10)
        # Create a colormap (jet-like)
        colors = np.zeros((len(norm_intensities), 3))
        for i, intensity in enumerate(norm_intensities):
            # Simple blue to red colormap
            colors[i] = [intensity, 0.2, 1.0 - intensity]
        pcd.colors = o3d.utility.Vector3dVector(colors)
    
    # Create a visualizer
    vis = o3d.visualization.Visualizer()
    vis.create_window(width=1280, height=720, visible=False)
    
    # Add the point cloud
    vis.add_geometry(pcd)
    
    # Set up the view
    view_control = vis.get_view_control()
    view_control.set_zoom(0.6)
    view_control.set_front([0, -1, 0])
    view_control.set_lookat([0, 0, 0])
    view_control.set_up([0, 0, 1])
    
    # Render
    vis.poll_events()
    vis.update_renderer()
    
    # Save image if path is provided
    if save_path:
        vis.capture_screen_image(save_path)
    
    # Close the window
    vis.destroy_window()
    
    return

# Main cell to process LiDAR data and create visualization video
def main(lidar_directory, output_video_path, fps=10, vis_type='open3d'):
    """
    Process all LiDAR PLY files in a directory and create a visualization video.
    
    Args:
        lidar_directory: Directory containing LiDAR PLY files
        output_video_path: Path to save the output video
        fps: Frames per second for the output video
        vis_type: Type of visualization ('matplotlib', 'top_down', or 'open3d')
    """
    # Get all PLY files
    ply_files = sorted(glob.glob(os.path.join(lidar_directory, "*.ply")), 
                      key=extract_timestamp)
    
    if not ply_files:
        print(f"No PLY files found in {lidar_directory}")
        return
    
    print(f"Found {len(ply_files)} PLY files")
    
    # Create a temporary directory for frames
    temp_dir = os.path.join(os.path.dirname(output_video_path), "temp_frames")
    os.makedirs(temp_dir, exist_ok=True)
    
    # Process each frame
    for i, ply_file in enumerate(tqdm(ply_files, desc="Processing frames")):
        # Extract timestamp for the frame
        timestamp = extract_timestamp(os.path.basename(ply_file))
        frame_path = os.path.join(temp_dir, f"frame_{i:06d}.png")
        
        # Read PLY file
        points, intensities = read_ply_file(ply_file)
        
        if points.shape[0] == 0:
            print(f"Warning: No points in {ply_file}, skipping")
            continue
        
        if vis_type == 'matplotlib':
            # Visualize with matplotlib
            fig = plt.figure(figsize=(12, 10))
            ax = fig.add_subplot(111, projection='3d')
            visualize_lidar_point_cloud(points, intensities, ax=ax, 
                                      title=f"LiDAR Data (t={timestamp:.2f}s)")
            plt.savefig(frame_path, dpi=100, bbox_inches='tight')
            plt.close(fig)
        
        elif vis_type == 'top_down':
            # Create top-down view
            grid = create_top_down_view(points, intensities)
            
            # Normalize for visualization
            if grid.max() > 0:
                grid = 255 * (grid / grid.max())
            
            # Use a colormap
            colored_grid = cv2.applyColorMap(grid.astype(np.uint8), cv2.COLORMAP_JET)
            
            # Add timestamp text
            cv2.putText(colored_grid, f"t={timestamp:.2f}s", (20, 30), 
                      cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
            
            # Save the frame
            cv2.imwrite(frame_path, colored_grid)
        
        elif vis_type == 'open3d':
            # Visualize with Open3D
            visualize_with_open3d(points, intensities, save_path=frame_path)
    
    # Create video from frames
    frame_size = None
    first_frame = os.path.join(temp_dir, f"frame_000000.png")
    if os.path.exists(first_frame):
        frame = cv2.imread(first_frame)
        if frame is not None:
            frame_size = (frame.shape[1], frame.shape[0])
    
    if frame_size:
        print(f"Creating video with frame size {frame_size} at {fps} fps")
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # MPEG-4 codec
        video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, frame_size)
        
        # Add frames to video
        for i in range(len(ply_files)):
            frame_path = os.path.join(temp_dir, f"frame_{i:06d}.png")
            if os.path.exists(frame_path):
                frame = cv2.imread(frame_path)
                if frame is not None:
                    video_writer.write(frame)
        
        video_writer.release()
        print(f"Video saved to {output_video_path}")
    else:
        print("Error: Could not determine frame size")
    
    # Clean up temporary files (optional)
    # for file in os.listdir(temp_dir):
    #     os.remove(os.path.join(temp_dir, file))
    # os.rmdir(temp_dir)
    # print("Temporary files cleaned up")

# Configuration cell - Modify these parameters for your setup
lidar_directory = "./carla_data/lidar"  # Replace with your LiDAR data directory
output_video = "./carla_lidar_visualization.mp4"
visualization_type = 'open3d'  # Options: 'matplotlib', 'top_down', 'open3d'
video_fps = 10

# Run the main function
main(lidar_directory, output_video, fps=video_fps, vis_type=visualization_type)

# Cell for testing a single frame visualization
def test_single_frame():
    # Find a sample PLY file
    ply_files = glob.glob(os.path.join(lidar_directory, "*.ply"))
    if not ply_files:
        print(f"No PLY files found in {lidar_directory}")
        return
    
    sample_file = ply_files[0]
    print(f"Testing visualization with file: {sample_file}")
    
    # Read the file
    points, intensities = read_ply_file(sample_file)
    
    # Visualize with matplotlib
    fig = plt.figure(figsize=(15, 12))
    
    # First plot: 3D visualization
    ax1 = fig.add_subplot(121, projection='3d')
    visualize_lidar_point_cloud(points, intensities, ax=ax1, title="3D LiDAR Visualization")
    
    # Second plot: 2D top-down view
    ax2 = fig.add_subplot(122)
    grid = create_top_down_view(points, intensities)
    ax2.imshow(grid, cmap='jet', origin='lower')
    ax2.set_title("Top-Down View")
    ax2.set_xlabel("X (grid cells)")
    ax2.set_ylabel("Y (grid cells)")
    
    plt.tight_layout()
    plt.savefig("sample_lidar_visualization.png", dpi=150)
    plt.show()
    
    # Also test Open3D visualization
    visualize_with_open3d(points, intensities, save_path="sample_open3d_visualization.png")
    print("Open3D visualization saved to sample_open3d_visualization.png")

# Uncomment to test single frame visualization
# test_single_frame()

# To combine visualization with vehicle data (optional)
def combine_with_vehicle_data(ego_data_file):
    """Combine LiDAR visualization with vehicle data."""
    if not os.path.exists(ego_data_file):
        print(f"Ego data file not found: {ego_data_file}")
        return
    
    # Read ego vehicle data
    ego_data = []
    with open(ego_data_file, 'r') as f:
        lines = f.readlines()
        headers = lines[0].strip().split(',')
        for line in lines[1:]:
            data = line.strip().split(',')
            ego_data.append({headers[i]: float(data[i]) for i in range(len(headers))})
    
    # Get timestamps from ego data
    ego_timestamps = [data['timestamp'] for data in ego_data]
    
    # Match LiDAR files with vehicle data based on timestamps
    ply_files = sorted(glob.glob(os.path.join(lidar_directory, "*.ply")), 
                      key=extract_timestamp)
    
    # Create output directory
    output_dir = "combined_visualization"
    os.makedirs(output_dir, exist_ok=True)
    
    # Process each LiDAR frame and add vehicle data
    for ply_file in tqdm(ply_files, desc="Creating combined visualizations"):
        lidar_timestamp = extract_timestamp(os.path.basename(ply_file))
        
        # Find closest ego data by timestamp
        closest_idx = min(range(len(ego_timestamps)), 
                          key=lambda i: abs(ego_timestamps[i] - lidar_timestamp))
        vehicle_data = ego_data[closest_idx]
        
        # Read LiDAR data
        points, intensities = read_ply_file(ply_file)
        
        # Create visualization
        fig = plt.figure(figsize=(15, 10))
        
        # 3D LiDAR plot
        ax1 = fig.add_subplot(121, projection='3d')
        visualize_lidar_point_cloud(points, intensities, ax=ax1, 
                                   title=f"LiDAR Data (t={lidar_timestamp:.2f}s)")
        
        # Vehicle data plot
        ax2 = fig.add_subplot(122)
        ax2.text(0.1, 0.9, f"Vehicle Data (t={vehicle_data['timestamp']:.2f}s)", 
                transform=ax2.transAxes, fontsize=14)
        ax2.text(0.1, 0.8, f"Position: ({vehicle_data['x']:.2f}, {vehicle_data['y']:.2f}, {vehicle_data['z']:.2f})", 
                transform=ax2.transAxes)
        ax2.text(0.1, 0.7, f"Velocity: {np.sqrt(vehicle_data['velocity_x']**2 + vehicle_data['velocity_y']**2):.2f} m/s", 
                transform=ax2.transAxes)
        ax2.text(0.1, 0.6, f"Heading: {vehicle_data['heading']:.2f}°", 
                transform=ax2.transAxes)
        ax2.text(0.1, 0.5, f"Steering: {vehicle_data['steering']:.2f}", 
                transform=ax2.transAxes)
        ax2.text(0.1, 0.4, f"Throttle: {vehicle_data['throttle']:.2f}", 
                transform=ax2.transAxes)
        ax2.text(0.1, 0.3, f"Brake: {vehicle_data['brake']:.2f}", 
                transform=ax2.transAxes)
        
        # Hide axis for the text plot
        ax2.axis('off')
        
        # Save the figure
        output_path = os.path.join(output_dir, f"combined_{int(lidar_timestamp*100):06d}.png")
        plt.savefig(output_path, dpi=100, bbox_inches='tight')
        plt.close(fig)
    
    # Create video
    create_video_from_images(output_dir, "combined_visualization.mp4", fps=10)
    print(f"Combined visualization video saved to combined_visualization.mp4")

def create_video_from_images(image_dir, output_path, fps=10):
    """Create a video from a sequence of images."""
    images = sorted([img for img in os.listdir(image_dir) if img.endswith(".png")])
    if not images:
        print(f"No images found in {image_dir}")
        return
    
    # Get the dimensions of the first image
    frame = cv2.imread(os.path.join(image_dir, images[0]))
    height, width, _ = frame.shape
    
    # Create video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    # Add each image to the video
    for image in tqdm(images, desc="Creating video"):
        video_writer.write(cv2.imread(os.path.join(image_dir, image)))
    
    video_writer.release()
    print(f"Video saved to {output_path}")

# Example usage:
ego_data_file = "./carla_data/ego_data_20250505_190810.csv"
combine_with_vehicle_data(ego_data_file)

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.
Found 7174 PLY files


Processing frames:  38%|███▊      | 2756/7174 [32:58<52:52,  1.39it/s]  


KeyboardInterrupt: 