In [1]:
from tutorials.utils.tutorial_utils import setup_notebook
setup_notebook()
import os
NUPLAN_DATA_ROOT = os.getenv('NUPLAN_DATA_ROOT', '../../nuplan/dataset')
NUPLAN_MAPS_ROOT = os.getenv('NUPLAN_MAPS_ROOT', '../../nuplan/dataset/maps')
NUPLAN_DB_FILES = os.getenv('NUPLAN_DB_FILES', '../../nuplan/dataset/nuplan-v1.1/splits/mini')
NUPLAN_MAP_VERSION = os.getenv('NUPLAN_MAP_VERSION', 'nuplan-maps-v1.0')

## Direct SQLite queries

In [2]:
from nuplan.database.nuplan_db.query_session import execute_one, execute_many

query = """
SELECT COUNT(*) AS cnt
FROM lidar_pc;
"""

log_db_name = "2021.05.12.22.00.38_veh-35_01008_01518"

result = execute_one(query, (), os.path.join(NUPLAN_DB_FILES, f"{log_db_name}.db"))
print(f"The number of lidar_pcs in this log files is {result['cnt']}.")

The number of lidar_pcs in this log files is 10200.


In [3]:
example_token = "e1e4ee25d1ff58f2"
query = """
SELECT ep.x AS ep_x,
       ep.y AS ep_y,
       ep.z AS ep_z,
       lp.token AS token
FROM ego_pose AS ep
INNER JOIN lidar_pc AS lp
  ON lp.ego_pose_token = ep.token
WHERE lp.token = ?
"""

result = execute_one(
    query, 
    (bytearray.fromhex(example_token),), 
    os.path.join(NUPLAN_DB_FILES, f"{log_db_name}.db")
)

print(f"Lidar_pc token: {result['token'].hex()}.")
print(f"Ego pose: <{result['ep_x']}, {result['ep_y']}, {result['ep_z']}>.")

Lidar_pc token: e1e4ee25d1ff58f2.
Ego pose: <664654.2126382107, 3999264.214758526, 606.4578471006269>.


For one scene, get the following information:
- current_ego_pose_gb
- current_ego_pose
- future_ego_pose
- current_neighbour_pos
- future_neighbour_pos
- current_lane

In [4]:
from nuplan.planning.script.builders.scenario_building_builder import build_scenario_builder
from nuplan.planning.script.builders.scenario_filter_builder import build_scenario_filter
from nuplan.planning.script.builders.worker_pool_builder import build_worker

### set up config file - edit in the config access to change ###
import hydra
CONFIG_PATH = "../nuplan/planning/script/config/common/myconfigs"
CONFIG_NAME = "scenario_access"
hydra.core.global_hydra.GlobalHydra.instance().clear()
hydra.initialize(config_path=CONFIG_PATH)
cfg = hydra.compose(config_name=CONFIG_NAME, overrides=[])
###

### create all scenario objects as specified in config file ###
scenario_builder = build_scenario_builder(cfg)
scenario_filter = build_scenario_filter(cfg.scenario_filter)
worker = build_worker(cfg)
scenarios = scenario_builder.get_scenarios(scenario_filter, worker) # List[AbstractScenario]

In [5]:
from math import sqrt
from nuplan.common.actor_state.tracked_objects_types import TrackedObjectType
from nuplan.common.actor_state.state_representation import TimeDuration

class LQRData():
    
    def __init__(self, scenario, closest=5):
        self.s = scenario
        
        self.data = {} # to be used for LQR
        
        self.duration = self.s.duration_s # TimeDuration Object in seconds
        self.num_frames = self.s.get_number_of_iterations()
        self.scenario_type = self.s.scenario_type
        
        self.n = closest
        self.closest_neighbors_id = []
    
    def populate_data(self):
        self.data['current_ego_pos_gb'] = self.get_current_ego_pos_gb()
        self.data['current_ego_pos'] = self.get_current_ego_pos()
        self.data['future_ego_pos'] = self.get_future_ego_pos()
        self.data['current_neighbor_pos'] = self.get_current_neighbor_pos()
        self.data['future_neighbor_pos'] = self.get_future_neighbor_pos_id_consistent()
        self.data['current_lane'] = self.get_current_lane()
    
    def get_current_ego_pos_gb(self):
        return self.get_xy_from_egostate(self.s.initial_ego_state)
    
    def get_current_ego_pos(self):
        return (0, 0)
    
    def get_future_ego_pos(self):
        future_ego_pos = []
        for ego in self.s.get_ego_future_trajectory(0, self.duration.time_s):
            future_ego_pos.append(self.get_xy_from_egostate(ego))
        return future_ego_pos
        
    def get_current_neighbor_pos(self):
        
        neighbors = self.s.initial_tracked_objects.tracked_objects.get_tracked_objects_of_type(TrackedObjectType.VEHICLE)
        
        neighbors = sorted(neighbors, key=lambda ag:self.dist(self.data['current_ego_pos_gb'], 
                                                               self.get_xy_from_agent(ag)))
        assert(self.n <= len(neighbors))
        near_neighbors = neighbors[:self.n]
        self.closest_neighbors_id = [x.track_token for x in near_neighbors]
        return [self.get_xy_from_agent(ag) for ag in near_neighbors]
    
#     def get_future_neighbor_pos_nearest_n(self):
        
#         frames = []
#         for detection_track in self.s.get_future_tracked_objects(0, self.duration.time_s):
#             tracked_objs = detection_track.tracked_objects
#             tracked_objs = sorted(tracked_objs, key=lambda ag:self.dist(self.data['current_ego_pos_gb'],
#                                                                   self.get_xy_from_agent(ag)))
#             near_objs = tracked_objs[:self.n]
            
#         return frames
    
    def get_future_neighbor_pos_id_consistent(self):
        
        frames = []
        
        for detection_track in self.s.get_future_tracked_objects(0, self.duration.time_s):
            tracked_objs = detection_track.tracked_objects
            
            interest_objs = []
            for obj in tracked_objs:
                if obj.track_token in self.closest_neighbors_id:
                    interest_objs.append(self.get_xy_from_agent(obj))
            
            assert(len(interest_objs) == self.n)
            frames.append(interest_objs)
            
        return frames
    
    def get_current_lane(self):
        return ''
    
    def get_xy_from_agent(self, agent):
        return (agent.center.x, agent.center.y)
    
    def get_xy_from_egostate(self, ego):
        return ego.waypoint._oriented_box.center.x, ego.waypoint._oriented_box.center.y
    
    def dist(self, a, b):
        return sqrt((a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2)

In [6]:
s0 = LQRData(scenarios[0])
s0.populate_data()