# read tracks

In [1]:
import json
import pandas as pd
import numpy as np

class Label:
    """
    Represent an atomic label
    """

    def __init__(self, vehicle_id, type_str):
        self._veihicle_id = int(vehicle_id)
        self._type = type_str

    def to_dict(self):
        return {
            'vehicle_id': self._veihicle_id,
            'type': self._type
        }

    def __json__(self):
        # return self.to_dict()
        return self.__dict__

class LabelSystem:
    """
    Contain [1-35] labels
    """

    def __init__(self, labels):
        self._label_system = labels

    def to_dict(self):
        return self._label_system

    def __json__(self):
        # return self.to_dict()
        return self.__dict__

class VehicleFrameLabel:
    """
    Represent a specific time frame for an Vehicle/RU
    """
    def __init__(self, vehicle_id, timestamp, label_system):
        self._vehicle_id = int(vehicle_id)
        self._timestamp = timestamp
        # self._frame = frame
        assert isinstance(label_system, LabelSystem), "[VehicleFrameLabel] label_system must be a type of LabelSystem"
        self._labels = label_system

    def get_vehicle_id(self):
        return self._vehicle_id

    # def update_label(self, id, label):
        # assert isinstance(id, int), "[VehicleFrame] label id must be an integer"
        assert 1 <= id <= 35, "[VehicleFrame] label id must be between 1 and 35"
        # assert isinstance(label, Label), "[VehicleFrame] label must be a Label type"
        # self._labels[id] = label

    def to_dict(self):
        return {
            'vehicle_id': self._vehicle_id,
            'timestamp': self._timestamp,
            'label_system': self._labels
        }

    def __json__(self):
        # return json.dump(self.to_dict(), indent=4)
        return self.__dict__

class VehicleTrack:
    """
    Represent a vehcle track
    """
    def __init__(self, vehicle_id, frames):
        self._vehicle_id = vehicle_id
        self._frames = frames # [VehicleFrameLabel]

    def to_dict(self):
        return {
            'vehicle_id': self._vehicle_id,
            'frames': [_frame.to_dict() for _frame in self._frames]
        }

    def __json__(self):
        # return json.dump(self.to_dict(), indent=4)
        return self.__dict__

class TimeframeVehicles:
    def __init__(self, timestamp):
        assert isinstance(timestamp, type(np.int64(1))), "[TimeframeVehicles] timestamp must be a np.int64 but got {}".format(type(timestamp))
        self._timestamp = int(timestamp) # serializable
        self._vehicles = {}

    def add_vehicle(self, vehicle_id, vehicle_frame_label):
        assert isinstance(vehicle_frame_label, VehicleFrameLabel), "[TimeframeVehicles] vehicle_frame_label must be a VehicleFrameLabel"
        # assert int(vehicle_id) == vehicle_frame_label.get_vehicle_id(), "[TimeframeVehicles] vehicle_id and vehicle_frame_label must for same vehicle"

        self._vehicles[int(vehicle_id)] = vehicle_frame_label

    def get_timestamp(self):
        return self._timestamp

    def to_dict(self):
        return {
            'timestamp': self._timestamp,
            'vehicles': self._vehicles
        }

    def __json__(self):
        # return json.dump(self.to_dict(), indent=4)
        return self.__dict__

class FullTimeframeVehicles:
    def __init__(self, start_ts, end_ts, ts_rate):
        self._start_timestamp = start_ts
        self._end_timestamp = end_ts
        self._timestamp_rate = ts_rate
        self._tf_vehicles = []

    def add_timeframe(self, timeframe_vehicles):
        assert isinstance(timeframe_vehicles, TimeframeVehicles), "[FullTimeframeVehicles] timeframe_vehicles must be TimeframeVehicles type"
        curr_timestamp = timeframe_vehicles.get_timestamp()
        # assert self._start_timestamp <= curr_timestamp & self._end_timestamp >= curr_timestamp, "[FullTimeframeVehicles] timestamp not in between"

        self._tf_vehicles.append(timeframe_vehicles)

    def to_dict(self):
        return {
            'start_timestamp': self._start_timestamp,
            'end_timestamp': self._end_timestamp,
            'timestamp_rate': self._timestamp_rate,
            'timestamp_vehicles': [_ts_vehicle.to_dict() for _ts_vehicle in self._tf_vehicles]
        }

class TracksByVehicle:
    def __init__(self):
        self._tracks = []

    def add_track(self, track):
        assert isinstance(track, VehicleTrack), "[Tracks] track must be a type of VehicleTrack"
        self._tracks.append(track)

    def to_dict(self):
        return {
            'tracks': [_track.to_dict() for _track in self._tracks]
        }

    def __json__(self):
        # return json.dumps(self.to_dict(), indent=4)
        return self.__dict__

In [2]:
RU = ["Car", "SUV", "Truck", "Special_truck"]
VRU = ["tricycle", "bicycle", "motorbike"]

df_tracks = pd.read_csv('./tracks_result.csv')
df_gp = df_tracks.groupby('frame')
first_gp_name = next(iter(df_gp.groups))
first_gp = df_gp.get_group(first_gp_name)

# print(first_gp)
# first_gp.columns

In [3]:
for gp_name, df_frame in df_tracks.groupby('frame'):

    # print("Frame: {}".format(gp_name))
    # print(df_frame)

    # Each time frame
    for index, row in df_frame.iterrows():

        # For each vehicle id, compute it's surrounding labels
        pass
        break

In [4]:
def check_RU_same_direction(frame_df, ru_id):
    """
    Get the labels for ru_id with RU in the same direction

    Return: [Labels]
    """
    # NOTE: dummy return
    label = Label(111, "car")
    return [label]

def check_RU_verticle(frame_df, ru_id):
    """
    Get the labels for ru_id with RU in the verticle direction

    Return: [Labels]
    """
    # NOTE: dummy return
    label = Label(111, "car")
    return [label]

def check_VRU(frame_df, ru_id):
    """
    Get the VRU for ru_id

    Return: [Labels]
    """
    # NOTE: dummy return
    label = Label(111, "car")
    return [label]

def get_frame_labels_for_RU(frame_df, ru_id):
    """
    Get [1-35] labels for a specific RU

    Return: VehicleFrame for specific ru_id
    """
    _same_dir_labels = check_RU_same_direction(frame_df, ru_id)
    _virticle_labels = check_RU_verticle(frame_df, ru_id)
    _VRU_labels = check_VRU(frame_df, ru_id)

    _all_labels = _same_dir_labels + _virticle_labels + _VRU_labels

    _label_system = LabelSystem(_all_labels)

    # NOTE  an object that contains all timestamp frame, '
    #       for each frame, contains a list of vehicle_id,
    #       for each vehicle_id, contains an object contains [1-35] labels
    vehicle_frame_labels = VehicleFrameLabel(ru_id, "timestamp", _label_system)

    return vehicle_frame_labels

def process_vehicles_by_frame(df):
    """
    Processes vehicles in each frame of the DataFrame.

    Args:
        df: pandas DataFrame with columns:
            - 'frame'
            - 'trackId'
            - 'timestamp'
            - 'vehicle_id'
            - 'class_str'
            - 'xCenter'
            - 'yCenter'
            - 'length'
            - 'width'
            - 'height'
            - 'xVelocity'
            - 'yVelocity'
            - 'roadId'
            - 'laneId'
            - 'angle'

    Returns:
    """
    _full_timeframe_vehicles = FullTimeframeVehicles("start_ts", "end_ts", "ts_rate")

    for _frame, frame_df in df.groupby('frame'):
        unique_vehicle_ids = frame_df['vehicle_id'].unique()
        _curr_timestamp = frame_df['timestamp'].iloc[0]

        _curr_timeframe_vehicles = TimeframeVehicles(_curr_timestamp)

        for _vehicle_id in unique_vehicle_ids:
            # vehicle_data = frame_df[frame_df['vehicle_id'] == vehicle_id]

            # check for RU
            _curr_vehicle_frame = get_frame_labels_for_RU(frame_df, _vehicle_id)
            assert isinstance(_curr_vehicle_frame, VehicleFrameLabel), "AssertionError: _curr_vehicle_frame has to be VehicleFrameLabel type"

            _curr_timeframe_vehicles.add_vehicle(_vehicle_id, _curr_vehicle_frame)

        _full_timeframe_vehicles.add_timeframe(_curr_timeframe_vehicles)

    return _full_timeframe_vehicles

# Example usage (assuming you have defined run_function1 and run_function2)

# processed_df = process_vehicles_by_frame(None)

In [5]:
full_timeframe_vehicles = process_vehicles_by_frame(df_tracks)

In [6]:
def full_timeframe_vehicles_default(obj):
    if isinstance(obj, FullTimeframeVehicles):
        return obj.to_dict()
    elif isinstance(obj, VehicleFrameLabel):
        return obj.to_dict()
    elif isinstance(obj, LabelSystem):
        return obj.to_dict()
    elif isinstance(obj, Label):
        return obj.to_dict()

    raise TypeError(f"Type {type(obj)} not serializable")

In [7]:
print(full_timeframe_vehicles.to_dict()['timestamp_vehicles'])

[{'timestamp': 1706928726990468352, 'vehicles': {1: <__main__.VehicleFrameLabel object at 0x10da5f8c0>, 354695: <__main__.VehicleFrameLabel object at 0x10dadf610>, 386711: <__main__.VehicleFrameLabel object at 0x10dadf890>, 390316: <__main__.VehicleFrameLabel object at 0x10dacb950>, 395774: <__main__.VehicleFrameLabel object at 0x10dacba80>, 396827: <__main__.VehicleFrameLabel object at 0x10db3d130>, 398015: <__main__.VehicleFrameLabel object at 0x10db25f20>, 402579: <__main__.VehicleFrameLabel object at 0x10db26140>, 403803: <__main__.VehicleFrameLabel object at 0x10db13050>, 403922: <__main__.VehicleFrameLabel object at 0x10db13250>}}, {'timestamp': 1706928727000468352, 'vehicles': {1: <__main__.VehicleFrameLabel object at 0x10db1f200>, 354695: <__main__.VehicleFrameLabel object at 0x10db1f3e0>, 386711: <__main__.VehicleFrameLabel object at 0x10dc60750>, 390316: <__main__.VehicleFrameLabel object at 0x10dc602f0>, 395774: <__main__.VehicleFrameLabel object at 0x105d101f0>, 396827: <__

In [8]:
with open("json_output.json", "w") as f:
    json.dump(full_timeframe_vehicles, f, default=full_timeframe_vehicles_default, indent=4)

# Lane Sorting

In [9]:

def label_vehicles_by_lane_with_ego_at_zero(df, ego_id, xvel, yvel, num_lanes=5, ego_lane_idx=2):
  """
  Labels vehicles on different lanes from left to right,
  sorts vehicles within each lane from front to back,
  and distinguishes between front and behind the ego vehicle.
  Ego vehicle is always at index 0 within its lane.

  Args:
    df: pandas DataFrame containing vehicle data with columns:
      - 'x': x-coordinate of the vehicle
      - 'y': y-coordinate of the vehicle
      - 'lane_id': lane ID of the vehicle
      - 'id': unique identifier for each vehicle
    ego_id: unique identifier of the ego vehicle
    xvel: x-component of ego's velocity
    yvel: y-component of ego's velocity
    num_lanes: Total number of lanes (default: 5)
    ego_lane_idx: Index of the ego vehicle's lane (0-indexed, default: 2)

  Returns:
    pandas DataFrame with an additional column:
      - 'relative_position':
          - 'front' if the vehicle is in front of the ego vehicle
          - 'behind' if the vehicle is behind the ego vehicle
          - 'ego' if the vehicle is the ego vehicle
          - 'other_lane' if the vehicle is on a different lane
  """

  df['relative_position'] = 'other_lane'

  # Calculate lane direction vector
  lane_direction = np.array([xvel, yvel])

  # Calculate relative positions to ego vehicle
  ego_vehicle = df[df['id'] == ego_id]
  if ego_vehicle.empty:
    raise ValueError("Ego vehicle not found in the DataFrame.")
  ego_position = ego_vehicle[['x', 'y']].values[0]
  df['rel_x'] = df['x'] - ego_position[0]
  df['rel_y'] = df['y'] - ego_position[1]

  # Project relative positions onto lane direction
  df['projection'] = np.dot(df[['rel_x', 'rel_y']].values, lane_direction)

  # Group vehicles by lane
  for i in range(num_lanes):
    lane_vehicles = df[df['lane_id'] == i]
    if lane_vehicles.empty:
      continue

    # Sort vehicles within each lane
    sorted_lane_vehicles = lane_vehicles.sort_values(by='projection')

    # Reindex to put ego vehicle at index 0
    ego_index = sorted_lane_vehicles.index[sorted_lane_vehicles['id'] == ego_id].tolist()
    if ego_index:
      ego_index = ego_index[0]
      sorted_lane_vehicles = pd.concat([sorted_lane_vehicles.iloc[ego_index:], sorted_lane_vehicles.iloc[:ego_index]])

    # Label vehicles relative to the ego vehicle
    sorted_lane_vehicles.loc[0, 'relative_position'] = 'ego'
    sorted_lane_vehicles.loc[1:, 'relative_position'] = 'front'
    sorted_lane_vehicles.loc[:0, 'relative_position'] = 'behind'

    # Update the original DataFrame
    df.loc[lane_vehicles.index, 'relative_position'] = sorted_lane_vehicles['relative_position']

  return df

# Example usage
data = {'id': ['v1', 'v2', 'ego', 'v4', 'v5', 'v6', 'v7'],
        'x': [10, 20, 30, 40, 50, 15, 25],
        'y': [1, 1, 1, 1, 1, 2, 2],
        'lane_id': [0, 0, 0, 1, 1, 2, 2]}  # Assuming 3 lanes (0, 1, 2)
df = pd.DataFrame(data)

labeled_df = label_vehicles_by_lane_with_ego_at_zero(df, 'ego', 1, 0, num_lanes=3, ego_lane_idx=0)
print(labeled_df)

KeyError: 1