In [4]:
import simplejson, urllib
import geopandas
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
import googlemaps

from time import sleep as sleep
import datetime as dt
from datetime import timedelta as td
from random import randrange
import json
import signal
import sys
import os

def add_delta_to_time(t,delta):
    a= (dt.datetime.combine(dt.date(1,1,1),t) + delta)
    a.time()
    return a.time()

def sub_delta_to_time(t,delta):
    a= (dt.datetime.combine(dt.date(1,1,1),t) - delta)
    a.time()
    return a.time()

def time_difference(time,time2):
    a= (dt.datetime.combine(dt.date(1,1,1),time) - dt.datetime.combine(dt.date(1,1,1),time2))
    return a

def check_file_modified(file, seconds):
    if os.path.isfile(file):
        if (dt.datetime.now() - dt.datetime.fromtimestamp(os.path.getmtime(file))).total_seconds() < seconds:
            return True
        else:
            return False
    else:
        return False

In [6]:


class DriveTimeSearcher:
    # constructor for this class
    def __init__(self, api_key_filename, apartment_list_filename, destination_list_filename, output_file):
        # load api key from file
        f_api=open(api_key_filename,)
        api_key = f_api.readline()
        f_api.close()

        self.init_locator()

        # start googlemaps client
        self.gmaps = googlemaps.Client(key=api_key)

        self.apartment_list_filename = apartment_list_filename
        self.destination_list_filename = destination_list_filename
        self.output_file = output_file

        # load apartment list from file
        #self.apartment_list = self.load_aparments(verbose=True)
        #self.apartment_list_lat_long = {a:self.get_lat_long(a) for a in self.apartment_list}

        # parse destination list from file
        #self.destination_list = self.load_destinations() 

        self.results=None

        # initialize the results dictionary
        self.on_reset()
        
        self.start_time = dt.datetime.now()
        try:
            self.spin(td(seconds = 30))
            self.save_results(0,_)
        except KeyboardInterrupt:
            self.save_results(0,_)

    # will come back to this later
    def spin(self,spintime):
        # signal handler to call save_results if interrupted
        signal.signal(signal.SIGINT, self.save_results)
        
        # get current day of the week
        current_day_of_week = dt.datetime.now().weekday()
        print("homebound query times: ")
        for start_address in self.apartment_list:
            for destination in [dest for dest in self.destination_list if current_day_of_week in dest['weekdays']]:
                dest_name= destination['name']
                print(start_address + " to " + dest_name + ": " + str(self.results[start_address][dest_name]['homebound_query_times']))
        print("outbound query times: ")
        for start_address in self.apartment_list:
            for destination in [dest for dest in self.destination_list if current_day_of_week in dest['weekdays']]:
                dest_name= destination['name']
                print(start_address + " to " + dest_name + ": " + str(self.results[start_address][dest_name]['outbound_query_times']))
        # spin until spintime is reached
        while dt.datetime.now() < self.start_time + spintime:
            # check if destinations or apartments files have been modified
            if check_file_modified(self.apartment_list_filename,30) or check_file_modified(self.destination_list_filename,30):
                # reload destinations and apartments
                print("reloading destinations and apartments")
                self.apartment_list = self.load_aparments(verbose=True)
                self.destination_list = self.load_destinations()
                # reset the results dictionary
                self.on_reset()

            # check if the time is right to query the next trip
            for start_address in self.apartment_list:
                for destination in [dest for dest in self.destination_list if current_day_of_week in dest['weekdays']]:
                    dest_name= destination['name']
                    # get the next outbound query time
                    outbound_query_times = self.results[start_address][dest_name]['outbound_query_times']
                    if len(outbound_query_times) > 0:
                        print(" running next outbound query time: " + start_address + " to " + dest_name + "at " + str(outbound_query_times[0]))
                        next_outbound_query_time = outbound_query_times[0]
                        if dt.datetime.now() > dt.datetime.combine(dt.datetime.today(),next_outbound_query_time):
                            # query the next trip
                            self.live_query_trip(start_address, destination['address'])
                            # remove the query time from the list
                            self.results[start_address][dest_name]['outbound_query_times'].pop(0)
                        else:
                            print("waiting for next outbound query time for " + start_address + " to " + dest_name + "at " + str(next_outbound_query_time) + "")
                    else:
                        print("no outpound query times for " + start_address + " to " + dest_name + "")
                    # get the next homebound query time
                    homebound_query_times = self.results[start_address][dest_name]['homebound_query_times']
                    if len(homebound_query_times) > 0:
                        print(" running next homebound query time: " + start_address + " to " + dest_name + "at " + str(homebound_query_times[0]))
                        next_homebound_query_time = homebound_query_times[0]
                        if dt.datetime.now() > dt.datetime.combine(dt.datetime.today(),next_homebound_query_time):
                            # query the next trip
                            self.live_query_trip(destination['address'],start_address)
                            # remove the query time from the list
                            self.results[start_address][dest_name]['homebound_query_times'].pop(0)
                        else:
                            print("waiting for next homebound query time for " + start_address + " to " + dest_name + "at " + str(next_homebound_query_time) + "")
                    else:
                        print("no homebound query times for " + start_address + " to " + dest_name + "")

            # sleep for 1 second
            sleep(30)


    # a function to save the results to a file
    def save_results(self,sig, frame):

        print(sig)
        print("interrupting, saving results to file")
        if self.results != None:
            # save raw results to file
            print("outfile: " + self.output_file)

            # append current date and time to output file name
            output_file_name = self.output_file + "_" + dt.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + ".json"
            with open(output_file_name, 'w') as outfile:
                json.dump(self.results, outfile,default=str)
            
        else:
            print("no results to save")
        if sig != 0:
            sys.exit(0)

    # a function to initialize theoretical departure times, and generate random query times.
    # will be called at script startup, at the beginning of each day, and if the config files change  
    def on_reset(self):
    
        self.save_results(0,_)
        self.destination_list = self.load_destinations()
        self.apartment_list=self.load_aparments()

        # initialize the results dictionary
        self.results = {}
        for start_address in self.apartment_list:
            self.results[start_address] = {}
            for dest_dict in self.destination_list:
                destination = dest_dict['name']
                self.results[start_address][destination] = {}

        # get the theoretical departure times from each apartment to each destination
        for start_address in self.apartment_list:
            for dest_dict in self.destination_list:
                destination = dest_dict['name']
                destination_adr= dest_dict['address']
                # parse arrival times from the destination list
                if 'arrival_times' in dest_dict:
                    arrival_times = dest_dict['arrival_times']
                    for arrival_time in arrival_times:
                        # get the theoretical departure time
                        theoretical_departure_time = self.get_theoretical_departure_time(start_address, destination_adr, arrival_time)
                        self.results[start_address][destination]['theoretical_departure_time'] = theoretical_departure_time
                else:
                    # get the theoretical departure time
                    theoretical_departure_time = self.get_theoretical_departure_time(start_address, destination_adr, dt.datetime.now().time())
                    self.results[start_address][destination]['theoretical_departure_time'] = theoretical_departure_time

        # generate randomized departure times for each trip, for both apartment to dest and dest to apartment
        for start_address in self.apartment_list:
            for dest_dict in self.destination_list:
                destination = dest_dict['name']
                # get the theoretical departure time
                self.results[start_address][destination]['outbound_query_times']=[]
                
                theoretical_departure_time = self.results[start_address][destination]['theoretical_departure_time']
                if theoretical_departure_time != None:
                    samples_per_day = dest_dict['samples_per_day']
                    for i in range(samples_per_day):
                        # generate a random departure time within 
                        random_departure_time = add_delta_to_time(theoretical_departure_time, td(seconds=randrange(-30*60,60*30)))
                        self.results[start_address][destination]['outbound_query_times'].append(random_departure_time)
                    # also query the theoretical departure time
                    self.results[start_address][destination]['outbound_query_times'].append(theoretical_departure_time)

                    # sort the list of outbound query times
                    self.results[start_address][destination]['outbound_query_times'].sort()
                else:
                    # use the given time range
                    outbound_departure_time_range = dest_dict['outbound_departure_time_range']
                    samples_per_day = dest_dict['samples_per_day']
                    for i in range(samples_per_day):
                        # generate a random departure time within range
                        random_departure_time = add_delta_to_time(outbound_departure_time_range[0], td(seconds=randrange(0,(time_difference(outbound_departure_time_range[1],outbound_departure_time_range[0])).seconds)))
                        self.results[start_address][destination]['outbound_query_times'].append(random_departure_time)

                #do the same for the homebound trips
                self.results[start_address][destination]['homebound_query_times']=[]
                homebound_departure_time_range = dest_dict['homebound_departure_time_range']
                samples_per_day = dest_dict['samples_per_day']
                for i in range(samples_per_day):
                    # generate a random departure time within range
                    random_departure_time = add_delta_to_time(homebound_departure_time_range[0],td(seconds=randrange(0,(time_difference(homebound_departure_time_range[1],homebound_departure_time_range[0])).seconds)))
                    self.results[start_address][destination]['homebound_query_times'].append(random_departure_time)

                # sort the list of outbound query times
                self.results[start_address][destination]['homebound_query_times'].sort()

    # initialize locator
    def init_locator(self):
        self.locator = Nominatim(user_agent="myGeocoder")
        geocode = RateLimiter(self.locator.geocode, min_delay_seconds=1)

    
    # a function to turn the address into a lat/long pair
    def get_lat_long(self, address):
        location = self.locator.geocode(address)
        if location != None:
            return "%s, %s" % (location.latitude, location.longitude)
        else:
            return None
    

    # load apartment list from file
    # returns a list of apartment addresses (strings)
    def load_aparments(self, verbose =False):
        f_ap=open(self.apartment_list_filename,)
        start_addresses = json.load(f_ap)
        f_ap.close()
        if verbose:
            print("loaded start addresses:")
            for s in start_addresses:
                print('\t',s) 

        return start_addresses
    
    # load a json file containing description of each aparment
    # returns a list of dictionaries
    def load_destinations(self, verbose =False):
        f_dest=open(self.destination_list_filename,)
        destinations = json.load(f_dest)
        f_dest.close()
        print("det: ",str(destinations))
        # make lat long for all destinations
        for destination in destinations:
            destination['lat_long'] = self.get_lat_long(destination['address'])

        # for every destination, if it has an arrival time, convert it to a datetime object
        for destination in destinations:
            if 'arrival_times' in destination:
                for i in range(len(destination['arrival_times'])):
                    destination['arrival_times'][i] = dt.datetime.strptime(destination['arrival_times'][i],'%H:%M:%S').time()
                print("atr ",str(destination['arrival_times']))
        # for every destination, if it has an outbound departure time range, convert it to a datetime object
        for destination in destinations:
            if 'outbound_departure_time_range' in destination:
                for i in range(len(destination['outbound_departure_time_range'])):
                    destination['outbound_departure_time_range'][i] = dt.datetime.strptime(destination['outbound_departure_time_range'][i],'%H:%M:%S').time()
                print("out_tr"+str(destination['outbound_departure_time_range']))
        # for every destination, if it has an homebound departure time range, convert it to a datetime object
        for destination in destinations:
            if 'homebound_departure_time_range' in destination:
                for i in range(len(destination['homebound_departure_time_range'])):
                    destination['homebound_departure_time_range'][i] = dt.datetime.strptime(destination['homebound_departure_time_range'][i],'%H:%M:%S').time()
                print("home_tr"+str(destination['homebound_departure_time_range']))
        if verbose:
            print("loaded destinations:")
            for d in destinations:
                # print name of destination
                print('\t',d['name'])

                # print every other attribute in the dictionary
                for key in d:
                    if key != 'name':
                        print('\t\t',key,':',d[key])  

        return destinations


    # a function to query the google maps api for the expected time to leave for a destination
    def get_theoretical_departure_time(self, start_address, destination, arrival_time):
        
        print("calculating theoretical departure time for",start_address,"to",destination,"at",arrival_time.strftime("%H:%M:%S"))
        
        departure_time = dt.datetime.now() 
        res = self.gmaps.directions(
            start_address, 
            destination, 
            mode="driving",
            avoid='tolls',
            arrival_time=arrival_time)

        # save the directions result to a map which we can use later
        if start_address not in self.results:
            self.results[start_address] = {}
        if destination not in self.results[start_address]:
            self.results[start_address][destination] = {}
        
        duration = res[0]['legs'][0]['duration']['value']

        print("duration:",duration)

        # calculate departure respecting traffic
        dep_time = sub_delta_to_time(arrival_time, td(seconds=duration))

        # return the trip info
        return dep_time

    # get the duration of a trip from a start address to a destination
    # returns a dictionary containing the trip duration, mileage, and time of arrival, average velocity
    def live_query_trip(self, start_address, destination):
        # get the duration of the trip
        departure_time = dt.datetime.now() 
        directions_result = self.gmaps.directions(
            start_address, 
            destination, 
            mode="driving",
            avoid='tolls',
            departure_time=departure_time)

        # save the directions result to a map which we can use later
        if start_address not in self.results:
            self.results[start_address] = {}
        if destination not in self.results[start_address]:
            self.results[start_address][destination] = {}
        
        self.results[start_address][destination][departure_time] = directions_result

        # get the duration of the trip
        duration = directions_result[0]['legs'][0]['duration']['value']
        #duration = td(seconds=duration)

        # get the duration in traffic
        duration_in_traffic = directions_result[0]['legs'][0]['duration_in_traffic']['value']

        # get the distance of the trip
        distance = directions_result[0]['legs'][0]['distance']['value']
        distance = distance/1609.34  # ! convert to miles apparently, not sure about this copilot...

        # calculate arrival time as duration + departure_time
        arrival_time = departure_time + td(seconds=duration)

        # calculate arrival respecting traffic
        arrival_time_in_traffic = departure_time + td(seconds=duration_in_traffic)

        # calculate average mph from duration and distance
        avg_velocity = distance/duration*3600
        
        # return the trip info
        ret = {
            'duration':duration,
            'duration_in_traffic':duration_in_traffic,
            'distance':distance, 
            'arrival_time':arrival_time, 
            'avg_velocity':avg_velocity, 
            'avg_velocity_in_traffic':distance/duration_in_traffic*3600,
            'arrival_time_in_traffic':arrival_time_in_traffic
        }
        print("checking ",start_address,"to",destination,"at",departure_time.strftime("%H:%M:%S")+"...")
        print("duration_in_traffic:",duration_in_traffic)
        return ret
    
    def test(self):
        # test the get_trip_duration function
        test_start_address = '1600 Amphitheatre Parkway, Mountain View, CA'
        test_destination = 'San Francisco, CA'
        test_result = self.live_query_trip(self.get_lat_long(test_start_address), self.get_lat_long(test_destination))
        return test_result



In [7]:
dts = DriveTimeSearcher('key.txt','apartments.json','destinations.json','output.json')
dts.test()

0
interrupting, saving results to file
no results to save
det:  [{'name': 'work', 'address': '443 E St. Elmo Rd, Austin, TX 78745', 'expected_times_per_week': 5, 'arrival_times': ['20:35:00'], 'homebound_departure_time_range': ['20:45:00', '20:50:00'], 'samples_per_day': 2, 'weekdays': [0, 1, 2, 3, 4, 5, 6]}]
atr  [datetime.time(20, 35)]
home_tr[datetime.time(20, 45), datetime.time(20, 50)]
calculating theoretical departure time for 2901 Barton Skyway, Austin, TX 78746 to 443 E St. Elmo Rd, Austin, TX 78745 at 20:35:00
duration: 450
homebound query times: 
2901 Barton Skyway, Austin, TX 78746 to work: [datetime.time(20, 47, 59), datetime.time(20, 49, 49)]
outbound query times: 
2901 Barton Skyway, Austin, TX 78746 to work: [datetime.time(20, 26, 36), datetime.time(20, 27, 30), datetime.time(20, 52, 14)]
 running next outbound query time: 2901 Barton Skyway, Austin, TX 78746 to workat 20:26:36
waiting for next outbound query time for 2901 Barton Skyway, Austin, TX 78746 to workat 20:26:

{'duration': 2499,
 'duration_in_traffic': 2379,
 'distance': 35.40022617967614,
 'arrival_time': datetime.datetime(2023, 2, 12, 14, 16, 35, 483828),
 'avg_velocity': 50.996724388489035,
 'avg_velocity_in_traffic': 53.56906861993867,
 'arrival_time_in_traffic': datetime.datetime(2023, 2, 12, 14, 14, 35, 483828)}

In [9]:
dts

<__main__.DriveTimeSearcher at 0x230bd8e3100>

In [None]:
a_fname="apartments.json"
dest_f_name='destinations.json'
key_file = 'key.txt'


13:11:08.636802


In [83]:
def add_delta_to_time(t,delta):
    a= (dt.datetime.combine(dt.date(1,1,1),t) - delta)
    a.time()
    return a.time()
print(add_delta_to_time(dt.datetime.now().time(),td(hours = 0)))


18:21:58.169347


In [37]:
d = {}
d['hi'] = {}
d['hi']['there'] = {}
d['hi']['there']['bob'] = 5

print(d)


{'hi': {'there': {'bob': 5}}}
