In [1]:
import Driving
from Driving import NavigationDemoLogParser, FrameParsers, driveUtilities
from Driving.NavigationDemoLogParser import NavigationDemoLogParser

import matplotlib.pyplot as plt
import numpy as np

import itertools

This is a legacy parser for demo files recorded before January 2020
No scene structure files
No plotly


Helper functions to load the data

In [2]:
def load_log_data(filenames):
    """
        Helper function for loading data.
        Returns a list of NavigationDemoLogParser objects corresponding to each
        file in filenames.  
    """
    # Load your simulation data using the NavigationDemoLogParser
    navigation_parsers = [NavigationDemoLogParser(filename, autoParse=True) for filename in filenames]
    return navigation_parsers

Going through the data in the parser and converting it into a matrix representing the trajectory. The states will be:
1. Ego position
2. Ego rotation
3. Ego speed
4. Nearest k_vehicle vehicle positions (relative to ego)
5. nearest k_vehicle vehicle rotations 
6. nearest k_vehicle speeds 
7. nearest k_pedestrian positions (relative to ego)
8. nearest k_pedestrian speeds



In [3]:
def select_closest(position, other_positions, other_rotations, other_speeds, k):
    """
         Select the k elements of other_positions, other_rotations, and other_speeds where 
         the element of other_positions is closest to position. 
    """
    distances = [np.linalg.norm(position - other_position) for other_position in other_positions]
    sorted_zipped = sorted(zip(distances, other_positions, other_rotations, other_speeds), key=lambda pair: pair[0])
    sorted_positions = [x for _, x, _, _ in sorted_zipped]
    sorted_rotations = [x for _, _, x, _ in sorted_zipped]
    sorted_speeds = [x for _, _, _, x in sorted_zipped]

    return sorted_positions[0:k], sorted_rotations[0:k], sorted_speeds[0:k]

In [4]:
def gather_controls(parser):
    # Set up the control matrix
    num_points = parser.playerPosition.shape[0]
    controls = np.zeros((num_points, 3))

    # Now pull out data on the controls
    throttles, brakes, steering_wheels, gears = parser.throttle, parser.brake, parser.steering, parser.gear

    # Fill in the matrix
    controls[:, 0] = throttles - brakes
    controls[:, 1] = steering_wheels

    return controls


def gather_states(parser, k_closest_vehicles=5, k_closest_pedestrians=5):
    """

    """ 
    delta_T = 1/parser.FPS

    # Pull out all the data from the parser. start with the ego vehicle 
    positions = parser.playerPosition
    rotations = parser.playerRotation
    speeds = parser.speed

    # Now pull out data for the other vehicles 
    vehicle_position_map = parser.vehiclePosition 
    vehicle_rotation_map = parser.vehicleRotation      

    # Now pull out data for the pedestrians 
    pedestrian_position_map = parser.pedestrianPosition
    pedestrian_rotation_map = parser.pedestrianRotation

    # Now pull out data on the controls 
    throttles, brakes, steering_wheels, gears = parser.throttle, parser.brake, parser.steering, parser.gear

    # Create the state and control matrices
    num_points = positions.shape[0]
    states = np.zeros((num_points, 7 + 7*k_closest_vehicles + 7*k_closest_pedestrians))
    controls = np.zeros((num_points, 3))

    # Fill in the state and control matrix indices that we can 
    states[:, 0:3] = positions
    states[:, 3:6] = rotations
    states[:, 6] = speeds

    # assert that if throttles is non-zero then brakes must be 0 and vice versa
    assert np.all(np.logical_or(throttles == 0, brakes == 0))

    for i in tqdm(range(num_points)):
        position, rotation, speed = positions[i, :], rotations[i, :], speeds[i]

        # Gather the vehicle states 
        vehicle_positions = []
        vehicle_rotations = []
        vehicle_speeds = [] # estimate these based off of previous moment 
        for vehicle in vehicle_position_map.keys():
            vehicle_exists = not np.isnan(vehicle_position_map[vehicle][i, 0])
            if vehicle_exists:
                vehicle_positions.append(vehicle_position_map[vehicle][i, :] - position) # relative to ego vehicle
                vehicle_rotations.append(vehicle_rotation_map[vehicle][i, :])

                # Estimate and add speed as well 
                speed_estimate = 0 if i == 0 else (vehicle_position_map[vehicle][i, :] - vehicle_position_map[vehicle][i, :]) / delta_T
                vehicle_speeds.append(speed_estimate)
            
        # Select only the closest vehicles to retain 
        vehicle_positions, vehicle_rotations, vehicle_speeds = select_closest(position, vehicle_positions, vehicle_rotations, vehicle_speeds, k_closest_vehicles)
        
        # Gather the pedestrian states
        pedestrian_positions = [] 
        pedestrian_rotations = []
        pedestrian_speeds = []
        pedestrian_exists = []
        for pedestrian in pedestrian_position_map.keys():
            pedestrian_exists = not np.isnan(pedestrian_position_map[pedestrian][i, 0])
            if pedestrian_exists:
                pedestrian_positions.append(pedestrian_position_map[pedestrian][i, :] - position) # relative to ego vehicle 
                pedestrian_rotations.append(pedestrian_rotation_map[pedestrian][i, :])

                # Estimate and add speed as well 
                speed_estimate = 0 if i == 0 else (pedestrian_position_map[pedestrian][i, :] - pedestrian_position_map[pedestrian][i, :]) / delta_T
                pedestrian_speeds.append(speed_estimate)

        # Select only the closest pedestrians to retain 
        pedestrian_positions, pedestrian_rotations, pedestrian_speeds = select_closest(position, pedestrian_positions, pedestrian_rotations, pedestrian_speeds, k_closest_pedestrians)
        
        # Fill in the state matrix appropriately 
        states[i, 7:7+3*k_closest_vehicles] = np.concatenate(vehicle_positions)
        states[i, 7+3*k_closest_vehicles:7+6*k_closest_vehicles] = np.concatenate(vehicle_rotations)
        states[i, 7+6*k_closest_vehicles:7+7*k_closest_vehicles] = np.concatenate(vehicle_speeds)

        states[i, 7+7*k_closest_vehicles:7+7*k_closest_vehicles+3*k_closest_pedestrians] = np.concatenate(pedestrian_positions)
        states[i, 7+7*k_closest_vehicles+3*k_closest_pedestrians:7+7*k_closest_vehicles+6*k_closest_pedestrians] = np.concatenate(pedestrian_rotations)
        states[i, 7+7*k_closest_vehicles+6*k_closest_pedestrians:7+7*k_closest_vehicles+7*k_closest_pedestrians] = np.concatenate(pedestrian_speeds)


    return states

def load_scenario_label(filename):
    data = np.load(filename)
    timestamps = data['timestamps']
    labels = data['labels']
    return timestamps, labels
    

Set the files we'd like to load for train, test, and validation. Then, load the trajectories.

In [5]:
# Specify the files for your train, validation, and test sets
train_filenames = ["./Data/LogAndLabelData/20210126SP_13-52-24_positions.xml"]
validation_filenames = ["./Data/LogAndLabelData/20210126SP_14-08-22_positions.xml"]
test_filenames = ["./Data/LogAndLabelData/20210126SP_14-23-30_positions.xml"]

# Load the appropriate parsers for each of the files
train_parsers = load_log_data(train_filenames)
validation_parsers = load_log_data(validation_filenames)
test_parsers = load_log_data(test_filenames)

# Gather the state and control matrices for each of the files
train_trajectories = [(gather_states(parser), gather_controls(parser)) for parser in train_parsers]
validation_trajectories = [(gather_states(parser), gather_controls(parser)) for parser in validation_parsers]
test_trajectories = [(gather_states(parser), gather_controls(parser)) for parser in test_parsers]

# Also load the scenario labels 
train_labels = [load_scenario_label(filename) for filename in train_filenames]
validation_labels = [load_scenario_label(filename) for filename in validation_filenames]
test_labels = [load_scenario_label(filename) for filename in test_filenames]

Now, we'd like to pull out only the various straight segments from the trajectories. 

In [None]:
def video_time_to_log_sample(parser, video_time):
    """ 
        Given a video time, return the corresponding log sample. 
    """
    # The time that data starts at. 
    start_time = parser.firstTRFrame / parser.FPS
    # The index will be given by the time elapsed since data started being captured
    # scaled by the data rate (parser.FPS)
    return (video_time - start_time) * parser.FPS

def get_straight_demonstrations(parsers, trajectories, labels):
    """ 
        Given a list of trajectories, use the labels to pull out separate 
        sub-trajectories corresponding to each straight segment. 
    """

    sub_trajectories = []
    for (parser, trajectory, (timestamps, labels)) in zip(parsers, trajectories, labels):
        for (i, label) in enumerate(labels):
            if label == 'straight':
                start_time = timestamps[i]
                end_time = timestamps[i+1]

                # Find the indices corresponding to the start and end times. 
                # note that these are start and end times from the video. 
                start_index = video_time_to_log_sample(parser, start_time)
                end_index = video_time_to_log_sample(parser, end_time)

                # index into the states and controls, and then append to our growing list 
                # of sub-trajectories
                sub_states = trajectory[0][start_index:end_index, :]
                sub_controls = trajectory[1][start_index:end_index, :]
                sub_trajectories.append((sub_states, sub_controls))

    return sub_trajectories

In [None]:
# Use the helper function above to load each straight segment of the trajectories as a separate 
# demonstration
train_demonstrations = get_straight_demonstrations(train_parsers, train_trajectories, train_labels)
validation_demonstrations = get_straight_demonstrations(validation_parsers, validation_trajectories, validation_labels)
test_demonstrations = get_straight_demonstrations(test_parsers, test_trajectories, test_labels)

In [None]:
# Save the demonstrations to file
np.save("./Data/train_demonstrations.npy", train_demonstrations)
np.save("./Data/validation_demonstrations.npy", validation_demonstrations)
np.save("./Data/test_demonstrations.npy", test_demonstrations)