In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import geopandas as gpd
from shapely.geometry import Polygon,Point
import geopy.distance

try: #spelling depends on enviroment version 
    import urllib2 as urllib #URL handling module
except ImportError:
    import urllib.request as urllib

from urllib.error import HTTPError
import json
import time

from datetime import datetime

import warnings
warnings.filterwarnings('ignore')

### Public EVSE Density

- Number of public EVSE ports within 15 driving minutes:

- Number of public EVSE ports within 15 walking minutes

**Idea**

Using <u>TomTom Routing API</u> to calculate reachable range with population centroid of each census tract: 

(Ref: https://developer.tomtom.com/routing-api/documentation/routing/routing-service )

1. within 15 driving minutes

    Calculate reachable range with a restriction of 15 minutes to generate polygons for each census tract, then spatial join with public EVSE
    
    (Ref: https://developer.tomtom.com/routing-api/documentation/routing/calculate-reachable-range)

2. within 15 walking minutes

    Calculate time difference between every public EVSE locations and population centroid, and count the time usage equal or less than 15 minutes

    In order to facilitate the efficiency on requesting data, filter public EVSE data before using Routing API: 

    Only select the points that the shortest distance with the population centroid is less than 0.7 miles (= ***(15/60)hour * 2.8 mph*** )
    <br>As Google estimates that an average human walks at a speed of exactly 4.5 km/h OR 2.8 miles per hour

    (Ref: https://developer.tomtom.com/routing-api/documentation/routing/calculate-route)

    (Notice: Reachable range Calculate is not applicable when the ***TravelModel*** is pedestrian)


In [3]:
## Get the public EVSE

public_ny = pd.read_csv("public_station.csv")
public_ny = gpd.GeoDataFrame(
        geometry=gpd.points_from_xy(public_ny.longitude, public_ny.latitude, crs="EPSG:4326"), data=public_ny
    )

In [7]:
## Get the population centroid of the census tract
# (005=Bronx, 047=Kings, 061=New York, 081=Queens, 085=Richmond)
CenPop = pd.read_csv("CenPop2020_Mean_TR36.txt")
CenPop = CenPop[CenPop['COUNTYFP'].isin([5,47,61,81,85])]

CenPop_gpd = gpd.GeoDataFrame(
        geometry=gpd.points_from_xy(CenPop.LONGITUDE, CenPop.LATITUDE, crs="EPSG:4326"), data=CenPop
    )
CenPop_gpd['COUNTYFP'] = CenPop_gpd['COUNTYFP'].astype('str').apply(lambda s: s.zfill(3))
CenPop_gpd['GEOID'] = CenPop_gpd['STATEFP'].astype('str')+CenPop_gpd['COUNTYFP']+CenPop_gpd['TRACTCE'].astype('str').apply(lambda s: s.zfill(6))
CenPop_gpd = CenPop_gpd.loc[:,~CenPop_gpd.columns.isin(['STATEFP','TRACTCE'])]

In [8]:
### As Walking calculation requires more API request, might not finish in one day with daily 2500 free request
## use Flag to mark calculated census tract in case seperated calculation
CenPop_gpd['Walk_Flag'] = False
CenPop_gpd['Walk15_EVSE'] = 0

## only one column can be the active geometry at a time
## Reachable Range with polygon output is not compatiable with Population centroid point
## Create a new GeoDataframe with connection GEOID
Drive_Range = gpd.GeoDataFrame(columns=['GEOID', 'geometry'], geometry='geometry')

In [9]:
def urlGenerate(type, API, OG_lat,OG_lon,D_lat,D_lon):
    if type=='drive':
        url = 'https://api.tomtom.com/routing/1/calculateRoute/{},{}:{},{}/json?&sectionType=traffic&report=effectiveSettings&routeType=eco&traffic=true&travelMode=car&vehicleEngineType=electric&key={}'.\
                            format(OG_lat,OG_lon,D_lat,D_lon,API)
    else:
        url = 'https://api.tomtom.com/routing/1/calculateRoute/{},{}:{},{}/json?&travelMode=pedestrian&key={}'.format(OG_lat,OG_lon,D_lat,D_lon,API)
    return url

def minimizeCal(type, min, county, Point):
    '''
    Point: (lat, lon)
    type: walk or drive
    min: reachable public EVSE within x min
    estimate distance before calcuate can reduce API usage efficiency
    '''
    if type=='drive':
        if county == '061':
            maxdist = (min/60)*20  #For census tract in Manhattan, 20mph
        else:
            maxdist = (min/60)*25  #outside Manhattan, 25mph
    else:
        #Google estimates that an average human walks at a speed of exactly 4.5 km/h OR 2.8 miles per hour
        maxdist = (min/60)*2.8  

    #calculate non-tesla ports
    nonTesla_public = public_ny[public_ny['ev_network'].apply(lambda s: 'Tesla' not in s)]
    
    idxlist = []
    for i in range(len(nonTesla_public)):
        rec = nonTesla_public.iloc[i]
        if geopy.distance.geodesic(Point,(rec.latitude,rec.longitude)).miles<=maxdist:
            idxlist.append(rec.name)
    cal_df = nonTesla_public[nonTesla_public.index.isin(idxlist)]
    return cal_df

def resolve_redirects(url):
    try:
        data = urllib.urlopen(url).read().decode('utf-8')
        data = json.loads(data)
        return data
    except HTTPError as err:
        if err.code == 429:
             time.sleep(10)
             return resolve_redirects(url)
        else:
            return err.code

def RoutingAPI(walk_drive, min, county, Point, API):

    cal_df = minimizeCal(walk_drive, min, county, Point)
    
    total_port = 0
    valid_cnt = 0
    valid15_cnt = 0

    if len(cal_df)==0:
        print(county, Point)

    for i in range(len(cal_df)):
        cal_rec = cal_df.iloc[i]
        url = urlGenerate(walk_drive, API, str(Point[0]), str(Point[1]), str(cal_rec['latitude']), str(cal_rec['longitude']))
        #Request data
        data = resolve_redirects(url)
        # print(data, type(data))
        if type(data)==dict:
            valid_cnt += 1
            difference = pd.to_datetime([ data['routes'][0]['summary']['arrivalTime'] ])[0] - \
                                pd.to_datetime([ data['routes'][0]['summary']['departureTime'] ])[0]
            
            seconds_in_day = 24 * 60 * 60
            if (divmod(difference.days * seconds_in_day + difference.seconds, 60)[0] < min):
                port_num = cal_rec['ev_level2_evse_num'] + cal_rec['ev_dc_fast_num']
                total_port += port_num
                valid15_cnt += 1

        if data == 403:
            print('Reach request limit')
            return  # implicitly, this is the same as saying `return None` 
    
    return total_port


### Calculate Reachable Range 
def DriveReachableRangeAPI(min, Point, API):
    '''
    In calculateReachableRange requests, the values bicycle and pedestrian must not be used.
    '''
    url = 'https://api.tomtom.com/routing/1/calculateReachableRange/{},{}/json?&report=effectiveSettings&routeType=eco&timeBudgetInSec={}&traffic=true&avoid=unpavedRoads&travelMode=car&vehicleCommercial=false&vehicleEngineType=electric&key={}'.\
            format(Point[0],Point[1],str(min*60),API)
    
    data = resolve_redirects(url)
    # print(data)
    if type(data)==dict:
        Polyg = Polygon([ [d['longitude'], d['latitude']] for d in data['reachableRange']['boundary'] ])
        return Polyg
    else:
        return data
    
## Spatial Join with public EVSE port
def ReachablePort(public_df, range_df, Tesla=False):
    range_df['Count'] = 0
    if not Tesla:
        public_port = public_df[public_df['ev_network'].apply(lambda s: 'Tesla' not in s)]

    for i in range(len(range_df)):
        geoid = range_df.iloc[i]['GEOID']
        evse_total = gpd.sjoin(public_port, range_df[range_df.GEOID==geoid], how="inner", predicate='within')\
                        [['ev_dc_fast_num','ev_level2_evse_num']].sum().sum()
        range_df.loc[range_df.GEOID==geoid, 'Count'] = evse_total

In [None]:
## Driving reachable range
## TomTom Routing API

API_KEY = "{YOUR_API_KEY}"

for idx in range(len(CenPop_gpd)):
    record = CenPop_gpd.iloc[idx]
    geoid = record['GEOID']
    Polyg = DriveReachableRangeAPI(15, (record['LATITUDE'],record['LONGITUDE']), API_KEY)
    if type(Polyg) != Point:
        Drive_Range = Drive_Range.append({'GEOID':geoid,'geometry':Polyg}, ignore_index=True)

Drive_Range = Drive_Range.set_crs('epsg:4326')
ReachablePort(public_ny, Drive_Range)

In [None]:
## Walking Route calculation

for idx in range(len(CenPop_gpd)):
    record = CenPop_gpd.iloc[idx]
    if not record['Walk_Flag']:
        geoid = record['GEOID']
        walk_port = RoutingAPI('walk', 15, record['COUNTYFP'], (record['LATITUDE'],record['LONGITUDE']), API_KEY)
        if walk_port!=None:
            #as limited request from TomTom, use Flag to mark the update status
            CenPop_gpd.loc[CenPop_gpd.GEOID == geoid,'Walk_Flag'] = True
            CenPop_gpd.loc[CenPop_gpd.GEOID == geoid,'Walk15_EVSE'] = walk_port
        else:
            break