# Argoverse Dataset Extract

In [None]:
"""
------------
|# start   |
------------
"""


In [None]:
# import cell
from argparse import Namespace
from pathlib import Path
from enum import Enum, unique
from random import choices
import click
import argparse
from typing import List, Final
import os
import math
import multiprocess
import scipy
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from joblib import Parallel, delayed
from rich.progress import track
from shapely.geometry import LineString, MultiPolygon, MultiPoint, Point, Polygon
from shapely.ops import unary_union
import csv
import random
import shapely
import time
import seaborn as sns
from numpy import trapz
import matplotlib.font_manager as fm
from scipy.signal import savgol_filter
import pickle
import pandas as pd
from mpl_toolkits.axes_grid1 import make_axes_locatable
import av2.geometry.polyline_utils as polyline_utils
import av2.rendering.vector as vector_plotting_utils
from av2.datasets.sensor.av2_sensor_dataloader import AV2SensorDataLoader
from av2.map.map_api import ArgoverseStaticMap, LaneSegment
from av2.datasets.motion_forecasting.scenario_serialization import load_argoverse_scenario_parquet
from av2.datasets.motion_forecasting import scenario_serialization
from av2.map.map_api import ArgoverseStaticMap

In [None]:
# 改变plt字体
matplotlib.rcParams.update(matplotlib.rcParamsDefault)
plt.rcParams['font.sans-serif'] = 'SimHei'
plt.rcParams['axes.unicode_minus'] = False
%matplotlib inline

In [None]:
"""
Function Cell
-----------------------
# 函数：
类定义
交互判断
交互提取
角度修正
转向轨迹搜索
-----------------------
"""
class Track_info:
    def __init__(self):
        self.object_type = None
        self.track_id = None
        self.timestep = []
        self.position = []
        self.velocity = []
        self.heading = []
        self.observed = []
        self.category = None
        self.track_type = None

def interaction_judgement(ego_track,agent_track):

    PET = 0
    interaction_flag = False
    intersection_position = np.array([0,0])
    agent_time_to_c_idx = 0
    ego_time_to_c_idx = 0
    ego_position = np.array(ego_track.position)
    agent_position = np.array(agent_track.position)

    line_ego = LineString(list(ego_position))
    line_agent = LineString(list(agent_position))
    intersection = line_ego.intersection(line_agent)
    if intersection.is_empty == False:
        
        # 有交点
#         print('have intersection')
        # ego车往上运行 通过y判断
        # print(f'{ego_track.track_id},{agent_track.track_id}')
        if intersection.type == 'Point':
            # print(f'{ego_track.track_id},{agent_track.track_id}')
            intersection_position = np.array([intersection.xy[0][0],
                                              intersection.xy[1][0]])
            intersection_position = np.expand_dims(intersection_position,-1).transpose()

            ego_dist = scipy.spatial.distance.cdist(intersection_position,
                                                ego_position)
            agent_dist = scipy.spatial.distance.cdist(intersection_position,
                                                agent_position)
            ego_time_to_c_idx = np.where(ego_dist[0] == 
                                         np.min(ego_dist))[0][0]
            agent_time_to_c_idx =np.where(agent_dist[0] == 
                                          np.min(agent_dist))[0][0]
            ego_time_to_c = ego_track.timestep[ego_time_to_c_idx]
            agent_time_to_c = agent_track.timestep[agent_time_to_c_idx]
            PET = agent_time_to_c - ego_time_to_c
            interaction_flag = True

#         print(f'ego_timestep={ego_time_to_c},'
#               f'agent_timestep={agent_time_to_c},'
#               f'PET ={PET}')
    else:
        
        # 无交点
        # 需要考虑因为场景时间长度受限，轨迹没有交叉，但存在交互...
        # 即交互某一方的轨迹不完全
        # 1.判断末端在交叉口内部
        # 2.判断是否是转向
        pass
    return interaction_flag, PET , ego_time_to_c_idx,\
            agent_time_to_c_idx, intersection_position

def search_AV_HV(ego_track,agents_track,scenario_ids):
    # HV left-turn and AV straight throught
    pi = math.pi
    res=[]
    if ego_track.track_type == 'straight':
        ego_heading = np.array(ego_track.heading)
        for idx,agent_track in enumerate(agents_track):
            if (agent_track.track_type == 'left_turn'):
                agent_heading = np.array(agent_track.heading)
                if (-pi/6 + pi) <= abs(agent_heading[0] - ego_heading[0] ) \
                    <= (pi/6 + pi):
                    # -----interaction judgment-----
#                     print(idx)
                    interaction_flag, PET,_,_,_= interaction_judgement(ego_track,agent_track)
                    if interaction_flag:
                        res.append([scenario_ids, agent_track.track_id, PET])
    # AV left-turn and HV straight throught
    elif ego_track.track_type == 'left_turn':
        ego_heading = np.array(ego_track.heading)
        for agent_track in agents_track:
            if (agent_track.track_type == 'straight'):
                agent_heading = np.array(agent_track.heading)
                if (-pi/6 + pi) <= abs(agent_heading[0] - ego_heading[0] ) \
                    <= (pi/6 + pi):
                    # -----interaction judgment-----
                    interaction_flag, PET,_,_,_= interaction_judgement(ego_track,agent_track)
                    if interaction_flag:
                        
                        res.append([scenario_ids,agent_track.track_id, PET])
    return res

def search_HV_HV(agents_track,scenario_ids):
    pi = math.pi
    res=[]
    # HV left-turn and HV straight throught
    for agent_track in agents_track:
        if (agent_track.track_type == 'left_turn'):
            leftturn_agent_track = agent_track
            leftturn_agent_heading = np.array(leftturn_agent_track.heading)
            for agent_track in agents_track:
                if (agent_track.track_type == 'straight'):
                    straight_agent_track = agent_track
                    straight_agent_heading = np.array(straight_agent_track.heading)
                    if (-pi/6 + pi) <= abs(straight_agent_heading[0] - 
                                           leftturn_agent_heading[0]) <= (pi/6 + pi):
                        interaction_flag, PET,_,_,_= interaction_judgement(leftturn_agent_track, straight_agent_track)
                        if interaction_flag:
                            res.append([scenario_ids, leftturn_agent_track.track_id,straight_agent_track.track_id, PET])
    return res

def heading_fix_inside(heading_np,str2judge):
    # 把heading正坐标方向旋转，x轴正方向取决于heading数组的初始值

    pi=math.pi
    heading_np_fixed = np.zeros((len(heading_np),))
    if str2judge == "lane":
        heading_start = heading_np[0]
    else:
        heading_start = (heading_np[0:5]).sum()/5

    for heading_idx in range(len(heading_np)):
        heading_np_fixed[heading_idx] = heading_np[heading_idx] - heading_start
        if pi < heading_np_fixed[heading_idx] <= 2*pi:
            heading_np_fixed[heading_idx] = heading_np_fixed[heading_idx] - 2*pi
        elif -2*pi <= heading_np_fixed[heading_idx] < - pi:
            heading_np_fixed[heading_idx] = heading_np_fixed[heading_idx] + 2 * pi
    
#     print(heading_np_fixed)
    return heading_np_fixed

def search_left_turn_track(track_info,str2judge):
    
    pi = math.pi
    heading_np = np.array(track_info.heading)
    heading_np_fixed = heading_fix_inside(heading_np,str2judge)
    heading_dif_np = heading_np_fixed - heading_np_fixed[0]

    # 全程转向角变化 [1/3 * pi,2/3 * pi]

    if  ((1/6 * pi) <= heading_dif_np[
        len(heading_dif_np)-3:len(heading_dif_np)]).all() and \
        ((heading_dif_np[
        len(heading_dif_np)-3:len(heading_dif_np)])<= (2/3 * pi)).all():
        
#         heading_everydif_np = heading_dif_np[1:]-heading_dif_np[0:-1]
#         # heading_everydif_np>0.01的比例大于0.1？
#         if ((heading_everydif_np>0.01).sum()/len(heading_everydif_np)) >= 0.1:
        track_type = 'left_turn'
    elif ((-1/6 * pi) < heading_dif_np[
        len(heading_dif_np)-3:len(heading_dif_np)]).all() and \
        ((heading_dif_np[
        len(heading_dif_np)-3:len(heading_dif_np)])< (1/6 * pi)).all():
        track_type = 'straight'
    elif (-(2/3 * pi) <= heading_dif_np[
        len(heading_dif_np)-3:len(heading_dif_np)]).all() and \
        ((heading_dif_np[
        len(heading_dif_np)-3:len(heading_dif_np)])< (-1/4 * pi)).all():
        track_type = 'right_turn'
    # 后添加-报错可删
    elif ((1/6 * pi) < heading_dif_np[
        len(heading_dif_np)-3:len(heading_dif_np)]).all() and \
        ((heading_dif_np[
        len(heading_dif_np)-3:len(heading_dif_np)])<=  pi).all():
        track_type = 'maybe_left_turn'
    else:
        # 小角度的转向轨迹，因为场景没有采集完全
        # 根据section判断
        track_type = 'unknown'
    return track_type

In [None]:
"""
Function Cell
-----------------------
# 函数
map绘制
场景：map+tracks
交互场景：map+交互对tracks
-----------------------
"""

def single_log_teaser(args: argparse.Namespace) -> None:
    """
    For a single log, render all local crosswalks in green, and pedestrian crossings in purple,
    in a bird's eye view.
    """
    log_map_dirpath = Path(args.dataroot) / args.log_id
    avm = ArgoverseStaticMap.from_map_dir(log_map_dirpath, build_raster=False)

    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot()
    ax.spines['right'].set_visible(False)
    ax.spines['top'].set_visible(False)
    ax.spines['left'].set_visible(False)
    ax.spines['bottom'].set_visible(False)
    for _, ls in avm.vector_lane_segments.items():
        
        if ls.is_intersection:
            # right_ln_bnd
            # left_ln_bnd
            vector_plotting_utils.draw_polygon_mpl(ax, ls.polygon_boundary, color="gray", linewidth=0.5)
            vector_plotting_utils.plot_polygon_patch_mpl(ls.polygon_boundary, ax, color="gray", alpha=0.2)
        else:
            vector_plotting_utils.draw_polygon_mpl(ax, ls.polygon_boundary, color="gray", linewidth=0.5)
            vector_plotting_utils.plot_polygon_patch_mpl(ls.polygon_boundary, ax, color="gray", alpha=0.2)
    # plot all pedestrian crossings
#     for _, pc in avm.vector_pedestrian_crossings.items():
#         vector_plotting_utils.draw_polygon_mpl(ax, pc.polygon, color="m", linewidth=0.5)
#         vector_plotting_utils.plot_polygon_patch_mpl(pc.polygon, ax, color="m", alpha=0.2)

def single_log_teaser_intersection(args: argparse.Namespace) -> None:
    """
    For a single log, render all local crosswalks in green, and pedestrian crossings in purple,
    in a bird's eye view.
    """
    log_map_dirpath = Path(args.dataroot) / args.log_id
    avm = ArgoverseStaticMap.from_map_dir(log_map_dirpath, build_raster=False)

    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot()
#     ax.spines['right'].set_visible(False)
#     ax.spines['top'].set_visible(False)
#     ax.spines['left'].set_visible(False)
#     ax.spines['bottom'].set_visible(False)
    for _, ls in avm.vector_lane_segments.items():
        
        if ls.is_intersection:
            # right_ln_bnd
            # left_ln_bnd
            vector_plotting_utils.draw_polygon_mpl(ax, ls.polygon_boundary, color="gray", linewidth=0.5)
            vector_plotting_utils.plot_polygon_patch_mpl(ls.polygon_boundary, ax, color="gray", alpha=0.2)
#         else:
#             vector_plotting_utils.draw_polygon_mpl(ax, ls.polygon_boundary, color="gray", linewidth=0.5)
#             vector_plotting_utils.plot_polygon_patch_mpl(ls.polygon_boundary, ax, color="gray", alpha=0.2)
            
def plt_mapNtracks(args,ego_track,agents_track,idx):
    log_map_dirpath = Path(args.dataroot) / args.log_id
    avm = ArgoverseStaticMap.from_map_dir(log_map_dirpath, build_raster=False)

    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot()
    
    
    for _, ls in avm.vector_lane_segments.items():
        
        if ls.is_intersection:
            # right_ln_bnd
            # left_ln_bnd
            vector_plotting_utils.draw_polygon_mpl(ax, ls.polygon_boundary, color="g", linewidth=0.5)
            vector_plotting_utils.plot_polygon_patch_mpl(ls.polygon_boundary, ax, color="g", alpha=0.2)
        else:
            vector_plotting_utils.draw_polygon_mpl(ax, ls.polygon_boundary, color="g", linewidth=0.5)
            vector_plotting_utils.plot_polygon_patch_mpl(ls.polygon_boundary, ax, color="g", alpha=0.2)

    # plot all pedestrian crossings
    for _, pc in avm.vector_pedestrian_crossings.items():
        vector_plotting_utils.draw_polygon_mpl(ax, pc.polygon, color="m", linewidth=0.5)
        vector_plotting_utils.plot_polygon_patch_mpl(pc.polygon, ax, color="m", alpha=0.2)
    
    ego_position_np = np.array(ego_track.position)
    plt.plot(ego_position_np[:, 0],ego_position_np[:, 1],'r.-',zorder=2)
    for agent_track in (agents_track):
        agent_position_np = np.array(agent_track.position)
        dist = np.hypot(*(agent_position_np[0] - agent_position_np[-1]))
        if dist>10:
            plt.plot(agent_position_np[:, 0], 
                        agent_position_np[:, 1],'b.-',zorder=1)
    plt.text(0.1, 0.8, s=f"Sceanrio {idx}", transform=ax.transAxes)
    plt.show()


def tracks_from_res(dataroot, res):
    scenario_ids = res[0]
    args = Namespace(**{"dataroot": Path(dataroot), "log_id": Path(scenario_ids)})
    scenario_path = Path(args.dataroot) / args.log_id / f"scenario_{args.log_id}.parquet"
    ArgoverseScenario = load_argoverse_scenario_parquet(scenario_path)
    leftturn_track = Track_info()
    straight_track = Track_info()

    if type(res[3]) == str:

        ego_track = Track_info()
        ego_track.category = 5
        ego_track.track_id = ArgoverseScenario.tracks[-1].track_id
        ego_track.object_type = ArgoverseScenario.tracks[-1].object_type[0:]
        for ego_state_idx in range(len(ArgoverseScenario.tracks[-1].object_states)):
            ego_track.timestep.append(ArgoverseScenario.tracks[-1].
                                      object_states[ego_state_idx].timestep)
            ego_track.position.append(ArgoverseScenario.tracks[-1].
                                      object_states[ego_state_idx].position)
            ego_track.velocity.append(ArgoverseScenario.tracks[-1].
                                      object_states[ego_state_idx].velocity)
            ego_track.heading.append(ArgoverseScenario.tracks[-1].
                                     object_states[ego_state_idx].heading)
            ego_track.observed.append(ArgoverseScenario.tracks[-1].
                                      object_states[ego_state_idx].observed)
        ego_track.track_type = search_left_turn_track(ego_track,'veh')
        agent_track = Track_info()
        for track_idx in range(len(ArgoverseScenario.tracks)-1):
            if ArgoverseScenario.tracks[track_idx].track_id == str(res[1]):
                agent_track.object_type = ArgoverseScenario.tracks[track_idx].object_type[0:]
                agent_track.category = ArgoverseScenario.tracks[track_idx].category.value
                agent_track.track_id = ArgoverseScenario.tracks[track_idx].track_id
                for state_idx in range(len(ArgoverseScenario.tracks[track_idx].object_states)):
                    agent_track.timestep.append(ArgoverseScenario.tracks[track_idx].
                                                object_states[state_idx].timestep)
                    agent_track.position.append(ArgoverseScenario.tracks[track_idx].
                                                object_states[state_idx].position[0:])
                    agent_track.velocity.append(ArgoverseScenario.tracks[track_idx].
                                                object_states[state_idx].velocity[0:])
                    agent_track.heading.append(ArgoverseScenario.tracks[track_idx].
                                               object_states[state_idx].heading)
                    agent_track.observed.append(ArgoverseScenario.tracks[track_idx].
                                               object_states[state_idx].observed)
        if ego_track.track_type == 'left_turn' or (res[2]==0 and res[4]==0):
            leftturn_track = ego_track
            straight_track = agent_track

        else:
            leftturn_track = agent_track
            straight_track = ego_track
#         plt.plot(range(0,len(leftturn_track.heading)),leftturn_track.heading)
#         plt.show()
    elif type(res[3]) == int:
        for track_idx in range(len(ArgoverseScenario.tracks)-1):
            if ArgoverseScenario.tracks[track_idx].track_id == str(res[1]):
                leftturn_track.object_type = ArgoverseScenario.tracks[track_idx].object_type[0:]
                leftturn_track.category = ArgoverseScenario.tracks[track_idx].category.value
                leftturn_track.track_id = ArgoverseScenario.tracks[track_idx].track_id
                for state_idx in range(len(ArgoverseScenario.tracks[track_idx].object_states)):
                    leftturn_track.timestep.append(ArgoverseScenario.tracks[track_idx].
                                                object_states[state_idx].timestep)
                    leftturn_track.position.append(ArgoverseScenario.tracks[track_idx].
                                                object_states[state_idx].position[0:])
                    leftturn_track.velocity.append(ArgoverseScenario.tracks[track_idx].
                                                object_states[state_idx].velocity[0:])
                    leftturn_track.heading.append(ArgoverseScenario.tracks[track_idx].
                                               object_states[state_idx].heading)
                    leftturn_track.observed.append(ArgoverseScenario.tracks[track_idx].
                                               object_states[state_idx].observed)
                leftturn_track.track_type = 'left_turn'
            elif ArgoverseScenario.tracks[track_idx].track_id == str(res[2]):
                straight_track.object_type = ArgoverseScenario.tracks[track_idx].object_type[0:]
                straight_track.category = ArgoverseScenario.tracks[track_idx].category.value
                straight_track.track_id = ArgoverseScenario.tracks[track_idx].track_id
                for state_idx in range(len(ArgoverseScenario.tracks[track_idx].object_states)):
                    straight_track.timestep.append(ArgoverseScenario.tracks[track_idx].
                                                object_states[state_idx].timestep)
                    straight_track.position.append(ArgoverseScenario.tracks[track_idx].
                                                object_states[state_idx].position[0:])
                    straight_track.velocity.append(ArgoverseScenario.tracks[track_idx].
                                                object_states[state_idx].velocity[0:])
                    straight_track.heading.append(ArgoverseScenario.tracks[track_idx].
                                               object_states[state_idx].heading)
                    straight_track.observed.append(ArgoverseScenario.tracks[track_idx].
                                               object_states[state_idx].observed)
                leftturn_track.track_type = 'straight'
    return leftturn_track,straight_track

def plt_res(dataroot, res,idx):
    scenario_ids = res[0]
    args = Namespace(**{"dataroot": Path(dataroot), "log_id": Path(scenario_ids)})
    leftturn_track, straight_track = tracks_from_res(dataroot,res)
    
    log_map_dirpath = Path(args.dataroot) / args.log_id
    avm = ArgoverseStaticMap.from_map_dir(log_map_dirpath, build_raster=False)
    
    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot()
    
    for _, ls in avm.vector_lane_segments.items():
        
#         if ls.is_intersection:
#             # right_ln_bnd
#             # left_ln_bnd
#             vector_plotting_utils.draw_polygon_mpl(ax, ls.polygon_boundary, color="g", linewidth=0.5)
#             vector_plotting_utils.plot_polygon_patch_mpl(ls.polygon_boundary, ax, color="g", alpha=0.2)
#         else:
            vector_plotting_utils.draw_polygon_mpl(ax, ls.polygon_boundary, color="gray", linewidth=0.5)
            vector_plotting_utils.plot_polygon_patch_mpl(ls.polygon_boundary, ax, color="gray", alpha=0.2)

    # plot all pedestrian crossings
#     for _, pc in avm.vector_pedestrian_crossings.items():
#         vector_plotting_utils.draw_polygon_mpl(ax, pc.polygon, color="m", linewidth=0.5)
#         vector_plotting_utils.plot_polygon_patch_mpl(pc.polygon, ax, color="m", alpha=0.2)
    
    leftturn_position_np = np.array(leftturn_track.position)

    plt.plot(leftturn_position_np[0:95, 0], leftturn_position_np[0:95, 1],'r.-',zorder=2,label='AV左转让行')
    straight_position_np = np.array(straight_track.position)
    plt.plot(straight_position_np[0:95, 0], straight_position_np[0:95, 1],'b.-',zorder=1,label='HV直行先行')
    plt.text(0.5, 0.8, s=f"场景编号\n{idx}", transform=ax.transAxes,fontsize=15)
#     plt.show()

In [None]:
"""
Function Cell
-----------------------
# 函数：
交叉口提取
路段线型判断
空间关系判断
-----------------------
"""

def optimizePoints(inPoint):
    """
        5点1阶段轨迹平滑算法
        input
            inPoint list点序列 [{'lng':0.0, 'lat':0.0},...]
        output
            inPoint 滤波后的list点序列 [{'lng':0.0, 'lat':0.0},...]
    """
    size = len(inPoint)
    
    outinPoint = copy.deepcopy(inPoint)
    
    if (size < 5):
        return inPoint
    else:
        # Latitude
        outinPoint[0]['lat'] = (3.0 * inPoint[0]['lat'] + 2.0 * inPoint[1]['lat'] + inPoint[2]['lat'] - inPoint[4]['lat']) / 5.0
        outinPoint[1]['lat'] = (4.0 * inPoint[0]['lat'] + 3.0 * inPoint[1]['lat'] + 2 * inPoint[2]['lat'] + inPoint[3]['lat']) / 10.0
        for i in range(2, size-3):
            outinPoint[i]['lat'] = (inPoint[i-2]['lat'] + inPoint[i-1]['lat'] + inPoint[i]['lat'] + inPoint[i+1]['lat'] + inPoint[i+2]['lat']) / 5.0
        outinPoint[size-2]['lat'] = (4.0 * inPoint[size-1]['lat'] + 3.0 * inPoint[size-2]['lat'] + 2 * inPoint[size-3]['lat'] + inPoint[size-4]['lat']) / 10.0
        outinPoint[size-1]['lat'] = (3.0 * inPoint[size-1]['lat'] + 2.0 * inPoint[size-2]['lat'] + inPoint[size-3]['lat'] - inPoint[size-5]['lat']) / 5.0
        
        # Longitude
        outinPoint[0]['lng'] = (3.0 * inPoint[0]['lng'] + 2.0 * inPoint[1]['lng'] + inPoint[2]['lng'] - inPoint[4]['lng']) / 5.0
        outinPoint[1]['lng'] = (4.0 * inPoint[0]['lng'] + 3.0 * inPoint[1]['lng'] + 2 * inPoint[2]['lng'] + inPoint[3]['lng']) / 10.0
        for i in range(2, size-3):
            outinPoint[i]['lng'] = (inPoint[i-2]['lng'] + inPoint[i-1]['lng'] + inPoint[i]['lng'] + inPoint[i+1]['lng'] + inPoint[i+2]['lng']) / 5.0
        outinPoint[size - 2]['lng'] = (4.0 * inPoint[size-1]['lng'] + 3.0 * inPoint[size-2]['lng'] + 2 * inPoint[size-3]['lng'] + inPoint[size-4]['lng']) / 10.0
        outinPoint[size - 1]['lng'] = (3.0 * inPoint[size-1]['lng'] + 2.0 * inPoint[size-2]['lng'] + inPoint[size-3]['lng'] - inPoint[size-5]['lng']) / 5.0
    
    return outinPoint


def lane_type_in_map(ls):
    # 求intersection内部路段的航向角，判断是不是左转车道
    right_lane_heading = []
    right_lane_boundary =ls.right_lane_boundary.xyz
    right_lane_boundary = np.delete(right_lane_boundary,2,axis=1)
    for i in range(len(right_lane_boundary)-1):
        a = right_lane_boundary[i+1]-right_lane_boundary[i]
        arctan2_ = np.arctan2(a[1], a[0])
        right_lane_heading.append(arctan2_)
#     print(right_lane_heading)
    right_lane_track = Track_info()
    right_lane_track.heading = right_lane_heading
#     print(right_lane_heading)
    if search_left_turn_track(right_lane_track,'lane') == 'left_turn' or\
            search_left_turn_track(right_lane_track,'lane') == 'maybe_left_turn':
        lane_flag = 'left_turn'
    elif search_left_turn_track(right_lane_track,'lane') == 'right_turn':
        lane_flag = 'right_turn'
    else:
        lane_flag = None
    
    return lane_flag


def intersection_extract(args):
    
    log_map_dirpath = Path(args.dataroot) / args.log_id
    avm = ArgoverseStaticMap.from_map_dir(log_map_dirpath, build_raster=False)
    mu_polygons = []

    for idx, ls in avm.vector_lane_segments.items():
        if ls.is_intersection and lane_type_in_map(ls) != 'right_turn':
            intersection_bounds_xy = ls.polygon_boundary
#             plt.scatter(intersection_bounds_xy[:,0],intersection_bounds_xy[:,1])
            polygon = Polygon(intersection_bounds_xy)
            mu_polygons.append(polygon)
            
    mu_polygons = unary_union(MultiPolygon(mu_polygons))
    
    return mu_polygons

def in_which_intersection(point,mu_polygons):
    intersection_idx = None
    if mu_polygons.geom_type == 'Polygon':
        if point.within(mu_polygons.convex_hull): # polygon
            intersection_idx = 0
    else:
        if len(mu_polygons.geoms)>0:
#             print(point.xy)
            for idx, polygon in enumerate(mu_polygons.geoms):
#                 print(point.within(polygon))
                if point.within(polygon.convex_hull): # polygon
                    intersection_idx = idx
                    break

    return intersection_idx

def av_left_interaction_for_acc(dataroot,cp_position,ego_track,
                                agent_track,res,ego_idx2cp,agent_idx2cp):
    # 进一步筛选交互事件
    # 通过交互双方与交叉口的距离判断
    
    ego_start_idx = None
    scenario_ids = res[0]
    args = Namespace(**{"dataroot": Path(dataroot), "log_id": Path(scenario_ids)})
    
    mu_polygons = intersection_extract(args)
    
    point = Point(cp_position[0])
    intersection_idx = in_which_intersection(point,mu_polygons)
    
    if intersection_idx == None:
        ego_start_idx = None
    else:
        if mu_polygons.geom_type == 'Polygon':
            polygon = mu_polygons   # 我认为这里不需要.convex_hull
        else:
            polygon = mu_polygons.geoms[intersection_idx]# 我认为这里不需要.convex_hull
#         if res[2] >= 0: # AV left 先通过
        for idx in range(len(ego_track.timestep)):
            timestep = ego_track.timestep[idx]
            if (timestep < ego_track.timestep[ego_idx2cp]  and
                timestep < agent_track.timestep[agent_idx2cp] and
                agent_track.timestep[0] <= timestep):
#                 print(f'timestep={timestep}')
#                 print(f'agent_timestep={agent_track.timestep[agent_idx2cp]}')
                point_ego = Point(ego_track.position[idx])
                agent_time_idx = np.where( np.array(agent_track.timestep) 
                                          == timestep)[0][0]
                point_agent = Point(agent_track.position[agent_time_idx])

                distance_ego2intersection = point_ego.distance(polygon)

                distance_agent2intersection = point_agent.distance(polygon)
        #                 print(f"distance_av2intersection={distance_av2intersection}")
                if distance_ego2intersection <= 5 and distance_agent2intersection <=5:
                    ego_start_idx = idx
        #                     print(f'ego_start_idx={ego_start_idx}')
                    break
    return ego_start_idx

In [None]:
"""
Function Cell
-----------------------
# 函数
协作加速度(acc_c)计算函数
-----------------------
"""
def smooth_traj(traj_list:list):
    smooth_traj_x = savgol_filter(np.array(traj_list)[:,0],10,3)
    smooth_traj_y = savgol_filter(np.array(traj_list)[:,1],10,3)
    traj_smoothed =np.hstack((np.array([smooth_traj_x]).T,np.array([smooth_traj_y]).T))
    velocity_cal = np.zeros((len(traj_smoothed),2))

    for idx in range(len(traj_smoothed)):
        if idx == 0 :
            continue
        elif idx == 1:
            velocity_cal[idx-1,:] = (traj_smoothed[idx] - traj_smoothed[idx-1])/0.1
            velocity_cal[idx,:] = (traj_smoothed[idx] - traj_smoothed[idx-1])/0.1
        else:
            velocity_cal[idx,:] = (traj_smoothed[idx] - traj_smoothed[idx-1])/0.1
    return traj_smoothed, velocity_cal
                    
def acc_c_func(d_e, d_a, v_e, v_a):
    if v_a < 0.01:
        v_a = 0.01
    t_a = d_a/v_a
    if d_e > v_e * t_a:
        acc_c = 2*(d_e-v_e*t_a)/(t_a * t_a)
    elif d_e <= v_e * t_a:
        acc_c = -(v_e*v_e)/(2*d_e)
    return acc_c

def acc_c_cal(ego_track,agent_track,ego_time_idx,
              ego_idx_to_cp,agent_idx_to_cp,key):
    
    cal_flag = False
    acc_c_e = 0
    acc_c_a = 0
    deta_TTCP = 0
    timestep = ego_track.timestep[ego_time_idx]
    if (
        (agent_track.timestep[0] <= timestep) and
        (timestep < agent_track.timestep[agent_idx_to_cp-1]) and
        (timestep < ego_track.timestep[ego_idx_to_cp-1])):
        # 减x就提前0.x秒
        
        
        agent_timestep_np = np.array(agent_track.timestep)
        agent_time_idx = np.where( agent_timestep_np == timestep)[0][0]
        
#         print(f'ego_time_idx={ego_time_idx}\n'
#              f'timestep={timestep}\n'
#              f'agent_timestep_np={agent_timestep_np}\n'
#              f'agent_time_idx={agent_time_idx}\n')
        
            
        if key == "None":
            agent_position = np.array(agent_track.position)
            agent_velocity = np.array(agent_track.velocity)
            v_a = np.linalg.norm(agent_velocity[agent_time_idx])
        else:
            agent_position, agent_velocity = smooth_traj(agent_track.position)
            agent_velocity_norm = savgol_filter(np.linalg.norm(np.array(agent_velocity), axis=1),10,3)
            v_a = agent_velocity_norm[agent_time_idx]
            
        agent_position_cp = agent_position[agent_time_idx:agent_idx_to_cp,:]
        
#         print(f'agent_position_cp={agent_position_cp}')
        if key == "AV" or "None":
            ego_position = np.array(ego_track.position)
            ego_velocity = np.array(ego_track.velocity)
            v_e = np.linalg.norm(ego_velocity[ego_time_idx])
        else:
            ego_position, ego_velocity = smooth_traj(ego_track.position)
            ego_velocity_norm =savgol_filter(np.linalg.norm(np.array(ego_velocity), axis=1),10,3)
            v_e = ego_velocity_norm[ego_time_idx]
            
        ego_position_cp = ego_position[ego_time_idx:ego_idx_to_cp,:]
#         print(f'ego_position_cp={ego_position_cp}')

        line_agent_cp = LineString(list(agent_position_cp))
        d_a = line_agent_cp.length
        line_ego_cp = LineString(list(ego_position_cp))
        d_e = line_ego_cp.length
        
        acc_c_e = acc_c_func(d_e, d_a, v_e, v_a)
        acc_c_a = acc_c_func(d_a, d_e, v_a, v_e)
        cal_flag = True
        
#         print(f'v_e={v_e},v_a={v_a}')
#         print(f'd_e={d_e},d_a={d_a}')

#         print(f'acc_c_e={acc_c_e}')
#         print(f'acc_c_a={acc_c_a}')
    return cal_flag, acc_c_e , acc_c_a

def acc_and_TTCP_cal(ego_track,agent_track,ego_time_idx,
              ego_idx_to_cp,agent_idx_to_cp,key):
    
    cal_flag = False
    acc_c_e = 0
    acc_c_a = 0
    deta_TTCP = 0
    deta_v = 0
    timestep = ego_track.timestep[ego_time_idx]
    if (
        (agent_track.timestep[0] <= timestep) and
        (timestep < agent_track.timestep[agent_idx_to_cp-1]) and
        (timestep < ego_track.timestep[ego_idx_to_cp-1])):
        # 减x就提前0.x秒
        
        
        agent_timestep_np = np.array(agent_track.timestep)
        agent_time_idx = np.where( agent_timestep_np == timestep)[0][0]
        
#         print(f'ego_time_idx={ego_time_idx}\n'
#              f'timestep={timestep}\n'
#              f'agent_timestep_np={agent_timestep_np}\n'
#              f'agent_time_idx={agent_time_idx}\n')
        
            
        if key == "None":
            agent_position = np.array(agent_track.position)
            agent_velocity = np.array(agent_track.velocity)
            v_a = np.linalg.norm(agent_velocity[agent_time_idx])
        else:
            agent_position, agent_velocity = smooth_traj(agent_track.position)
            agent_velocity_norm = savgol_filter(np.linalg.norm(np.array(agent_velocity), axis=1),10,3)
            v_a = agent_velocity_norm[agent_time_idx]
            
        agent_position_cp = agent_position[agent_time_idx:agent_idx_to_cp,:]
        
#         print(f'agent_position_cp={agent_position_cp}')
        if key == "AV" or "None":
            ego_position = np.array(ego_track.position)
            ego_velocity = np.array(ego_track.velocity)
            v_e = np.linalg.norm(ego_velocity[ego_time_idx])
        else:
            ego_position, ego_velocity = smooth_traj(ego_track.position)
            ego_velocity_norm =savgol_filter(np.linalg.norm(np.array(ego_velocity), axis=1),10,3)
            v_e = ego_velocity_norm[ego_time_idx]
            
        ego_position_cp = ego_position[ego_time_idx:ego_idx_to_cp,:]
#         print(f'ego_position_cp={ego_position_cp}')

        line_agent_cp = LineString(list(agent_position_cp))
        d_a = line_agent_cp.length
        
        line_ego_cp = LineString(list(ego_position_cp))
        d_e = line_ego_cp.length


        
        acc_c_e = acc_c_func(d_e, d_a, v_e, v_a)
        acc_c_a = acc_c_func(d_a, d_e, v_a, v_e)
        
        cal_flag = True
        if v_e < 0.5:
            v_e = 0.5
        if v_a < 0.5:
            v_a = 0.5
        TTCP_e = d_e/v_e
        TTCP_a = d_a/v_a
        deta_TTCP = TTCP_e - TTCP_a
        deta_v = v_e - v_a
#         print(f'v_e={v_e},v_a={v_a}')
#         print(f'd_e={d_e},d_a={d_a}')
#         print(f'acc_c_e={acc_c_e}')
#         print(f'acc_c_a={acc_c_a}')
    return cal_flag, acc_c_e , acc_c_a, deta_TTCP, deta_v

In [None]:
"""
-------------------------------
# 运行程序cell
交互轨迹提取程序
-------------------------------
"""
if __name__=="__main__":
    pi = math.pi

    # Results of AV-HV interaction behavior
    res_AV_HV = []
    PET_AV_HV_5 = 0
    PET_AV_HV_4_5 = 0
    PET_AV_HV_3_4 = 0
    PET_AV_HV_2_3 = 0
    PET_AV_HV_2 = 0
    
    # Results of HV-HV interaction behavior
    res_HV_HV = []
    PET_HV_HV_5 = 0
    PET_HV_HV_4_5 = 0
    PET_HV_HV_3_4 = 0
    PET_HV_HV_2_3 = 0
    PET_HV_HV_2 = 0
    
    dataroot = "E:/ResearchData/Argoverse/val"
    Scenario_ids_list = os.listdir(dataroot)
    
    Scenario_ids_list_temp = [
                              Scenario_ids_list[18],
                              Scenario_ids_list[34],
                              Scenario_ids_list[58],
                              Scenario_ids_list[192],
                              Scenario_ids_list[206],
                             Scenario_ids_list[328]]
    
    for idx,scenario_ids in enumerate(Scenario_ids_list_temp):
        args = Namespace(**{"dataroot": Path(dataroot), "log_id": Path(scenario_ids)})
        scenario_path = Path(args.dataroot) / args.log_id / f"scenario_{args.log_id}.parquet"
        ArgoverseScenario = load_argoverse_scenario_parquet(scenario_path)

        # ego vehicle track info
        if ArgoverseScenario.tracks[-1].track_id != "AV":
            print(f'Scenario {idx} last track is not AV!')
            continue
        else:
            # ego track info
            ego_track = Track_info()
            ego_track.category = 5
            ego_track.track_id = ArgoverseScenario.tracks[-1].track_id
            ego_track.object_type = ArgoverseScenario.tracks[-1].object_type[0:]
            for ego_state_idx in range(len(
                ArgoverseScenario.tracks[-1].object_states)):
                ego_track.timestep.append(ArgoverseScenario.tracks[-1].
                                          object_states[ego_state_idx].timestep)
                ego_track.position.append(ArgoverseScenario.tracks[-1].
                                          object_states[ego_state_idx].position)
                ego_track.velocity.append(ArgoverseScenario.tracks[-1].
                                          object_states[ego_state_idx].velocity)
                ego_track.heading.append(ArgoverseScenario.tracks[-1].
                                         object_states[ego_state_idx].heading)
                ego_track.observed.append(ArgoverseScenario.tracks[-1].
                                          object_states[ego_state_idx].observed)

            ego_track.track_type = search_left_turn_track(ego_track,'veh')

            # plot ego and map
            ego_position_np = np.array(ego_track.position)
#             plt.figure(idx)
#             single_log_teaser(args)
#             plt.plot(ego_position_np[:, 0],ego_position_np[:, 1],'r.-',zorder=2)

            # all agents tracks info
            agents_track: List[Track_info] = []
            for track_idx in range(len(ArgoverseScenario.tracks)-1):
                if ArgoverseScenario.tracks[track_idx].object_type[0:] == 'vehicle':
                    agent_track = Track_info()
                    agent_track.object_type = ArgoverseScenario.tracks[track_idx].object_type[0:]
                    agent_track.category = ArgoverseScenario.tracks[track_idx].category.value
                    agent_track.track_id = ArgoverseScenario.tracks[track_idx].track_id
                    for state_idx in range(len(
                        ArgoverseScenario.tracks[track_idx].object_states)):
                        agent_track.timestep.append(ArgoverseScenario.tracks[track_idx].
                                                    object_states[state_idx].timestep)
                        agent_track.position.append(ArgoverseScenario.tracks[track_idx].
                                                    object_states[state_idx].position[0:])
                        agent_track.velocity.append(ArgoverseScenario.tracks[track_idx].
                                                    object_states[state_idx].velocity[0:])
                        agent_track.heading.append(ArgoverseScenario.tracks[track_idx].
                                                   object_states[state_idx].heading)
                        agent_track.observed.append(ArgoverseScenario.tracks[track_idx].
                                                   object_states[state_idx].observed)

                    agent_position_np = np.array(agent_track.position)
                    if (agent_track.category == 0 or 1 or 2 or 3) :
                        dist = np.hypot(*(agent_position_np[0] - agent_position_np[-1]))
                        if dist>10:
                            agent_track.track_type = search_left_turn_track(
                                                                agent_track,'veh')

                            # plot agents tracks
#                             plt.plot(agent_position_np[:, 0], 
#                                      agent_position_np[:, 1],'b.-',zorder=1)
                    agents_track.append(agent_track)

            # search interaction scenario
            res_AV_HV_temp = search_AV_HV(ego_track,agents_track,scenario_ids)
            res_HV_HV_temp = search_HV_HV(agents_track,scenario_ids)
            if len(res_AV_HV_temp)>0:
                
                for res_AV_HV_s in res_AV_HV_temp:
                    res_AV_HV.append(res_AV_HV_s)
                    pet_AV_HV_temp = abs(res_AV_HV_s[2])
                    if  pet_AV_HV_temp <=20 :
                        PET_AV_HV_2 = PET_AV_HV_2+1
                    elif 20<pet_AV_HV_temp <=30 :
                        PET_AV_HV_2_3 = PET_AV_HV_2_3 + 1
                    elif 30<pet_AV_HV_temp <=40 :
                        PET_AV_HV_3_4 = PET_AV_HV_3_4 + 1
                    elif 40<pet_AV_HV_temp <=50 :
                        PET_AV_HV_4_5 = PET_AV_HV_4_5 + 1
                    elif 50<pet_AV_HV_temp :
                        PET_AV_HV_5 = PET_AV_HV_5 + 1
            if len(res_HV_HV_temp)>0:
                
                for res_HV_HV_s in res_HV_HV_temp:
                    res_HV_HV.append(res_HV_HV_s)
                    pet_HV_HV_temp = abs(res_HV_HV_s[3])
                    if  pet_HV_HV_temp <=20 :
                        PET_HV_HV_2 = PET_HV_HV_2+1
                    elif 20<pet_HV_HV_temp <=30 :
                        PET_HV_HV_2_3 = PET_HV_HV_2_3 + 1
                    elif 30<pet_HV_HV_temp <=40 :
                        PET_HV_HV_3_4 = PET_HV_HV_3_4 + 1
                    elif 40<pet_HV_HV_temp <=50 :
                        PET_HV_HV_4_5 = PET_HV_HV_4_5 + 1
                    elif 50<pet_HV_HV_temp :
                        PET_HV_HV_5 = PET_HV_HV_5 + 1
#             if idx % 200 == 0:
            print(f'Scearching in {idx}th Scenario: \n'
                 f'Totally found {len(res_AV_HV)} AV-HV interactive scenarios \n'
                 f'PET distribution: 0~2={PET_AV_HV_2} ; 2~3={PET_AV_HV_2_3} ; '
                f'3~4={PET_AV_HV_3_4} ; 4~5={PET_AV_HV_4_5} ; >5={PET_AV_HV_5} \n'
                 f'Totally found {len(res_HV_HV)} HV-HV interactive scenarios \n'
                 f'PET distribution: 0~2={PET_HV_HV_2} ; 2~3={PET_HV_HV_2_3} ; '
                f'3~4={PET_HV_HV_3_4} ; 4~5={PET_HV_HV_4_5} ; >5={PET_HV_HV_5} \n')
            
        plt_mapNtracks(args,ego_track,agents_track,idx)
#         if idx == 0:
#             break

#         ax = fig.add_subplot() # 这句话有问题，需要运行一遍错误的程序...
#         plt.text(0.1, 0.7, s=f"Sceanrio {idx}", transform=ax.transAxes)
#         scenario_dir = f'E:/ResearchData/Argoverse/val/{args.log_id}'
#         output_dir = 'E:/ResearchData/Argoverse/visualization'
#         generate_scenario_visualizations(Path(scenario_dir),Path(output_dir),1, "FIRST")
#         print(args.log_id)
#     if idx == 0:
#         break

In [None]:
# 判断AV是转向还是直行 (一次性)
"""
--------------------
# 判断AV是转向还是直行
--------------------
"""
#     for idx,res in enumerate(res_AV_HV_list):
#         scenario_ids = res[0]
#         args = Namespace(**{"dataroot": Path(dataroot), "log_id": Path(scenario_ids)})
#         scenario_path = Path(args.dataroot) / args.log_id / f"scenario_{args.log_id}.parquet"
#         ArgoverseScenario = load_argoverse_scenario_parquet(scenario_path)

#         ego_track = Track_info()
#         ego_track.category = 5
#         ego_track.track_id = ArgoverseScenario.tracks[-1].track_id
#         ego_track.object_type = ArgoverseScenario.tracks[-1].object_type[0:]
#         for ego_state_idx in range(len(ArgoverseScenario.tracks[-1].object_states)):
#             ego_track.timestep.append(ArgoverseScenario.tracks[-1].
#                                       object_states[ego_state_idx].timestep)
#             ego_track.position.append(ArgoverseScenario.tracks[-1].
#                                       object_states[ego_state_idx].position)
#             ego_track.velocity.append(ArgoverseScenario.tracks[-1].
#                                       object_states[ego_state_idx].velocity)
#             ego_track.heading.append(ArgoverseScenario.tracks[-1].
#                                      object_states[ego_state_idx].heading)
#             ego_track.observed.append(ArgoverseScenario.tracks[-1].
#                                       object_states[ego_state_idx].observed)
#         ego_track.track_type = search_left_turn_track(ego_track,'veh')
#         if ego_track.track_type == 'left_turn':
#             res_AV_HV[idx].append('AV')
#         else:
#             res_AV_HV[idx].append('HV')


In [None]:
# 结果保存成CSV（一次性）
"""
-------------------------------
# 结果保存成CSV
-------------------------------
"""

# filePath_AV_HV = './result_AV_HV.csv'

# with open(filePath_AV_HV, "w", newline='') as f:
#     writer = csv.writer(f)
#     for row in res_AV_HV:
#         writer.writerow(row)

# filePath_HV_HV = './result_HV_HV.csv'

# with open(filePath_HV_HV, "w", newline='') as f:
#     writer = csv.writer(f)
#     for row in res_HV_HV:
#         writer.writerow(row)

# print(f"------ Saved to CSV, extract interactive scene in total------\n")

In [None]:
for test_idx,agent_track in enumerate(agents_track):
    if agent_track.track_type == 'left_turn':
        print(f'left_idx{test_idx}, {agent_track.track_id}')
    elif agent_track.track_type == 'straight':
        print(f'straight_idx{test_idx}, {agent_track.track_id}')
    elif agent_track.track_type == 'unknown':
        print(f'unknown_idx{test_idx}, {agent_track.track_id}')

In [None]:
# 从保存的CSV中读取数据

"""
-------------------------------
# 运行程序cell
读取CSV
-------------------------------
"""
if __name__== "__main__" :
    filePath_AV_HV = './result/result_AV_HV.csv'
    col_types_1 = [str,int,int,str]
    col_types_2 = [str,int,int,int]
    res_AV_HV_list = []
    with open(filePath_AV_HV, "r") as file:
            data = csv.reader(file)
            for row in data:
                row = list(tuple(convert(value) for convert, value in zip(col_types_1,row)))
                res_AV_HV_list.append(row)

    filePath_HV_HV = './result/result_HV_HV.csv'
    res_HV_HV_list = []
    with open(filePath_HV_HV, "r") as file:
            data = csv.reader(file)
            for row in data:
                row = list(tuple(convert(value) for convert, value in zip(col_types_2,row)))
                res_HV_HV_list.append(row)

    PET_AV_HV = []
    PET_HV_HV = []
    PET_AV_HV_all = []
    PET_HV_HV_all = []
    for res_AV_HV_single in res_AV_HV_list:
        PET_AV_HV_all.append(res_AV_HV_single[2])
        if -50 <= res_AV_HV_single[2] <=50:
            PET_AV_HV.append(res_AV_HV_single[2])
    for res_HV_HV_single in res_HV_HV_list:
        PET_HV_HV_all.append(res_HV_HV_single[3])
        if -50 <= res_HV_HV_single[3] <=50:
            PET_HV_HV.append(res_HV_HV_single[3])

## 根据最初的提取结果，进一步修正

In [None]:
# 根据res_AV_HV_list, res_HV_HV_list，进一步筛选交互事件，生成新的List

dataroot = "E:/ResearchData/Argoverse/train"
av_left_preempt_list=[]
av_left_yield_list=[]

for idx,res in enumerate(res_AV_HV_list):
    
    if res[3]=='AV' :
        
        ego_track,agent_track = tracks_from_res(dataroot,res)
        _, _, ego_idx2cp,agent_idx2cp,cp_position = interaction_judgement(
                                                    ego_track,agent_track)

        ego_start_idx = av_left_interaction_for_acc(dataroot,cp_position,
                                                    ego_track,agent_track,res,
                                                   ego_idx2cp,agent_idx2cp)
        if (ego_start_idx != None) :
            res_c = []
            res_c = res
            if res[2] >= 0:
                res_c = res_c +[ego_idx2cp]
                res_c = res_c +[agent_idx2cp]
                res_c = res_c +[ego_start_idx]
                av_left_preempt_list.append(res_c)
                
            elif res[2] < 0:
                res_c = res_c +[ego_idx2cp]
                res_c = res_c +[agent_idx2cp]
                res_c = res_c +[ego_start_idx]
                av_left_yield_list.append(res_c)
#     if idx==4:
#         break
    if idx%200==0:
        print(f'搜索AV-HV场景-{idx}-, 还剩={len(res_AV_HV_list)-idx}=')


hv_left_preempt_list=[]
hv_left_yield_list=[]
for idx,res in enumerate(res_HV_HV_list):
    
    ego_track,agent_track = tracks_from_res(dataroot,res)
    _, _, ego_idx2cp,agent_idx2cp,cp_position = interaction_judgement(
                                                ego_track,agent_track)

    ego_start_idx = av_left_interaction_for_acc(dataroot,cp_position,
                                                ego_track,agent_track,res,
                                               ego_idx2cp,agent_idx2cp)
    if (ego_start_idx != None) :
        res_c = []
        res_c = res
        if res[3] >= 0:
            res_c = res_c +[ego_idx2cp]
            res_c = res_c +[agent_idx2cp]
            res_c = res_c +[ego_start_idx]
            hv_left_preempt_list.append(res_c)
        elif res[3] < 0:
            res_c = res_c +[ego_idx2cp]
            res_c = res_c +[agent_idx2cp]
            res_c = res_c +[ego_start_idx]
            hv_left_yield_list.append(res_c)
    if idx%200==0:
        print(f'搜索HV-HV场景-{idx}-, 还剩={len(res_HV_HV_list)-idx}=')

In [None]:
# 求AV 和 HV 在 转向让行的场景中协作加速度和速度的分布
dataroot = "E:/ResearchData/Argoverse/train"
av_yield_acc_list=[]
hv_yield_acc_list=[]
av_yield_v_np = np.array([])
hv_yield_v_np = np.array([])

for idx,res in enumerate(av_left_yield_list):
    
    if res[3]=='AV' :
        acc_c = []
        ego_track,agent_track = tracks_from_res(dataroot,res)
        ego_idx2cp, agent_idx2cp, ego_start_idx = res[4:]
        
        v_np = np.array(ego_track.velocity)
        v_np = np.linalg.norm(v_np,axis=1)
        av_yield_v_np = np.append(av_yield_v_np,v_np)
        for time_idx in range(ego_start_idx, len(ego_track.timestep)):
            cal_flag, acc_c_e,_ = acc_c_cal(ego_track,agent_track,
                                          time_idx,ego_idx2cp,agent_idx2cp,'AV')

            if cal_flag :
                acc_c.append(acc_c_e)
            else:
                continue
        acc_c.reverse()
        av_yield_acc_list.append(acc_c)
    if idx%100==0:
        print(f'搜索HV-HV场景-{idx}-, 还剩={len(av_left_yield_list)-idx}=')


for idx,res in enumerate(hv_left_yield_list):

        acc_c = []
        ego_track,agent_track = tracks_from_res(dataroot,res)
        ego_idx2cp, agent_idx2cp, ego_start_idx = res[4:]

        v_np = np.array(ego_track.velocity)
        v_np = np.linalg.norm(v_np,axis=1)
        hv_yield_v_np = np.append(hv_yield_v_np,v_np)
        for time_idx in range(ego_start_idx, len(ego_track.timestep)):
            cal_flag, acc_c_e,_ = acc_c_cal(ego_track,agent_track,
                                          time_idx,ego_idx2cp,agent_idx2cp,'HV')

            if cal_flag :
                acc_c.append(acc_c_e)
            else:
                continue
        acc_c.reverse()
        hv_yield_acc_list.append(acc_c)
        if idx%100==0:
            print(f'搜索HV-HV场景-{idx}-, 还剩={len(hv_left_yield_list)-idx}=')

In [None]:
# 保存 list
# av_left_preempt_list
# av_left_yield_list
# hv_left_preempt_list
# hv_left_yield_list

filePath = './result_data/av_left_preempt_list.csv'
with open(filePath, "w", newline='') as f:
    writer = csv.writer(f)
    for row in av_left_preempt_list:
        writer.writerow(row)

filePath = './result_data/av_left_yield_list.csv'
with open(filePath, "w", newline='') as f:
    writer = csv.writer(f)
    for row in av_left_yield_list:
        writer.writerow(row)

filePath = './result_data/hv_left_preempt_list.csv'
with open(filePath, "w", newline='') as f:
    writer = csv.writer(f)
    for row in hv_left_preempt_list:
        writer.writerow(row)

filePath = './result_data/hv_left_yield_list.csv'
with open(filePath, "w", newline='') as f:
    writer = csv.writer(f)
    for row in hv_left_yield_list:
        writer.writerow(row)


In [None]:
# 加载list

filePath_AV = './result_data/av_left_yield_list.csv'
col_types_1 = [str,int,int,str,int,int,int]
av_left_yield_list = []
with open(filePath_AV, "r") as file:
        data = csv.reader(file)
        for row in data:
            row = list(tuple(convert(value) for convert, value in zip(col_types_1,row)))
            av_left_yield_list.append(row)

filePath_AV = './result_data/av_left_preempt_list.csv'
col_types_1 = [str,int,int,str,int,int,int]
av_left_preempt_list = []
with open(filePath_AV, "r") as file:
        data = csv.reader(file)
        for row in data:
            row = list(tuple(convert(value) for convert, value in zip(col_types_1,row)))
            av_left_preempt_list.append(row)

filePath_HV = './result_data/hv_left_preempt_list.csv'
col_types_1 = [str,int,int,str,int,int,int]
hv_left_preempt_list = []
with open(filePath_HV, "r") as file:
        data = csv.reader(file)
        for row in data:
            row = list(tuple(convert(value) for convert, value in zip(col_types_1,row)))
            hv_left_preempt_list.append(row)

filePath_HV = './result_data/hv_left_yield_list.csv'
col_types_1 = [str,int,int,str,int,int,int]
hv_left_yield_list = []
with open(filePath_HV, "r") as file:
        data = csv.reader(file)
        for row in data:
            row = list(tuple(convert(value) for convert, value in zip(col_types_1,row)))
            hv_left_yield_list.append(row)


In [None]:
# 筛选转向车辆在路口内部没有产生轨迹交点的等待事件！

if __name__=="__main__":
    pi = math.pi

    res_AV_HV_NC = []
    
    dataroot = "E:/ResearchData/Argoverse/train"
    Scenario_ids_list = os.listdir(dataroot)
    Scenario_ids_list_temp = [
                          Scenario_ids_list[18],
                          Scenario_ids_list[34],
                          Scenario_ids_list[58],
                          Scenario_ids_list[192],
                          Scenario_ids_list[206],
                         Scenario_ids_list[328]]
    t0 = time.time()
    
    for idx,scenario_ids in enumerate(Scenario_ids_list):
#         if idx<16361:
#             continue
        if idx%500 == 0:
        
            t1=time.time()
            print(f'Scearching in {idx}th Scenario: \n'
                 f'Totally found {len(res_AV_HV_NC)} AV-HV-NC interactive scenarios \n')
            print(f'该500组场景耗时{t1-t0}s......')
            t0=time.time()
            
        args = Namespace(**{"dataroot": Path(dataroot), "log_id": Path(scenario_ids)})
        scenario_path = Path(args.dataroot) / args.log_id / f"scenario_{args.log_id}.parquet"
        ArgoverseScenario = load_argoverse_scenario_parquet(scenario_path)

        # ego vehicle track info
        if ArgoverseScenario.tracks[-1].track_id != "AV":
            print(f'Scenario {idx} last track is not AV!')
            continue
        else:
            
#             print(f'搜索第{idx}个场景......')
            
            # ego track info
            ego_track = Track_info()
            ego_track.category = 5
            ego_track.track_id = ArgoverseScenario.tracks[-1].track_id
            ego_track.object_type = ArgoverseScenario.tracks[-1].object_type[0:]
            for ego_state_idx in range(len(
                ArgoverseScenario.tracks[-1].object_states)):
                ego_track.timestep.append(ArgoverseScenario.tracks[-1].
                                          object_states[ego_state_idx].timestep)
                ego_track.position.append(ArgoverseScenario.tracks[-1].
                                          object_states[ego_state_idx].position)
                ego_track.velocity.append(ArgoverseScenario.tracks[-1].
                                          object_states[ego_state_idx].velocity)
                ego_track.heading.append(ArgoverseScenario.tracks[-1].
                                         object_states[ego_state_idx].heading)
                ego_track.observed.append(ArgoverseScenario.tracks[-1].
                                          object_states[ego_state_idx].observed)

            ego_track.track_type = search_left_turn_track(ego_track,'veh')
            ego_position_np = np.array(ego_track.position)
            
            # 转向角的变化-微变即可
            ego_heading_np = np.array(ego_track.heading)

            ego_heading_np_fixed =heading_fix_inside(ego_heading_np,'veh')
            if (ego_heading_np_fixed[-1] - ego_heading_np_fixed[0]) < (5*pi/180) :
                continue
            else:
                pass
            
            # 判断ego在哪个路口，轨迹最后一个点没有在交叉口内部则直接continue
            mu_polygons = intersection_extract(args)
            traj_lastpoint = Point(ego_position_np[-1])
            intersection_idx = in_which_intersection(traj_lastpoint,mu_polygons)
            
            if intersection_idx == None:
                continue
            else:
                if mu_polygons.geom_type == 'Polygon':
                    polygon = mu_polygons.convex_hull
                else:
                    polygon = mu_polygons.geoms[intersection_idx].convex_hull
            
            # 交叉口内轨迹截取，且长度>20
            for idx_point in range(len(ego_position_np)-1,-1,-1):
                point_temp = Point(ego_position_np[idx_point])
                if point_temp.within(polygon) and idx_point > 0:
#                     print(f'111:{idx_point}')
                    continue
                elif point_temp.within(polygon) and idx_point == 0:
                    traj_start_idx = idx_point
#                     print(f'222:{idx_point}')
                    break
                elif point_temp.within(polygon) == False and idx_point >= 0:
                    traj_start_idx = idx_point + 1
#                     print(f'333:{idx_point}')
                    break
            
            if len(ego_position_np) - traj_start_idx < 20 :
                continue
            else:
                pass
            
            traj_points = MultiPoint(ego_position_np[traj_start_idx:])
            
            # 车道中心线与左转车道的确定
            log_map_dirpath = Path(args.dataroot) / args.log_id
            avm = ArgoverseStaticMap.from_map_dir(log_map_dirpath, build_raster=False)
            left_lane, midline_leftlane,judg_flag = in_which_leftturnlane(traj_points,
                                                                          avm, polygon)

            if judg_flag == False:
                continue
            else:
                pass
            
#             print(f'444')
            # all agents tracks info
            agents_track: List[Track_info] = []
            for track_idx in range(len(ArgoverseScenario.tracks)-1):
                if ArgoverseScenario.tracks[track_idx].object_type[0:] == 'vehicle':
                    agent_track = Track_info()
                    agent_track.object_type = ArgoverseScenario.tracks[track_idx].object_type[0:]
                    agent_track.category = ArgoverseScenario.tracks[track_idx].category.value
                    agent_track.track_id = ArgoverseScenario.tracks[track_idx].track_id
                    for state_idx in range(len(
                        ArgoverseScenario.tracks[track_idx].object_states)):
                        agent_track.timestep.append(ArgoverseScenario.tracks[track_idx].
                                                    object_states[state_idx].timestep)
                        agent_track.position.append(ArgoverseScenario.tracks[track_idx].
                                                    object_states[state_idx].position[0:])
                        agent_track.velocity.append(ArgoverseScenario.tracks[track_idx].
                                                    object_states[state_idx].velocity[0:])
                        agent_track.heading.append(ArgoverseScenario.tracks[track_idx].
                                                   object_states[state_idx].heading)
                        agent_track.observed.append(ArgoverseScenario.tracks[track_idx].
                                                   object_states[state_idx].observed)

                    agent_position_np = np.array(agent_track.position)
                    if (agent_track.category == 0 or 1 or 2 or 3) :
                        dist = np.hypot(*(agent_position_np[0] - agent_position_np[-1]))
                        if dist>10:
                            agent_track.track_type = search_left_turn_track(
                                                                agent_track,'veh')


                    agents_track.append(agent_track)
            
            
            # 对象直行车与左转车道左边界线有交点，且与左转轨迹没有交点
            
            
            for agent_track in agents_track:
                if (agent_track.track_type == 'straight'):
                    
                    agent_heading_np = np.array(agent_track.heading)
                    
                    agent_position_np = np.array(agent_track.position)
#                     print(np.sum(agent_heading_np[0:3])/3 - np.sum(ego_heading_np[0:3])/3)
                    if (-pi/6 + pi) <= abs(np.sum(agent_heading_np[0:3])/3 - 
                                           np.sum(ego_heading_np[0:3])/3)<= (pi/6 + pi):
#                         print('555')
                        line_ego_traj = LineString(ego_position_np[traj_start_idx:])
#                         line_turn_lane = LineString(left_lane.left_lane_boundary.xyz[:,0:2])
                        line_agent_traj = LineString(agent_position_np)
                        intersec_point_ego = line_agent_traj.intersection(line_ego_traj)
                        intersec_point_lane = line_agent_traj.intersection(midline_leftlane)
                        
                        # -----interaction judgment-----
                        if intersec_point_ego.is_empty == True:
                            if intersec_point_lane.is_empty == False:
                                if intersec_point_lane.type == 'Point':
                                    intersection_position = np.array([intersec_point_lane.xy[0][0],
                                              intersec_point_lane.xy[1][0]])
                                    intersection_position = np.expand_dims(intersection_position,-1).transpose()

                                    agent_dist = scipy.spatial.distance.cdist(intersection_position,
                                                                        agent_position_np)
                                    
                                    agent_time_to_c_idx =np.where(agent_dist[0] == 
                                                                  np.min(agent_dist))[0][0]
                                    
                                    agent_time_to_c = agent_track.timestep[agent_time_to_c_idx]
                                    
                                    if agent_time_to_c > ego_track.timestep[traj_start_idx] + 20:
                                        res_temp = []
                                        res_temp = [scenario_ids,int(agent_track.track_id), 0,
                                                        'AV',0,agent_time_to_c_idx,traj_start_idx]
                                        res_AV_HV_NC.append(res_temp)

In [None]:
# 场景拆分，单个转向车对多个直行车时，TTCP计算会有问题，本函数为了解决这一问题
def interaction_split(res_list,dataroot):
    res_list_new = []
    num_splitted = 0
    if res_list[0][3] == 'AV':
        
        for idx in range(0,len(res_list)):
            if idx == num_splitted:
                search_end_idx = 0
                start_idx = idx

                if idx<=len(res_list)-10:
                    end_idx = idx+10
                else:
                    end_idx = len(res_list)
                for i_idx in range(start_idx+1,end_idx):
                    if res_list[i_idx][0] == res_list[start_idx][0]:
                        search_end_idx = i_idx
                        continue
                    else:
                        break
                if search_end_idx !=0:
#                     print(search_end_idx)
                    res_matrix = res_list[start_idx:search_end_idx+1]
                    df = pd.DataFrame(res_matrix)
                    res_matrix = df.values
                    for idx_temp,row_temp in enumerate(res_matrix):
                        ego_track,agent_track = tracks_from_res(dataroot,row_temp)
                        timestep_egostart = ego_track.timestep[row_temp[6]]
                        timestep_agent2cp = agent_track.timestep[row_temp[5]]
                        if idx_temp == 0:
                            colume2add = np.array([timestep_agent2cp,timestep_egostart])
                        else:
                            colume2add = np.vstack((colume2add,
                                        np.array([timestep_agent2cp,timestep_egostart])))
                    res_matrix = np.hstack((res_matrix,colume2add))
                    res_matrix = res_matrix[res_matrix[:,7].argsort()]
                    for ii_idx in range(1,len(res_matrix)):
                        if res_matrix[ii_idx,8] < res_matrix[ii_idx-1,7]:
#                             print(ii_idx)
                            res_matrix[ii_idx,6] = (res_matrix[ii_idx,6]+
                                                    res_matrix[ii_idx-1,7] - 
                                                    res_matrix[ii_idx,8])
                    res_matrix = np.delete(res_matrix, [7,8], 1)
                    res_m2list = res_matrix.tolist()
                    num_splitted = num_splitted + len(res_m2list)
                    for res_row in res_m2list:
                        res_list_new.append(res_row)
                else:
                    res_m2list = res_list[start_idx]
                    num_splitted = num_splitted + 1
                    res_list_new.append(res_m2list)
            else:
                continue
            
    else:
        for idx in range(0,len(res_list)):
            if idx == num_splitted:
                search_end_idx = 0
                start_idx = idx

                if idx<=len(res_list)-10:
                    end_idx = idx+10
                else:
                    end_idx = len(res_list)
                for i_idx in range(start_idx+1,end_idx):
                    if res_list[i_idx][0] == res_list[start_idx][0] and\
                    res_list[i_idx][1] == res_list[start_idx][1]:
                        search_end_idx = i_idx
                        continue
                    else:
                        break
                if search_end_idx !=0:
                    res_matrix = res_list[start_idx:search_end_idx+1]
                    df = pd.DataFrame(res_matrix)
                    res_matrix = df.values
                    for idx_temp,row_temp in enumerate(res_matrix):
                        ego_track,agent_track = tracks_from_res(dataroot,row_temp)
                        timestep_egostart = ego_track.timestep[row_temp[6]]
                        timestep_agent2cp = agent_track.timestep[row_temp[5]]
                        if idx_temp == 0:
                            colume2add = np.array([timestep_agent2cp,timestep_egostart])
                        else:
                            colume2add = np.vstack((colume2add,
                                        np.array([timestep_agent2cp,timestep_egostart])))
                    res_matrix = np.hstack((res_matrix,colume2add))
                    res_matrix = res_matrix[res_matrix[:,7].argsort()]
                    
                    for ii_idx in range(1,len(res_matrix)):
                        if res_matrix[ii_idx,8] < res_matrix[ii_idx-1,7]:
#                             print(ii_idx)
                            res_matrix[ii_idx,6] = (res_matrix[ii_idx,6]+
                                                    res_matrix[ii_idx-1,7] - 
                                                    res_matrix[ii_idx,8])
                    res_matrix = np.delete(res_matrix, [7,8], 1)
                    res_m2list = res_matrix.tolist()
                    num_splitted = num_splitted + len(res_m2list)
                    for res_row in res_m2list:
                        res_list_new.append(res_row)
                else:
                    res_m2list = res_list[start_idx]
                    num_splitted = num_splitted + 1
                    res_list_new.append(res_m2list)
            else:
                continue

    return res_list_new

In [None]:
av_left_yield_list_new = interaction_split(av_left_yield_list,dataroot)
hv_left_yield_list_new = interaction_split(hv_left_yield_list,dataroot)

In [None]:
# av_left_yield_list_new,hv_left_yield_list_new保存
with open('./result_data/av_left_yield_list_new.data','wb') as filehandle:
    pickle.dump(av_left_yield_list_new,filehandle)
with open('./result_data/hv_left_yield_list_new.data','wb') as filehandle:
    pickle.dump(hv_left_yield_list_new,filehandle)

In [None]:
# 读取
av_left_yield_list_new = []
with open ('./result_data/av_left_yield_list_new.data','rb') as filehandle:
    av_left_yield_list_new = pickle.load(filehandle)
hv_left_yield_list_new = []
with open ('./result_data/hv_left_yield_list_new.data','rb') as filehandle:
    hv_left_yield_list_new = pickle.load(filehandle)

In [None]:
# 求AV 和 HV 在 转向让行的场景中协作加速度和速度的分布
# 上面也有一个cell，这个cell是基于场景拆分结果重新计算的

dataroot = "E:/ResearchData/Argoverse/train"

av_yield_acc_list_new=[]
t0 = time.time()
for idx,res in enumerate(av_left_yield_list_new):
    acc_c = []
    if res[3]=='AV' and len(av_yield_acc_list[idx])>0 and len(av_SL_list[idx][0])>0:
        ego_track,agent_track = tracks_from_res(dataroot,res)
        ego_idx2cp, agent_idx2cp, ego_start_idx = res[4:]
        timestep_egostart = ego_track.timestep[ego_start_idx]
        timestep_agent2cp = agent_track.timestep[agent_idx2cp]
        
        if timestep_egostart < timestep_agent2cp:
            for time_idx in range(ego_start_idx, len(ego_track.timestep)):
                cal_flag, acc_c_e,_, deta_TTCP,deta_v = acc_and_TTCP_cal(ego_track,agent_track,
                                              time_idx,ego_idx2cp,agent_idx2cp,'None')

                if cal_flag :
#                     acc_c.append([acc_c_e])
                    acc_c.append([acc_c_e, deta_TTCP,deta_v])
                else:
                    continue
        else:
            pass
    acc_c.reverse()
    av_yield_acc_list_new.append(acc_c)
    if idx%200==0:
        t1 = time.time()
        print(f'搜索AV-HV场景-{idx}-, 还剩={len(av_left_yield_list_new)-idx}=')
        print(f'耗时{t1-t0}s......')
        t0 = time.time()

hv_yield_acc_list_new=[]
t0 = time.time()
for idx,res in enumerate(hv_left_yield_list_new):
    
    acc_c = []
    if  len(hv_SL_list[idx][0])>0 :
        
        ego_track,agent_track = tracks_from_res(dataroot,res)
        ego_idx2cp, agent_idx2cp, ego_start_idx = res[4:]
        timestep_egostart = ego_track.timestep[ego_start_idx]
        timestep_agent2cp = agent_track.timestep[agent_idx2cp]
        
        if timestep_egostart < timestep_agent2cp:
            for time_idx in range(ego_start_idx, len(ego_track.timestep)):
                cal_flag, acc_c_e,_,deta_TTCP,deta_v = acc_and_TTCP_cal(ego_track,agent_track,
                                              time_idx,ego_idx2cp,agent_idx2cp,'HV')

                if cal_flag :
                    acc_c.append([acc_c_e,deta_TTCP,deta_v])
                else:
                    continue
        else:
            pass
    
    acc_c.reverse()
    hv_yield_acc_list_new.append(acc_c)
    if idx%500==0:
        t1 = time.time()
        print(f'搜索HV-HV场景-{idx}-, 还剩={len(hv_left_yield_list_new)-idx}=')
        print(f'耗时{t1-t0}s......')
        t0 = time.time()

In [None]:
# av_yield_acc_list_new, hv_yield_acc_list_new 保存
with open('./result_data/av_yield_acc_list_new.data','wb') as filehandle:
    pickle.dump(av_yield_acc_list_new,filehandle)
with open('./result_data/hv_yield_acc_list_new.data','wb') as filehandle:
    pickle.dump(hv_yield_acc_list_new,filehandle)

In [None]:
# 读取
av_yield_acc_list_new = []
with open ('./result_data/av_yield_acc_list_new.data','rb') as filehandle:
    av_yield_acc_list_new = pickle.load(filehandle)
hv_yield_acc_list_new = []
with open ('./result_data/hv_yield_acc_list_new.data','rb') as filehandle:
    hv_yield_acc_list_new = pickle.load(filehandle)