In [229]:
import pandas as pd
import json
import os
import numpy as np
import pickle
import json

# ignore warnings jupyter notebook
import warnings
warnings.filterwarnings('ignore')

# --------- Functions ----------

In [35]:
# read folder with all csv files and create one df from it (one per intersection)
def read_folder(current_intersection, configs, trac, direc):
    """Function to read all csv files (which is one per month) in the path and create one df from it."""

    print("Starting intersection: " + str(current_intersection))  # note which intersection its working on
    path = os.path.join(configs['data_folder'],current_intersection)  # define path to intersection folder (where all csv files are)
    print(path)

    df = pd.DataFrame(columns=['timestamp', 'cars'])  # create df to save everything in
    # loop through all files that end with csv:
    for file in os.listdir(path):  # read all files:
        if file.endswith(".csv"):  # for all csv files in the folder

            current_month = pd.read_csv(os.path.join(path,file), delimiter=";") # read csv file
            cols = configs["trajectories"][trac][direc][current_intersection] + [current_intersection]  # get sensors defined in config file + intersection name for dates (check csv files))
            current_month = current_month[cols]  # only keep interesting columns
            # some cleaning:
            current_month = current_month[:-1]  # last row is totals
            current_month = current_month.fillna(0)  # fill NA values with 0

            # remove sensor errors:
            # Q: why keep range 0 to 600?
            # Q: why shift 4?
            for sensor in configs['trajectories'][trac][direc][current_intersection]:
                current_month[sensor] = current_month[sensor].apply(lambda x: x if x <= 600 else 0)  # remove sensor errors
                current_month[sensor] = current_month[sensor].loc[current_month[sensor].shift(4) != current_month[sensor]] # remove sensor errors 
            
            current_month[configs['trajectories'][trac][direc][current_intersection]] = \
                current_month[configs['trajectories'][trac][direc][current_intersection]].clip(-1,401)  # clip values between 0 and 400

            # sum all sensors: 
            current_month['cars'] = current_month[configs['trajectories'][trac][direc][current_intersection]].sum(axis=1)  # sum of all interesting columns
            current_month = current_month[[current_intersection, "cars"]]  # only keep name and total amount of cars
            current_month.columns = ['timestamp', 'cars']  # rename to timestamp for general format
            
            # add to base df:
            df = pd.concat([df, current_month])

    df['timestamp'] = pd.to_datetime(df['timestamp'])  # format as dt
    df['cars'] = df['cars'].clip(-1, len(configs['trajectories'][trac][direc][current_intersection] * 150))  # no intersection could be able to process sensors*150 cars
    df = df.sort_values(by='timestamp')  #sort by timestamp
    df = df.dropna()  # extra check to drop na values
    df = df.loc[(df['timestamp'] > '2014-12-31') & (df['timestamp'] < '2020-05-31')]  # delete faulty datapoints outside scope
    df = df.reset_index(drop=True)
    df = check_hours(df)  # try this afterwards
    return df



In [156]:
# function to save the processed data for GNN training in h5 format
def save_GNN_processed_data(raw_data,save_path):
    # 1. from read raw data dictionary create a dataframe with the values of the dictionary
    # 2. from values in the dictionary change column name 'car' to key name 
    # 3. convert timestamp to datetime64
    # 4. join df's on timestamp

    first_intersection = list(raw_data.keys())[0]
    base_df = raw_data[first_intersection]['timestamp']
    base_df = pd.DataFrame(base_df)
    # base_df['timestamp'] = np.datetime64(base_df['timestamp'])

    for intersection in raw_data:
        df = pd.DataFrame(raw_data[intersection])
        df = df.rename(columns={'cars': intersection})
        # df['timestamp'] = np.datetime64(df['timestamp'])
        base_df = pd.merge(base_df, df, on='timestamp', how='inner')


    base_df['timestamp'] = pd.to_datetime(base_df['timestamp'])
    base_df['timestamp'] = np.array(base_df['timestamp'])


    # set timestamp as index and remove the name of the index
    base_df = base_df.set_index('timestamp')
    base_df.index.name = None

    # save the raw data to a h5 file
    base_df.to_hdf(save_path, key='df', mode='w')

In [157]:
# function to check if the data is complete
def check_hours(df):
    """"Function to double check if hours are complete, delete hours with more than 3 zeroes"""

    minutes = ['00:00', '05:00', '10:00', '15:00', '20:00', '25:00', '30:00', '35:00', '40:00', '45:00', '50:00','55:00']
    for date in df['timestamp'].dt.date.unique():
        current_day = df[df['timestamp'].dt.date == date]
        for hour in current_day['timestamp'].dt.hour.unique():
            current_hour = current_day[current_day['timestamp'].dt.hour == hour]
            if (len(current_hour) != 12): # 12 because we have 12, 5 minutes intervals
                df.drop(current_hour.index, inplace=True)
            # Q: should we delete hours with more than 3 zeroes? What would be the impact? A lot of zeroes in the cars column means that the data is sparse ?
            # try:
            #     if current_hour['cars'].value_counts()[0.0] > 3: # delete hours with more than 3 zeroes in the cars column 
            #         df.drop(current_hour.index, inplace=True)
            # except Exception as e:
            #     continue
    return df

In [158]:
# function to aggregate data into fpds per hour, a new probability column is added with the probability of a car passing through the intersection per 5 minutes
def fpd(df, hours=1):
    """Function to aggregate a df of traffic info into one with fpds per window of x hours (this means 12*hours values)"""
    freq = str(hours) + "H"
    aggregate = df.groupby(pd.Grouper(freq=freq, key='timestamp')).sum()  # aggregate by 1 hour
    df = pd.merge(df, aggregate, on='timestamp', how='left')  # merge with normal df
    df = df.fillna(method='ffill')  # fill with previous number
    df.columns = ['timestamp', 'cars', 'total']
    df['cars'][df['cars'] < 0] = 0  # some inconistencies in the data where cars could be negative
    df['total'][df['total'] <= 0] = 1  # some inconsistencies in the data where total cars could be negative, set to 1 to avoid problems
    df['prob'] = df['cars'] / df['total'] # calculate probability
    return df

In [159]:
# weeks 7,
# hours 24,
# [ data, timesteps ]
# data -> example: FPDs (list of 12 points) of all mondays 00:00 to 01:00 for 4 years with the list of probabilities for each FPD
# timesteps -> example: date associated with each FPD of all mondays 00:00 to 01:00 for 4 years
def create_timeslot_array(data, window=12):
    """Function to reshape into numpy array shaped like (samples,window); e.g. 120 datapoints/12 (60min/5mins=12) = 10 FPDs.
    This is neccesary to create the bhattacharyya matrices. Misfunctions when an hour in the data has more or fewer than 12 datapoints (happens with double timestamps or missing data)
    Should be fixed by adding better data protection in the read_data function & rerunning the vlogbroker to output raw sensor values."""

    data['weekday'] = data['timestamp'].apply(lambda x: x.weekday())
    data['hour'] = data['timestamp'].apply(lambda x: x.hour)
    data_array = [] # create empty array
    for i in range(7):
        timeslots = []
        for hour in range(24):
            try:
                datapoint = data[(data['weekday'] == i) & (data['hour'] == hour)]
                datapoint = data[(data['weekday'] == i) & (data['hour'] == hour)]
                x = np.array(datapoint['prob'])
                x = x.reshape(int(int(len(x)) / window),window)  # data should be complete and divisible by 12, otherwise it fails.
                dates = sorted(set(datapoint['timestamp'].apply(lambda x: x.floor(freq='H'))))  # add in hourly timestamp
                timeslots.append([x, dates])
            except Exception as e:
                print(e)
        data_array.append(timeslots)
    # output structure: data_array[7weekdays][24hours]; e.g. data_array[0][9] is data for monday mornings 9 am.
    return data_array

## -------- MAIN ----------

In [161]:
with open(r"../utils/configs.json", 'r') as f:
        configs = json.load(f)

final_results = {}

for trajectory in configs['trajectories']:
    final_results[trajectory] = {}
    for direction in configs['trajectories'][trajectory].keys():
        
        # 1. first read the raw data from the pickle files and create a dictionary with the data:
        raw_data = {}
        for intersection in configs['trajectories'][trajectory][direction]:
            raw_data[intersection] = read_folder(intersection, configs, trajectory, direction)
            # save_path = f"../data/hauge/processed/GNN_raw_data_{trajectory}.h5" # path to save the processed raw data to a h5 file for GNNs
            # save_GNN_processed_data(raw_data, save_path) # save the raw data to a h5 file

        # # 2. create FPDs from dictionary 
        # fpds = {}
        # fpd_hour = 1 # interval in hours to aggregate the data
        # for intersection in raw_data:
        #     fpds[intersection] = fpd(raw_data[intersection], fpd_hour)
        #     break

        # # 3. create timeslot arrays for further processing:
        # fpds_processed = [[]]  # array with shape (weeks, hours) containing the FPDs for each hour of each week for all days from 2018 to 2022 
        # for intersection in fpds:
        #     fpds_processed = create_timeslot_array(fpds[intersection], intersection)
        #     break




Starting intersection: K502
../data/hauge/K502
Starting intersection: K504
../data/hauge/K504
Starting intersection: K503
../data/hauge/K503
Starting intersection: K263
../data/hauge/K263
Starting intersection: K556
../data/hauge/K556
Starting intersection: K557
../data/hauge/K557
Starting intersection: K559
../data/hauge/K559
Starting intersection: K561
../data/hauge/K561
Starting intersection: K198
../data/hauge/K198
Starting intersection: K502
../data/hauge/K502
Starting intersection: K504
../data/hauge/K504
Starting intersection: K503
../data/hauge/K503
Starting intersection: K263
../data/hauge/K263
Starting intersection: K556
../data/hauge/K556
Starting intersection: K557
../data/hauge/K557
Starting intersection: K559
../data/hauge/K559
Starting intersection: K561
../data/hauge/K561
Starting intersection: K198
../data/hauge/K198
Starting intersection: K704
../data/hauge/K704
Starting intersection: K702
../data/hauge/K702
Starting intersection: K703
../data/hauge/K703
Starting inte

In [206]:
raw_data['K074']

Unnamed: 0,timestamp,cars
0,2018-01-01 00:00:00,3.0
1,2018-01-01 00:05:00,2.0
2,2018-01-01 00:10:00,0.0
3,2018-01-01 00:15:00,2.0
4,2018-01-01 00:20:00,4.0
...,...,...
233963,2020-03-31 23:35:00,3.0
233964,2020-03-31 23:40:00,3.0
233965,2020-03-31 23:45:00,3.0
233966,2020-03-31 23:50:00,0.0


In [67]:
print(fpds_processed)
# fpds processed contains the FPDs for each hour of each week for all days from 2018 to 2022
# we have 7 weekdays, 24 hours, 4 years, 12 FPDs per hour, 1 probability per FPD

[[[array([[0.0617284 , 0.00617284, 0.01851852, ..., 0.15432099, 0.16666667,
        0.09876543],
       [0.16666667, 0.11904762, 0.11904762, ..., 0.0952381 , 0.04761905,
        0.        ],
       [0.05084746, 0.03389831, 0.13559322, ..., 0.11864407, 0.06779661,
        0.01694915],
       ...,
       [0.18604651, 0.09302326, 0.04651163, ..., 0.        , 0.06976744,
        0.04651163],
       [0.13636364, 0.18181818, 0.18181818, ..., 0.04545455, 0.09090909,
        0.09090909],
       [0.02941176, 0.11764706, 0.08823529, ..., 0.05882353, 0.02941176,
        0.05882353]]), [Timestamp('2018-01-01 00:00:00'), Timestamp('2018-01-08 00:00:00'), Timestamp('2018-01-15 00:00:00'), Timestamp('2018-01-22 00:00:00'), Timestamp('2018-01-29 00:00:00'), Timestamp('2018-02-05 00:00:00'), Timestamp('2018-02-12 00:00:00'), Timestamp('2018-02-19 00:00:00'), Timestamp('2018-02-26 00:00:00'), Timestamp('2018-03-05 00:00:00'), Timestamp('2018-03-12 00:00:00'), Timestamp('2018-03-19 00:00:00'), Timestamp(

In [75]:
# todo: apply earth mover distance to the FPDs

## ========== EXTRA =========

## =============== 1. METER-LA and PEMS-BAY data processing =================

In [200]:
# read the h5 file from meter-la data and convert to dictionary with each column name as key

df.head()

Unnamed: 0,773869,767541,767542,717447,717446,717445,773062,767620,737529,717816,...,772167,769372,774204,769806,717590,717592,717595,772168,718141,769373
2012-03-01 00:00:00,64.375,67.625,67.125,61.5,66.875,68.75,65.125,67.125,59.625,62.75,...,45.625,65.5,64.5,66.428571,66.875,59.375,69.0,59.25,69.0,61.875
2012-03-01 00:05:00,62.666667,68.555556,65.444444,62.444444,64.444444,68.111111,65.0,65.0,57.444444,63.333333,...,50.666667,69.875,66.666667,58.555556,62.0,61.111111,64.444444,55.888889,68.444444,62.875
2012-03-01 00:10:00,64.0,63.75,60.0,59.0,66.5,66.25,64.5,64.25,63.875,65.375,...,44.125,69.0,56.5,59.25,68.125,62.5,65.625,61.375,69.857143,62.0
2012-03-01 00:15:00,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2012-03-01 00:20:00,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [215]:
# function to read the data from the h5 file and convert to dictionary with each column name as key
def read_h5(df):
    data_dict = {} # create empty dictionary for storing the data
    # make datetime object using year, month, day, hour, minute, second
    # reset index and add timestamp column
    df.index = pd.to_datetime(df.index.year*10000000000 + df.index.month*100000000 + df.index.day*1000000 + df.index.hour*10000 + df.index.minute*100 + df.index.second, format='%Y%m%d%H%M%S')
    df['timestamp'] = df.index
    df.reset_index(drop=True, inplace=True)
    # create dictionary with each column name as key
    for column in df.columns:
        temp_df = pd.DataFrame()
        temp_df['timestamp'] = df['timestamp']
        temp_df['cars'] = df[column].values
        data_dict[column] = temp_df

    return data_dict

In [228]:
# read data of meter-la 
load_path = '../data/METR-LA/metr-la.h5'
save_path = '../data/METR-LA/processed/OWRI_df_format.pickle'
df = pd.read_hdf(load_path, 'df')
df.head()

Unnamed: 0,773869,767541,767542,717447,717446,717445,773062,767620,737529,717816,...,772167,769372,774204,769806,717590,717592,717595,772168,718141,769373
2012-03-01 00:00:00,64.375,67.625,67.125,61.5,66.875,68.75,65.125,67.125,59.625,62.75,...,45.625,65.5,64.5,66.428571,66.875,59.375,69.0,59.25,69.0,61.875
2012-03-01 00:05:00,62.666667,68.555556,65.444444,62.444444,64.444444,68.111111,65.0,65.0,57.444444,63.333333,...,50.666667,69.875,66.666667,58.555556,62.0,61.111111,64.444444,55.888889,68.444444,62.875
2012-03-01 00:10:00,64.0,63.75,60.0,59.0,66.5,66.25,64.5,64.25,63.875,65.375,...,44.125,69.0,56.5,59.25,68.125,62.5,65.625,61.375,69.857143,62.0
2012-03-01 00:15:00,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2012-03-01 00:20:00,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [230]:
# convert to dictionary
OWRI_df_format = read_h5(df)
# save data to h5 file
with open(save_path, 'wb') as f:
    pickle.dump(OWRI_df_format, f)

In [235]:
# read data of pems-bay
load_path = '../data/PEMS-BAY/pems-bay.h5'
save_path = '../data/PEMS-BAY/processed/OWRI_df_format.pickle'
df = pd.read_hdf(load_path)
df.head()

sensor_id,400001,400017,400030,400040,400045,400052,400057,400059,400065,400069,...,409525,409526,409528,409529,413026,413845,413877,413878,414284,414694
2017-01-01 00:00:00,71.4,67.8,70.5,67.4,68.8,66.6,66.8,68.0,66.8,69.0,...,68.8,67.9,68.8,68.0,69.2,68.9,70.4,68.8,71.1,68.0
2017-01-01 00:05:00,71.6,67.5,70.6,67.5,68.7,66.6,66.8,67.8,66.5,68.2,...,68.4,67.3,68.4,67.6,70.4,68.8,70.1,68.4,70.8,67.4
2017-01-01 00:10:00,71.6,67.6,70.2,67.4,68.7,66.1,66.8,67.8,66.2,67.8,...,68.4,67.4,68.4,67.5,70.2,68.3,69.8,68.4,70.5,67.9
2017-01-01 00:15:00,71.1,67.5,70.3,68.0,68.5,66.7,66.6,67.7,65.9,67.8,...,68.5,67.5,68.5,67.5,70.4,68.7,70.2,68.4,70.8,67.6
2017-01-01 00:20:00,71.7,67.8,70.2,68.1,68.4,66.9,66.1,67.7,66.1,67.8,...,68.5,67.7,68.5,67.4,69.6,69.1,70.0,68.4,71.0,67.9


In [236]:
# convert to dictionary
OWRI_df_format = read_h5(df)
# save data to h5 file
with open(save_path, 'wb') as f:
    pickle.dump(OWRI_df_format, f)