In [57]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
import matplotlib.pyplot as plt
import seaborn as sns
import os
import sys
from pyspark.sql import functions as F  #filtering
import geopandas as gpd
import folium
import pandas as pd
import matplotlib.pyplot as plt

In [58]:
# starting a Spark session
spark = (
    SparkSession.builder.appName('Parkres Further Analysis')
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "2g")
    .config("spark.executor.memory", "4g")
    .getOrCreate()
)

# Read the dataset from a CSV file using PySpark
parkres = spark.read.csv('../data/curated/parkres/parkres.csv', header=True, inferSchema=True)

# Drop the extra index column (_c0) if it exists
parkres = parkres.drop('_c0')

# Show the first few rows of the dataset to confirm
parkres.show(5)

+--------------------+--------------------+--------+--------------------+
|                name|            sa2_name|postcode|            geometry|
+--------------------+--------------------+--------+--------------------+
|Lilydale-Warburto...|        Yarra Valley|    3139|POLYGON ((1034153...|
|Nangana Bushland ...|        Yarra Valley|    3139|POLYGON ((1022203...|
|Nillumbik G139 Bu...|Wattle Glen - Dia...|    3089|POLYGON ((989912....|
|Lilydale-Warburto...|Lilydale - Coldst...|    3140|POLYGON ((1005216...|
|Plenty Gorge Park...|  Plenty - Yarrambat|    3088|POLYGON ((983018....|
+--------------------+--------------------+--------+--------------------+


In [59]:
# read the domain parquet dataset
domain = spark.read.parquet('../data/curated/domain_data')
domain.limit(5)

url,price,address,property_type,latitude,longitude,Beds,Baths,Parking,bond,extracted_price,geometry,sa2_code,sa2_name,chg_flag,chg_lbl,sa3_code,sa3_name,sa4_code,sa4_name,gcc_code,gcc_name,ste_code,ste_name,aus_code,aus_name,areasqkm,loci_uri,__index_level_0__
https://www.domai...,"$1,400.00","10 Allara Court, ...",Townhouse,-37.77427300000001,145.1811258,4.0,3.0,2.0,9125.0,1400.0,[01 01 00 00 00 C...,211021261,Donvale - Park Or...,0,No change,21102,Manningham - East,211,Melbourne - Outer...,2GMEL,Greater Melbourne,2,Victoria,AUS,Australia,20.8028,http://linked.dat...,0
https://www.domai...,$750 per week,"7 Pine Ridge, Don...",House,-37.7912513,145.1756489,4.0,2.0,0.0,3259.0,750.0,[01 01 00 00 00 8...,211021261,Donvale - Park Or...,0,No change,21102,Manningham - East,211,Melbourne - Outer...,2GMEL,Greater Melbourne,2,Victoria,AUS,Australia,20.8028,http://linked.dat...,1
https://www.domai...,$1300 per week,"20 Mulsanne Way, ...",House,-37.7972323,145.1812636,5.0,2.0,2.0,5649.0,1300.0,[01 01 00 00 00 9...,211021261,Donvale - Park Or...,0,No change,21102,Manningham - East,211,Melbourne - Outer...,2GMEL,Greater Melbourne,2,Victoria,AUS,Australia,20.8028,http://linked.dat...,2
https://www.domai...,$825pw / $3585pcm,3 Monterey Cresce...,House,-37.792402,145.1743233,3.0,1.0,5.0,3585.0,825.0,[01 01 00 00 00 C...,211021261,Donvale - Park Or...,0,No change,21102,Manningham - East,211,Melbourne - Outer...,2GMEL,Greater Melbourne,2,Victoria,AUS,Australia,20.8028,http://linked.dat...,3
https://www.domai...,$680.00,3/49 Leslie Stree...,Townhouse,-37.7810117,145.180705,3.0,2.0,2.0,2955.0,680.0,[01 01 00 00 00 2...,211021261,Donvale - Park Or...,0,No change,21102,Manningham - East,211,Melbourne - Outer...,2GMEL,Greater Melbourne,2,Victoria,AUS,Australia,20.8028,http://linked.dat...,4


In [60]:
from pyspark.sql import functions as F

# Check for null values in each column
domain.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in domain.columns]).show()

+---+-----+-------+-------------+--------+---------+----+-----+-------+----+---------------+--------+--------+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----------------+
|url|price|address|property_type|latitude|longitude|Beds|Baths|Parking|bond|extracted_price|geometry|sa2_code|sa2_name|chg_flag|chg_lbl|sa3_code|sa3_name|sa4_code|sa4_name|gcc_code|gcc_name|ste_code|ste_name|aus_code|aus_name|areasqkm|loci_uri|__index_level_0__|
+---+-----+-------+-------------+--------+---------+----+-----+-------+----+---------------+--------+--------+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----------------+
|  0|    0|      0|            0|       0|        0|   0|    0|      0|1199|              0|       0|       0|       0|       0|      0|       0|       0|       0|       0|       0|       0|       0|       0|   

In [61]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType, StringType
from shapely import wkt
from pyspark.sql.functions import udf
from math import radians, sin, cos, sqrt, atan2

# Function to calculate distance between two points (Haversine formula)
def calculate_distance(lat1, lon1, lat2, lon2):
    R = 6371  # Earth's radius in km
    
    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    
    a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    return R * c

# UDF to convert WKT geometry to centroid coordinates
def get_centroid(geometry):
    try:
        shape = wkt.loads(geometry)
        centroid = shape.centroid
        return f"{centroid.y},{centroid.x}"
    except:
        return None

# Register UDFs
distance_udf = udf(calculate_distance, DoubleType())
centroid_udf = udf(get_centroid, StringType())

# Prepare park data
parkres = parkres.withColumnRenamed("geometry", "park_geometry")
parkres = parkres.withColumn("park_centroid", centroid_udf(F.col("park_geometry")))
parkres = parkres.withColumn("park_centroid_lat", F.split(F.col("park_centroid"), ",")[0].cast(DoubleType()))
parkres = parkres.withColumn("park_centroid_lon", F.split(F.col("park_centroid"), ",")[1].cast(DoubleType()))

# Prepare domain data
domain = domain.withColumnRenamed("latitude", "prop_lat")
domain = domain.withColumnRenamed("longitude", "prop_lon")
domain = domain.withColumnRenamed("sa2_name", "property_sa2_name")

# Cross join to calculate the distance between each property and every park
result = domain.crossJoin(F.broadcast(parkres))

# Calculate distances between properties and park centroids
result = result.withColumn("distance", 
    distance_udf(F.col("prop_lat"), F.col("prop_lon"), 
                 F.col("park_centroid_lat"), F.col("park_centroid_lon")))
result.orderBy("distance").show(5)

[Stage 89:>                                                         (0 + 1) / 1]

+--------------------+----------------+--------------------+--------------------+-----------+-----------+----+-----+-------+------+---------------+--------------------+---------+-----------------+--------+---------+--------+------------+--------+--------------------+--------+-----------------+--------+--------+--------+---------+--------+--------------------+-----------------+--------------------+--------------------+--------+--------------------+--------------------+------------------+-----------------+------------------+
|                 url|           price|             address|       property_type|   prop_lat|   prop_lon|Beds|Baths|Parking|  bond|extracted_price|            geometry| sa2_code|property_sa2_name|chg_flag|  chg_lbl|sa3_code|    sa3_name|sa4_code|            sa4_name|gcc_code|         gcc_name|ste_code|ste_name|aus_code| aus_name|areasqkm|            loci_uri|__index_level_0__|                name|            sa2_name|postcode|       park_geometry|       park_centroid

                                                                                

In [62]:
# Find the nearest park for each property
window_spec = Window.partitionBy("__index_level_0__").orderBy("distance")
nearest_park = result.withColumn("row", F.row_number().over(window_spec)) \
    .filter(F.col("row") == 1) \
    .select(
        "__index_level_0__",
        F.col("distance").alias("nearest_park_distance"),
        F.col("name").alias("nearest_park_name"),
        F.col("sa2_name").alias("park_sa2_name")
    )

# Join back to the original domain data
final_result = domain.join(nearest_park, on="__index_level_0__")

# Show the result for validation
final_result.select(
    "address", 
    "extracted_price", 
    "nearest_park_distance", 
    "nearest_park_name", 
    "property_sa2_name", 
    "park_sa2_name"
) \
    .orderBy("nearest_park_distance") \
    .show(10, truncate=False)

[Stage 92:>                                                         (0 + 1) / 1]

+---------------------------------------------------+---------------+---------------------+----------------------+---------------------------------------------+-----------------------+
|address                                            |extracted_price|nearest_park_distance|nearest_park_name     |property_sa2_name                            |park_sa2_name          |
+---------------------------------------------------+---------------+---------------------+----------------------+---------------------------------------------+-----------------------+
|90 Wickhams Road, Launching Place VIC 3139         |690.0          |259.9745567502766    |Yarra Valley Parklands|Yarra Valley                                 |Warrandyte - Wonga Park|
|37 MONBULK-SEVILLE ROAD, Seville VIC 3139          |738.0          |268.42900145854645   |Yarra Valley Parklands|Wandin - Seville                             |Warrandyte - Wonga Park|
|5 Morris Court, Wandin North VIC 3139              |730.0          |272.07

                                                                                

In [63]:
'''from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from math import radians, sin, cos, sqrt, atan2

# Helper function to parse geometry
def parse_geometry(geom):
    coords = F.split(F.regexp_extract(geom, r"POINT\((.*?)\)", 1), " ")
    return F.struct(
        coords[0].cast(DoubleType()).alias("longitude"),
        coords[1].cast(DoubleType()).alias("latitude")
    )

# Haversine distance function with null handling
@F.udf(returnType=DoubleType())
def haversine_distance(lat1, lon1, lat2, lon2):
    if lat1 is None or lon1 is None or lat2 is None or lon2 is None:
        return None
    
    R = 6371  # Earth's radius in kilometers
    
    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    return R * c

# Load data
parkres = spark.read.csv('../data/curated/parkres/parkres.csv', header=True, inferSchema=True)
domain = spark.read.parquet('../data/curated/domain_data')

# Display schema and sample data
print("parkres schema:")
parkres.printSchema()
print("\nSample parkres data:")
parkres.show(5, truncate=False)

print("\ndomain schema:")
domain.printSchema()
print("\nSample domain data:")
domain.show(5, truncate=False)

# Parse geometry for parkres
parkres = parkres.withColumn("park_coords", parse_geometry(F.col("geometry")))

# Prepare domain data
domain = domain.select(
    "url", "extracted_price", 
    F.col("latitude").cast(DoubleType()),
    F.col("longitude").cast(DoubleType())
).withColumn("property_coords", F.struct("latitude", "longitude"))

# Calculate distances using cross join
crossed = domain.crossJoin(F.broadcast(parkres))
with_distances = crossed.withColumn("distance_to_park",
    haversine_distance(
        F.col("property_coords.latitude"), F.col("property_coords.longitude"),
        F.col("park_coords.latitude"), F.col("park_coords.longitude")
    )
)

# Find nearest park for each property
nearest_park = with_distances.groupBy("url").agg(
    F.min("distance_to_park").alias("nearest_park_distance")
)

# Join back to get full property data with nearest park distance
result = domain.join(nearest_park, on="url")

# Analyze by distance ranges
analysis = result.groupBy(F.round(F.col("nearest_park_distance")).alias("distance_km")) \
    .agg(
        F.avg("extracted_price").alias("avg_price"),
        F.count("url").alias("property_count")
    ) \
    .orderBy("distance_km")

# Show results
print("\nFinal analysis:")
analysis.show()

# Optional: Count null distances
null_distances = result.filter(F.col("nearest_park_distance").isNull()).count()
print(f"\nNumber of properties with null distances: {null_distances}")'''

  '''from pyspark.sql import functions as F


'from pyspark.sql import functions as F\nfrom pyspark.sql.types import DoubleType\nfrom math import radians, sin, cos, sqrt, atan2\n\n# Helper function to parse geometry\ndef parse_geometry(geom):\n    coords = F.split(F.regexp_extract(geom, r"POINT\\((.*?)\\)", 1), " ")\n    return F.struct(\n        coords[0].cast(DoubleType()).alias("longitude"),\n        coords[1].cast(DoubleType()).alias("latitude")\n    )\n\n# Haversine distance function with null handling\n@F.udf(returnType=DoubleType())\ndef haversine_distance(lat1, lon1, lat2, lon2):\n    if lat1 is None or lon1 is None or lat2 is None or lon2 is None:\n        return None\n    \n    R = 6371  # Earth\'s radius in kilometers\n    \n    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])\n    dlat = lat2 - lat1\n    dlon = lon2 - lon1\n    \n    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2\n    c = 2 * atan2(sqrt(a), sqrt(1-a))\n    return R * c\n\n# Load data\nparkres = spark.read.csv(\'../data/curat

In [64]:
'''import geopandas as gpd
from shapely.geometry import Point
from shapely import wkt
import pandas as pd

# Step 1: Inspect the domain dataset
print(domain.show(5))  # Show a few rows to ensure data exists in 'latitude' and 'longitude'

# Convert Spark DataFrame to Pandas
domain_pd = domain.select('url', 'latitude', 'longitude').toPandas()

# Step 2: Check for null values in domain's latitude and longitude
print("Checking for null values in domain dataset:")
print(domain_pd.isnull().sum())  # This will show if there are any null latitudes or longitudes

# Step 3: Check if lat/lon columns are valid in the domain
domain_pd = domain_pd.dropna(subset=['latitude', 'longitude'])
print(f"Number of valid rows in domain dataset after dropping nulls: {len(domain_pd)}")

# Step 4: Inspect the parkres dataset
print(parkres.show(5))  # Show a few rows to ensure data exists in 'geometry'

# Convert Spark DataFrame to Pandas
parkres_pd = parkres.select('sa2_name', 'geometry').toPandas()

# Step 5: Check for null values in parkres dataset
print("Checking for null values in parkres dataset:")
print(parkres_pd.isnull().sum())  # This will show if there are any null geometries

# Step 6: Check if geometry column is valid in parkres
parkres_pd = parkres_pd.dropna(subset=['geometry'])
print(f"Number of valid rows in parkres dataset after dropping null geometries: {len(parkres_pd)}")

# Step 7: Try converting both datasets to GeoDataFrames
if not domain_pd.empty and not parkres_pd.empty:
    # Create GeoDataFrame for domain
    domain_gdf = gpd.GeoDataFrame(domain_pd, geometry=gpd.points_from_xy(domain_pd.longitude, domain_pd.latitude), crs="EPSG:4326")

    # Create GeoDataFrame for parkres
    parkres_gdf = gpd.GeoDataFrame(parkres_pd, geometry=parkres_pd['geometry'].apply(wkt.loads), crs="EPSG:4326")

    print("Successfully created GeoDataFrames for both datasets.")
else:
    print("One of the datasets is still empty after handling missing values.")'''

'import geopandas as gpd\nfrom shapely.geometry import Point\nfrom shapely import wkt\nimport pandas as pd\n\n# Step 1: Inspect the domain dataset\nprint(domain.show(5))  # Show a few rows to ensure data exists in \'latitude\' and \'longitude\'\n\n# Convert Spark DataFrame to Pandas\ndomain_pd = domain.select(\'url\', \'latitude\', \'longitude\').toPandas()\n\n# Step 2: Check for null values in domain\'s latitude and longitude\nprint("Checking for null values in domain dataset:")\nprint(domain_pd.isnull().sum())  # This will show if there are any null latitudes or longitudes\n\n# Step 3: Check if lat/lon columns are valid in the domain\ndomain_pd = domain_pd.dropna(subset=[\'latitude\', \'longitude\'])\nprint(f"Number of valid rows in domain dataset after dropping nulls: {len(domain_pd)}")\n\n# Step 4: Inspect the parkres dataset\nprint(parkres.show(5))  # Show a few rows to ensure data exists in \'geometry\'\n\n# Convert Spark DataFrame to Pandas\nparkres_pd = parkres.select(\'sa2_nam

In [65]:
'''from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from shapely import wkb, wkt
from shapely.ops import nearest_points

# Define UDF for geospatial operations
@F.udf(returnType=DoubleType())
def calculate_distance(lat, lon, park_geom_wkt):
    point = wkt.loads(f"POINT({lon} {lat})")
    park = wkt.loads(park_geom_wkt)
    nearest_point = nearest_points(point, park.exterior)[1]
    return point.distance(nearest_point) * 111000  # Approximate conversion to meters

# Convert binary WKB to WKT
def wkb_to_wkt(wkb_value):
    if wkb_value is None:
        return None
    return wkb.loads(wkb_value).wkt

wkb_to_wkt_udf = F.udf(wkb_to_wkt, returnType=F.StringType())

# Prepare domain DataFrame
domain_prepared = domain.withColumn("domain_geometry_wkt", wkb_to_wkt_udf(F.col("geometry")))

# Rename parkres geometry column to avoid conflict
parkres_prepared = parkres.withColumnRenamed("geometry", "park_geometry")

# Cross join domain with parkres
joined = domain_prepared.crossJoin(parkres_prepared)

# Calculate distances
joined = joined.withColumn("distance", 
                           calculate_distance(F.col("latitude"), 
                                              F.col("longitude"), 
                                              F.col("park_geometry")))

# Find the minimum distance for each domain entry
result = joined.groupBy("url", "address", "__index_level_0__") \
    .agg(F.min("distance").alias("min_distance_to_park_border"), 
         F.first("name").alias("nearest_park_name"))

# Show the results
result.show(truncate=False)

# If you want to save the results
# result.write.parquet("path/to/save/results.parquet")'''

'from pyspark.sql import functions as F\nfrom pyspark.sql.types import DoubleType\nfrom shapely import wkb, wkt\nfrom shapely.ops import nearest_points\n\n# Define UDF for geospatial operations\n@F.udf(returnType=DoubleType())\ndef calculate_distance(lat, lon, park_geom_wkt):\n    point = wkt.loads(f"POINT({lon} {lat})")\n    park = wkt.loads(park_geom_wkt)\n    nearest_point = nearest_points(point, park.exterior)[1]\n    return point.distance(nearest_point) * 111000  # Approximate conversion to meters\n\n# Convert binary WKB to WKT\ndef wkb_to_wkt(wkb_value):\n    if wkb_value is None:\n        return None\n    return wkb.loads(wkb_value).wkt\n\nwkb_to_wkt_udf = F.udf(wkb_to_wkt, returnType=F.StringType())\n\n# Prepare domain DataFrame\ndomain_prepared = domain.withColumn("domain_geometry_wkt", wkb_to_wkt_udf(F.col("geometry")))\n\n# Rename parkres geometry column to avoid conflict\nparkres_prepared = parkres.withColumnRenamed("geometry", "park_geometry")\n\n# Cross join domain with p

In [66]:
'''from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from shapely import wkb, wkt
from shapely.ops import nearest_points
import math

# Define the Haversine formula to calculate distance in meters between two lat/lon points
def haversine_distance(lat1, lon1, lat2, lon2):
    R = 6371000  # Radius of Earth in meters
    phi1 = math.radians(lat1)
    phi2 = math.radians(lat2)
    delta_phi = math.radians(lat2 - lat1)
    delta_lambda = math.radians(lon2 - lon1)

    a = math.sin(delta_phi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

    return R * c  # Distance in meters

# Define UDF for geospatial operations using Haversine formula
@F.udf(returnType=DoubleType())
def calculate_distance(lat, lon, park_geom_wkt):
    point = wkt.loads(f"POINT({lon} {lat})")
    park = wkt.loads(park_geom_wkt)
    
    # Find the nearest point on the park boundary
    nearest_point = nearest_points(point, park.exterior)[1]
    
    # Calculate the Haversine distance between the property point and nearest park point
    return haversine_distance(lat, lon, nearest_point.y, nearest_point.x)

# Convert binary WKB to WKT
def wkb_to_wkt(wkb_value):
    if wkb_value is None:
        return None
    return wkb.loads(wkb_value).wkt

wkb_to_wkt_udf = F.udf(wkb_to_wkt, returnType=F.StringType())

# Prepare domain DataFrame
domain_prepared = domain.withColumn("domain_geometry_wkt", wkb_to_wkt_udf(F.col("geometry")))

# Rename parkres geometry column to avoid conflict
parkres_prepared = parkres.withColumnRenamed("geometry", "park_geometry")

# Cross join domain with parkres
joined = domain_prepared.crossJoin(parkres_prepared)

# Calculate distances using the improved method (Haversine formula)
joined = joined.withColumn("distance", 
                           calculate_distance(F.col("latitude"), 
                                              F.col("longitude"), 
                                              F.col("park_geometry")))

# Find the minimum distance for each domain entry
result = joined.groupBy("url", "address", "__index_level_0__") \
    .agg(F.min("distance").alias("min_distance_to_park_border"), 
         F.first("name").alias("nearest_park_name"))

# Show the results
result.show(truncate=False)

# Optional: Save the results if needed
# result.write.parquet("path/to/save/results.parquet")'''

'from pyspark.sql import functions as F\nfrom pyspark.sql.types import DoubleType\nfrom shapely import wkb, wkt\nfrom shapely.ops import nearest_points\nimport math\n\n# Define the Haversine formula to calculate distance in meters between two lat/lon points\ndef haversine_distance(lat1, lon1, lat2, lon2):\n    R = 6371000  # Radius of Earth in meters\n    phi1 = math.radians(lat1)\n    phi2 = math.radians(lat2)\n    delta_phi = math.radians(lat2 - lat1)\n    delta_lambda = math.radians(lon2 - lon1)\n\n    a = math.sin(delta_phi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda / 2) ** 2\n    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))\n\n    return R * c  # Distance in meters\n\n# Define UDF for geospatial operations using Haversine formula\n@F.udf(returnType=DoubleType())\ndef calculate_distance(lat, lon, park_geom_wkt):\n    point = wkt.loads(f"POINT({lon} {lat})")\n    park = wkt.loads(park_geom_wkt)\n    \n    # Find the nearest point on the park boundary\n

In [67]:
'''from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, StringType
from shapely import wkb, wkt
import math

def haversine_distance(lat1, lon1, lat2, lon2):
    R = 6371  # Radius of Earth in kilometers
    
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    
    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
    c = 2 * math.asin(math.sqrt(a))
    
    return R * c * 1000  # Distance in meters

@udf(returnType=DoubleType())
def calculate_distance(lat, lon, park_geom_wkt):
    if lat is None or lon is None or park_geom_wkt is None:
        return None
    
    point = wkt.loads(f"POINT({lon} {lat})")
    park = wkt.loads(park_geom_wkt)
    
    # Find the nearest point on the park boundary
    nearest_point = park.exterior.interpolate(park.exterior.project(point))
    
    return haversine_distance(lat, lon, nearest_point.y, nearest_point.x)

@udf(returnType=StringType())
def wkb_to_wkt(wkb_value):
    if wkb_value is None:
        return None
    return wkb.loads(wkb_value).wkt

# Prepare domain DataFrame
domain_prepared = domain.withColumn("domain_geometry_wkt", wkb_to_wkt(domain.geometry))

# Rename parkres geometry column to avoid conflict
parkres_prepared = parkres.withColumnRenamed("geometry", "park_geometry")

# Join domain with parkres
joined = domain_prepared.crossJoin(parkres_prepared)

# Calculate distances
joined = joined.withColumn("distance",
                           calculate_distance(joined.latitude,
                                              joined.longitude,
                                              joined.park_geometry))

# Find the minimum distance for each domain entry
result = joined.groupBy("url", "address", "__index_level_0__") \
    .agg({"distance": "min", "name": "first"}) \
    .withColumnRenamed("min(distance)", "min_distance_to_park_border") \
    .withColumnRenamed("first(name)", "nearest_park_name")

# Show the results
result.show(truncate=False)

# Optional: Save the results if needed
# result.write.parquet("path/to/save/results.parquet")'''

'from pyspark.sql.functions import udf\nfrom pyspark.sql.types import DoubleType, StringType\nfrom shapely import wkb, wkt\nimport math\n\ndef haversine_distance(lat1, lon1, lat2, lon2):\n    R = 6371  # Radius of Earth in kilometers\n    \n    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])\n    \n    dlat = lat2 - lat1\n    dlon = lon2 - lon1\n    \n    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2\n    c = 2 * math.asin(math.sqrt(a))\n    \n    return R * c * 1000  # Distance in meters\n\n@udf(returnType=DoubleType())\ndef calculate_distance(lat, lon, park_geom_wkt):\n    if lat is None or lon is None or park_geom_wkt is None:\n        return None\n    \n    point = wkt.loads(f"POINT({lon} {lat})")\n    park = wkt.loads(park_geom_wkt)\n    \n    # Find the nearest point on the park boundary\n    nearest_point = park.exterior.interpolate(park.exterior.project(point))\n    \n    return haversine_distance(lat, lon, nearest_point.y,

In [68]:
from pyspark.sql.functions import udf, col, lit
from pyspark.sql.types import DoubleType, StringType, StructType, StructField
import shapely
from shapely import wkb, wkt
from pyproj import Geod
import sys

# Use WGS84 ellipsoid for distance calculations
geod = Geod(ellps="WGS84")

# Define a struct to return both distance and error message
result_schema = StructType([
    StructField("distance", DoubleType(), True),
    StructField("error", StringType(), True)
])

@udf(returnType=result_schema)
def calculate_distance(lat, lon, park_geom_wkt):
    try:
        if lat is None or lon is None or park_geom_wkt is None:
            return (None, "One or more input values are None")
        
        lat, lon = float(lat), float(lon)
        point = wkt.loads(f"POINT({lon} {lat})")
        park = wkt.loads(park_geom_wkt)
        
        # Find the nearest point on the park boundary
        nearest_point = park.exterior.interpolate(park.exterior.project(point))
        
        # Calculate the geodesic distance
        _, _, distance = geod.inv(lon, lat, nearest_point.x, nearest_point.y)
        
        return (float(distance), None)
    except Exception as e:
        return (None, str(e))

@udf(returnType=StringType())
def wkb_to_wkt(wkb_value):
    try:
        if wkb_value is None:
            return None
        return wkb.loads(wkb_value).wkt
    except Exception as e:
        return f"Error: {str(e)}"

# Prepare domain DataFrame
domain_prepared = domain.withColumn("domain_geometry_wkt", wkb_to_wkt(col("geometry")))

# Rename parkres geometry column to avoid conflict
parkres_prepared = parkres.withColumnRenamed("geometry", "park_geometry")

# Join domain with parkres
joined = domain_prepared.crossJoin(parkres_prepared)

# Calculate distances
distance_calc = calculate_distance(col("latitude"), col("longitude"), col("park_geometry"))
joined = joined.withColumn("distance_result", distance_calc)
joined = joined.withColumn("distance", col("distance_result.distance"))
joined = joined.withColumn("distance_error", col("distance_result.error"))

# Find the minimum distance for each domain entry
result = joined.groupBy("url", "address", "__index_level_0__") \
    .agg({"distance": "min", "name": "first", "distance_error": "first"}) \
    .withColumnRenamed("min(distance)", "min_distance_to_park_border") \
    .withColumnRenamed("first(name)", "nearest_park_name") \
    .withColumnRenamed("first(distance_error)", "error_message")

# Show the results
result.show(truncate=False)

# Optional: Save the results if needed
# result.write.parquet("path/to/save/results.parquet")

# Print some debug information
print("Sample of domain_geometry_wkt:")
domain_prepared.select("domain_geometry_wkt").show(5, truncate=False)

print("\nSample of park_geometry:")
parkres_prepared.select("park_geometry").show(5, truncate=False)

print("\nSample of latitude and longitude:")
domain_prepared.select("latitude", "longitude").show(5, truncate=False)

print("\nUnique error messages:")
result.select("error_message").distinct().show(truncate=False)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `latitude` cannot be resolved. Did you mean one of the following? [`Baths`, `aus_code`, `bond`, `loci_uri`, `postcode`].;
'Project [url#6064, price#6065, address#6066, property_type#6067, prop_lat#6704, prop_lon#6734, Beds#6070, Baths#6071, Parking#6072, bond#6073, extracted_price#6074, geometry#6075, sa2_code#6076, property_sa2_name#6764, chg_flag#6078, chg_lbl#6079, sa3_code#6080, sa3_name#6081, sa4_code#6082, sa4_name#6083, gcc_code#6084, gcc_name#6085, ste_code#6086, ste_name#6087, ... 14 more fields]
+- Join Cross
   :- Project [url#6064, price#6065, address#6066, property_type#6067, prop_lat#6704, prop_lon#6734, Beds#6070, Baths#6071, Parking#6072, bond#6073, extracted_price#6074, geometry#6075, sa2_code#6076, property_sa2_name#6764, chg_flag#6078, chg_lbl#6079, sa3_code#6080, sa3_name#6081, sa4_code#6082, sa4_name#6083, gcc_code#6084, gcc_name#6085, ste_code#6086, ste_name#6087, ... 6 more fields]
   :  +- Project [url#6064, price#6065, address#6066, property_type#6067, prop_lat#6704, prop_lon#6734, Beds#6070, Baths#6071, Parking#6072, bond#6073, extracted_price#6074, geometry#6075, sa2_code#6076, sa2_name#6077 AS property_sa2_name#6764, chg_flag#6078, chg_lbl#6079, sa3_code#6080, sa3_name#6081, sa4_code#6082, sa4_name#6083, gcc_code#6084, gcc_name#6085, ste_code#6086, ste_name#6087, ... 5 more fields]
   :     +- Project [url#6064, price#6065, address#6066, property_type#6067, prop_lat#6704, longitude#6069 AS prop_lon#6734, Beds#6070, Baths#6071, Parking#6072, bond#6073, extracted_price#6074, geometry#6075, sa2_code#6076, sa2_name#6077, chg_flag#6078, chg_lbl#6079, sa3_code#6080, sa3_name#6081, sa4_code#6082, sa4_name#6083, gcc_code#6084, gcc_name#6085, ste_code#6086, ste_name#6087, ... 5 more fields]
   :        +- Project [url#6064, price#6065, address#6066, property_type#6067, latitude#6068 AS prop_lat#6704, longitude#6069, Beds#6070, Baths#6071, Parking#6072, bond#6073, extracted_price#6074, geometry#6075, sa2_code#6076, sa2_name#6077, chg_flag#6078, chg_lbl#6079, sa3_code#6080, sa3_name#6081, sa4_code#6082, sa4_name#6083, gcc_code#6084, gcc_name#6085, ste_code#6086, ste_name#6087, ... 5 more fields]
   :           +- Relation [url#6064,price#6065,address#6066,property_type#6067,latitude#6068,longitude#6069,Beds#6070,Baths#6071,Parking#6072,bond#6073,extracted_price#6074,geometry#6075,sa2_code#6076,sa2_name#6077,chg_flag#6078,chg_lbl#6079,sa3_code#6080,sa3_name#6081,sa4_code#6082,sa4_name#6083,gcc_code#6084,gcc_name#6085,ste_code#6086,ste_name#6087,... 5 more fields] parquet
   +- Project [name#6029, sa2_name#6030, postcode#6031, park_geometry#6677, park_centroid#6683, park_centroid_lat#6689, cast(split(park_centroid#6683, ,, -1)[1] as double) AS park_centroid_lon#6696]
      +- Project [name#6029, sa2_name#6030, postcode#6031, park_geometry#6677, park_centroid#6683, cast(split(park_centroid#6683, ,, -1)[0] as double) AS park_centroid_lat#6689]
         +- Project [name#6029, sa2_name#6030, postcode#6031, park_geometry#6677, get_centroid(park_geometry#6677)#6682 AS park_centroid#6683]
            +- Project [name#6029, sa2_name#6030, postcode#6031, geometry#6032 AS park_geometry#6677]
               +- Project [name#6029, sa2_name#6030, postcode#6031, geometry#6032]
                  +- Relation [_c0#6028,name#6029,sa2_name#6030,postcode#6031,geometry#6032] csv


In [None]:
from pyspark.sql.functions import col, sqrt, pow, min as spark_min, first
from pyspark.sql.types import DoubleType
from math import radians

# Approximate radius of earth in km
R = 6371.0

def haversine_distance(lat1, lon1, lat2, lon2):
    # Convert decimal degrees to radians
    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    
    # Haversine formula
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = pow(sin(dlat/2), 2) + cos(lat1) * cos(lat2) * pow(sin(dlon/2), 2)
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    distance = R * c
    
    return distance

# Register the UDF
haversine_udf = F.udf(haversine_distance, DoubleType())

# Assuming 'domain' is your DataFrame with property data
# and 'parkres' is your DataFrame with park data

# Calculate the distance for each property-park pair
result = domain.crossJoin(parkres) \
    .withColumn("distance", 
                haversine_udf(
                    col("latitude"), col("longitude"), 
                    col("park_latitude"), col("park_longitude")
                )) \
    .groupBy("url", "address", "__index_level_0__") \
    .agg(
        spark_min("distance").alias("min_distance_to_park_border"),
        first("park_name").alias("nearest_park_name")
    )

# Show the results
result.show(truncate=False)

# Optional: Save the results if needed
# result.write.parquet("path/to/save/results.parquet")