In [1]:
import pandas as pd
import numpy as np
import os
import collections
from collections import Counter
import matplotlib.pyplot as plt

from datetime import timedelta
from datetime import date

import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure

#pd.set_option('display.max_columns', None)

In [2]:
%load_ext watermark

In [3]:
%watermark --python

Python implementation: CPython
Python version       : 3.11.5
IPython version      : 8.15.0



## 1. Calculate the statistic of each record

### Read traffic flow data of each site (sensor) - 974 sites

In [13]:
def read_df(filename):
    site_df = pd.read_csv(filename)
    
    return site_df

In [3]:
def read_df_2(site_df):
    site_dup = site_df.loc[site_df['count'] > 1]
    site_dup['startDate'] = min(site_df['lastUpdate'])
    site_dup['errorPercentage(%)'] = (site_dup['count'].sum() - len(site_dup))/site_df['count'].sum() * 100
    site_dup['allCount'] = site_df['count'].sum()
       
    return site_dup

### Extract the essential information from raw traffic flow data 

In [4]:
def df_clean(site_dup):
    #clean and rearrange df
    new_cols = ['siteSiteid', 'timestamp', 'count', 'errorPercentage(%)', 'startDate']
    site_clean = site_dup[new_cols]

    #extract a date column from timestamp
    timestamp = site_clean['timestamp'].str.split('T', n = 1, expand = True)
    site_clean['date'] = timestamp[0]
    site_clean['time'] = timestamp[1]
    
    timestamp_raw = site_df['timestamp'].str.split('T', n = 1, expand = True)
    site_df['date'] = timestamp_raw[0]
    #display(site_df)
    
    return site_clean

### Get number of consecutive interpolation each day

In [5]:
def get_count_each_day(site_clean):
    #set date & add count
    dates = list(set(list(site_clean['date'])))
    #print(len(dates))

    site_dateCount = {}
    for a in dates:
        b = site_clean.loc[site_clean.date == a , 'count']
        c = site_df.loc[site_df.date == a, 'count']
        site_dateCount[a] = [sum(b), sum(c)]
    #print(site_dateCount)

    dic_site_dateCount = {}
    date = []
    dateCount = []
    dateCountAll = []
    for key, value in site_dateCount.items():
        date.append(key)
        dateCount.append(value[0])
        dateCountAll.append(value[1])
        

    dic_site_dateCount['errorDate'] = date
    dic_site_dateCount['reCount/Day'] = dateCount
    dic_site_dateCount['allCount/Day'] = dateCountAll

    df_site_dateCount = pd.DataFrame.from_dict(dic_site_dateCount)
    df_site_dateCount = df_site_dateCount.sort_values(by=['errorDate'])
    
    return df_site_dateCount

### Get the time of interpolation each day

In [6]:
def get_alltime_each_day(site_clean):
    #set date & merge time
    dates = list(set(list(site_clean['date'])))
    #print(len(dates))

    site_time = {}
    for a in dates:
        b = site_clean.loc[site_clean.date == a , 'time']
        site_time[a] = sorted(list(set(list(b))))
    #print(site_time)

    dic_site_time = {}
    date = []
    time = []
    for key, value in site_time.items():
        date.append(key)
        time.append(value)

    dic_site_time['errorDate'] = date
    dic_site_time['reTime/Day'] = time

    df_site_time = pd.DataFrame.from_dict(dic_site_time)
    df_site_time = df_site_time.sort_values(by=['errorDate'])
    
    return df_site_time

### Merge the two columns together

In [7]:
def merge_to_each_day(df_site_dateCount, df_site_time):
    site_date_time = pd.merge(df_site_dateCount, df_site_time, how="left", on=["errorDate"])
    site_date_time['siteId'] = site_clean['siteSiteid'].tolist()[0]
    site_date_time['errorPercentage(%)'] = site_dup['errorPercentage(%)'].tolist()[0]
    site_date_time['startDate'] = site_dup['startDate'].tolist()[0]
    site_date_time['errorPercentage/Day(%)'] = (site_date_time['reCount/Day'] - 1) / site_date_time['allCount/Day'] * 100
    site_date_time['allCount'] = site_dup['allCount'].tolist()[0]
    new_cols = ['siteId', 'errorDate', 'reCount/Day', 'reTime/Day', 'errorPercentage/Day(%)', 'errorPercentage(%)', 'allCount', 'startDate']
    site_date_time = site_date_time[new_cols]
    
    return site_date_time

### Merge each site in a single raw

In [8]:
def merge_to_each_site(site_date_time):
    # columns: siteId, errorPercentage, countAll, startDate
    site_id = site_date_time['siteId'].tolist()[0]
    error_percentage = site_date_time['errorPercentage(%)'].tolist()[0]
    count_all = site_date_time['allCount'].tolist()[0]
    start_date = site_date_time['startDate'].tolist()[0]
    
    data = [[site_id, error_percentage, count_all, start_date]]
    
    # Create the pandas DataFrame
    each_site = pd.DataFrame(data, columns = ['siteId', 'errorPercentage(%)', 'countAll', 'startDate'])
    
    return each_site

## - Implement

In [None]:
directory = os.fsencode('data')

all_sites = []
all_sites_date = []
for file in os.listdir(directory):
    filename = os.fsdecode(file)
    
    if filename.endswith(".csv"):
        
        site_df = read_df(filename)
        site_dup = read_df_2(site_df)
        site_clean = df_clean(site_dup)
        df_site_dateCount = get_count_each_day(site_clean)
        df_site_time = get_alltime_each_day(site_clean)
        site_date_time = merge_to_each_day(df_site_dateCount, df_site_time)
      
        each_site = merge_to_each_site(site_date_time)

        all_sites.append(each_site)
        all_sites_date.append(site_date_time)


all_statistic = pd.concat(all_sites)
all_statistic = all_statistic.reset_index(drop=True)

all_sites_day_statistic = pd.concat(all_sites_date)
all_sites_day_statistic = all_sites_day_statistic.reset_index(drop=True)

## 2. Clean the records via statistics

### Get number of records

In [None]:
counts = list(set(list(all_statistic['countAll'])))
#print(len(counts))

counts_sta = {}
for a in counts:
    b = all_statistic.loc[all_statistic.countAll == a , 'countAll']
    c = all_statistic.loc[all_statistic.countAll == a , 'siteId']
    counts_sta[a] = [len(b), list(c)]

collections.OrderedDict(sorted(counts_sta.items()))

dic_counts = {}
counts = []
countNumber = []
sites = []
for key, value in counts_sta.items():
    counts.append(key)
    countNumber.append(value[0])
    sites.append(value[1])


dic_counts['countAll'] = counts
dic_counts['countNum'] = countNumber
dic_counts['sites'] = sites

df_counts = pd.DataFrame.from_dict(dic_counts)
df_counts = df_counts.sort_values(by=['countAll'])
df_counts = df_counts.reset_index(drop=True)

df_counts

### Tips 1: Calculate the correct number of records

In [3]:
from datetime import date
d0 = date(2019, 10, 1)
d1 = date(2023, 10, 1)
delta = d1 - d0
print(delta.days*24*4)

140256


### Get start date

In [None]:
dates = list(set(list(all_statistic['startDate'])))
#print(len(counts))

counts_sta = {}
for a in dates:
    b = all_statistic.loc[all_statistic.startDate == a , 'startDate']
    c = all_statistic.loc[all_statistic.startDate == a , 'siteId']
    counts_sta[a] = [len(b), list(c)]

#collections.OrderedDict(sorted(counts_sta.items()))

dic_dates = {}
dates = []
dateNumber = []
sites = []
for key, value in counts_sta.items():
    dates.append(key)
    dateNumber.append(value[0])
    sites.append(value[1])


dic_dates['startDate'] = dates
dic_dates['dateNumber'] = dateNumber
dic_dates['sites'] = sites

df_dates = pd.DataFrame.from_dict(dic_dates)
df_dates = df_dates.sort_values(by=['startDate'])
df_dates = df_dates.reset_index(drop=True)

df_dates

### Get the error percentage (interpolation percentage)

In [None]:
df_error = all_statistic.loc[all_statistic['errorPercentage(%)'] > 1].iloc[:,:2]
df_error.sort_values(by=['errorPercentage(%)'], ascending=False).reset_index(drop=True).loc[:10]

### Get the error sites based on above statistic: 
    countsAll < 140000
    startDate != 2019-10-01
    errorPercentage > 3%

In [57]:
count_error = df_counts.loc[df_counts.countAll < 140000, 'sites'].tolist()
date_error = df_dates.loc[df_dates.startDate != '2019-10-01T00:00:00.4458636Z' , 'sites'].tolist()
error_error = df_error.loc[df_error['errorPercentage(%)'] > 3 , 'siteId'].tolist()

error_list = []
for i in range(len(count_error)):
    error_list = error_list + count_error[i]
for j in range(len(date_error)):
    error_list = error_list + date_error[j]
error_list = error_list + error_error

In [None]:
error_sites = list(set(error_list))
error_sites_df = pd.DataFrame({'errorSites':error_sites})
error_sites_df

### Clean the error sites - 922 sites

In [76]:
error_index = []
for error_site in error_sites_df.errorSites.tolist():
    #print(error_site)
    all_error_sites = raw_df.loc[raw_df['siteId'] == error_site]
    error_index = error_index + all_error_sites.index.tolist()

all_useful_sites = all_statistic.drop(error_index)
all_useful_sites = all_useful_sites.sort_values(by=['siteId'])
all_useful_sites = all_useful_sites.reset_index(drop=True)

detector_922 = all_useful_sites

## 3. Clean the recordes via spatial location - 913 sites
The distance between each sensor and its closest road is calculated via ArcGIS

In [None]:
#a list of distance between recorded sites and closest road
distance_df = pd.read_csv('/Map matching/closest_road_to_detector_100.csv')
distance_df_15 = distance_df.loc[distance_df.NEAR_DIST < 15]

detector_922['IN_FID'] = detector_922.index
detector_913 = pd.merge(distance_df_15, detector_922, how="left", on=["IN_FID"])

## 4. Clean the dataset via zero percentage - 724 sites

### Calculate the zero percentage of each sensor

In [None]:
detector_913['zeroQuantity'] = 0
detector_913['zeroPercentage%'] = 0

for i in detector_913.siteId.tolist():
    print(i)
    zero_quantity = 0
    site_df = pd.read_csv('data/' + i + '&2019_10_01-2023_09_30.csv')
    site_df = site_df.sort_values(by=['lastUpdate']).reset_index(drop=True)
    for j in range(len(site_df)):
        if site_df.flow[j] == 0:
            zero_quantity = zero_quantity + 1
    detector_913.loc[detector_913.siteId == i, 'zeroQuantity'] = zero_quantity
    detector_913.loc[detector_913.siteId == i, 'zeroPercentage%'] = zero_quantity/len(site_df)*100

### Clean the data
zeroPercentage% > 90

In [None]:
detector_724 = detector_913.loc[detector_913['zeroPercentage%'] <= 90]

## 5. Clean the dataset via consecutive zero - 470 sites

### Delete all zero from original data

In [79]:
def drop_zero_flow(data2):
    data2 = data2[data2['flow'] != 0]
    
    data3 = data2.sort_values(by=['lastUpdate'])
    data3 = data3.reset_index(drop=True)

    return data3

### Check the consecutive dates after drop all the zero

In [80]:
def get_check_df(data3, siteId): 
    
    if len(data3) == 0:
        zero_check_df = pd.DataFrame({'siteId': siteId,
                'start_date': 'none',
                'end_date': 'none',
                'full_dates': 0,
                'practical_dates': 0}, index=[0])
        
    else:
        date_range = data3.iloc[[0, -1]].date.tolist()
        start_date = date_range[0]
        end_date = date_range[1]
        full_dates = pd.DataFrame(pd.date_range(start_date, end_date), columns=['date']) #get the list of dates
        #practical_dates = pd.DataFrame({'date':list(set(data3.date))})
        zero_check_df = pd.DataFrame({'siteId': siteId,
                'start_date': start_date,
                'end_date': end_date,
                'full_dates': len(full_dates),
                'practical_dates': len(set(data3.date))}, index=[0])

    return zero_check_df

### Implement: Select the sites with the most consecutive flow

In [None]:
df_list = []
for i in detector_724.siteId.tolist():
    site_df = pd.read_csv('data/'+ i + '&2019_10_01-2023_09_30.csv')
    data3 = drop_zero_flow(site_df)
    each_check = get_check_df(data3,i)
    df_list.append(each_check)
    
df_check_consecutive = pd.concat(df_list)
df_check_consecutive = df_check_consecutive.sort_values(by=['siteId']).reset_index(drop=True)
df_check_consecutive

In [96]:
#Select the 470 most useful sites
detector_470 = df_check_consecutive.loc[df_check_consecutive.practical_dates == 1411].sort_values(by=['siteId']).reset_index(drop=True)

## 6. Get the missing dates

### Tips 2: Get the full list of days

In [2]:
def day_list(start_year, start_month, start_day, end_year, end_month, end_day):
    
    d0 = date(start_year, start_month, start_day)
    d1 = date(end_year, end_month, end_day)
    delta = d1 - d0
    
    full_days = []
    for i in range(delta.days):
        day = str(d0 + timedelta(days=i))
        full_days.append(day)
    
    return full_days

In [6]:
full_days = day_list(2019,10,1,2023,10,1)
len(full_days)

1461

In [10]:
def get_missing_days(sites_df, path, file_suffix, full_days):
    
    missing_day_list = []
    for i in sites_df.siteId.tolist():
        #print(i)
        site_df = pd.read_csv(path+ i + file_suffix)
        site_df['date'] = site_df['lastUpdate'].str.split('T').str[0]
        practical_days = list(site_df.date.unique())
        #Get the missing days
        missing_days = sorted(list(set(full_days) - set(practical_days)))
        missing_day_list.append(missing_days)

    #Get frequency of missing days
    missing_days = Counter([tuple(sublist) for sublist in missing_day_list])
    
    return missing_days

In [None]:
missing_days = get_missing_days(detector_470, 'data/', '&2019_10_01-2023_09_30.csv', full_days)
missing_days

In [38]:
missing_list = []
for i in list(missing_days):
    for j in i:
        missing_list.append(j)
len(missing_list)

50

### In conclusion, there are 50 missing days of 460 sensors from 2019-10-1 to 2023-9-30

Which includes 2019-10-17 to 2019-11-19 (34d); 2022-4-9, 2022-4-10; 2023-4-23, and 2023-7-13 to 2023-7-25 (13d)

## 7. Aggregate the 470 data on 60-min interval

### Create a df with full timestamp

In [7]:
from datetime import datetime, timedelta

def get_time_string_list(start,end,interval):
    start = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
    end = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
    now = start
    string_list = []
    while now <= end:
        now += timedelta(minutes=15)
        string_list.append(now.strftime("%Y-%m-%d %H:%M:%S"))
    return string_list

time_list = get_time_string_list("2019-09-30 23:45:00","2023-09-30 23:30:00",15)
allTime = pd.DataFrame(time_list, columns=['timeStamp'])

In [8]:
allTime

Unnamed: 0,timeStamp
0,2019-10-01 00:00:00
1,2019-10-01 00:15:00
2,2019-10-01 00:30:00
3,2019-10-01 00:45:00
4,2019-10-01 01:00:00
...,...
140251,2023-09-30 22:45:00
140252,2023-09-30 23:00:00
140253,2023-09-30 23:15:00
140254,2023-09-30 23:30:00


### Merge the traffic flow data to the fullTime

In [8]:
# recover the each timestamp to int
def recover_lastupdate(data):
    data['timeStamp'] = data['lastUpdate'].str.split('.').str[0].str.replace('T',' ')
    data['time'] = data['timeStamp'].str.split(' ').str[-1].str.split(':',n=1).str[-1].str.replace(':','.')
    data['time'] = pd.to_numeric(data['time'])
    for i in range(len(data)):
        if 0 <= data['time'][i] < 15:
            data.loc[data.index == i,'time'] = 0
            #data['time'][i] = 0
        elif 15 <= data['time'][i] < 30:
            data.loc[data.index == i,'time'] = 15
        elif 30 <= data['time'][i] < 45:
            data.loc[data.index == i,'time'] = 30
        elif 45 <= data['time'][i] < 60:
            data.loc[data.index == i,'time'] = 45

    data['time'] = data['time'].astype(str).str.replace('.',':').str.zfill(4).str.ljust(5, '0')
    data['timeStamp'] = data['timeStamp'].str[:-5] + data['time'].str[-5:]

    return data

In [12]:
def merge_allTime(data, allTime):    
    # calculat the mean of flow with the same timestamp
    data = data.groupby("timeStamp", as_index=False).agg( {'flow':'mean', 'siteSiteid':'first', 'originDescription':'first', 
                                                           'originLat':'first', 'originLong':'first', 'originEasting': 'first', 
                                                           'originNorthing':'first'})

    # merge the data with full time
    fullData = pd.merge(allTime, data, how="left", on="timeStamp")
    fullData['siteSiteid'] = data['siteSiteid'][0]
    fullData['originDescription'] = data['originDescription'][0]
    fullData['originLat'] = data['originLat'][0]
    fullData['originLong'] = data['originLong'][0]
    fullData['originEasting'] = data['originEasting'][0]
    fullData['originNorthing'] = data['originNorthing'][0]
    return fullData

### Aggregate to specifc time interval
Aggregate discipline: 2 = 30-min interval, 3 = 45-min interval, 4 = 60-min interval

In [18]:
def aggregate_data(data, aggregate_len, date):
    data = data.groupby(np.arange(len(data))//aggregate_len).agg( {'timeStamp': 'first', 'flow':lambda x: x.mean()*aggregate_len})
    data = data.dropna().sort_values(by=['timeStamp']).reset_index(drop=True)
    data['flow'] = data['flow'].round(0).astype(int)
    
    timestamp = data['timeStamp'].str.split(' ', n = 1, expand = True)
    data['date'] = timestamp[0]
    data['time'] = timestamp[1].str.split(':', n = 1, expand = True)[0].astype(int)
    data = data[['date','time','flow']]
    
    # clip the data from 2019-10-01
    start = data.loc[data['date'] == date].index.min()
    data = data.iloc[start:].reset_index(drop=True)
    return data

### Implement on 470 data

In [139]:
for detector in detector_470.siteId.tolist():
    
    data = pd.read_csv('data/' + detector + '&2019_10_01-2023_09_30.csv')    
    data = recover_lastupdate(data)
    fullData = merge_allTime(data, allTime)
    agr_hour = aggregate_data(fullData,4,'2019-10-01')

    agr_hour.to_csv('data_470_hourly/' + detector + '.csv', index = False)
    
data_all = detector_470[['siteId','lat','lon']]
data_all = data_all.rename(columns={'siteId': 'id', 'lat': 'latitude', 'lon': 'longitude'})
data_all.to_csv('data_470_hourly/locations.csv', index = False)