In [48]:
import zarr
import numpy as np
import pandas as pd
import re
import csv
import os

In [49]:
def full_path(folder_path,file_name):
    full_path = os.path.join(folder_path, file_name)
    return full_path
dataset_path="H:\\Desktop\\Q1\\python\\project\\TIL-Group15\\dataset\\CarFollowing\\trainHA.zarr\\"
folder_path = "H:\\Desktop\\Q1\\python\\project\\TIL-Group15\\dataset\\split_data\\HA\\"
filename_HA='trainHA.zarr'
filename_multi_data='multi_data.csv'
filename_multi_size='multi_size.csv'
filename_distance='distance.csv'
filename_regimeHA='regimes_list_HA_train.csv'

In [51]:
#Define a function to export the center point, velocity and acceleration of the vehicles in dataset
#Define the number of exports with inputs
def load_single_data(i):    
    data = zarr.open(dataset_path, mode='a')
    start, end = data.index_range[i]
    # get timestamps
    timestamps = data.timestamp[start:end]
    # get position, speed, and acceleration
    x_lead = data.lead_centroid[start:end]
    v_lead = data.lead_velocity[start:end]
    a_lead = data.lead_acceleration[start:end]
    case_id=np.ones((1,x_lead.shape[0]))*i
    x_follow = data.follow_centroid[start:end]
    v_follow = data.follow_velocity[start:end]
    a_follow = data.follow_acceleration[start:end]
    array = np.vstack((case_id, x_lead, v_lead, a_lead, x_follow, v_follow, a_follow, timestamps))
    return array.T
#Define a function to export the size of the vehicles in dataset
def load_size(i):
    data = zarr.open(dataset_path, mode='a')
    size_lead = 4.85 # this is the size of the lead vehicles，for HA there's only one size
    size_follow = data.follow_size[i] # this is the size of the follow vehicles
    case_id=np.ones(1)*i # ID
    array = np.vstack((case_id, size_lead, size_follow)) 
    return array.T
#Organize all data into a csv file
def load_multi_size(i):
    multi_data = None
    for x in range(i + 1):
        single_data1 = load_size(x)
        if x == 0:
            multi_data = single_data1
        else:
            multi_data = np.vstack([multi_data, single_data1])
    return multi_data
def load_multi_data(i):
    multi_data = None
    for x in range(i+1):
        single_data = load_single_data(x)
        if x == 0:
            multi_data = single_data
        else:
            multi_data = np.vstack([multi_data, single_data])
    return multi_data
#Define the number of exports with inputs
p=int(input('type in the number you want'))

np.savetxt(full_path(folder_path,filename_multi_data), load_multi_data(p), delimiter="," ,
           header="ID,x_lead,v_lead,a_lead,x_follow,v_follow,a_follow,timestamps",comments='')
np.savetxt(full_path(folder_path,filename_multi_size), load_multi_size(p), delimiter="," ,
           header="ID,size_lead,size_follow",comments='')

In [52]:
data=np.loadtxt(full_path(folder_path,filename_multi_data),delimiter=",",skiprows=1 )
data_size=np.loadtxt(full_path(folder_path,filename_multi_size),delimiter=",",skiprows=1 )
#data_size.columns = ["ID", "size_lead", "size_follow"]
#data.columns = ["ID","x_lead", "v_lead", "a_lead", "x_follow", "v_follow", "a_follow", "timestamps"]


In [53]:
# Define A_distance as the vehicle distance (tail of lead vehicle to head of follow vehicle)
# Define B_distance is the vehicle distance (head of lead vehicle to head of follow vehicle)

def distance(x):
    x_data=np.squeeze(data[np.where(data[:,0]==x),:],0)
    x_size=np.squeeze(data_size[np.where(data_size[:,0]==x),:],0)
    x_lead=x_data[:,1]
    x_follow=x_data[:,4]
    size_lead = x_size[:,1]
    size_follow = x_size[:,2]
    id=np.ones(1)*x
    A_distance=[]
    B_distance=[]
    for y in range(1,x_lead.size):
        A = x_lead[y] - x_follow[y] - 0.5 * (size_lead[0] + size_follow[0])# A_distance
        B = x_lead[y] - x_follow[y] - 0.5 * (size_follow[0] - size_lead[0])# A_distance
        A_distance.append(A)
        B_distance.append(B)
    id_tiled = np.tile(id, len(A_distance))
    array = np.vstack((id_tiled, A_distance, B_distance))
    return array
def load_multi_distance(i):    
    multi_data = None
    for x in range(i+1):
        single_data = distance(x).T
        if x == 0:
            multi_data = single_data
        else:
            multi_data = np.vstack([multi_data, single_data])
    return multi_data
np.savetxt(full_path(folder_path,filename_distance), load_multi_distance(p), delimiter="," ,
           header="ID,A_distance,B_distance",comments='')


In [54]:
data3 = np.genfromtxt(full_path(folder_path,filename_regimeHA), delimiter=',',  dtype=None, names=True)

# define the list of regimes
pattern = re.compile(r'Fa|Fd|A|D|F|C|S')
output_list = []  # the list to store the data
for row in data3:
    regime = row['regime_comb'].decode('utf-8')
    matches = pattern.findall(regime)  # use re to cut regime_comb
    row_dict = {}  
    for module in matches:
        row_dict[module] = row[module]
    
    # export the data into the list
    output_list.append(row_dict)
data = np.loadtxt(full_path(folder_path,filename_multi_data), delimiter=",",skiprows=1)

patterns = ["A", "D", "F", "Fa", "Fd", "C", "S"]  # the list of regimes

# make a CSV file for each regime
for pattern_name in patterns:
    with open(full_path(folder_path,f'{pattern_name}.csv'), 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        # write in the title for each line
        writer.writerow(['ID', 'x_lead', 'v_lead', 'a_lead', 'x_follow', 'v_follow', 'a_follow', 'timestamps'])

        for i in range(len(output_list)):
            pattern_times = output_list[i]
            all_data = data[data[:, 0] == i, 1:]  # load the data except ID
            pattern_data = {}
            start_index = 0
            for pattern, time in pattern_times.items():
                if pattern == pattern_name: 
                    length = int(time * len(all_data) / sum(pattern_times.values()))
                    pattern_data[pattern] = all_data[start_index:start_index+length]
                    start_index += length

            # save the result in CSV file
            for _, pattern_specific_data in pattern_data.items():
                for row_data in pattern_specific_data:
                    writer.writerow([i] + list(row_data))


  data3 = np.genfromtxt(full_path(folder_path,filename_regimeHA), delimiter=',',  dtype=None, names=True)


In [None]:

data = np.loadtxt(full_path(folder_path, filename_regimeHA), delimiter=",", skiprows=1, dtype=object)
multi_data = np.loadtxt(full_path(folder_path, filename_multi_data), delimiter=",", skiprows=1, dtype=object)

def count_upper_except_adf(regime_str):
    # replace 'ADF' to 'A'
    if 'ADF' in regime_str:
        new_str = re.sub('ADF', 'A', regime_str)
    # find all the upper letters
        upper_chars = re.findall(r'[A-Z]', new_str)
    # return the number of upper letters
    else:
        upper_chars=[]
    return len(upper_chars)

# categories
categories = {
    "ADF": [],
    "ADF+1": [],
    "ADF+2": [],
    "ADF+3": [],
    "ADF+4": []
}

for row in data:
    regime_comb = row[6]  #HH:5 HA:6
    regime_id = float(row[0])  # Assuming the ID is in the first column and is a float
    if 'ADF' in regime_comb:
        regime_count = int(count_upper_except_adf(regime_comb))-1
        key = f'ADF+{regime_count}' if regime_count else 'ADF'
        categories[key].append(regime_id)

# get the data from multi_data, and save in multi csv file.
for key, ids in categories.items():
    filtered_data = [row for row in multi_data if float(row[0]) in ids]
    output_path = full_path(folder_path, f'{key}_multi_data.csv')
    np.savetxt(output_path, filtered_data, delimiter=",", fmt='%s',header="ID,x_lead,v_lead,a_lead,x_follow,v_follow,a_follow,timestamps", comments='') 
    # using %s to save as string, adjust if necessary
