In [1]:
import time
import pandas as pd
import requests
import schedule

def query_api():
    response = requests.get("https://data.traffic.hereapi.com/v7/flow?in=circle:1.279683,103.809628;r=20830&locationReferencing=shape&apiKey=uAF0ydU9hJ_THCHkwvtSQR5FhTKEWjtX8XynT5rHoPk")
    data = response.json()["results"]
    return data

def save_data_to_csv(df, filename):
    with open(filename, 'a') as f:
        df.to_csv(f, header=f.tell() == 0, index=False)

**Pre-processing immediately when collecting because there are duplicates in the data. If we store the data without handling the duplicate data first, the size of the data will be around 3 times more.**

In [2]:
def preprocess_data(data, timestamp):
    raw_df = pd.DataFrame(data)

    #sorted_df["timestamp"] = timestamp
    
    raw_df = pd.concat([raw_df['location'].apply(pd.Series), raw_df['currentFlow'].apply(pd.Series)], axis=1)

    raw_df['start_lat'] = raw_df['shape'][0]['links'][0]['points'][0]['lat']
    raw_df['start_lon'] = raw_df['shape'][0]['links'][0]['points'][0]['lng']
    raw_df['end_lat'] = raw_df['shape'][0]['links'][0]['points'][-1]['lat']
    raw_df['end_lon'] = raw_df['shape'][0]['links'][0]['points'][-1]['lng']


    raw_df = raw_df.drop("shape", axis=1)


    columns_to_drop = ['subSegments', 'junctionTraversability', 'jamTendency']

    columns_to_drop = [col for col in columns_to_drop if col in raw_df.columns]

    raw_df = raw_df.drop(columns_to_drop, axis=1)


    open_df = raw_df.loc[raw_df['traversability'] == "open"].copy()

    open_df = open_df.drop("traversability", axis=1)

    open_df.dropna(inplace=True)

    df = open_df.copy()

    duplicates = df.duplicated(subset='description', keep=False)
    duplicate_df = df[duplicates]

    duplicate_df

    unique_df = df[~df.duplicated(subset='description', keep=False)]

    road_aggregated_data = {}

    for _, row in duplicate_df.iterrows():
        road_name = row['description']
        start_lat = row['start_lat']
        start_lon = row['start_lon']
        end_lat = row['end_lat']
        end_lon = row['end_lon']
        length = row['length']
        speed = row['speed']
        speed_uncapped = row['speedUncapped']
        free_flow = row['freeFlow']
        jam_factor = row['jamFactor']
        confidence = row['confidence']

        if road_name in road_aggregated_data:
            (
                cumulative_length,
                cumulative_weighted_speed,
                cumulative_weighted_speed_uncapped,
                cumulative_weighted_free_flow,
                cumulative_weighted_jam_factor
            ) = road_aggregated_data[road_name][4:9]

            cumulative_length += length
            cumulative_weighted_speed += length * speed
            cumulative_weighted_speed_uncapped += length * speed_uncapped
            cumulative_weighted_free_flow += length * free_flow
            cumulative_weighted_jam_factor += length * jam_factor

            road_aggregated_data[road_name] = (
                start_lat,
                start_lon,
                end_lat,
                end_lon,
                cumulative_length,
                cumulative_weighted_speed,
                cumulative_weighted_speed_uncapped,
                cumulative_weighted_free_flow,
                cumulative_weighted_jam_factor,
                confidence
            )
        else:
            road_aggregated_data[road_name] = (
                start_lat,
                start_lon,
                end_lat,
                end_lon,
                length,
                length * speed,
                length * speed_uncapped,
                length * free_flow,
                length * jam_factor,
                confidence
            )

    road_average_speeds = {}

    for road_name, (
        start_lat,
        start_lon,
        end_lat,
        end_lon,
        cumulative_length,
        cumulative_weighted_speed,
        cumulative_weighted_speed_uncapped,
        cumulative_weighted_free_flow,
        cumulative_weighted_jam_factor,
        confidence
    ) in road_aggregated_data.items():
        average_speed = cumulative_weighted_speed / cumulative_length
        average_speed_uncapped = cumulative_weighted_speed_uncapped / cumulative_length
        average_free_flow = cumulative_weighted_free_flow / cumulative_length
        average_weighted_jam_factor = cumulative_weighted_jam_factor / cumulative_length

        road_average_speeds[road_name] = (
            start_lat,
            start_lon,
            end_lat,
            end_lon,
            cumulative_length,
            average_speed,
            average_speed_uncapped,
            average_free_flow,
            average_weighted_jam_factor,
            confidence
        )

    result_df = pd.DataFrame(list(road_average_speeds.values()), columns=['start_lat', 'start_lon', 'end_lat', 'end_lon', 'length', 'speed', 'speedUncapped', 'freeFlow', 'jamFactor', 'confidence'])
    result_df.insert(0, 'description', list(road_average_speeds.keys()))
    clean_df = pd.concat([unique_df, result_df])
    clean_df = clean_df.drop("confidence", axis=1)
    sorted_df = clean_df.sort_values("description")
    sorted_df['road_id'] = pd.factorize(sorted_df['description'])[0] + 1
    
    sorted_df["timestamp"] = timestamp
    
    #result_df = sorted_df[["road_id", "speed", ]]
    
    return sorted_df

**Calculating the delay before the next 5 minute interval. We are doing this because the pre-processing takes some time, if we just simply add a delay of 300 seconds, the delay will be 300 + (pre-processing time) seconds, which if so the timestamp will not be in 5 minutes interval**

In [3]:
import schedule
import time

def run_pipeline():
    csv_filename = 'traffic_data.csv'
    duration = 60 * 60  # 1 hour in seconds, Change this to 24 * 60 * 60 for one day
    start_time = time.time()

    def job():
        # Query the API
        data = query_api()

        # Obtain the timestamp
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        
        print("Current time: ", timestamp)
        
        # Preprocess the data
        result_df = preprocess_data(data, timestamp)
        
        # Save the data to CSV
        save_data_to_csv(result_df, csv_filename)

        # Check if the duration limit has been reached
        elapsed_time = time.time() - start_time
        if elapsed_time >= duration:
            # Exit the pipeline
            schedule.clear()

    while True:
        # Get the current time
        current_time = time.localtime()
        next_interval = (current_time.tm_min + 5) // 5 * 5
        if next_interval >= 60:
            next_interval = 0
            next_hour = (current_time.tm_hour + 1) % 24  # Increase the hour by 1 and wrap around to 0 if it exceeds 23
        else:
            next_hour = current_time.tm_hour

        next_interval_time = time.struct_time((
            current_time.tm_year, current_time.tm_mon, current_time.tm_mday,
            next_hour, next_interval, 0, current_time.tm_wday,
            current_time.tm_yday, current_time.tm_isdst
        ))

        print("Next Interval Time:", time.strftime("%H:%M:%S", next_interval_time))

        # Calculate the delay until the next interval
        delay = time.mktime(next_interval_time) - time.mktime(current_time)
        #print(time.mktime(next_interval_time))
        #print(time.mktime(current_time))
        #print(delay)
        if delay < 0:
            # Calculate the remaining time until the next day's 5-minute interval
            remaining_minutes = 60 - current_time.tm_min
            remaining_seconds = 60 - current_time.tm_sec
            remaining_delay = ((remaining_minutes-1) * 60) + remaining_seconds

            # Calculate the delay until the next interval of the next day
            delay = remaining_delay + (24 * 60 * 60) - (60 * 60 * 24) + (next_interval * 60)

        print("Delay until Next Interval (in seconds):", delay)
        
        # Wait until the next interval
        time.sleep(delay)
        print("Delay completed. Moving to next interval.")
        
        # Run the job
        job()

        # Check if the duration limit has been reached
        elapsed_time = time.time() - start_time
        if elapsed_time >= duration:
            break

In [4]:
run_pipeline()

Next Interval Time: 13:55:00
Delay until Next Interval (in seconds): 69.0
Delay completed. Moving to next interval.
Current time:  2023-06-17 13:55:02
Next Interval Time: 14:00:00
Delay until Next Interval (in seconds): 295.0
Delay completed. Moving to next interval.
Current time:  2023-06-17 14:00:03
Next Interval Time: 14:05:00
Delay until Next Interval (in seconds): 294.0
Delay completed. Moving to next interval.
Current time:  2023-06-17 14:05:02
Next Interval Time: 14:10:00
Delay until Next Interval (in seconds): 295.0
Delay completed. Moving to next interval.
Current time:  2023-06-17 14:10:02
Next Interval Time: 14:15:00
Delay until Next Interval (in seconds): 295.0
Delay completed. Moving to next interval.
Current time:  2023-06-17 14:15:03
Next Interval Time: 14:20:00
Delay until Next Interval (in seconds): 295.0
Delay completed. Moving to next interval.
Current time:  2023-06-17 14:20:04
Next Interval Time: 14:25:00
Delay until Next Interval (in seconds): 293.0
Delay complete