In [None]:
!mkdir -p /data/sets/nuscenes  # Make the directory to store the nuScenes dataset in.
!wget https://www.nuscenes.org/data/v1.0-mini.tgz  # Download the nuScenes mini split.
!tar -xf v1.0-mini.tgz -C /data/sets/nuscenes  # Uncompress the nuScenes mini split.
!pip install nuscenes-devkit &> /dev/null  # Install nuScenes.
!pip install pyyaml &> /dev/null
!pip uninstall nuscenes-devkit -y
!pip install nuscenes-devkit

In [None]:
import datetime
import uuid
import json
import numpy as np
import warnings
import yaml
import os

from nuscenes.nuscenes import NuScenes
from sklearn.cluster import KMeans
from collections import defaultdict

# Suppress KMeans warning about n_init
warnings.filterwarnings("ignore", category=UserWarning, module='sklearn.cluster._kmeans')

NUSCENES_ROOT = '/data/sets/nuscenes'
nusc = NuScenes(version='v1.0-mini', dataroot=NUSCENES_ROOT)

print("NuScenes dataset loaded successfully.")

Definition of ODD Parameters According to SES Components

In [None]:
# Definition of SES (System Entity Structure) components
# This part can be considered more as documentation or a fixed structure.
ses_schema = {
    "Entity": {
        "Environment": ["Weather", "Illumination", "SceneType", "ObjectCategory", "ObjectAttribute", "Visibility"],
        "VehicleState": ["Position", "Orientation", "SensorType", "CalibrationDetails"],
        "OperationalConditions": ["Traffic", "RoadType", "SpeedRange", "Route", "LogDetails"]
    },
    "Multi-Aspect": {
        "Sensors": ["Lidar", "Radar", "Camera"]
    },
    "Specialization": {
        "Weather": ["Clear", "Rainy", "Snowy"],
        "SceneType": ["City", "Highway", "Suburban"],
        "ObjectCategory": ["Pedestrian", "Car", "Bicycle"]
    }
}

print("SES Schema defined.")

Timestamp Generation for Each ODD

In [None]:
def generate_odd_timestamp():
    now = datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S')
    unique_id = uuid.uuid4().hex[:6]
    return f"{now}_{unique_id}"

print(f"Example ODD Timestamp: {generate_odd_timestamp()}")

 Mapping Parameters to SES Components and Data Transformation Functions

In [None]:
# Suppress KMeans warning about n_init
warnings.filterwarnings("ignore", category=UserWarning, module='sklearn.cluster._kmeans')

# Function to generate a unique ODD timestamp
def generate_odd_timestamp():
    now = datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S')
    unique_id = uuid.uuid4().hex[:6]
    return f"{now}_{unique_id}"


# Load the nuScenes dataset (assuming it's already downloaded and extracted)
# Adjust this path according to the location of your dataset in your Colab environment
NUSCENES_ROOT = '/data/sets/nuscenes'
nusc = NuScenes(version='v1.0-mini', dataroot=NUSCENES_ROOT)

print("NuScenes dataset loaded successfully.")


In [None]:

def map_ego_pose_to_vehicle_state(ego_pose_record):
    """
    Transforms an ego_pose record into a VehicleState SES component.
    """
    vehicle_state = {
        'Position': {
            'x': ego_pose_record['translation'][0],
            'y': ego_pose_record['translation'][1],
            'z': ego_pose_record['translation'][2],
        },
        'Orientation': {
            'qw': ego_pose_record['rotation'][0],
            'qx': ego_pose_record['rotation'][1],
            'qy': ego_pose_record['rotation'][2],
            'qz': ego_pose_record['rotation'][3],
        },
    }
    return vehicle_state

In [None]:
def map_sample_data_to_sensors(sample_data_record, calibrated_sensor_record):
    """
    Transforms sample_data and calibrated_sensor records into SensorType and CalibrationDetails.
    """
    sensor_info = {
        'SensorType': sample_data_record['sensor_modality'],
        'CalibrationDetails': {
            'translation': calibrated_sensor_record['translation'],
            'rotation': calibrated_sensor_record['rotation'],
            'camera_intrinsic': calibrated_sensor_record.get('camera_intrinsic'), # For camera sensors
        }
    }
    return sensor_info

In [None]:
def map_annotation_to_object_info(annotation_record):
    """
    Transforms a sample_annotation record into ObjectCategory, ObjectAttribute, Visibility, Position, and Orientation.
    """
    object_info = {
        'ObjectCategory': annotation_record['category_name'],
        'ObjectAttribute': annotation_record['attribute_tokens'],
        'Visibility': annotation_record['visibility'],
        'Position': {
            'x': annotation_record['translation'][0],
            'y': annotation_record['translation'][1],
            'z': annotation_record['translation'][2],
        },
        'Orientation': {
            'qw': annotation_record['rotation'][0],
            'qx': annotation_record['rotation'][1],
            'qy': annotation_record['rotation'][2],
            'qz': annotation_record['rotation'][3],
        },
    }
    return object_info

In [None]:
def get_scene_environment_info(scene_record):
    """
    Extracts environmental information from the scene record (currently limited, can be expanded).
    NuScenes does not directly contain a 'weather' field; it usually comes from external metadata
    or is inferred from scene.name or log_token. We can use scene.name as an example or infer from log records.
    """
    environment_info = {
        'SceneType': scene_record['name'], # This is typically inferred from the scene name, more sophisticated parsing may be needed.
        'Weather': 'Unknown', # This field usually requires external metadata or inference.
        'Illumination': 'Unknown' # This field also usually requires inference.
    }

    # Simple example: estimate weather and illumination based on scene name
    if "rain" in scene_record['name'].lower():
        environment_info['Weather'] = 'Rainy'
    elif "night" in scene_record['name'].lower():
        environment_info['Illumination'] = 'Night'
    elif "day" in scene_record['name'].lower():
        environment_info['Illumination'] = 'Day'

    return environment_info

In [None]:
def get_operational_conditions(log_record):
    """
    Extracts operational conditions from the log record.
    """
    operational_conditions = {
        'Route': log_record['location'], # Location information from the log
        'SpeedRange': 'Unknown', # Speed range not directly in log_record, can be calculated from samples
        'Traffic': 'Unknown', # This also requires inference or external data
        'RoadType': 'Unknown' # Can be inferred from log or scene_name
    }
    return operational_conditions

In [None]:
# Main ODD creation function
def create_odd_from_sample(nusc, sample_token):
    """
    Creates an ODD structure from a given NuScenes sample_token.
    """
    sample = nusc.get('sample', sample_token)
    odd_id = generate_odd_timestamp()

    # Ego Vehicle State
    # Use any sensor's ego_pose as they should be consistent for the vehicle at that timestamp
    ego_pose_record = nusc.get('ego_pose', sample['data']['LIDAR_TOP'])
    vehicle_state = map_ego_pose_to_vehicle_state(ego_pose_record)

    # Sensor Information
    sensors = {}
    for sd_token in sample['data'].values():
        sample_data_record = nusc.get('sample_data', sd_token)
        calibrated_sensor_record = nusc.get('calibrated_sensor', sample_data_record['calibrated_sensor_token'])
        sensor_info = map_sample_data_to_sensors(sample_data_record, calibrated_sensor_record)
        sensors[sensor_info['SensorType']] = sensor_info

    vehicle_state['Sensors'] = sensors

    # Environmental Conditions
    scene_record = nusc.get('scene', sample['scene_token']) # Access scene_token from sample
    log_record = nusc.get('log', scene_record['log_token']) # Access log_token from scene_record
    environment_info = get_scene_environment_info(scene_record)

    # Object Information
    objects = []
    if 'annotations' in sample: # Check if annotations key exists
        for ann_token in sample['annotations']:
            annotation_record = nusc.get('sample_annotation', ann_token)
            objects.append(map_annotation_to_object_info(annotation_record))

    environment_info['Objects'] = objects

    # Operational Conditions
    operational_conditions = get_operational_conditions(log_record)

    odd = {
        'ODD_ID': odd_id,
        'Timestamp': datetime.datetime.fromtimestamp(sample['timestamp'] / 1000000).isoformat(),
        'VehicleState': vehicle_state,
        'Environment': environment_info,
        'OperationalConditions': operational_conditions
    }

    return odd

print("Data transformation functions defined.")

# Create an example ODD
first_sample_token = nusc.sample[0]['token']
sample_odd = create_odd_from_sample(nusc, first_sample_token)
print("\nExample ODD created:")
print(json.dumps(sample_odd, indent=2))

yaml odd files genration

In [None]:
output_dir = '/content/generated_ODDs'
os.makedirs(output_dir, exist_ok=True)

for i, odd in enumerate(sample_odds_list):
    odd_id = odd.get('ODD_ID', f'odd_{i}')
    file_path = os.path.join(output_dir, f'{odd_id}.yaml')
    with open(file_path, 'w') as f:
        yaml.dump(odd, f, default_flow_style=False)

print(f"Successfully saved {len(sample_odds_list)} ODD data in YAML format to the '{output_dir}' directory.")