# Car VS Publis transport. How faster is the car?

In [280]:
import numpy as np
import requests
import matplotlib.pyplot as plt
import csv

from numpy import random
from bs4 import BeautifulSoup
from datetime import datetime, timedelta

Some useful variables.

In [281]:
class request_error(Exception):
    def __init__ (self, message):
        self.message = message

In [282]:
tomtom_key = '4BOQAR16NyODy7R3Gy9MB8QlFZY6L15L'
tomtom_url = 'https://api.tomtom.com/'

here_key = 'bay27AqD18KKFO75ZbPj4yyJW2umCg1TLvnn9IlNmOg'
here_url = 'https://route.ls.hereapi.com/'

datacloud_url = 'https://api.bigdatacloud.net/data/reverse-geocode-client'

NW = {'lat': 45.52147, 'long': 9.10991}
SE = {'lat': 45.42156, 'long': 9.25822}

Some utility functions.

In [292]:
# ---- Reset the dataset with new header line ----
def reset_dataset():
    with open('milan_journeys.csv', 'w') as f:
        writer = csv.DictWriter(f, fieldnames=attributes)
        writer.writeheader()
    f.close()
    
    return

# ---- Returns a random datetime in year X ----
def random_datetime(X):
    return datetime.strptime('{}-01-01'.format(X), '%Y-%m-%d') + timedelta(minutes=random.randint(365*24*60))   

# ---- Returns a point in the rectangle of opposite corners X, Y ----
def point_in_box(X, Y):
    scale = np.array([
        abs(X['lat'] - Y['lat']),
        abs(X['long'] - Y['long'])
    ])

    base = np.array([
        min(X['lat'], Y['lat']),
        min(X['long'], Y['long'])
    ])

    coord = scale * random.rand(1,2) + base
    
    A = dict(zip(['lat', 'long'], coord[0]))
    
    return A

# ---- Returns a proper string to reverse geocode coordinates X ----
def datacloud_reversegeocode(X):
    return datacloud_url + '?latitude={}&longitude={}&localityLanguage=en'.format(X['lat'], X['long'])

def in_milan(response):
    
    rj = response.json()
    
    return rj['countryCode'] == 'IT' \
            and rj['principalSubdivision'] == 'Lombardy' \
            and rj['city'] == 'Milan'

# ---- Returns the locality of X according to datacloud ----
def locality(XJ):
    return XJ.json()['locality']

# ---- Returns a pair of points in Milan, starting from a rectangle SE, NW ----
def pair_milan(SE, NW):
    
    A = point_in_box(SE, NW)
    B = point_in_box(SE, NW)
    attemptsA = 1
    attemptsB = 1
    
    AJ = requests.get(datacloud_reversegeocode(A))   
    if not AJ.ok:
        raise request_error('AJ was not inititialized')
    
    while not in_milan(AJ):
        A = point_in_box(SE, NW)
        AJ = requests.get(datacloud_reversegeocode(A))
        if not AJ.ok: 
            raise request_error('AJ was not inititialized')

        attemptsA += 1

    BJ = requests.get(datacloud_reversegeocode(B))         
    if not BJ.ok: 
        raise request_error('BJ was not inititialized')

    while not in_milan(BJ):
        B = point_in_box(SE, NW)
        BJ = requests.get(datacloud_reversegeocode(B))
        if not BJ.ok: 
            raise request_error('BJ was not inititialized')

        attemptsB += 1
    
    # retrieve information about A and B, save it in the dictionary
    
    A['locality'] = locality(AJ)
    B['locality'] = locality(BJ)
    
    return A, B

# ---- Returns a .json response object to the query: drive from A to B, leave at datetime 'departure' ---- 
def car_path(A, B, departure):
    d = departure
    dep_string = '{}-{:02d}-{:02d}T{:02d}%3A{:02d}%3A{:02d}'.format(d.year, d.month, d.day, d.hour, d.minute, d.second)
    
    tomtom_query = 'routing/1/calculateRoute/{},{}:{},{}/json?departAt={}&avoid=unpavedRoads&key={}'\
                    .format(A['lat'], A['long'], B['lat'], B['long'], dep_string, tomtom_key)

    car = requests.get(tomtom_url + tomtom_query)
    if not car.ok:
        print(car)
        raise request_error('Connection to TomTom failed')
        
    return car

# ---- Returns a .json response object to the query: go from A to B via public transport, leave at datetime 'departure' ---- 
def pt_path(A, B, departure):
    d = departure
    dep_string = '{}-{:02d}-{:02d}T{:02d}%3A{:02d}%3A{:02d}'.format(d.year, d.month, d.day, d.hour, d.minute, d.second)
    
    here_query = 'routing/7.2/calculateroute.json?apiKey={}&waypoint0=geo!{},{}&waypoint1=geo!{},{}&departure={}&mode=fastest;publicTransport&combineChange=true'\
                  .format(here_key, A['lat'], A['long'], B['lat'], B['long'], dep_string)

    pt = requests.get(here_url + here_query)
    if not pt.ok:
        print(A)
        print(B)
        print(A is None)
        print(pt)
        raise request_error('Connection to HERE failed')
        
    return pt

# ---- Extracts information from .json response object 
def extract_info(JJ, flag):
    
    if flag == 'car':
        summary = JJ.json()['routes'][0]['summary']
        departure_time = datetime.strptime(summary['departureTime'], '%Y-%m-%dT%H:%M:%S%z')
        
        month = departure_time.strftime('%B')
        weekday = departure_time.strftime('%A')
        hour = departure_time.hour
        distance = summary['lengthInMeters']
        time = summary['travelTimeInSeconds']
        
    elif flag == 'pt':
        summary = JJ.json()['response']['route'][0]['summary']
        departure_time = datetime.strptime(summary['departure'], '%Y-%m-%dT%H:%M:%S%z')
        
        month = departure_time.strftime('%B')
        weekday = departure_time.strftime('%A')
        hour = departure_time.hour
        distance = summary['distance']
        time = summary['baseTime']
    
        
    keys = ['month', 'weekday', 'hour', 'distance', 'time']    
    values = [month, weekday, hour, distance, time]
    
    return dict(zip(keys, values))

Now we can make the dataset bigger!

In [None]:
N = 1000
car_times = np.array([])
pt_times = np.array([])

for i in range(N):
    A, B = pair_milan(SE, NW)
    
    departure = random_datetime(2021)
    
    car_response = car_path(A, B, departure)
    pt_response = pt_path(A, B, departure) 
    
    car_journey = extract_info(car_response, 'car')
    pt_journey = extract_info(pt_response, 'pt')
    
    # add journey to dataset 
    
    attributes = ['A_lat', 'A_long', 'A_locality', \
                  'B_lat', 'B_long', 'B_locality', \
                  'month', 'weekday', 'hour', \
                  'car_distance', 'pt_distance', \
                  'car_time', 'pt_time']

    journey = {
        'A_lat' : A['lat'],
        'A_long' : A['long'],
        'A_locality' : A['locality'],
        'B_lat' : B['lat'],
        'B_long' : B['long'],
        'B_locality' : B['locality'],
        'month' : car_journey['month'],
        'weekday' : car_journey['weekday'],
        'hour' : car_journey['hour'], 
        'car_distance' : car_journey['distance'],
        'pt_distance' : pt_journey['distance'],
        'car_time' : car_journey['time'],
        'pt_time' : pt_journey['time']
    }

    with open('C:/Users/Io/Desktop/GitHub Upload/Milan-journeys/milan_journeys.csv', 'a') as f:
        journey_writer = csv.DictWriter(f, fieldnames=attributes)
        journey_writer.writerow(journey)
    
    if i%10 == 0:
        print(i)
    
f.close()
print('Done!')