#### From time predictions to location predictions

We will try predicting the road condition in 1 meter intervals.

The issue before have been we have only used intervals, which just had the length of whatever far one could travel in 1 second. 

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib import cm
import h5py
from scipy.interpolate import CubicSpline
from copy import deepcopy
from tqdm import tqdm
import re
import csv


In [2]:
parameter_dict = {
        'acc_long':     {'bstar': 198,      'rstar': 1,     'b': 198,   'r': 0.05   },
        'acc_trans':    {'bstar': 32768,    'rstar': 1,     'b': 32768, 'r': 0.04   },
        'acc_yaw':      {'bstar': 2047,     'rstar': 1,     'b': 2047,  'r': 0.1    },
        'brk_trq_elec': {'bstar': 4096,     'rstar': -1,    'b': 4098,  'r': -1     },
        'whl_trq_est':  {'bstar': 12800,    'rstar': 0.5,   'b': 12700, 'r': 1      },
        'trac_cons':    {'bstar': 80,       'rstar': 1,     'b': 79,    'r': 1      },
        'trip_cons':    {'bstar': 0,        'rstar': 0.1,   'b': 0,     'r': 1      }
    }

def convertdata(data, parameter):
    bstar = parameter['bstar']
    rstar = parameter['rstar']
    b = parameter['b']
    r = parameter['r']
    # We only convert data in the second column at idx 1 (wrt. 0-indexing), as the first column is time
    col0 = data[:,0]
    col1 = ((data[:,1]-bstar*rstar)-b)*r
    data = np.column_stack((col0, col1))
    return data


def unpack_hdf5(hdf5_file, convert: bool = False):
    with h5py.File(hdf5_file, 'r') as f:
        print(f)
        data = unpack_hdf5_(f, convert)
    return data


def unpack_hdf5_(group, convert: bool = False):
    data = {}
    for key in group.keys():
        if isinstance(group[key], h5py.Group):
            data[key] = unpack_hdf5_(group[key])
        else:
            if convert and key in parameter_dict:
                data[key] = convertdata(group[key][()], parameter_dict[key])
            else:
                d = group[key][()]
                if isinstance(d, bytes):
                    data[key] = d.decode('utf-8')
                else:
                    data[key] = group[key][()]
    return data


def find_best_start_and_end_indeces_by_lonlat(trip: np.ndarray, section: np.ndarray):
    # Find the start and end indeces of the section data that are closest to the trip data
    lon_a, lat_a = trip[:,0], trip[:,1]
    lon_b, lat_b = section[:,0], section[:,1]
    
    start_index = np.argmin(np.linalg.norm(np.column_stack((lon_a, lat_a)) - np.array([lon_b[0], lat_b[0]]), axis=1))
    end_index = np.argmin(np.linalg.norm(np.column_stack((lon_a, lat_a)) - np.array([lon_b[-1], lat_b[-1]]), axis=1))

    return start_index, end_index


def time_to_drive_X_meters(speed_kmh, distance_m=10):
    # Convert speed from km/h to m/s
    speed_ms = speed_kmh * (1000 / 3600)

    # Calculate time in seconds
    time_seconds = distance_m / speed_ms
    
    return time_seconds

### Note: We are only doing it for the right side (hh) here

In [3]:
# Right side
autopi_hh = unpack_hdf5('../data/raw/AutoPi_CAN/platoon_CPH1_HH.hdf5')
gm_data = autopi_hh['GM']['16006']['pass_1'] # TODO choose a one true route. 
p79_hh = pd.read_csv('../data/raw/ref_data/cph1_zp_hh.csv', sep=';', encoding='unicode_escape')

segments = h5py.File('../data/processed/segments.hdf5', 'r')     

<HDF5 file "platoon_CPH1_HH.hdf5" (mode r)>


In [9]:
for segment in segments:
    for second in segments[segment]:
        for key in segments[segment][second]:
            print(segments[segment][second].keys())

<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopro', 'p79']>
<KeysViewHDF5 ['aran', 'gm', 'gopr

In [182]:
# HH 
# Extract every 10th item starting from idx[0] to idx[1]+1, to get one location for each meter
lon_zp_hh = p79_hh['Lon']
lat_zp_hh = p79_hh['Lat']
idx_hh = find_best_start_and_end_indeces_by_lonlat(p79_hh[['Lat', 'Lon']].to_numpy(), gm_data['gps'][:,1:]) # TODO is this really the best place to start?? 
loc_hh_lon = lon_zp_hh[idx_hh[0]:idx_hh[1]+1:10]
loc_hh_lat = lat_zp_hh[idx_hh[0]:idx_hh[1]+1:10]

# Combine lon and lat into a list of lists
loc_hh = [[lon, lat] for lon, lat in zip(loc_hh_lon, loc_hh_lat)]

# For demonstration, let's print the first few pairs to verify
print(len(loc_hh))
print(loc_hh[:5])

1226
[[12.53019946, 55.7111808], [12.53018721, 55.71117492], [12.53017498, 55.71116905], [12.53016273, 55.71116317], [12.53015047, 55.7111573]]


In [183]:
# Define all_trip_names and pass_lists
def natural_key(string):
    """A key to sort strings that contain numbers naturally."""
    return [int(text) if text.isdigit() else text.lower() for text in re.split(r'(\d+)', string)]

all_trip_names = ['16006', '16008', '16009', '16010', '16011']
# Initialize the dictionary with empty sets for each trip
pass_names_for_each_trip = {trip: set() for trip in all_trip_names}

# Populate the sets with pass names from the segments
for segment in segments.values():
    trip_name = segment.attrs["trip_name"]
    pass_name = segment.attrs["pass_name"]
    pass_names_for_each_trip[trip_name].add(pass_name)

# Convert sets to sorted lists using the natural key for sorting
for trip in pass_names_for_each_trip:
    pass_names_for_each_trip[trip] = sorted(pass_names_for_each_trip[trip], key=natural_key)

# Variable assignments for pass lists
for trip, passes in pass_names_for_each_trip.items():
    globals()[f'pass_list_{trip}'] = passes

# Displaying the sorted lists (this would typically be for debugging or checking, not part of production code)
pass_lists = {trip: sorted(passes, key=natural_key) for trip, passes in pass_names_for_each_trip.items()}
for trip in pass_lists:
    print(f"{trip}: {pass_lists[trip]}")
    

16006: ['pass_1', 'pass_2', 'pass_3', 'pass_4', 'pass_5', 'pass_6', 'pass_7', 'pass_8', 'pass_9', 'pass_10', 'pass_11', 'pass_12', 'pass_13', 'pass_14', 'pass_15', 'pass_16', 'pass_17', 'pass_18', 'pass_19']
16008: ['pass_1', 'pass_2', 'pass_3', 'pass_4', 'pass_5', 'pass_6', 'pass_7', 'pass_8', 'pass_9', 'pass_10', 'pass_11', 'pass_12', 'pass_13', 'pass_14', 'pass_15', 'pass_16', 'pass_17']
16009: ['pass_1', 'pass_2', 'pass_3', 'pass_4', 'pass_5', 'pass_6', 'pass_7', 'pass_8', 'pass_9', 'pass_10', 'pass_11', 'pass_12', 'pass_13', 'pass_14']
16010: ['pass_1', 'pass_2', 'pass_3', 'pass_4', 'pass_5', 'pass_6', 'pass_7', 'pass_8', 'pass_9', 'pass_10', 'pass_11', 'pass_12', 'pass_13', 'pass_14']
16011: ['pass_1', 'pass_2', 'pass_3', 'pass_4', 'pass_5', 'pass_6', 'pass_7', 'pass_8', 'pass_9', 'pass_10', 'pass_11', 'pass_12', 'pass_13', 'pass_14']


In [184]:
# Initialize the mapping dictionary
mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip = {}

for index in range(len(loc_hh)):
    trip_data = {}
    for trip_name in all_trip_names:
        pass_data = {}
        for pass_name in pass_lists[trip_name]:
            pass_data[pass_name] = {
                "distance_segment_second_1": [100, 0, 0],
                "distance_segment_second_2": [200, 0, 0]
            }
        trip_data[trip_name] = pass_data
    mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip[index] = trip_data

# Accessing the structure for the first index
print(mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip[0])
print(mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip[0]['16011'])
print(mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip[0]['16011']['pass_1'])
print(mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip[0]['16011']['pass_1']['distance_segment_second_1'])


{'16006': {'pass_1': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_2': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_3': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_4': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_5': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_6': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_7': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_8': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_9': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_10': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_11': {'distance_segmen

In [186]:
# Now fill out the dictionary with the correct values
for index, real_location in tqdm(enumerate(loc_hh), total=len(loc_hh), desc="Processing"):
    for segment in segments.keys():
        current_direction = segments[str(segment)].attrs["direction"]
        if current_direction == "vh": # Check that we are going in the right direction
            continue
        current_trip_name = segments[str(segment)].attrs["trip_name"]
        current_pass_name = segments[str(segment)].attrs["pass_name"]
        current_best_trip_1 = mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip[index][current_trip_name][current_pass_name]['distance_segment_second_1']
        current_best_trip_2 = mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip[index][current_trip_name][current_pass_name]['distance_segment_second_2']
        for second in segments[str(segment)].keys():
            current_second = segments[str(segment)][str(second)]
            current_second_lat = current_second["gm"][:,15]
            current_second_lon = current_second["gm"][:,16]
            current_second_locations = [[lon, lat] for lon, lat in zip(current_second_lon, current_second_lat)]
            closest_sample_arg = np.argmin(np.linalg.norm(np.column_stack((current_second_lon, current_second_lat)) - np.array([real_location[0], real_location[1]]), axis=1))
            best_at_second = current_second_locations[closest_sample_arg]
            distance = np.linalg.norm(np.array(best_at_second) - np.array(real_location))
            
            if distance < current_best_trip_1[0]:
                current_best_trip_2 = current_best_trip_1
                current_best_trip_1 = [distance, segment, second]
                mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip[index][current_trip_name][current_pass_name]['distance_segment_second_1'] = current_best_trip_1
                mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip[index][current_trip_name][current_pass_name]['distance_segment_second_2'] = current_best_trip_2
                
            elif distance < current_best_trip_2[0]:
                current_best_trip_2 = [distance, segment, second]
                mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip[index][current_trip_name][current_pass_name]['distance_segment_second_2'] = current_best_trip_2
            
            else: 
                continue

# Save the HH mapping to a CSV file
filename = "mapping_hh_time_to_location.csv"

# Write to CSV
with open(filename, mode='w', newline='') as file:
    writer = csv.writer(file)
    # Write the header
    writer.writerow(['Index', 'Trip Name', 'Pass Name', 'Distance Segment', 'Value1', 'Value2', 'Value3'])
    # Write the data
    for index, trips in mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip.items():
        for trip_name, passes in trips.items():
            for pass_name, segments in passes.items():
                for segment_name, values in segments.items():
                    # Prepare the row with all needed information
                    row = [index, trip_name, pass_name, segment_name] + values
                    writer.writerow(row)

print("CSV file has been created successfully.")


Processing: 100%|██████████| 1226/1226 [55:11<00:00,  2.70s/it]


CSV file has been created successfully.


In [None]:
# Save the HH mapping to a CSV file
filename = "mapping_hh_time_to_location.csv"

# Write to CSV
with open(filename, mode='w', newline='') as file:
    writer = csv.writer(file)
    # Write the header
    writer.writerow(['Index', 'Trip Name', 'Pass Name', 'Distance Segment', 'Value1', 'Value2', 'Value3'])
    # Write the data
    for index, trips in mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip.items():
        for trip_name, passes in trips.items():
            for pass_name, segments in passes.items():
                for segment_name, values in segments.items():
                    # Prepare the row with all needed information
                    row = [index, trip_name, pass_name, segment_name] + values
                    writer.writerow(row)

print("CSV file has been created successfully.")

In [175]:
mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip[0]["16009"]

{'pass_1': {'distance_segment_second_1': [1.8639465280960233e-05, '38', '2'],
  'distance_segment_second_2': [4.218541846208998e-05, '38', '3']},
 'pass_2': {'distance_segment_second_1': [100, 0, 0],
  'distance_segment_second_2': [200, 0, 0]},
 'pass_3': {'distance_segment_second_1': [4.120648748128986e-05, '44', '2'],
  'distance_segment_second_2': [6.30595759780742e-05, '44', '3']},
 'pass_4': {'distance_segment_second_1': [100, 0, 0],
  'distance_segment_second_2': [200, 0, 0]},
 'pass_5': {'distance_segment_second_1': [2.508377363051862e-05, '47', '2'],
  'distance_segment_second_2': [2.6619532700902304e-05, '47', '1']},
 'pass_6': {'distance_segment_second_1': [100, 0, 0],
  'distance_segment_second_2': [200, 0, 0]},
 'pass_7': {'distance_segment_second_1': [0.00012054763207898648, '49', '0'],
  'distance_segment_second_2': [0.0001305584327341958, '49', '1']},
 'pass_8': {'distance_segment_second_1': [100, 0, 0],
  'distance_segment_second_2': [200, 0, 0]},
 'pass_9': {'distance_

In [129]:
# TODO plot segments for each car.  
for segment in segments.keys():
    current_trip_name = segments[str(segment)].attrs["trip_name"]
    first_second = segments[str(segment)][str(0)]
    
    for second in segments[str(segment)].keys():
        current_second = segments[str(segment)][str(second)]
        current_second_lat = current_second["gm"][:,15]
        current_second_lon = current_second["gm"][:,16]
        current_second_locations = [[lon, lat] for lon, lat in zip(current_second_lon, current_second_lat)]
        closest_sample_arg = np.argmin(np.linalg.norm(np.column_stack((current_second_lon, current_second_lat)) - np.array([real_location[0], real_location[1]]), axis=1))
        best_at_second = current_second_locations[closest_sample_arg]
        distance = np.linalg.norm(np.array(best_at_second) - np.array(real_location))
        
        if distance < current_best_trip_1[0]:
            current_best_trip_2 = current_best_trip_1
            current_best_trip_1 = [distance, segment, second]
            mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip[index][current_trip_name]['distance_segment_second_1'] = current_best_trip_1
            mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip[index][current_trip_name]['distance_segment_second_2'] = current_best_trip_2
            
        elif distance < current_best_trip_2[0]:
            current_best_trip_2 = [distance, segment, second]
            mapping_hh_to_the_two_best_seconds_for_each_pass_in_each_trip[index][current_trip_name]['distance_segment_second_2'] = current_best_trip_2
        
        else: 
            continue

### Save as pd

In [131]:
hdf5_filename = 'trip_data_hh.hdf5'

# Create a new HDF5 file
with h5py.File(hdf5_filename, 'w') as hdf:
    for trip_id, segments in ex_mapping_to_the_two_best_seconds_for_each_trip.items():
        group = hdf.create_group(str(trip_id))
        for segment_id, segment_data in segments.items():
            # Convert each segment's data into a numpy array for easier handling
            ds_data = np.array([segment_data['distance_segment_second_1'],
                                segment_data['distance_segment_second_2']])
            # Each dataset is named after the segment ID
            group.create_dataset(segment_id, data=ds_data)

### Note: We are only doing it for the left side (vh) here

In [18]:
# Left side
autopi_vh = unpack_hdf5('../data/raw/AutoPi_CAN/platoon_CPH1_VH.hdf5')
gm_data = autopi_vh['GM']['16006']['pass_2'] # pass two is a VH route
p79_vh = pd.read_csv('../data/raw/ref_data/cph1_zp_vh.csv', sep=';', encoding='unicode_escape')

segments = h5py.File('../data/processed/segments.hdf5', 'r')     

<HDF5 file "platoon_CPH1_VH.hdf5" (mode r)>


In [19]:
# VH 
# Extract every 10th item starting from idx[0] to idx[1]+1, to get one location for each meter
lon_zp_vh = p79_vh['Lon']
lat_zp_vh = p79_vh['Lat']
idx_vh = find_best_start_and_end_indeces_by_lonlat(p79_vh[['Lat', 'Lon']].to_numpy(), gm_data['gps'][:,1:]) # TODO is this really the best place to start?? 
loc_vh_lon = lon_zp_vh[idx_vh[0]:idx_vh[1]+1:10]
loc_vh_lat = lat_zp_vh[idx_vh[0]:idx_vh[1]+1:10]

# Combine lon and lat into a list of lists
loc_vh = [[lon, lat] for lon, lat in zip(loc_vh_lon, loc_vh_lat)]

# For demonstration, let's print the first few pairs to verify
print(len(loc_vh))
print(loc_vh[:5])

940
[[12.52125164, 55.70485965], [12.52125497, 55.70486815], [12.52125829, 55.70487666], [12.52126164, 55.70488519], [12.52126493, 55.70489373]]


In [20]:
# Define all_trip_names and pass_lists
def natural_key(string):
    """A key to sort strings that contain numbers naturally."""
    return [int(text) if text.isdigit() else text.lower() for text in re.split(r'(\d+)', string)]

all_trip_names = ['16006', '16008', '16009', '16010', '16011']
# Initialize the dictionary with empty sets for each trip
pass_names_for_each_trip = {trip: set() for trip in all_trip_names}

# Populate the sets with pass names from the segments
for segment in segments.values():
    trip_name = segment.attrs["trip_name"]
    pass_name = segment.attrs["pass_name"]
    pass_names_for_each_trip[trip_name].add(pass_name)

# Convert sets to sorted lists using the natural key for sorting
for trip in pass_names_for_each_trip:
    pass_names_for_each_trip[trip] = sorted(pass_names_for_each_trip[trip], key=natural_key)

# Variable assignments for pass lists
for trip, passes in pass_names_for_each_trip.items():
    globals()[f'pass_list_{trip}'] = passes

# Displaying the sorted lists (this would typically be for debugging or checking, not part of production code)
pass_lists = {trip: sorted(passes, key=natural_key) for trip, passes in pass_names_for_each_trip.items()}
for trip in pass_lists:
    print(f"{trip}: {pass_lists[trip]}")
    

16006: ['pass_1', 'pass_2', 'pass_3', 'pass_4', 'pass_5', 'pass_6', 'pass_7', 'pass_8', 'pass_9', 'pass_10', 'pass_11', 'pass_12', 'pass_13', 'pass_14', 'pass_15', 'pass_16', 'pass_17', 'pass_18', 'pass_19']
16008: ['pass_1', 'pass_2', 'pass_3', 'pass_4', 'pass_5', 'pass_6', 'pass_7', 'pass_8', 'pass_9', 'pass_10', 'pass_11', 'pass_12', 'pass_13', 'pass_14', 'pass_15', 'pass_16', 'pass_17']
16009: ['pass_1', 'pass_2', 'pass_3', 'pass_4', 'pass_5', 'pass_6', 'pass_7', 'pass_8', 'pass_9', 'pass_10', 'pass_11', 'pass_12', 'pass_13', 'pass_14']
16010: ['pass_1', 'pass_2', 'pass_3', 'pass_4', 'pass_5', 'pass_6', 'pass_7', 'pass_8', 'pass_9', 'pass_10', 'pass_11', 'pass_12', 'pass_13', 'pass_14']
16011: ['pass_1', 'pass_2', 'pass_3', 'pass_4', 'pass_5', 'pass_6', 'pass_7', 'pass_8', 'pass_9', 'pass_10', 'pass_11', 'pass_12', 'pass_13', 'pass_14']


In [24]:
# Initialize the mapping dictionary
mapping_vh_to_the_two_best_seconds_for_each_pass_in_each_trip = {}

for index in range(len(loc_vh)):
    trip_data = {}
    for trip_name in all_trip_names:
        pass_data = {}
        for pass_name in pass_lists[trip_name]:
            pass_data[pass_name] = {
                "distance_segment_second_1": [100, 0, 0],
                "distance_segment_second_2": [200, 0, 0]
            }
        trip_data[trip_name] = pass_data
    mapping_vh_to_the_two_best_seconds_for_each_pass_in_each_trip[index] = trip_data

# Accessing the structure for the first index
print(mapping_vh_to_the_two_best_seconds_for_each_pass_in_each_trip[0])
print(mapping_vh_to_the_two_best_seconds_for_each_pass_in_each_trip[0]['16011'])
print(mapping_vh_to_the_two_best_seconds_for_each_pass_in_each_trip[0]['16011']['pass_1'])
print(mapping_vh_to_the_two_best_seconds_for_each_pass_in_each_trip[0]['16011']['pass_1']['distance_segment_second_1'])


{'16006': {'pass_1': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_2': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_3': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_4': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_5': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_6': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_7': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_8': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_9': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_10': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_11': {'distance_segmen

In [25]:
# Now fill out the dictionary with the correct values
for index, real_location in tqdm(enumerate(loc_vh), total=len(loc_vh), desc="Processing"):
    for segment in segments.keys():
        current_direction = segments[str(segment)].attrs["direction"]
        if current_direction == "hh": # Check that we are going in the right direction
            continue
        current_trip_name = segments[str(segment)].attrs["trip_name"]
        current_pass_name = segments[str(segment)].attrs["pass_name"]
        current_best_trip_1 = mapping_vh_to_the_two_best_seconds_for_each_pass_in_each_trip[index][current_trip_name][current_pass_name]['distance_segment_second_1']
        current_best_trip_2 = mapping_vh_to_the_two_best_seconds_for_each_pass_in_each_trip[index][current_trip_name][current_pass_name]['distance_segment_second_2']
        for second in segments[str(segment)].keys():
            current_second = segments[str(segment)][str(second)]
            current_second_lat = current_second["gm"][:,15]
            current_second_lon = current_second["gm"][:,16]
            current_second_locations = [[lon, lat] for lon, lat in zip(current_second_lon, current_second_lat)]
            closest_sample_arg = np.argmin(np.linalg.norm(np.column_stack((current_second_lon, current_second_lat)) - np.array([real_location[0], real_location[1]]), axis=1))
            best_at_second = current_second_locations[closest_sample_arg]
            distance = np.linalg.norm(np.array(best_at_second) - np.array(real_location))
            
            if distance < current_best_trip_1[0]:
                current_best_trip_2 = current_best_trip_1
                current_best_trip_1 = [distance, segment, second]
                mapping_vh_to_the_two_best_seconds_for_each_pass_in_each_trip[index][current_trip_name][current_pass_name]['distance_segment_second_1'] = current_best_trip_1
                mapping_vh_to_the_two_best_seconds_for_each_pass_in_each_trip[index][current_trip_name][current_pass_name]['distance_segment_second_2'] = current_best_trip_2
                
            elif distance < current_best_trip_2[0]:
                current_best_trip_2 = [distance, segment, second]
                mapping_vh_to_the_two_best_seconds_for_each_pass_in_each_trip[index][current_trip_name][current_pass_name]['distance_segment_second_2'] = current_best_trip_2
            
            else: 
                continue

# Save the VH mapping to a CSV file
filename = "mapping_vh_time_to_location.csv"

# Write to CSV
with open(filename, mode='w', newline='') as file:
    writer = csv.writer(file)
    # Write the header
    writer.writerow(['Index', 'Trip Name', 'Pass Name', 'Distance Segment', 'Value1', 'Value2', 'Value3'])
    # Write the data
    for index, trips in mapping_vh_to_the_two_best_seconds_for_each_pass_in_each_trip.items():
        for trip_name, passes in trips.items():
            for pass_name, segments in passes.items():
                for segment_name, values in segments.items():
                    # Prepare the row with all needed information
                    row = [index, trip_name, pass_name, segment_name] + values
                    writer.writerow(row)

print("CSV file has been created successfully.")


Processing: 100%|██████████| 940/940 [38:49<00:00,  2.48s/it]


CSV file has been created successfully.


### END

#### After mapping each AOI to 2 time windows for each car, we will predict the KPI of the AOI

There are in total 1226 AOIs for HH and 940 for VH.

We have 10 windows for each AOI; 2 for each car and we have 5 cars.

In [50]:
# Use the speed to estimate the amount of sample points there are in the area of interest (AOI)
# The AOI is thought to be defined by an X meter long segment
A_speed = 50
B_speed = 30
C_speed = 40

sample_freq = 250  # Sample frequency in Hz
A_samples = time_to_drive_X_meters(A_speed) * sample_freq
B_samples = time_to_drive_X_meters(B_speed) * sample_freq
C_samples = time_to_drive_X_meters(C_speed) * sample_freq
total_samples = A_samples + B_samples + C_samples

A_weight = A_samples / total_samples
B_weight = B_samples / total_samples
C_weight = C_samples / total_samples


In [51]:
# We will now localise the point of interest in the time domains.
# It does not work to just take the first and last lon. and lat. and not the median either. 
# We need to be able to find the location fast, right now we will just 

# Should they all have a dictionary?
A_t1_data = [[1,2,3,4,5,6,7,8,9,10],
        [1,2,3,4,5,6,7,8,9,10],
        [1,2,3,4,5,6,7,8,9,10],
        [1,2,3,4,5,6,7,8,9,10],
        [1,2,3,4,5,6,7,8,9,10],
        [1,2,3,4,5,6,7,8,9,10]]
A_t1_pred = 5


In [44]:
x = 2

if x > 13:
    print("yes")
    
elif x > 1.5:
    print("more yes")

more yes


In [49]:
# TODO There is a bug where current_best_trip_1 == current_best_trip_2 sometimes
current_best_trip_1 = [100, 0, 0]
current_best_trip_2 = [200, 0, 0]

distance, segment, second = 50, 1, 1

best_1 = 1
best_2 = 2

if distance < current_best_trip_1[0]:
    current_best_trip_2 = current_best_trip_1
    current_best_trip_1 = [distance, segment, second]
    best_1 = current_best_trip_1
    best_2 = current_best_trip_2
    
elif distance < current_best_trip_2[0]:
    current_best_trip_2 = [distance, segment, second]
    best_2 = current_best_trip_2

if distance < current_best_trip_1[0]:
    current_best_trip_2 = current_best_trip_1
    current_best_trip_1 = [distance, segment, second]
    best_1 = current_best_trip_1
    best_2 = current_best_trip_2
    
elif distance < current_best_trip_2[0]:
    current_best_trip_2 = [distance, segment, second]
    best_2 = current_best_trip_2

print(f"distance: {distance}")
print(f"current_best_trip_1: {current_best_trip_1}")
print(f"current_best_trip_2: {current_best_trip_2}")
print(f"best_1: {best_1}")
print(f"best_2: {best_2}")


distance: 50
current_best_trip_1: [50, 1, 1]
current_best_trip_2: [50, 1, 1]
best_1: [50, 1, 1]
best_2: [50, 1, 1]


In [6]:
locations = 3
all_trip_names = ["16001", "16002", "16003"]
pass_lists = ["pass_1", "pass_2", "pass_3"]

mapping_to_the_two_best_seconds_for_each_pass_in_each_trip = {}
for index in range(locations):
    trip_data = {}
    for trip_name in all_trip_names:
        pass_data = {}
        for pass_name in pass_lists:
            pass_data[pass_name] = {
                "distance_segment_second_1": [100, 0, 0],
                "distance_segment_second_2": [200, 0, 0] # these values are just placeholders
            }
        trip_data[trip_name] = pass_data
    
    mapping_to_the_two_best_seconds_for_each_pass_in_each_trip[index] = trip_data

print(mapping_to_the_two_best_seconds_for_each_pass_in_each_trip)


{0: {'16001': {'pass_1': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_2': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_3': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}}, '16002': {'pass_1': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_2': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_3': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}}, '16003': {'pass_1': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_2': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_3': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}}}, 1: {'16001': {'pass_1': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [

In [11]:
def filter_entries(data):
    # Define the threshold value
    threshold = 1.e-4
    
    # Function to filter the values within each pass
    def filter_pass(pass_data):
        filtered_pass_data = {}
        for segment, values in pass_data.items():
            distance = values[0]
            # Check if any value in the list exceeds the threshold
            if distance < threshold:
                filtered_pass_data[segment] = values
        return filtered_pass_data
    
    # Iterate through the main dictionary to filter the entries
    filtered_data = {}
    for key, trips in data.items():
        filtered_trips = {}
        for trip, passes in trips.items():
            filtered_passes = {}
            for pass_name, pass_data in passes.items():
                filtered_pass_data = filter_pass(pass_data)
                if filtered_pass_data:
                    filtered_passes[pass_name] = filtered_pass_data
            if filtered_passes:
                filtered_trips[trip] = filtered_passes
        if filtered_trips:
            filtered_data[key] = filtered_trips
            
    return filtered_data

# Example dictionary
data = {
    0: {'16001': {'pass_1': {'distance_segment_second_1': [0.000005, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_2': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_3': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}}, '16002': {'pass_1': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_2': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_3': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}}, '16003': {'pass_1': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_2': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_3': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}}}, 
    1: {'16001': {'pass_1': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_2': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_3': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}}, '16002': {'pass_1': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_2': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_3': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}}, '16003': {'pass_1': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_2': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_3': {'distance_segment_second_1': [400, 0, 0], 'distance_segment_second_2': [200, 0, 0]}}}, 
    2: {'16001': {'pass_1': {'distance_segment_second_1': [500, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_2': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [0, 0, 0]}, 'pass_3': {'distance_segment_second_1': [300, 0, 0], 'distance_segment_second_2': [200, 0, 0]}}, '16002': {'pass_1': {'distance_segment_second_1': [400, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_2': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_3': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}}, '16003': {'pass_1': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_2': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}, 'pass_3': {'distance_segment_second_1': [100, 0, 0], 'distance_segment_second_2': [200, 0, 0]}}}
}

# Apply the filter function
filtered_data = filter_entries(data)

# Print the filtered data
print(filtered_data)


KeyboardInterrupt: 