In [1]:
from nuscenes.nuscenes import NuScenes
from nuscenes.map_expansion.map_api import NuScenesMap
from pyquaternion import Quaternion
from ultralytics import YOLO
import json
from tqdm import tqdm
from utils_drivelm import get_option, action_map

In [2]:
nusc = NuScenes(version='v1.0-trainval', dataroot='/data2/common/xuanyang/nuscenes', verbose=True)

Loading NuScenes tables for version v1.0-trainval...


23 category,
8 attribute,
4 visibility,
64386 instance,
12 sensor,
10200 calibrated_sensor,
2631083 ego_pose,
68 log,
850 scene,
34149 sample,
2631083 sample_data,
1166187 sample_annotation,
4 map,
Done loading in 39.013 seconds.
Reverse indexing ...
Done reverse indexing in 11.4 seconds.


In [3]:
map_singapore_onenorth = NuScenesMap(dataroot='/data2/common/xuanyang/nuscenes', map_name='singapore-onenorth')
map_singapore_hollandvillage = NuScenesMap(dataroot='/data2/common/xuanyang/nuscenes', map_name='singapore-hollandvillage')
map_boston_seaport = NuScenesMap(dataroot='/data2/common/xuanyang/nuscenes', map_name='boston-seaport')
map_singapore_queenstown = NuScenesMap(dataroot='/data2/common/xuanyang/nuscenes', map_name='singapore-queenstown')

In [4]:
def get_map_instance_from_frame(scene_token):
    scene_info = nusc.get('scene', scene_token)
    log_info = nusc.get('log', scene_info['log_token'])
    map_name = log_info['location']
    if map_name == 'singapore-onenorth':
        map_instance = map_singapore_onenorth
    elif map_name == 'singapore-hollandvillage':
        map_instance = map_singapore_hollandvillage
    elif map_name == 'boston-seaport':
        map_instance = map_boston_seaport
    elif map_name == 'singapore-queenstown':
        map_instance = map_singapore_queenstown
    else:
        raise ValueError('Unsupported map name')
    return map_instance


def get_ego_pose(frame_token):
    sample_info = nusc.get('sample', frame_token)
    cam_front_data = nusc.get('sample_data', sample_info['data']['CAM_FRONT'])
    ego_pose_info = nusc.get('ego_pose', cam_front_data['ego_pose_token'])
    return ego_pose_info['translation'], ego_pose_info['rotation']


def search_lane(map_instance, lane_token):
    lanes = map_instance.lane
    for lane_info in lanes:
        if lane_info['token'] == lane_token:
            return lane_info
    # print (f"Error: {lane_token} not found")
    return None


def get_nearby_lane_types(map_instance, scene_token, frame_token):
    ego_translation, ego_rotation = get_ego_pose(frame_token)
    ego_x, ego_y, ego_z = ego_translation
    road_on_point = map_instance.layers_on_point(ego_x, ego_y)
    closest_lane = map_instance.get_closest_lane(ego_x, ego_y, radius=3)
    lane_info = search_lane(map_instance, closest_lane)
    return ego_x, ego_y, road_on_point, lane_info


def get_node_info(map_instance, node_token):
    all_node = map_instance.node
    for node in all_node:
        if node['token'] == node_token:
            return node


def distance_cal(x1,y1,x2,y2):
    return ((x1-x2)**2 + (y1-y2)**2)**0.5


def get_divider_type(ego_x, ego_y, map_instance, divider_segment_info):
    min_distance = 100000000
    min_node = None
    for node in divider_segment_info:
        node_info = get_node_info(map_instance, node['node_token'])
        distance = distance_cal(ego_x, ego_y, node_info['x'], node_info['y'])
        if distance < min_distance:
            min_node = node
            min_distance = distance
    return min_node


def condition_predicate_extractor(conv_path, question_path, detect_info_save_path):
    yolo = YOLO('best.pt')
    with open(conv_path, 'r') as f:
        conv = json.load(f)
    with open(question_path, 'r') as f:
        questions = json.load(f)
    all_detect_info = []
    for conversation in tqdm(conv):
        id = conversation['id']
        scene_id = id.split('_')[0]
        frame_id = id.split('_')[1]
        # yolo_detection
        images = conversation['image'][:3] # cam_front cam_front_right cam_front_left
        yolo_results = set()
        yolo_result_list = []
        detected_classes = []
        for img_path in images:
            detections = yolo(img_path, verbose=False)
            for detection in detections:
                for box in detection.boxes:
                    class_name = yolo.names[int(box.cls)]
                    detected_classes.append(class_name)
        if detected_classes:
            yolo_results.update(detected_classes)
        
        # condition_predicate_extractor
        map_instance = get_map_instance_from_frame(scene_id)
        ego_x, ego_y, road_on_point, lane_info = get_nearby_lane_types(map_instance, scene_id, frame_id)
        if road_on_point['ped_crossing'] != '':
            yolo_results.add('pedestrianCrossing')
        if road_on_point['stop_line'] != '':
            yolo_results.add('stopLine')
        if lane_info:
            if lane_info['left_lane_divider_segments']:
                left_min_node = get_divider_type(ego_x, ego_y, map_instance, lane_info['left_lane_divider_segments'])
                yolo_results.add(left_min_node['segment_type']+'_LEFT')
            if lane_info['right_lane_divider_segments']:
                right_min_node = get_divider_type(ego_x, ego_y, map_instance, lane_info['right_lane_divider_segments'])
                yolo_results.add(right_min_node['segment_type']+ '_RIGHT')
        
        # action_predicate
        question_part = questions[scene_id]["key_frames"][frame_id]["QA"]["behavior"][0]["Q"]
        answer = questions[scene_id]["key_frames"][frame_id]["QA"]["behavior"][0]["A"]
        option = get_option(question_part, answer)
        action_list = action_map(option)
        
        # save
        yolo_result_list = list(yolo_results)
        single_detect_info = {
            'image_id': id,
            'classes': yolo_result_list,
            'action': action_list,
        }
        all_detect_info.append(single_detect_info)
        with open(detect_info_save_path, 'w') as f:
            json.dump(all_detect_info, f)  
            

        
    

In [5]:
# conv_path = 'DriveLM_process/conversation_drivelm_train.json'
# question_path = 'DriveLM_process/train_eval.json'
# save_path = 'process_data_drivelm/train/train_detected_classes.json'

conv_path = 'DriveLM_process/conversation_drivelm_val.json'
question_path = 'DriveLM_process/v1_1_val_nus_q_only.json'
save_path = 'process_data_drivelm/test/test_detected_classes.json'

condition_predicate_extractor(conv_path, question_path, save_path)

  ckpt = torch.load(file, map_location="cpu")
  0%|          | 0/799 [00:00<?, ?it/s]

100%|██████████| 799/799 [03:19<00:00,  4.01it/s]


In [2]:
origin_path = 'process_data_drivelm/train/train_detected_classes.json'
conv_path = 'DriveLM_process/conversation_drivelm_train.json'
ori_data = json.load(open(origin_path, 'r'))
conv_data = json.load(open(conv_path, 'r'))

In [3]:
import re

def control_signal_extractor(cs_string):
    pattern = r"(\w+): \[(.*?)\]"
    matches = re.findall(pattern, cs_string)
    control_signals = {match[0]: eval(f"[{match[1]}]") for match in matches}
    return control_signals

In [4]:
import os
from openai import OpenAI
def gpt_map_cs(Speed, Course):
    client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
    system_prompt = "You are a helpful assistant"
    prompt = """
    Given the current speed and course of the car, use one velocity predicate and one directional predicate to best describe the behavior of the car. 
    The velocity predicates are: Normal, Fast, Slow, Stop.
    The directional predicates are: Straight, Left, Right. 
    Output the predicates directly without any additional information.
    Here are some examples:
    # Speed: [(4.54, 0.0), (5.34, 0.0), (5.67, 0.0), (5.7, 0.0), (6.46, 0.0), (6.63, 0.0)]
    # Course: [(1.0, 0.0), (1.0, 0.0), (1.0, 0.0), (1.0, 0.0), (1.0, 0.0), (1.0, 0.0)]
    # Predicate: Fast, Straight
    # Speed: [(10.01, 0.0), (9.88, 0.0), (9.52, 0.0), (9.39, 0.0), (9.15, 0.0), (8.94, 0.0)]
    # Course: [(0.84, 0.0), (0.84, 0.0), (0.86, 0.0), (0.89, 0.0), (0.93, 0.0), (0.95, 0.0)]
    # Predicate: Fast, Right
    # Speed: [(2.51, 0.0), (2.49, 0.0), (2.45, 0.0), (2.43, 0.0), (2.43, 0.0), (2.37, 0.0)]
    # Course: [(0.85, 0.0), (0.85, 0.0), (0.86, 0.0), (0.85, 0.0), (0.82, 0.0), (0.75, 0.0)]
    # Predicate: Slowly, Left
    # Speed: [(1.65, 0.0), (1.37, 0.0), (0.73, 0.0), (0.09, 0.0), (0.0, 0.0), (0.0, 0.0), (0.0, 0.0), (0.0, 0.0)]
    # Course: [(0.86, 0.0), (0.86, 0.0), (0.87, 0.0), (0.86, 0.0), (0.86, 0.0), (0.86, 0.0), (0.85, 0.0), (0.84, 0.0)]
    # Predicate: Stop, Straight
    # Speed: {speed}
    # Course: {course}
    # Predicate: """.format(speed=Speed, course=Course)
    messages=[{"role": "system", "content": system_prompt},
              {"role": "user", "content": prompt}]
    response = client.chat.completions.create(
                model='gpt-4o',
                messages=messages,
                temperature=0,
                frequency_penalty=0.0,
                presence_penalty=0.0,
            ).choices[0]
    return response.message.content

In [5]:
def update_action(action_list, action):
    for act in action_list:
        if act.lower() in action.lower():
            return act
    return None

In [6]:
from tqdm import tqdm
new_items = []
for item in tqdm(ori_data):
    id = item['image_id']
    action_list = item['action']
    classes = item['classes']
    action_mapping = {
        'Keep': 'Normal',
        'Accelerate': 'Fast',
        'Decelerate': 'Slow',
        'Stop': 'Stop'
    }
    new_action_list = []
    for action in action_list:
        if action in action_mapping.keys():
            new_action_list.append(action_mapping[action])
        else:
            new_action_list.append(action)
    for conv in conv_data:
        if conv['id'] == id:
            cs_string = conv['conversations'][-2]['value']
            cs_info = control_signal_extractor(cs_string)
            break
    answer = gpt_map_cs(cs_info['Speed'], cs_info['Orientation'])
    velocity_predicate = update_action(['Normal', 'Fast', 'Slow', 'Stop'], answer)
    direction_predicate = update_action(['Straight', 'Left', 'Right'], answer)
    if velocity_predicate is None or direction_predicate is None:
        print('error')
    new_items.append({
        'image_id': id,
        'classes': classes,
        'action': new_action_list,
        'velocity_predicate': velocity_predicate,
        'direction_predicate': direction_predicate
    })
    with open('process_data_drivelm/test/test_detected_classes_with_predicate.json', 'w') as f:
        json.dump(new_items, f)
        

  0%|          | 18/3982 [01:00<3:43:31,  3.38s/it]


KeyboardInterrupt: 