In [1]:
import pandas as pd
import numpy as np
import sys
import ast
from tqdm import tqdm
from pyspark.sql.utils import AnalysisException
from pyspark.sql import SparkSession
from shapely.geometry import Point, LineString
from shapely.ops import unary_union
sys.path.append(r'/mnt/vasp-got-da/scripts/team_data_analytics/josefin/adas_mtd/')
import src.db.db_helper_dsl3 as dbh

In [2]:
config_path = '../config.yaml'
from scripts.dataloader import OnlineSpark
session = OnlineSpark(config_path,size='large')
session.path_dict

{'sut': {'root': 's3a://vasp-got-da/data/spark/rwup/sut/',
  'flc_obj': 's3a://vasp-got-da/data/spark/rwup/sut/record_id*flc*R5_object*',
  'vehicle': 's3a://vasp-got-da/data/spark/rwup/sut/record_id*vehicle*'},
 'avl_reference': {'root': 's3a://vasp-got-da/data/spark/rwup/avl_reference/',
  'ego': 's3a://vasp-got-da/data/spark/rwup/avl_reference/record_id*_ego_df*',
  'line': 's3a://vasp-got-da/data/spark/rwup/avl_reference/record_id*_line_df*',
  'object': 's3a://vasp-got-da/data/spark/rwup/avl_reference/record_id*_object_df*'}}

In [3]:
def load_ego_yaw(recording_id: str, time_interval: list=None) -> pd.DataFrame :
    ego = session.loadParquet('avl_reference','ego',record_id=recording_id).select('ptp_time', 'Yaw', 'PositionX', 'PositionY')
    if time_interval is not None:
        sample_time_range = [pd.to_datetime(time_interval[0], unit='s'),
                    pd.to_datetime(time_interval[1], unit='s')]
        # print(sample_time_range)
        lc_sample = ego.filter(ego.ptp_time.between(time_interval[0],time_interval[1])).toPandas()
    else:        
        lc_sample = ego.toPandas()
    # lc_sample['ptp_time'] = pd.to_datetime(lc_sample['ptp_time'], unit='s')
    return lc_sample

In [4]:
def load_line(recording_id: str, time_interval: list):
    line = session.loadParquet('avl_reference','line',record_id=recording_id).select('ptp_time', 'BoundaryLineX', 'BoundaryLineY')
    line_sample = line.filter(line.ptp_time.between(time_interval[0],time_interval[1])).toPandas()
    return line_sample

In [5]:
def find_lc(df, minimum=30, maximum=70):
    matching_ptp_times = []

    i = 0
    while i < len(df):
        current_yaw = df.loc[i, 'Yaw']
        current_ptp_time = df.loc[i, 'ptp_time']
        found_matching_pair = False

        # Check the next maximum rows
        for j in range(i + 1, min(i + maximum + 1, len(df))):
            if (abs(df.loc[j, 'Yaw'] - current_yaw) <= 0.001) and \
                (abs(df.loc[round((i+j)/2), 'Yaw'] - current_yaw)>0.015) and \
                (abs(df.loc[round((i+j)/2), 'Yaw'] - current_yaw)<0.1):
                # Check if the matching row is within the range
                if j >= i + minimum:
                    matching_pair = (0.5*current_ptp_time-6+0.5*df.loc[j, 'ptp_time'], 0.5*current_ptp_time+6+0.5*df.loc[j, 'ptp_time'])
                    matching_ptp_times.append(matching_pair)
                    # print(matching_pair)
                    i = i + 41 
                    found_matching_pair = True
                    break  # If a match is found, exit the inner loop
                else:
                    i += 1
                    break

        if not found_matching_pair:
            i += 1   
    return matching_ptp_times

In [6]:
def if_cross(BoundaryX, BoundaryY, PositionX, PositionY, buffer_distance=0.1):
    line_points = [(x, y) for x, y in zip(BoundaryX, BoundaryY)]
    if len(line_points) < 2:
        return False
    boundary_line = LineString(line_points)
    buffered_line = boundary_line.buffer(buffer_distance)
    union_buffer = unary_union([buffered_line])
    is_in_buffer = union_buffer.contains(Point(PositionX, PositionY))
    return is_in_buffer

In [7]:
from shapely.geometry import Point, LineString
from shapely.ops import unary_union
def validate_lc(df, recording_id, lc_times, highway_time_interval):
    filtered_lc = []
    ego_sample = df
    line_sample = load_line(recording_id, highway_time_interval)
    for lc_time in lc_times:
        for index, ego_position in ego_sample[(ego_sample['ptp_time']>=lc_time[0])&(ego_sample['ptp_time']<=lc_time[1])].iterrows():
            PositionX = ego_position['PositionX']
            PositionY = ego_position['PositionY']
            ego_time = ego_position['ptp_time']
            for idx, line in line_sample[line_sample['ptp_time']==ego_time].iterrows():
                if if_cross(line['BoundaryLineX'], line['BoundaryLineY'], PositionX, PositionY):
                    filtered_lc.append(lc_time)
            break
    return filtered_lc

In [8]:
import ast
import pandas as pd
from tqdm import tqdm
from pyspark.sql.utils import AnalysisException

highway_df = pd.read_csv('/home/a494189/vasp-got-da/scripts/team_data_analytics/Lingbin_Bokuan/Bokuan/road_type_motorway_interval_non_empty_filtered.csv')

with tqdm(total=1571) as pbar_intervals:
    out = []
    for highway_index, highway_row in highway_df.iterrows():
        try:
            recording_id = highway_row[1]
            time_intervals = ast.literal_eval(highway_row[2])
            ego_df_whole = load_ego_yaw(recording_id)
            for time_interval in time_intervals:
                try:
                    ego_df = ego_df_whole[ego_df_whole['ptp_time'].between(time_interval[0],time_interval[1])].reset_index(drop=True)
                    lc1 = find_lc(ego_df)
                    lc2 = validate_lc(ego_df, recording_id, lc1, time_interval)
                    if len(lc2)>0:
                        out.append([recording_id, lc2])
                except Exception as e:
                    print(f"Skipping {recording_id} due to an error: {e}")
                pbar_intervals.update(1)
        except AnalysisException as e:
            print(f"Skipping {recording_id} due to missing path: {e}")


  0%|          | 0/1571 [00:00<?, ?it/s]

 18%|█▊        | 287/1571 [27:14<2:01:50,  5.69s/it]


KeyboardInterrupt: 

In [9]:
out

[['FM1073_20220606_140009', [(1654524227.41, 1654524239.41)]],
 ['FM1073_20220606_140009', [(1654526065.1100001, 1654526077.1100001)]],
 ['FM1073_20220315_130253', [(1647350154.3600001, 1647350166.3600001)]]]

In [None]:
df = pd.DataFrame(out, columns=['recording_id', 'lc'])

csv_file_path = '/home/a494189/vasp-got-da/scripts/team_data_analytics/Lingbin_Bokuan/Bokuan/lc.csv'

df.to_csv(csv_file_path, index=False)

NameError: name 'pd' is not defined

In [4]:
def load_ego_yaw(recording_id: str, time_interval: list=None):
    ego = session.loadParquet('avl_reference','ego',record_id=recording_id).select('ptp_time', 'Yaw', 'PositionX', 'PositionY')
    if time_interval is not None:
        ego = ego.filter((ego.ptp_time >= time_interval[0]) & (ego.ptp_time <= time_interval[1]))
    return ego

def load_line(recording_id: str, time_interval: list):
    line = session.loadParquet('avl_reference','line',record_id=recording_id).select('ptp_time', 'BoundaryLineX', 'BoundaryLineY')
    line = line.filter((line.ptp_time >= time_interval[0]) & (line.ptp_time <= time_interval[1]))
    return line

def find_lc(df, minimum=40, maximum=80):
    df_pd = df.toPandas()
    matching_ptp_times = []
    i = 0

    while i < len(df_pd):
        current_yaw = df_pd.loc[i, 'Yaw']
        current_ptp_time = df_pd.loc[i, 'ptp_time']
        found_matching_pair = False

        for j in range(i + 1, min(i + maximum + 1, len(df_pd))):
            if (abs(df_pd.loc[j, 'Yaw'] - current_yaw) <= 0.001) and \
               (abs(df_pd.loc[round((i+j)/2), 'Yaw'] - current_yaw) > 0.013) and \
               (abs(df_pd.loc[round((i+j)/2), 'Yaw'] - current_yaw) < 0.2):
                if j >= i + minimum:
                    matching_pair = (0.5 * current_ptp_time - 6 + 0.5 * df_pd.loc[j, 'ptp_time'], 0.5 * current_ptp_time + 6 + 0.5 * df_pd.loc[j, 'ptp_time'])
                    matching_ptp_times.append(matching_pair)
                    i = i + 41
                    found_matching_pair = True
                    break
                else:
                    i += 1
                    break

        if not found_matching_pair:
            i += 1

    return matching_ptp_times

def if_cross(BoundaryX, BoundaryY, PositionX, PositionY, buffer_distance=0.1):
    line_points = [(x, y) for x, y in zip(BoundaryX, BoundaryY)]
    if len(line_points) < 2:
        return False
    boundary_line = LineString(line_points)
    buffered_line = boundary_line.buffer(buffer_distance)
    union_buffer = unary_union([buffered_line])
    return union_buffer.contains(Point(PositionX, PositionY))

def validate_lc(df, recording_id, lc_times, highway_time_interval):
    filtered_lc = []
    line_sample = load_line(recording_id, highway_time_interval).toPandas()

    for lc_time in lc_times:
        ego_sample = df.filter((df.ptp_time >= lc_time[0]) & (df.ptp_time <= lc_time[1])).toPandas()
        for _, ego_position in ego_sample.iterrows():
            PositionX = ego_position['PositionX']
            PositionY = ego_position['PositionY']
            ego_time = ego_position['ptp_time']

            for _, line in line_sample[line_sample['ptp_time'] == ego_time].iterrows():
                if if_cross(line['BoundaryLineX'], line['BoundaryLineY'], PositionX, PositionY):
                    filtered_lc.append(lc_time)
            break

    return filtered_lc

highway_df = pd.read_csv('/home/a494189/vasp-got-da/scripts/team_data_analytics/Lingbin_Bokuan/Bokuan/road_type_motorway_interval_non_empty_filtered.csv')

with tqdm(total=1571) as pbar_intervals:
    out = []
    for _, highway_row in highway_df.iterrows():
        try:
            recording_id = highway_row[1]
            time_intervals = ast.literal_eval(highway_row[2])
            ego_df_whole = load_ego_yaw(recording_id)
            
            for time_interval in time_intervals:
                try:
                    ego_df = ego_df_whole.filter((ego_df_whole.ptp_time >= (time_interval[0]-10)) & (ego_df_whole.ptp_time <= (time_interval[1]+10)))
                    lc1 = find_lc(ego_df)
                    lc2 = validate_lc(ego_df, recording_id, lc1, time_interval)
                    
                    if lc2:
                        out.append([recording_id, lc2])
                except Exception as e:
                    print(f"Skipping {time_interval} in {recording_id} for: {e}")
                
                pbar_intervals.update(1)
        
        except AnalysisException as e:
            print(f"Skipping {recording_id} for: {e}")

100%|██████████| 1571/1571 [2:57:42<00:00,  6.79s/it]  
