### Jupyter Notebook for partially joining domain and distance data
- This jupyter notebook aims to retrieve distance data from the housing to the closest amenities around the area.
- It uses open route services api to calculate both walking distance and car distance 
- Note that the limit for open route services api 

In [2]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
import json
import re
import pandas as pd
import numpy as np
from geopy.distance import great_circle
import requests
import overpass
import time

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Open Route Services")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.port", "6066")
    .getOrCreate()
)

your 131072x1 screen size is bogus. expect trouble
24/09/30 14:42:25 WARN Utils: Your hostname, TABLET-K04EDL4Q resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/09/30 14:42:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/30 14:42:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Set Up your Open Route Service Key
- insert your json file with your ORS(open route service) key into the landing folder

In [4]:
# Load the config file
with open('../../data/landing/openrouteservicekey.json') as config_file:
    config = json.load(config_file)

# Access the API key
api_key = config.get('api_key')

if not api_key:
    raise ValueError("API Key not found in openrouteservicekey.json.")

**This is prior code for one to one api calls**
- No longer used
- Simply kept here for reference

In [3]:
# # Profile: foot-walking, driving-car
# def openRouteDistance(start_coords, end_coords, profile):
#     # Define the OpenRouteService API endpoint for routing
#     url = 'https://api.openrouteservice.org/v2/directions/{profile}/geojson'

#     # Set up headers with your API key
#     headers = {
#         'Authorization': api_key,
#         'Content-Type': 'application/json'
#     }

#     # Create the payload with the start and end coordinates
#     payload = {
#         "coordinates": [
#             list(start_coords),
#             list(end_coords)
#         ]
#     }

#     # Make the request
#     response = requests.post(url.format(profile=profile), json=payload, headers=headers)

#     # Check if the request was successful
#     if response.status_code == 200:
#         # Parse the response JSON
#         data = response.json()
        
#         # Extract distance (in meters) from the response
#         distance = data['features'][0]['properties']['segments'][0]['distance']
#         distance_km = distance / 1000  # Convert to kilometers
        
#         return (distance_km)
#     else:
#         return None

**Current Function to get matrix one to many distances**

In [5]:
# Function to get matrix distances
def get_matrix_distances(client, source, destinations, profile='foot-walking'):
    """Returns distances from the source to all destinations."""
    distance = []
    coords = [source] + destinations  # Combine source and destination coordinates
    matrix = client.distance_matrix(
        locations=coords,
        profile=profile,
        sources=[0],  # Index of the house (source)
        destinations=list(range(1, len(coords)))  # Indexes of the amenities (destinations)
    )
    
    for destination in matrix['destinations']:
        try:
            distance.append(destination['snapped_distance'])
        except:
            distance.append(None)

    return distance # Return distances for the first source (house)

In [7]:
# Reading data
cleaned_domain_data = spark.read.parquet("../../data/raw/domain/cleaned_domain_current_listings.parquet")
VIC_amenities = spark.read.parquet("../../data/raw/VIC_amenities")

                                                                                

In [8]:
# This finds all unique amenities an separate all into individual pyspark dfs
amenity_dfs = {}
unique_types = [row['amenity'] for row in VIC_amenities.select('amenity').distinct().collect()]

for type_value in unique_types:
    filtered_df = VIC_amenities.filter(VIC_amenities.amenity == type_value)
    amenity_dfs[type_value] = filtered_df

                                                                                

**Total amount of data**

In [9]:
cleaned_domain_data.count()

13328

In [10]:
cleaned_domain_data.count()/5

2665.6

# The only thing needed to be changed

In [22]:
# There is only 13328 data and everyone should do around 2666 data to have it all completed
# Just copy paste the data range for each person 
# Chin : [0,2666]
# Nigel : [2666, 5332]
# Cinque : [5332, 7998]
# Malachy : [7998, 10664]
# Jun : [10664, 13328]

data_range = [5828,7998]

24/09/30 15:49:16 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE

**Getting distance from Open Route Services API**

In [23]:
import openrouteservice
import pandas as pd

# Convert to Pandas DataFrame
domain_pd_df = cleaned_domain_data.toPandas()[data_range[0]:data_range[1]]
client = openrouteservice.Client(key=api_key)

# Function to calculate distance
def calculate_distance(row, amenities_df):
    house_lat, house_lon = row['latitude'], row['longitude']
    amenities_df['Distance'] = amenities_df.apply(
        lambda x: great_circle((house_lat, house_lon), (x['latitude'], x['longitude'])).km,
        axis=1
    )
    return amenities_df.loc[amenities_df['Distance'].idxmin()]

# Find closest amenities for each house
results = []

for index, house_row in domain_pd_df.iterrows():
    start_coords = (house_row['longitude'], house_row['latitude'])
    results.append({
        'House Name': house_row['name']
    })

    # Collecting the coordinates of each amenity
    amenity_coords = []
    amenity_names = []
    for key, value in amenity_dfs.items():
        amenities_pd_df = value.toPandas()
        closest_amenity = calculate_distance(house_row, amenities_pd_df)
        end_coords = (float(closest_amenity['longitude']), float(closest_amenity['latitude']))
        amenity_coords.append(end_coords)
        amenity_names.append(closest_amenity['name'])

    start_time = time.time()
    walking_distances = get_matrix_distances(client, start_coords, amenity_coords, profile='foot-walking')
    end_time = time.time()
    if (end_time - start_time) < 1.5:
        time.sleep(1.5-(end_time - start_time))

    start_time = time.time()
    driving_distances = get_matrix_distances(client, start_coords, amenity_coords, profile= 'driving-car')
    end_time = time.time()
    if (end_time - start_time) < 1.5:
        time.sleep(1.5-(end_time - start_time))
    
    # Inserting data
    j = 0
    _temp_index = index - data_range[0]
    for key, value in amenity_dfs.items():
        results[_temp_index][f'{key}_name'] = amenity_names[j]
        results[_temp_index][f'{key}_walking'] = walking_distances[j]
        results[_temp_index][f'{key}_driving'] = driving_distances[j]
        j += 1
        
    print(f"Done {index}.")

# Saved to a csv file
results_df = pd.DataFrame(results)
results_df.to_csv(f"../data/raw/domain_and_distance_{data_range}.csv", index=False)

ApiError: 403 ({'error': 'Quota exceeded'})

24/09/30 15:49:56 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE

In [15]:
results2 = results.copy()

In [21]:

with open("data.json", "w") as json_file:
    json.dump(results, json_file, indent=4) 

24/09/30 15:45:45 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 564746 ms exceeds timeout 120000 ms
24/09/30 15:45:45 WARN SparkContext: Killing executors is not supported by current scheduler.
24/09/30 15:45:46 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$