# Data Join

Must generate segments with segments.ipynb first. This notebook joins our various data sources as discussed in Section 3 of the paper.

In [None]:
#import gtfs_functions as gtfs
#import geopandas as gpd
import pandas as pd
import os
import zipfile
import shutil
import datetime as dt
import time
from copy import deepcopy
import datetime
from shapely.geometry import Point, LineString, Polygon, asShape, mapping
import requests
from plotly import graph_objs as go
import numpy as np
from shapely.ops import cascaded_union, transform
from functools import partial
import pyproj
#import folium
import math
import requests
import concurrent.futures
import json
#import pymongo
import pytz 
#import dask.dataframe as dd
#from dask.distributed import Client
#from haversine import haversine, Unit
#import swifter

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
os.chdir("../")
print(f"Current working directory: {os.getcwd()}")

In [None]:
# Global Settings

JOIN_TOLERANCE = 2005
SEG_LOOKAHEAD = 2
BUFFER = 25
TRAFFIC_BUFFER = 200

## 1. Data Merge

Joins raw ViriCiti, Clever and Weather data. Need to first generate the GTFS segments in segments.ipynb.

Reads from: Mongo

Writes to: output/data-joins

In [None]:
CONFIG_PATH = os.path.join(os.getcwd(), "config", "config.json")
DB = 'data-class'
VIRICITI_COLLECTION = 'viriciti.one.sec'
CLEVER_COLLECTION = 'bustime.chattanooga'
WEATHER_COLLECTION = 'darksky.chattanooga'
TZ = pytz.timezone('EST')
HOUR = 3600

BYD_PARAMS = ["soc",
              "odo",
              "speed",
              "current",
              "voltage",
              "power",
              "gps"]


GILLIG_PARAMS = ["fuel",
                 "odo",
                 "speed",
                 "gps"]

VEHICLE_FLEETS = {"Diesel": [str(x) for x in range(101, 151)],
                  "Hybrid": [str(x) for x in range(501, 507)],
                  "Electric": [str(x) for x in range(751, 754)]}

VEHICLE_TYPES = {"Gillig LF 35' 2014": [str(x) for x in range(147, 151)],
                 "Gillig HF 2006": [str(x) for x in range(135, 145)],
                 "Gillig HF 35' 2002": [str(x) for x in range(111, 135)],
                 "Gillig HF 30' 1998": [str(x) for x in range(101, 111)],
                 "Gillig 35' Hybrids 2014": [str(x) for x in range(503, 507)],
                 "Gillig 37' Hybrids 2009": [str(x) for x in range(501, 503)],
                 "Gillig LF 37' 2009": [str(x) for x in range(145, 147)],
                 "BYD BEV": [str(x) for x in range(751, 754)]}


VALID_VEHICLE_TYPES = ["Gillig LF 35' 2014", "Gillig 35' Hybrids 2014", 'BYD BEV']
INVALID_ROUTES = ['DH', 'PI', 'PO', 'U']
DIESEL_PATH = os.path.join(os.getcwd(), 'output_r', 'data-joins', 'diesel')
HYBRID_PATH = os.path.join(os.getcwd(), 'output_r', 'data-joins', 'hybrid')
ELECTRIC_PATH = os.path.join(os.getcwd(), 'output_r', 'data-joins', 'electric')

In [None]:
def get_mongo_connection():
    with open(CONFIG_PATH) as file:
        config = json.load(file)

    mongo_url = "mongodb://{}:{}@{}:{}/?authSource={}".format(config["user"],
                                                              config["password"],
                                                              config["host"],
                                                              config["port"],
                                                              config["authenticationDatabase"])

    client = pymongo.MongoClient(mongo_url)
    return client


def timestamp_to_datetime(timestamp, millis=False):
    if millis is True:
        ts = timestamp / 1000
    else:
        ts = timestamp
    dt = datetime.datetime.fromtimestamp(ts, tz=TZ)
    return dt


def datestr_to_timestamp(datestr="2020/11/15", millis=False):
    # datestr in format month/day/year
    year, month, day = [int(x) for x in datestr.split("/")]
    ts = datetime.datetime(year=year, month=month, day=day, hour=0, tzinfo=TZ).timestamp()
    if millis is False:
        return int(ts)
    else:
        ts = ts * 1000
        return int(ts)
    
    
def datetimestr_to_timestamp(datestr="11/11/2020-08:00", millis=False):
    # datestr in format month/day/year
    d, t = datestr.split("-")
    month, day, year = [int(x) for x in d.split("/")]
    hour, minute = [int(x) for x in t.split(":")]
    ts = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=minute, tzinfo=TZ).timestamp()
    if millis is False:
        return int(ts)
    else:
        ts = ts * 1000
        return int(ts)
    
    
def query_trips_from_clever(client, start_timestamp, end_timestamp):
    start_time = int(start_timestamp / 1000)
    end_time = int(end_timestamp / 1000)
    match = {"$match": {"timestamp": {"$gte": start_time, "$lte": end_time}, 'rt': {"$nin": INVALID_ROUTES}}}

    pipeline = [match,
                {'$sort': {'clever.timestamp': 1}},
                {'$group': {'_id': {"month":
                                        {'$month':
                                             {"$dateFromString":
                                                  {'dateString': '$tmstmp',
                                                   'format': "%Y%m%d %H:%M"}}},
                                    "day":
                                        {'$dayOfMonth':
                                             {"$dateFromString":
                                                  {'dateString': '$tmstmp',
                                                   'format': "%Y%m%d %H:%M"}}},
                                    "year":
                                        {'$year':
                                             {"$dateFromString":
                                                  {'dateString': '$tmstmp',
                                                   'format': "%Y%m%d %H:%M"}}},
                                    "tripID": '$tatripid'},
                            'tID': {'$first': '$tatripid'},
                            'vID': {'$first': '$vid'},
                            'dCT': {'$sum': 1},
                            'sEP': {'$first': '$timestamp'},
                            'eEP': {'$last': '$timestamp'},
                            'sTM': {'$first': '$tmstmp'},
                            'sRT': {'$first': '$rt'},
                            'eTM': {'$last': '$tmstmp'}}},
                {'$replaceWith': {'clever_vid': '$vID',
                                  'trip_id': '$tID',
                                  'documents': '$dCT',
                                  'start_timestamp': '$sEP',
                                  'end_timestamp': '$eEP',
                                  'start_time': '$sTM',
                                  'end_time': '$eTM',
                                  'rt': '$sRT',
                                  'test': '$_id'}}]
    result = list(client[DB][CLEVER_COLLECTION].aggregate(pipeline, allowDiskUse=True))
    trips = pd.DataFrame(result)
    if len(trips) > 0:
        trips['start_datetime'] = trips['start_timestamp'].apply(lambda x: timestamp_to_datetime(x, millis=False))
        trips['datetime'] = trips['start_timestamp'].apply(lambda x: timestamp_to_datetime(x, millis=False))
        trips['end_datetime'] = trips['end_timestamp'].apply(lambda x: timestamp_to_datetime(x, millis=False))
        trips['date'] = trips['start_datetime'].apply(lambda x: x.strftime("%Y/%m/%d"))
        trips['hour'] = trips['start_datetime'].apply(lambda x: int(x.strftime("%H")))
        trips['date-hour'] = trips.apply(lambda row: f"{row['date']}-{int(row['hour'])}", axis=1)
        trips['fleet'] = trips['clever_vid'].apply(lambda x: get_fleet(x))
        trips['vehicle_type'] = trips['clever_vid'].apply(lambda x: get_vehicle_type(x))
        trips['start_timestamp'] = trips['start_timestamp'].apply(lambda x: x*1000)
        trips['end_timestamp'] = trips['end_timestamp'].apply(lambda x: x*1000)
        trips = trips.drop(['test', 'start_time', 'end_time'], axis=1)
        trips = trips.drop_duplicates()
        return trips
    else:
        return None


def get_fleet(x):
    for k, v in VEHICLE_FLEETS.items():
        if x in v:
            return k
    return None


def get_vehicle_type(x):
    for k, v in VEHICLE_TYPES.items():
        if x in v:
            return k
    return None


def fleet_to_parameters(fleet):
    if fleet == 'Electric':
        params_to_check = BYD_PARAMS
    elif fleet == 'Diesel':
        params_to_check = GILLIG_PARAMS
    elif fleet == 'Hybrid':
        params_to_check = GILLIG_PARAMS
    else:
        params_to_check = None
    return params_to_check


def original_query_viriciti(client, lower, upper, vid):
    query = {"clever_vid": vid, 'time': {'$gte': lower, '$lte': upper}}
    result = client[DB][VIRICITI_COLLECTION].find(query)
    df = pd.DataFrame(result)
    return df


def query_viriciti(client, lower, upper, vid_list):
    labels = ['analyses.fuel_used', 'gps', 'power']
    #query = {'time': {'$gte': lower, '$lte': upper}, "clever_vid": {"$in": vid_list}}
    query = {"clever_vid": {"$in": vid_list}, "label": {"$in": labels}, 'time': {'$lte': upper, '$gte': lower}}
    result = client[DB][VIRICITI_COLLECTION].find(query)
    df = pd.DataFrame(result)
    return df


def query_darksky(client, lower, upper):
    match = {"$match": {"time": {"$gte": lower-HOUR, "$lte": upper+HOUR}}}
    result = list(client[DB][WEATHER_COLLECTION].aggregate([match]))
    df = pd.DataFrame(result)
    df['time'] = df['time'].apply(lambda x: x*1000)
    return df


def join_weather(df, st_timestamp, et_timestamp):
    df_weather = query_darksky(client, st_timestamp/1000, et_timestamp/1000)
    df_weather = df_weather.sort_values(by=['time'])
    df_result = df.sort_values(by=['time'])
    df_result = pd.merge_asof(df_result, df_weather, on='time', suffixes=('', '_darksky'), direction='nearest')
    df_result.drop(columns=['_id', 'geometry', 'timezone', 'agency', 'icon'])
    return df_result


def get_label(df, label, base):
    # extract label and perform label-name pivot        
    df = df[df.label == label].sort_values(by=['time'])
    df = df.drop(columns=['label']).rename(columns={"value": label})
        
    # drop columns for non-base labels
    if not base:
        df = df.drop(columns=['vehicle_type', 'trip_id', 'rt', 'vehicle_fleet', 'time_string'])
    return df


def format_df_diesel_hybrid(df, tolerance=None, remove_nan_targets=False):
    #df_left = get_label(df, 'gps', True)
    #df_right = get_label(df, 'analyses.fuel_used', False)
    df_left = get_label(df, 'analyses.fuel_used', True)
    df_right = get_label(df, 'gps', False)
    df_left  = pd.merge_asof(df_left, df_right, on='time', by='clever_vid', suffixes=('', '_gps'), direction='nearest', tolerance=tolerance)
    if remove_nan_targets:
        global NUM_SAMPLES
        NUM_SAMPLES += len(df_left)
        df_left = df_left.dropna(subset=['analyses.fuel_used', 'gps'])
        global NUM_SAMPLES_MAPPED
        NUM_SAMPLES_MAPPED += len(df_left)

    return df_left


def format_df_electric(df, tolerance=None, remove_nan_targets=False):
    df_left = get_label(df, 'power', True)
    df_right = get_label(df, 'gps', False)
    df_left  = pd.merge_asof(df_left, df_right, on='time', by='clever_vid', suffixes=('', '_gps'), direction='nearest', tolerance=tolerance)
    if remove_nan_targets:
        global NUM_SAMPLES
        NUM_SAMPLES += len(df_left)
        df_left = df_left.dropna(subset=['power', 'gps'])
        global NUM_SAMPLES_MAPPED
        NUM_SAMPLES_MAPPED += len(df_left)
    return df_left

In [None]:
def main_run_day(st_datestr, et_datestr, tolerance=JOIN_TOLERANCE):
    print(f"starting {st_datestr} to {et_datestr}")
    start_time = time.time()
    st_timestamp = datestr_to_timestamp(st_datestr, millis=True)
    et_timestamp = datestr_to_timestamp(et_datestr, millis=True)
    
    # get trips from clever and unique vid in time window
    df_clever = query_trips_from_clever(client, st_timestamp, et_timestamp)
    if df_clever is None:
        return None
    df_clever = df_clever[df_clever['vehicle_type'].isin(VALID_VEHICLE_TYPES)]
    vid_uniq = list(df_clever['clever_vid'].unique())

    df_viriciti = query_viriciti(client, st_timestamp, et_timestamp, vid_uniq)
    #df_viriciti = df_viriciti[df_viriciti['label'].isin(['analyses.fuel_used', 'gps', 'odo', 'speed', 'current', 'soc', 'voltage', 'power'])]
    if len(df_viriciti) == 0:
        return None
    df_viriciti = df_viriciti[df_viriciti['label'].isin(['analyses.fuel_used', 'gps', 'power'])]
    col_keep = list(df_viriciti.columns) + ['trip_id', 'rt']
    
    # process keys for cross join 
    df_clever = df_clever[['clever_vid', 'trip_id', 'start_timestamp', 'end_timestamp', 'rt']]
    df_clever   = df_clever.assign(key=1)
    df_viriciti = df_viriciti.assign(key=1)

    # perform cross join on matching vid and query rows matching time criteria
    df_merge = pd.merge(df_viriciti, df_clever, on = ['key', 'clever_vid'], suffixes=('','_clever')).drop('key',axis=1).query('time >= start_timestamp and time <= end_timestamp')
    #df_merge = df_merge.query('time >= start_timestamp and time <= end_timestamp')

    # get and drop unessential columns
    col_drop = [c for c in list(df_merge.columns) if c not in col_keep]
    df_merge = df_merge.drop(columns=col_drop)
    
    # add vehicle_fleet and vehicle_type
    df_merge['vehicle_fleet'] = df_merge['clever_vid'].apply(lambda x: get_fleet(x))
    df_merge['vehicle_type'] = df_merge['clever_vid'].apply(lambda x: get_vehicle_type(x))
    df_merge = df_merge[['time', 'value', 'label', 'vehicle_type', 'vehicle_fleet', 'time_string', 'clever_vid', 'trip_id', 'rt']]
    
    # break into diesel, hybrid and electric
    df_diesel = df_merge[df_merge['vehicle_fleet']=='Diesel']
    df_hybrid = df_merge[df_merge['vehicle_fleet']=='Hybrid']
    df_electric = df_merge[df_merge['vehicle_fleet']=='Electric']
    #print(f"Number of diesel: {len(df_diesel)}, number of hybrid: {len(df_hybrid)}, number of electric: {len(df_electric)}")
    
    # diesel
    try:
        if len(df_diesel) > 0:
            df = format_df_diesel_hybrid(df_diesel, tolerance=tolerance, remove_nan_targets=True)
            if len(df_diesel) > 0:
                df = join_weather(df, st_timestamp, et_timestamp)
                temp = st_datestr.replace('/', '-')
                out_path = os.path.join(DIESEL_PATH, f"{temp}.csv")
                df.to_csv(out_path, index=False)
    except:
        pass
            
    # hybrid
    try:
        if len(df_hybrid) > 0:
            df = format_df_diesel_hybrid(df_hybrid, tolerance=tolerance, remove_nan_targets=True)
            if len(df_hybrid) > 0:
                df = join_weather(df, st_timestamp, et_timestamp)
                temp = st_datestr.replace('/', '-')
                out_path = os.path.join(HYBRID_PATH, f"{temp}.csv")
                df.to_csv(out_path, index=False)
    except:
        pass
    
    # electric
    try:
        if len(df_electric) > 0:
            df = format_df_electric(df_electric, tolerance=tolerance, remove_nan_targets=True)
            if len(df_electric) > 0:
                df = join_weather(df, st_timestamp, et_timestamp)
                temp = st_datestr.replace('/', '-')
                out_path = os.path.join(ELECTRIC_PATH, f"{temp}.csv")
                df.to_csv(out_path, index=False)
    except:
        pass
            
    
    end_time = time.time() - start_time
    #print(f"Took {end_time} seconds to run day {temp}")
    #print("......")
    return None

In [None]:
client = get_mongo_connection()

In [None]:
ST_DATESTR = "2020/01/24"
#ET_DATESTR = "2020/02/01"
ET_DATESTR = "2020/07/01"
NUM_SAMPLES = 0
NUM_SAMPLES_MAPPED = 0

date_range = pd.date_range(start=ST_DATESTR, end=ET_DATESTR)
for i in range(len(date_range)-1):
    st_datestr = date_range[i].isoformat().split('T')[0].replace('-', '/')
    et_datestr = date_range[i+1].isoformat().split('T')[0].replace('-', '/')
    t = main_run_day(st_datestr, et_datestr)
print(NUM_SAMPLES, NUM_SAMPLES_MAPPED)
print(NUM_SAMPLES_MAPPED / NUM_SAMPLES)

In [None]:
client.close()

## 2. Generate Trajectories and Training Samples

Reads from: output/data-joins

Writes to: output/samples, output/traj

In [None]:
DIESEL_PATH = os.path.join(os.getcwd(), 'output_r', 'data-joins', 'diesel')
HYBRID_PATH = os.path.join(os.getcwd(), 'output_r', 'data-joins', 'hybrid')
ELECTRIC_PATH = os.path.join(os.getcwd(), 'output_r', 'data-joins', 'electric')
IMAGES_DIR = os.path.join(os.getcwd(), 'output_r', 'images')
SEGMENTS_PATH = os.path.join(os.getcwd(), 'output_r', 'segments', 'segments.pkl')

SAMPLES_DIR = os.path.join(os.getcwd(), 'output_r', 'samples')
TRAJ_DIR = os.path.join(os.getcwd(), 'output_r', 'traj')

TZ = pytz.timezone('EST')
TIMEZONE_STR = "America/New_York"

In [None]:
def get_point(row, pref):
    return Point(row[f"{pref}_stop_lon"], row[f"{pref}_stop_lat"])


def format_segments(df, add_stop_points=True, trip_id_format=False):
    df = df.set_geometry('geometry')
    df['trip_id'] = df['trip_id'].astype(int)
    df['segment_seq'] = df['segment_seq'].astype(int)
    df['start_stop_id'] = df['start_stop_id'].astype(int)
    df['end_stop_id'] = df['end_stop_id'].astype(int)
    df['direction_id'] = df['direction_id'].astype(int)
    df['route_id'] = df['route_id'].astype(str)
    df['distance_btw_stops'] = df['distance_btw_stops'].astype(float)
    df['start_stop_name'] = df['start_stop_name'].astype(str)
    df['end_stop_name'] = df['end_stop_name'].astype(str)
    df['distance_m'] = df['distance_m'].astype(float)
    df['gtfs_start_date'] = df['gtfs_start_date'].apply(lambda x: datetime.date.fromisoformat(x))
    df['gtfs_end_date'] = df['gtfs_end_date'].apply(lambda x: datetime.date.fromisoformat(x))
    df['segment_id'] = df['segment_id'].astype(str)
    
    df['XDSegID'] = df['XDSegID'].apply(lambda x: apply_format_list(x, col_type='int'))
    df['osm_ways'] = df['osm_ways'].apply(lambda x: apply_format_list(x, col_type='int'))
    df['tmc_id'] = df['tmc_id'].apply(lambda x: apply_format_list(x, col_type='string'))
    df['osm_way_fclasses'] = df['osm_way_fclasses'].apply(lambda x: apply_format_list(x, col_type='string'))
    df['elevation_list'] = df['elevation_list'].apply(lambda x: apply_format_list(x, col_type='float'))
    
    if add_stop_points:
        df['start_stop_geometry'] = df.apply(lambda row: get_point(row, 'start'), axis=1)
        df['end_stop_geometry'] = df.apply(lambda row: get_point(row, 'end'), axis=1)
        df = df.drop(columns=['start_stop_lon', 'start_stop_lat', 'end_stop_lat', 'end_stop_lon'])
    else:
        df['start_stop_lon'] = df['start_stop_lon'].astype(float)
        df['end_stop_lon'] = df['end_stop_lon'].astype(float)
        df['start_stop_lat'] = df['start_stop_lat'].astype(float)
        df['end_stop_lat'] = df['end_stop_lat'].astype(float)
    if trip_id_format:
        df['trip_id'] = df['trip_id'].apply(lambda x: int(str(int(x))[0:-3]))
    return df


def timestamp_to_datetime(timestamp, millis=False):
    if millis is True:
        ts = timestamp / 1000
    else:
        ts = timestamp
    dt = datetime.datetime.fromtimestamp(ts, tz=TZ)
    return dt


def format_gps(x):
    lat, lon = x.split("|")
    return Point(float(lon), float(lat))


def load_data_gdf(dir_path, trip_id_format=False):
    result = []
    files = os.listdir(dir_path)
    files.sort()
    for file in files:
        if file.endswith(".csv"):
            temp = pd.read_csv(os.path.join(dir_path, file), low_memory=False)
            result.append(temp.copy(deep=True))
            #print(f"loaded: {file}")
    df = pd.concat(result, ignore_index=True)
    df = gpd.GeoDataFrame(df)
    df['datetime'] = df['time'].apply(lambda x: timestamp_to_datetime(x, millis=True))
    df['timestamp_ms'] = df['time']
    df['date'] = df['datetime'].apply(lambda x: x.date())
    df['geometry'] = df['gps'].apply(lambda x: format_gps(x))
    if trip_id_format:
        df['trip_id'] = df['trip_id'].apply(lambda x: str(int(x))[0:-3])
    df = df.drop(columns=['time', 'gps'])
    df = df.set_geometry('geometry')
    df = df.set_crs('EPSG:4326')
    return df


def remove_trips_with_duplicates(df):
    print(f"Number of unique trips: {len(df['trip_id'].unique())}")
    trips_to_keep = []
    for trip_id in df['trip_id'].unique():
        temp = df[df['trip_id']==trip_id]
        if len(temp['start_stop_id'].unique()) == len(temp):
            trips_to_keep.append(trip_id)
    result = df[df['trip_id'].isin(trips_to_keep)]
    print(f"Number of trips without duplicate stops: {len(result['trip_id'].unique())}")
    return df[df['trip_id'].isin(trips_to_keep)]
    
    
def get_mongo_connection():
    with open(CONFIG_PATH) as file:
        config = json.load(file)

    mongo_url = "mongodb://{}:{}@{}:{}/?authSource={}".format(config["user"],
                                                              config["password"],
                                                              config["host"],
                                                              config["port"],
                                                              config["authenticationDatabase"])

    client = pymongo.MongoClient(mongo_url)
    return client

project = partial(
    pyproj.transform,
    pyproj.Proj('EPSG:4326'),
    pyproj.Proj('EPSG:32616'))
    
def project_linestring(line1, project):
    line1_inv = LineString([(x[1], x[0]) for x in list(line1.coords)])
    return transform(project, line1_inv)


def project_point(p, project):
    p_inv = Point((p.y, p.x))
    return transform(project, p_inv)



def get_all_trips(df):
    trips = df.groupby(['trip_id','clever_vid', 'date']).size().reset_index().rename(columns={0:'count'}).sort_values(by=['date'])
    return trips[['trip_id', 'clever_vid', 'date']]


def get_data_for_trip(df, trip_id, clever_vid, date):
    return df[(df['trip_id']==trip_id) & (df['clever_vid']==clever_vid) & (df['date']==date)].sort_values(by=['timestamp_ms'])


def get_segments_for_trip(df_seg, trip_id, date_obj):
    return df_seg[(df_seg['trip_id']==trip_id) & (df_seg['gtfs_start_date'] <= date_obj) & (df_seg['gtfs_end_date'] > date_obj)].sort_values('segment_seq')


def check_sequences(df_trip_seg):
    l = df_trip_seg['segment_seq'].tolist()
    for i in range(len(l)):
        if l[i] != i:
            return False
    return True


def mapmatch(df_trip_seg, df_trip_traj, seg_lookahead=SEG_LOOKAHEAD, buffer=BUFFER):
    df_trip_traj = df_trip_traj.sort_values(by=['timestamp_ms'])
    df_trip_seg = df_trip_seg.sort_values(by=['segment_seq'])
    
    if check_sequences(df_trip_seg) is False:
        print('issue')
        return None
    
    if 'geometry_proj' not in df_trip_seg.columns:
        df_trip_seg['geometry_proj'] = df_trip_seg['geometry'].apply(lambda x: project_linestring(x, project))
    if 'geometry_proj' not in df_trip_traj.columns:
        df_trip_traj['geometry_proj'] = df_trip_traj['geometry'].apply(lambda x: project_point(x, project))
        
    result_list = []
    cur_seg_seq = 0
    for traj_index in range(len(df_trip_traj)):
        dist_to_segments = []
        for i in range(cur_seg_seq, cur_seg_seq + seg_lookahead):
            try:
                dist_to_segment = df_trip_seg.iloc[i]['geometry_proj'].distance(df_trip_traj.iloc[traj_index]['geometry_proj'])
                dist_to_segments.append(dist_to_segment)
            except:
                pass
        best_dist = 100000000
        best_new = None
        for j in range(len(dist_to_segments)):
            if dist_to_segments[j] < best_dist:
                best_dist = dist_to_segments[j]
                best_new = j
        if best_dist <= buffer:
            cur_seg_seq += best_new
            result_list.append(cur_seg_seq)
        else:
            result_list.append(None)
    df_trip_traj['segment_seq'] = result_list
    df_trip_traj = df_trip_traj.dropna(subset=['segment_seq'])
    df_trip_traj['segment_seq'] = df_trip_traj['segment_seq'].astype(int)
    df_trip_seg['segment_seq'] = df_trip_seg['segment_seq'].astype(int)

    df_result_final = pd.merge(left=df_trip_traj, right=df_trip_seg, how='left', on='segment_seq', validate='many_to_one')
    df_result_final['gps_geometry'] = df_result_final['geometry_x']
    df_result_final['gps_geometry_proj'] = df_result_final['geometry_proj_x']
    df_result_final['trip_id'] = df_result_final['trip_id_x']
    df_result_final = df_result_final.drop(columns=['trip_id_x', 'trip_id_y', 'geometry_proj_x', 'geometry_proj_y', 'geometry_x', 'geometry_y'])
    return df_result_final


def generate_training_samples(df, vehicle_type='diesel'):
    result_list = []
    unique_segments = df['segment_id'].unique()
    for segment_id in unique_segments:
        df_temp = df[df['segment_id']==segment_id].sort_values(by=['timestamp_ms'])
        if len(df_temp) > 1:
            first_reading = df_temp.iloc[0]
            reading = first_reading.to_dict()
            last_reading = df_temp.iloc[-1]
            reading['actual_start_elevation'] = request_elevation(first_reading['gps_geometry'].y, first_reading['gps_geometry'].x)
            reading['actual_end_elevation'] = request_elevation(last_reading['gps_geometry'].y, last_reading['gps_geometry'].x)
            reading['time_diff_ms'] = last_reading['timestamp_ms'] - reading['timestamp_ms']
            reading['gps_traj_proj'] = LineString(df_temp['gps_geometry_proj'].tolist())
            reading['gps_traj'] = LineString(df_temp['gps_geometry'].tolist())
            reading['true_distance_travelled_m'] = reading['gps_traj_proj'].length
            reading['number_of_trajectories'] = len(df_temp)
            
            if vehicle_type == 'electric':
                energy_consumed = 0
                for i in range(1, len(df_temp)):
                    energy_consumed += df_temp.iloc[i]['power'] * ((df_temp.iloc[i]['timestamp_ms'] / 1000) - (df_temp.iloc[i-1]['timestamp_ms'] / 1000))
                reading['energy_consumed_kwh'] = energy_consumed / 3600.0
            else:
                reading['fuel_diff_l'] = last_reading['analyses.fuel_used'] - reading['analyses.fuel_used']
            
            result_list.append(reading)
    df_result = pd.DataFrame(result_list)
    if len(df_result) > 0:
        if vehicle_type == 'electric':
            df_result = df_result.drop(columns=['_id', 'power', 'timezone', 'agency', 'summary', 'icon'])
        else:
            df_result = df_result.drop(columns=['_id', 'analyses.fuel_used', 'timezone', 'agency', 'summary', 'icon'])
        return df_result
    else:
        return None


def sample_to_traj(df_mapping, trip_id, date_obj, clever_vid, segment_seq):
    return df_mapping[(df_mapping['trip_id']==trip_id) & (df_mapping['date']==date_obj) & (df_mapping['clever_vid']==clever_vid) & (df_mapping['segment_seq']==segment_seq)].sort_values(by=['timestamp_ms'])
    
    
def trip_to_samples(df_samples, trip_id, date_obj, clever_vid):
    return df_samples[(df_samples['trip_id']==trip_id) & (df_samples['date']==date_obj) & (df_samples['clever_vid']==clever_vid)].sort_values(by=['segment_seq'])


def trip_to_traj(df_mapping, trip_id, date_obj, clever_vid):
    return df_mapping[(df_mapping['trip_id']==trip_id) & (df_mapping['date']==date_obj) & (df_mapping['clever_vid']==clever_vid)].sort_values(by=['timestamp_ms'])


def request_elevation(lat,
                      lon,
                      units='Meters',
                      max_tries=10,
                      sec_btw_tries=1):
    usgs_url = r'https://nationalmap.gov/epqs/pqs.php?'
    usgs_params = {'output': 'json', 'x': lon, 'y': lat, 'units': units}
    for i in range(max_tries):
        try:
            usgs_request = requests.get(url=usgs_url,
                                        params=usgs_params)
            elevation = float(usgs_request.json()['USGS_Elevation_Point_Query_Service']['Elevation_Query']['Elevation'])
            break
        except Exception as e:
            print(e)
            elevation = None
            time.sleep(sec_btw_tries)
    return elevation


def apply_format_list(x, col_type='int'):
    if isinstance(x, str):
        x = x.replace("[", "")
        x = x.replace("]", "")
        x = x.replace(" ", "")
        x = x.replace("'", "")
        if col_type == 'int':
            return [int(y) for y in x.split(",")]
        elif col_type == 'float':
            return [float(y) for y in x.split(",")]
        elif col_type == "string":
            return [str(y) for y in x.split(",")]
        else:
            return [y for y in x.split(",")]
            
    else:
        return x

In [None]:
def load_data_joined(file_path, trip_id_format=False):
    df = pd.read_csv(file_path, low_memory=False)
    df = gpd.GeoDataFrame(df)
    df['datetime'] = df['time'].apply(lambda x: timestamp_to_datetime(x, millis=True))
    df['timestamp_ms'] = df['time']
    df['date'] = df['datetime'].apply(lambda x: x.date())
    df['geometry'] = df['gps'].apply(lambda x: format_gps(x))
    if trip_id_format:
        df['trip_id'] = df['trip_id'].apply(lambda x: str(int(x))[0:-3])
    df = df.drop(columns=['time', 'gps'])
    df = df.set_geometry('geometry')
    df = df.set_crs('EPSG:4326')
    return df


def process_all_trips(df, df_seg, vehicle_type="diesel"):
    trips = get_all_trips(df)

    results_mapmatch = []
    results_training_samples = []
    start_time = time.time()
    for k, trip in trips.iterrows():
        df_trip_traj = get_data_for_trip(df, trip['trip_id'], trip['clever_vid'], trip['date'])
        df_trip_seg = get_segments_for_trip(df_seg, trip['trip_id'], trip['date'])
        if (len(df_trip_traj) > 3) & (len(df_trip_seg) > 3):
            df_test = mapmatch(df_trip_seg, df_trip_traj)
            if len(df_test) > 0:
                results_mapmatch.append(df_test)
                df_result = generate_training_samples(df_test, vehicle_type=vehicle_type)
                if df_result is not None:
                    if len(df_result) > 0:
                        results_training_samples.append(df_result)
    try:
        df_samples = pd.concat(results_training_samples, ignore_index=True)
        df_samples = df_samples.drop(columns=['time_string', 'rt', 'nearest_storm_distance', 'dew_point', 'wind_bearing', 'cloud_cover', 'uv_index', 'ozone', 'distance_m', 'gtfs_start_date', 'gtfs_end_date', 'elevation_list', 'gps_geometry', 'gps_geometry_proj'])
        df_samples['speed_meters_per_second'] = df_samples.apply(lambda row: row['true_distance_travelled_m'] / (row['time_diff_ms']/1000), axis=1)
        df_traj = pd.concat(results_mapmatch, ignore_index=True)
        return df_samples, df_traj
    except:
        return None, None


def process_file(file):
    start_time = time.time()
    #print(f"Starting to process {file}")
        
    file_path = os.path.join(dir_path, file)
    df = load_data_joined(file_path)
    df['geometry_proj'] = df.to_crs('EPSG:32616')['geometry']
    df_samples, df_traj = process_all_trips(df, DF_SEG.copy(deep=True), vehicle_type=vehicle_type)
    if (df_samples is not None) & (df_traj is not None):
        file_name_new = file.split(".")[0] + ".pkl"
        out_path = os.path.join(SAMPLES_DIR, vehicle_type, file_name_new)
        df_samples.to_pickle(out_path)
        out_path = os.path.join(TRAJ_DIR, vehicle_type, file_name_new)
        df_traj.to_pickle(out_path)
        
    end_time = time.time() - start_time
    #print(f"Done processing {file} in {end_time} seconds")
    return file

In [None]:
DF_SEG = pd.read_pickle(SEGMENTS_PATH)
DF_SEG = format_segments(DF_SEG, add_stop_points=True, trip_id_format=True)
DF_SEG = remove_trips_with_duplicates(DF_SEG)
DF_SEG['geometry_proj'] = DF_SEG.to_crs('EPSG:32616')['geometry']
DF_SEG.head(1)

In [None]:
vehicle_type = "hybrid"
dir_path = HYBRID_PATH
files = os.listdir(dir_path)
files.sort()

with concurrent.futures.ProcessPoolExecutor(max_workers=30) as executor:
    futures = []
    for file in files:
        if file.endswith(".csv"):
            futures.append(executor.submit(process_file, file))
    for future in concurrent.futures.as_completed(futures):
        r = future.result()
        #print(r)
        
print("DONE!!!")

In [None]:
vehicle_type = "diesel"
dir_path = DIESEL_PATH
files = os.listdir(dir_path)
files.sort()

with concurrent.futures.ProcessPoolExecutor(max_workers=30) as executor:
    futures = []
    for file in files:
        if file.endswith(".csv"):
            futures.append(executor.submit(process_file, file))
    for future in concurrent.futures.as_completed(futures):
        r = future.result()
        #print(r)
        
print("DONE!!!")

In [None]:
vehicle_type = "electric"
dir_path = ELECTRIC_PATH
files = os.listdir(dir_path)
files.sort()

with concurrent.futures.ProcessPoolExecutor(max_workers=30) as executor:
    futures = []
    for file in files:
        if file.endswith(".csv"):
            futures.append(executor.submit(process_file, file))
    for future in concurrent.futures.as_completed(futures):
        r = future.result()
        #print(r)
        
print("DONE!!!")

## 3. Add Traffic

Reads from: output/samples

Writes to: output/samples_with_traffic

In [None]:
SAMPLES_DIR = os.path.join(os.getcwd(), 'output_r', 'samples')
SAMPLES_WITH_TRAFFIC_DIR = os.path.join(os.getcwd(), 'output_r', 'samples_with_traffic')

TZ = pytz.timezone('EST')
TIMEZONE_STR = "America/New_York"

In [None]:
def get_mongo_connection():
    with open(CONFIG_PATH) as file:
        config = json.load(file)

    mongo_url = "mongodb://{}:{}@{}:{}/?authSource={}".format(config["user"],
                                                              config["password"],
                                                              config["host"],
                                                              config["port"],
                                                              config["authenticationDatabase"])

    client = pymongo.MongoClient(mongo_url)
    return client


def read_samples(file_path):
    return pd.read_pickle(file_path)


def speed_ratio(row):
    try:
        result = row['SU'] / row['FF']
    except Exception as e:
        result = np.nan
    return result


def query_ave_here_traffic(client, tmc_ids, timestamp_start, timestamp_end, db="data-class", collection="here.chattanooga"):
    query = {'tmc_id': {"$in": tmc_ids}, "time": {"$lte": timestamp_end, "$gte": timestamp_start}}
    traffic = pd.DataFrame(list(client[db][collection].find(query)))
    if len(traffic) >= 1:
        traffic['SR'] = traffic.apply(lambda row: speed_ratio(row), axis=1)
        return traffic['SR'].mean(), traffic['JF'].mean()
    else:
        return np.nan, np.nan
    
def process_file_traffic(file):
    client = get_mongo_connection()
    start_time = time.time()
    file_path = os.path.join(SAMPLES_DIR, vehicle_type, file)
    df = pd.read_pickle(file_path)
    
    sr_nearest_list = []
    jf_nearest_list = []
    sr_trip_list = []
    jf_trip_list = []
    
    for k, v in df.iterrows():
        timestamp_start = int(v['timestamp_ms'] / 1000) - TRAFFIC_BUFFER
        timestamp_end = timestamp_start + 2 * TRAFFIC_BUFFER
        if isinstance(v['tmc_id'], list):
            sr_nearest, jf_nearest = query_ave_here_traffic(client, v['tmc_id'], timestamp_start, timestamp_end)
        else:
            sr_nearest = np.nan
            jf_nearest = np.nan
        df_temp = df[(df['trip_id']==v['trip_id']) & (df['clever_vid']==v['clever_vid'])]
        tmc_ids = []
        for kk, vv in df_temp.iterrows():
            if isinstance(vv['tmc_id'], list):
                tmc_ids += vv['tmc_id']
        if len(tmc_ids) > 1:
            sr_trip, jf_trip = query_ave_here_traffic(client, tmc_ids, timestamp_start, timestamp_end)
        else:
            sr_trip = np.nan
            jf_trip = np.nan
        sr_nearest_list.append(sr_nearest)
        jf_nearest_list.append(jf_nearest)
        sr_trip_list.append(sr_trip)
        jf_trip_list.append(jf_trip)
        
    df['sr_ave_segment'] = sr_nearest_list
    df['jf_ave_segment'] = jf_nearest_list
    df['sr_ave_trip'] = sr_trip_list
    df['jf_ave_trip'] = jf_trip_list
    
    out_path = os.path.join(SAMPLES_WITH_TRAFFIC_DIR, vehicle_type, file)
    df.to_pickle(out_path)
    
    end_time = time.time() - start_time
    #print(f"Done processing {file} in {end_time} seconds")
    client.close()
    return file

In [None]:
client = get_mongo_connection()

In [None]:
vehicle_type = 'hybrid'

files = os.listdir(os.path.join(SAMPLES_DIR, vehicle_type))
files.sort()
        
with concurrent.futures.ProcessPoolExecutor(max_workers=30) as executor:
    futures = []
    for file in files:
        if file.endswith(".pkl"):
            futures.append(executor.submit(process_file_traffic, file))
    for future in concurrent.futures.as_completed(futures):
        r = future.result()
        #print(r)
        
print("DONE!!!")

In [None]:
vehicle_type = 'diesel'

files = os.listdir(os.path.join(SAMPLES_DIR, vehicle_type))
files.sort()
        
with concurrent.futures.ProcessPoolExecutor(max_workers=30) as executor:
    futures = []
    for file in files:
        if file.endswith(".pkl"):
            futures.append(executor.submit(process_file_traffic, file))
    for future in concurrent.futures.as_completed(futures):
        r = future.result()
        #print(r)
        
print("DONE!!!")

In [None]:
vehicle_type = 'electric'

files = os.listdir(os.path.join(SAMPLES_DIR, vehicle_type))
files.sort()
        
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
    futures = []
    for file in files:
        if file.endswith(".pkl"):
            futures.append(executor.submit(process_file_traffic, file))
    for future in concurrent.futures.as_completed(futures):
        r = future.result()
        #print(r)
        
print("DONE!!!")

In [None]:
client.close()

## 4. Generate Training Set - reformat

In [None]:
SAMPLES_WITH_TRAFFIC_DIR = os.path.join(os.getcwd(), 'output_r', 'samples_with_traffic')
TRAINING_DIR = os.path.join(os.getcwd(), 'output_r', 'training')

TZ = pytz.timezone('EST')
TIMEZONE_STR = "America/New_York"

In [None]:
def get_training_set(vehicle_type, add_date=False):
    files = os.listdir(os.path.join(SAMPLES_WITH_TRAFFIC_DIR, vehicle_type))
    files.sort()
    
    df_list = []
    for file in files:
        if file.endswith(".pkl"):
            file_path = os.path.join(SAMPLES_WITH_TRAFFIC_DIR, vehicle_type, file)
            df = read_samples(file_path)
            if add_date:
                df['date'] = file.split(".")[0]
            df_list.append(df)

    df = pd.concat(df_list, ignore_index=True)
    print(len(df))
    df = df.drop(['precipitation_probability', 'apparent_temperature', 'start_stop_id', 'end_stop_id', 'shape_id', 'direction_id', 'start_stop_name', 'end_stop_name', 'start_stop_geometry', 'end_stop_geometry', 'gps_traj_proj', 'gps_traj'], axis=1)
    df['time_diff_s'] = df['time_diff_ms'].apply(lambda x: int(x/1000))
    df['timestamp_s'] = df['timestamp_ms'].apply(lambda x: int(x/1000))
    df['actual_elevation_change'] = df.apply(lambda row: row['actual_end_elevation'] - row['actual_start_elevation'], axis=1)
    return df


def one_hot_encode_roadway(df):
    unique_fclasses = set()
    for k, v in df.iterrows():
        if isinstance(v['osm_way_fclasses'], list):
            for x in v['osm_way_fclasses']:
                if x == '':
                    pass
                else:
                    unique_fclasses.add(x)

    unique_fclasses = list(unique_fclasses)
    print(unique_fclasses)
    result = {key: [] for key in unique_fclasses}

    for k, v in df.iterrows():
        if isinstance(v['osm_way_fclasses'], list):
            for fclass in unique_fclasses:
                if fclass in v['osm_way_fclasses']:
                    result[fclass].append(1)
                else:
                    result[fclass].append(0)
        else:
            for fclass in unique_fclasses:
                result[fclass].append(0)

    for k, v in result.items():
        df[k] = v

    df['distance_travelled_m'] = df['true_distance_travelled_m']
    df['distance_btw_stops_m'] = df['distance_btw_stops']
    df = df.drop(['time_diff_ms', 'timestamp_ms', 'osm_way_fclasses', 'true_distance_travelled_m', 'distance_btw_stops'], axis=1)
    return df


def trim_by_percentile(df, target, cutoff):
    lower = df[target].quantile(cutoff)
    temp = 1 - cutoff
    upper = df[target].quantile(temp)
    df = df[(df[target]>=lower) & (df[target]<=upper)]
    return df


def apply_fun(row, lower_bound, upper_bound):
    if (row['distance_travelled_m'] >= lower_bound * row['distance_btw_stops_m']) and (row['distance_travelled_m'] <= upper_bound * row['distance_btw_stops_m']):
        return 1
    else:
        return 0
    

def filt_by_traj_dist(df, lower_bound, upper_bound, min_dist):
    df['filt'] = df.apply(lambda row: apply_fun(row, lower_bound, upper_bound), axis=1) 
    df = df[df['filt']==1]
    df = df[df['distance_travelled_m']>min_dist]
    return df


def add_emmisions(df, vehicle_type):
    if vehicle_type == "electric":
        df['target_kg'] = df['energy_consumed_kwh'].apply(lambda x: x * 0.707)
        df['target_kg_per_km'] = df.apply(lambda row: (row['target_kg'] * 1000) / row['distance_travelled_m'], axis=1)
    else:
        df['target_kg'] = df['fuel_diff_l'].apply(lambda x: x * 10.18 / 3.78541)
        df['target_kg_per_km'] = df.apply(lambda row: (row['target_kg'] * 1000) / row['distance_travelled_m'], axis=1) 
    return df

In [None]:
vehicle_types = ['electric', 'hybrid', 'diesel']
vehicle_type = 'electric'
LOWER_BOUND = 0.5
UPPER_BOUND = 1.5
MIN_DIST = 10
CUTOFF = 0.05
TARGET = 'target_kg_per_km' # alternatively could be target_kg

df_list = []

for vehicle_type in vehicle_types:
    print("........")
    print(vehicle_type)
    df = get_training_set(vehicle_type, add_date=False)
    print(f"Total number of samples: {len(df)}")
    df = one_hot_encode_roadway(df)
    df = filt_by_traj_dist(df, LOWER_BOUND, UPPER_BOUND, MIN_DIST)
    print(f"Total after distance filtering {len(df)}")
    df = add_emmisions(df, vehicle_type)
    if vehicle_type != 'electric':
        df = df[df['target_kg'] > 0]
    df = trim_by_percentile(df, TARGET, CUTOFF)
    print(f"Total after trimming percentile: {len(df)}")
    df['vehicle_model'] = df['vehicle_type']
    df['vehicle_class'] = vehicle_type
    df = df.drop(columns=['vehicle_fleet', 'vehicle_type'])
    df_list.append(df.copy(deep=True))
    print(f"Final total: {len(df)}")

df = pd.concat(df_list, ignore_index=True)
df = df.drop(columns='fuel_diff_l')
#file_path = os.path.join('output_r', 'training', 'data2.pkl')
#df.to_pickle(file_path)
df.head(2)

In [None]:
df.isna().sum()

In [None]:
df = pd.read_pickle(file_path)
len(df)

## Metrics

In [None]:
vehicle_types = ['electric', 'diesel', 'hybrid']

In [None]:
# data-joins

result = 0
dir_path = os.path.join(os.getcwd(), 'output_r', 'data-joins')
for vehicle_type in vehicle_types:
    dir_path_vehicle = os.path.join(dir_path, vehicle_type)
    for file in os.listdir(dir_path_vehicle):
        if file.endswith(".csv"):
            file_path = os.path.join(dir_path_vehicle, file)
            df = load_data_joined(file_path)
            result += len(df)
print(f"Total number of GPS readings joined: {result}")

In [None]:
# mapmatch results

result = 0
dir_path = os.path.join(os.getcwd(), 'output_r', 'traj')
for vehicle_type in vehicle_types:
    dir_path_vehicle = os.path.join(dir_path, vehicle_type)
    for file in os.listdir(dir_path_vehicle):
        if file.endswith(".pkl"):
            file_path = os.path.join(dir_path_vehicle, file)
            df = pd.read_pickle(file_path)
            result += len(df)
print(f"Total number of GPS readings mapped to segments: {result}")

In [None]:
# samples

result = 0
dir_path = os.path.join(os.getcwd(), 'output_r', 'samples')
for vehicle_type in vehicle_types:
    dir_path_vehicle = os.path.join(dir_path, vehicle_type)
    for file in os.listdir(dir_path_vehicle):
        if file.endswith(".pkl"):
            file_path = os.path.join(dir_path_vehicle, file)
            df = pd.read_pickle(file_path)
            result += len(df)
print(f"Total number of samples available for training: {result}")

In [None]:
# samples with traffic

result = 0
dir_path = os.path.join(os.getcwd(), 'output_r', 'samples_with_traffic')
for vehicle_type in vehicle_types:
    dir_path_vehicle = os.path.join(dir_path, vehicle_type)
    for file in os.listdir(dir_path_vehicle):
        if file.endswith(".pkl"):
            file_path = os.path.join(dir_path_vehicle, file)
            df = pd.read_pickle(file_path)
            result += len(df)
print(result)