In [1]:
import json
import pyarrow.parquet as pq
import numpy as np

Define a function to read Parquet data files.

In [2]:
def read_parquet(parquet_path):
    '''
    Read Parquet file, extract embedded metadata, and convert trajectory data back to dictionary format.
    
    :param parquet_path: the path to the Parquet file
    :return: restored_tracks (Dict), restored_meta (Dict)
    '''
    print("\n--- Reading back from Parquet ---")
    
    # 1. Read Parquet file
    table = pq.read_table(parquet_path)
    
    # 2. Extract and parse Metadata
    file_meta = table.schema.metadata
    
    if b'dataset_meta' in file_meta:
        restored_meta_json = file_meta[b'dataset_meta'].decode('utf-8')
        restored_meta = json.loads(restored_meta_json)
        
        print("\n[Success] Meta embedded in Parquet found:")
        # Print all meta info
        print(restored_meta)
        print(f"  - Location: {restored_meta.get('location_name')}")
        print(f"  - Lane Map (Dict): {restored_meta.get('lane_sequence_to_movement_map')}")
    else:
        print("\n[Warning] 'dataset_meta' key not found in Parquet header.")

    # 3. Convert back to DataFrame to view data
    df_read = table.to_pandas()
    
    print("\n[Success] Trajectory Data loaded:")
    print(f"  - Shape: {df_read.shape}")
    print(f"  - Columns: {list(df_read.columns)}")
    
    # Verify complex structure (pixel_corners)
    sample_corners = df_read.iloc[0]['pixel_corners']
    print(sample_corners)
    print(f"  - Sample pixel_corners type: {type(sample_corners)}")
    print(f"  - Sample pixel_corners shape (len): {len(sample_corners)} (should be 5)")

      # 4. Convert back to Dict 
    print("\n--- Converting DataFrame back to Dict ---")
    restored_tracks = {}
    # Convert DataFrame to record list
    records = df_read.to_dict(orient='records')
    
    for record in records:
        # Assume vehicle_id exists and is unique
        if 'vehicle_id' in record:
            vid = record['vehicle_id']
            del record['vehicle_id']
            restored_tracks[vid] = record
            
    print(f"[Success] Converted back to Dict. Total tracks: {len(restored_tracks)}")
    if restored_tracks:
        sample_vid = list(restored_tracks.keys())[0]
        print(f"  - Sample Vehicle ID: {sample_vid}")
        print(f"  - Sample Keys in Track Dict: {list(restored_tracks[sample_vid].keys())[:5]} ...")
    return restored_tracks, restored_meta

In [3]:
# file path
parquet_file_path = "data/Hurong_20220617_A1_F1_demo.parquet"

# read parquet file
tracks, meta = read_parquet(parquet_file_path)


--- Reading back from Parquet ---

[Success] Meta embedded in Parquet found:
{'data_file_name': 'Hurong_20220617_A1_F1_demo', 'location_id': 'A1', 'location_name': 'HurongFreeway-Nanjing-Jiangsu-China', 'frame_interval': 0.1, 'start_timestamp_ms': 1655420390457, 'total_duration': 299.8, 'timestamp_timezone': 'Asia/Shanghai', 'spatial_unit': 'm', 'dataset_version': '1.0.0', 'lane_sequence_to_movement_map': {}, 'total_vehicle_count': 1131, 'unique_lane_ids': [1, 2, 3, 4, 20, 21, 22, 23]}
  - Location: HurongFreeway-Nanjing-Jiangsu-China
  - Lane Map (Dict): {}

[Success] Trajectory Data loaded:
  - Shape: (1131, 20)
  - Columns: ['vehicle_id', 'vehicle_class', 'vehicle_width', 'vehicle_length', 'frame_index', 'frenet_s', 'frenet_d', 'frenet_s_speed', 'frenet_d_speed', 'frenet_s_accel', 'frenet_d_accel', 'lane_id', 'lane_sequence', 'pixel_x', 'pixel_y', 'ground_x', 'ground_y', 'pixel_corners', 'ground_corners', 'is_imputed']
[array([5236.27929688, 1596.27478027, 5233.13134766, 1616.84716

In [4]:
# show Metadata
print("Location:", meta.get('location_name', 'Unknown'))
print("Lane Map:", meta.get('lane_sequence_to_movement_map', 'Unknown'))

Location: HurongFreeway-Nanjing-Jiangsu-China
Lane Map: {}


In [5]:
#  show sample data
if tracks:
    sample_vid = list(tracks.keys())[0]
    print(f"Sample Vehicle ID: {sample_vid}")
    print(f"Keys: {list(tracks[sample_vid].keys())[:10]}")
    # print first 10 frame_index
    print(f"Frame Indices (first 10): {tracks[sample_vid].get('frame_index', [])[:10]}")

Sample Vehicle ID: 4
Keys: ['vehicle_class', 'vehicle_width', 'vehicle_length', 'frame_index', 'frenet_s', 'frenet_d', 'frenet_s_speed', 'frenet_d_speed', 'frenet_s_accel', 'frenet_d_accel']
Frame Indices (first 10): [0 1 2 3 4 5 6 7 8 9]


Plot space-time diagram

In [6]:
import matplotlib.pyplot as plt
from matplotlib.collections import LineCollection
import os

In [None]:
def plot_trajectory_spacetime_diagram(trajectory_data, meta_data):
    '''
    Visualize trajectory data using Matplotlib.
    Plot time-space diagram for vehicles.
    Read unique_lane_ids from meta data and plot trajectory for each lane.
    Convert frenet_s to vehicle head center coordinate during plotting.
    Use frame_index and frame_interval from meta data to calculate time.
    Color the trajectory based on frenet_s_speed.
    Determine lane change positions based on lane_id.
    
    :param trajectory_data: the trajectory data in dictionary format
    :param meta_data: the meta data in dictionary format
    :return: None
    '''
    print("\n--- Plotting Spacetime Diagrams ---")
    
    unique_lane_ids = meta_data.get('unique_lane_ids', [])
    # Prioritize frame_interval, fallback to time_step (default 0.1)
    frame_interval = meta_data['frame_interval']
    
    # Create folder for saving figures
    save_folder = "fig"
    os.makedirs(save_folder, exist_ok=True)
    
    for target_lane_id in unique_lane_ids:
        # Skip invalid lane ID
        if target_lane_id == -1:
            continue
            
        print(f"Processing Lane {target_lane_id}...")
        
        fig, ax = plt.subplots(figsize=(20, 8))
        
        lines = []
        speeds = []
        # Store lane change points: LC (Lane Change)
        lc_points_x = []
        lc_points_y = []
        
        for vid, track in trajectory_data.items():
            # Extract data
            frames = np.array(track['frame_index'])
            s_coords = np.array(track['frenet_s'])
            s_speeds = np.array(track['frenet_s_speed'])
            lane_ids = np.array(track['lane_id'])
            
           
            vehicle_length = track.get('vehicle_length', 5.0)
            
            # Determine trajectory direction: increasing or decreasing
            # Use start and end points to determine overall trend
            if len(s_coords) > 1 and s_coords[-1] < s_coords[0]:
                direction_sign = -1
            else:
                direction_sign = 1

            # Calculate vehicle head coordinate
            # frenet_s is the center point
            # If increasing (direction_sign=1) add length, if decreasing (direction_sign=-1) subtract length
            head_s = s_coords + direction_sign * (vehicle_length / 2.0)
            
            # Calculate time: frame_index * frame_interval
            times = frames * frame_interval
            
            # Iterate through all frames of the vehicle to find segments in target_lane_id
            for i in range(len(frames) - 1):
                curr_lane = lane_ids[i]
                next_lane = lane_ids[i+1]
                
                # Case 1: Driving within the target lane
                if curr_lane == target_lane_id and next_lane == target_lane_id:
                    p1 = (times[i], head_s[i])
                    p2 = (times[i+1], head_s[i+1])
                    lines.append([p1, p2])
                    speeds.append(abs(s_speeds[i])) # Use absolute speed
                
                # Case 2: Lane change points (Cut-in or Cut-out)
                # Logic: Either this point or the next involves the target lane, and a lane change occurred
                
                # Cut-out: Currently in target, next frame not
                elif curr_lane == target_lane_id and next_lane != target_lane_id:
                    lc_points_x.append(times[i])
                    lc_points_y.append(head_s[i])
                    
                # Cut-in: Currently not in target, next frame is
                elif curr_lane != target_lane_id and next_lane == target_lane_id:
                    lc_points_x.append(times[i+1])
                    lc_points_y.append(head_s[i+1])

        # Skip plotting if no data for this lane
        if not lines:
            print(f"  No data for Lane {target_lane_id}")
            plt.close(fig)
            continue

        # Create LineCollection
        # Speed typically 0-35m/s (0-120km/h), use jet_r colormap
        lc = LineCollection(lines, array=np.array(speeds), cmap="jet_r", linewidths=1.0)
        lc.set_clim(vmin=0, vmax=35) # Set speed color range 0-35 m/s
        ax.add_collection(lc)
        
        # Add Colorbar
        cb = fig.colorbar(lc, ax=ax)
        cb.set_label('Speed [m/s]')
        
        # Plot lane change points
        if lc_points_x:
            ax.scatter(lc_points_x, lc_points_y, 
                       marker='o', s=20, 
                       color='k', linewidths=0.8, 
                       label='Lane Change', zorder=3)
            ax.legend(loc='upper right')
            
        ax.autoscale()
        ax.set_title(f'Lane {target_lane_id} Space-Time Diagram')
        ax.set_xlabel("Time [s]")
        ax.set_ylabel("Location [m]")
        plt.tight_layout()
        # Save figure
        # save_path = os.path.join(save_folder, f'lane_{target_lane_id}_spacetime.png')
        # plt.tight_layout()
        # plt.savefig(save_path, dpi=300)
        # plt.close(fig)
        # print(f"  Saved: {save_path}")
        # show img
        plt.show()

In [None]:
plot_trajectory_spacetime_diagram(tracks, meta)

Create an animation of vehicle motion in the XY-plane coordinate system.

In [9]:

import matplotlib.animation as animation
from matplotlib.collections import PolyCollection
from IPython.display import HTML
import numpy as np
import matplotlib.pyplot as plt

def animate_vehicle_contours(trajectory_data):
    """
    Animates vehicle contours (ground_corners) frame by frame.
    
    :param trajectory_data: dictionary of vehicle tracks
    """
    # 1. Reorganize data by frame
    frames_data = {}
    
    print("Reorganizing data for animation...")
    # To determine plot limits
    min_x, max_x = float('inf'), float('-inf')
    min_y, max_y = float('inf'), float('-inf')
    
    for vid, track in trajectory_data.items():
        if 'ground_corners' not in track or 'frame_index' not in track:
            continue
            
        frame_indices = track['frame_index']
        ground_corners = track['ground_corners']
        
        # Ensure lengths match
        if len(frame_indices) != len(ground_corners):
            continue
            
        for i, frame_idx in enumerate(frame_indices):
            corners = ground_corners[i]
            
            if frame_idx not in frames_data:
                frames_data[frame_idx] = []
            
            # Handle format: flat list/array or list of lists
            c_arr = np.array(corners)
            if c_arr.ndim == 1 and c_arr.size == 8:
                 c_arr = c_arr.reshape(4, 2)
            elif c_arr.ndim == 2 and c_arr.shape == (4, 2):
                 pass # already correct
            else:
                 # Try to interpret as best as possible or skip
                 continue
                 
            frames_data[frame_idx].append(c_arr)
            
            # Update bounds
            xs = c_arr[:, 0]
            ys = c_arr[:, 1]
            min_x = min(min_x, np.min(xs))
            max_x = max(max_x, np.max(xs))
            min_y = min(min_y, np.min(ys))
            max_y = max(max_y, np.max(ys))

    sorted_frames = sorted(frames_data.keys())
    if not sorted_frames:
        print("No valid frame data found.")
        return None

    print(f"Animation range: Frame {sorted_frames[0]} to {sorted_frames[-1]}")
    print(f"X range: {min_x:.2f} to {max_x:.2f}")
    print(f"Y range: {min_y:.2f} to {max_y:.2f}")

    # Create figure
    fig, ax = plt.subplots(figsize=(15, 10))
    
    # Initial setup
    polys = PolyCollection([], edgecolors='black', facecolors='cyan', alpha=0.6)
    ax.add_collection(polys)
    
    # Set axis limits with some margin
    margin_x = (max_x - min_x) * 0.05
    margin_y = (max_y - min_y) * 0.05
    if margin_x == 0: margin_x = 10
    if margin_y == 0: margin_y = 10
    
    ax.set_xlim(min_x - margin_x, max_x + margin_x)
    ax.set_ylim(min_y - margin_y, max_y + margin_y)
    ax.set_aspect('equal')
    ax.set_xlabel("Ground X (m)")
    ax.set_ylabel("Ground Y (m)")
    
    title_text = ax.set_title("")
    
    def update(frame_idx):
        verts = frames_data.get(frame_idx, [])
        polys.set_paths(verts)
        title_text.set_text(f"Frame: {frame_idx}")
        return polys, title_text
        
    # Create animation
    # Displaying a subset of frames if too many for faster rendering test? 
    # Or just all frames.
    ani = animation.FuncAnimation(fig, update, frames=sorted_frames, interval=100, blit=True)
    
    plt.close(fig) # Prevent static plot from showing up
    
    print("Generating animation...")
    return HTML(ani.to_jshtml())


In [None]:
# Run animation (might take a while for large datasets)
animate_vehicle_contours(tracks)