In [3]:
import os
import csv
import openrouteservice
from pathlib import Path
import time
import math
from dotenv import load_dotenv

In [None]:
api_key_list = os.getenv('API_keys') #Get them in .env

In [None]:
# Latitude and longitude of the starting point and multiple ending points (longitude, latitude)
# result: unit - meter
def get_min_distance_helper(client, origin:tuple[int, int], destinations:list[tuple[int, int]]):
    

    # Combine the start and ending point into a list of coordinates
    coords = [origin] + destinations

    # Request distance matrix (using driving-car mode)
    matrix = client.distance_matrix(
        locations=coords,
        profile='driving-car',
        metrics=['distance'],  # only need distance matrix
        sources=[0],  # Index of the starting point
        destinations=list(range(1, len(coords)))  # An index list of destination
    )

    # Extract distance information (in meters)
    distances = matrix['distances'][0]

    min_distance = float('inf')

    for i, distance in enumerate(distances):
        # print(f"from start to destination {i + 1}, the min distance is：{distance:.2f} meters")
        if distance is not None:
            min_distance = min(min_distance, distance)
    return min_distance

In [None]:
def haversine(origin, destination):
    """
    Calculate the spherical distance between two points in kilometers.

    Parameters:
    lon1, lat1 - the longitude and latitude of the first point
    lon2, lat2 -the longitude and latitude of the second point

    Return value:
    The distance (in kilometers) between two points.
    """
    lon1, lat1 = origin
    lon2, lat2 = destination
    # Convert longitude and latitude from degrees to radians
    lon1, lat1, lon2, lat2 = map(math.radians, [lon1, lat1, lon2, lat2])

    # Haversine formular
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

    # Earth radius (km)
    R = 6371.0
    distance = R * c

    return distance

# Because the limited on the number of api in a single call, we filter the num points with the closest straight-line distance
def nearest_destinations(origin:tuple[int, int], destinations:list[tuple[int, int]], num):
    nearest_destinations = []
    for i in range(len(destinations)):
        nearest_destinations.append( (haversine(origin, destinations[i]), destinations[i]) )
    nearest_destinations.sort()
    nearest_destinations = nearest_destinations[:num]
    for i in range(len(nearest_destinations)):
        nearest_destinations[i] = nearest_destinations[i][1]
    return nearest_destinations

def get_min_distance(client, origin:tuple[int, int], destinations:list[tuple[int, int]]):
    min_distance = float('inf')
    n = len(destinations)
    # # Because of the limited api calls, we filter the 2k closest points in a straight line
    if n > 2000:
        destinations = nearest_destinations(origin, destinations, 2000)
    min_distance = min(min_distance, get_min_distance_helper(client, origin, destinations))
    # time.sleep(0.1)
    return min_distance

In [None]:
# Get destination list from the file
def get_destinations(filename):
    destinations = []
    with open(filename, 'r', encoding='utf-8') as file:
        lines = file.readlines()
    for i in range(1, len(lines)):
        row = lines[i].split(',') 
        longitude, latitude = row[-2], row[-1]
        destinations.append( (float(longitude), float(latitude)) )
    return destinations

# Get listings of property from the file as the starting point list
def get_origins(filename):
    matrix = []
    origins = []
    with open(filename, mode='r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)

        # Read the CSV file line by line
        for row_index, row in enumerate(csv_reader):
            matrix.append(row)
            # # Read column-by-column
            for col_index, cell in enumerate(row):
                # Get the latitude and longitude columns according to the title
                if row_index == 0:
                    if cell == 'addressLng':
                        longitude_idx = col_index
                    elif cell == 'addressLat':
                        latitude_idx = col_index
                else:
                    if col_index == longitude_idx:
                        longitude = cell
                    elif col_index == latitude_idx:
                        latitude = cell
            if row_index > 0:
                origins.append( (float(longitude), float(latitude)) )
            
    return matrix, origins



def get_min_distance_by_type(domain_filename, type_filename, new_filename):
    # Client authentication with API key
    clients = []
    for i in range(len(api_key_list)):
        clients.append(openrouteservice.Client(key=api_key_list[i]))
    matrix, origins = get_origins(domain_filename)
    rows_count = len(matrix)
    name = type_filename.replace('.csv', '')
    # create new column - min distance
    matrix[0].append('min_distance_to_' + name)
    destinations = get_destinations(type_filename)
    for i in range(1, rows_count):
        for try_again_times in range(10):
            try:
                min_dist = get_min_distance(clients[i % len(clients)], origins[i - 1], destinations)
                matrix[i].append(f'{min_dist/1000:.2f}') # km
                break
            except Exception as e:
                print(e)
                print("try again " + str(try_again_times + 1))
                time.sleep(1)
        print(name + '-' + str(i))

    # Write to a csv using the CSV module
    with open(new_filename + domain_filename, mode='w', newline='', encoding='utf-8') as file:
        csv_writer = csv.DictWriter(file, fieldnames=matrix[0])
        csv_writer.writeheader()  # Write the header row (column name)
        
        # line by line
        for i in range(1, rows_count):
            row = {}
            for col in range(len(matrix[i])):
                row[matrix[0][col]] = matrix[i][col]
            csv_writer.writerow(row)


if __name__ == "__main__":
     get_min_distance_by_type('../data/curated/Nonelongi_lati_remove_domain.csv', '../data/raw/interested_facilities/university.csv')
     get_min_distance_by_type('../data/curated/Uni.csv', '../data/raw/interested_facilities/shopping_mall.csv')
     get_min_distance_by_type('../data/curated/shopping_mall.csv', '../data/raw/interested_facilities/secondary_school.csv')
     get_min_distance_by_type('../data/curated/sec.csv', '../data/raw/interested_facilities/primary_school.csv')
     get_min_distance_by_type('../data/curated/pri.csv', '../data/raw/interested_facilities/park.csv')
     
