In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, split
from pyspark.sql.types import DoubleType
from geopy.distance import great_circle

spark = SparkSession.builder.appName("DistanceCalculation").getOrCreate()

df_properties = spark.read.csv('Cleaned/Vancouver_Property_Value_Data_With_Cords.csv', header=True, inferSchema=True).cache()

24/04/01 20:48:22 WARN Utils: Your hostname, joshua-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/04/01 20:48:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/01 20:48:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/01 20:48:27 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [3]:
df_libraries = spark.read.csv('Cleaned/libraries-cleaned.csv', header=True, inferSchema=True)

df_libraries = df_libraries.withColumn("SplitCoords", split(col("Coordinates"), ","))
df_libraries = df_libraries.withColumn("Latitude", col("SplitCoords").getItem(0).cast(DoubleType()))
df_libraries = df_libraries.withColumn("Longitude", col("SplitCoords").getItem(1).cast(DoubleType())).drop("SplitCoords").drop("Coordinates")

df_libraries.show()

+--------+---------+
|Latitude|Longitude|
+--------+---------+
| 49.2362|-123.0426|
| 49.2197|-123.0669|
| 49.2899|-123.1361|
| 49.2332|-123.1571|
| 49.2104|  -123.14|
| 49.2797|-123.1156|
| 49.2191|-123.0402|
| 49.2629|-123.1376|
| 49.2647|-123.1687|
| 49.2643|-123.1002|
| 49.2295|-123.0903|
| 49.2635|-123.2086|
| 49.2524| -123.043|
| 49.2756|-123.0738|
| 49.2811|-123.1001|
| 49.2797|-123.1156|
|  49.246|-123.1856|
| 49.2809|  -123.05|
| 49.2491|-123.0753|
| 49.2327|-123.1172|
+--------+---------+
only showing top 20 rows



In [4]:
# Broadcast the libraries DataFrame to optimize distance calculations
broadcast_libraries = spark.sparkContext.broadcast(df_libraries.collect())

# Define a UDF to calculate the minimum distance to the nearest library for a given property
def min_distance_to_library(lat, lon):
    property_coord = (lat, lon)
    min_distance = float("inf")
    for library in broadcast_libraries.value:
        library_coord = (library['Latitude'], library['Longitude'])
        distance = great_circle(property_coord, library_coord).kilometers
        if distance < min_distance:
            min_distance = distance
    return min_distance*1000

# Register the UDF with Spark
min_distance_udf = udf(min_distance_to_library, DoubleType())

# Add a new column to df_properties with the calculated minimum distance
df_properties = df_properties.withColumn("MIN_DISTANCE_LIBRARY_METERS", min_distance_udf("LATITUDE", "LONGITUDE"))

df_properties.show()

                                                                                

+--------------------+----------+--------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------

In [5]:
# Load and preprocess the community centers data
df_community_centers = spark.read.csv('Cleaned/community-centre-cleaned.csv', header=True, inferSchema=True)
df_community_centers = df_community_centers.withColumnRenamed("Community_Centre_Locations", "Coordinates")
df_community_centers = df_community_centers.withColumn("SplitCoords", split("Coordinates", ",\s*"))
df_community_centers = df_community_centers.withColumn("Latitude", df_community_centers["SplitCoords"].getItem(0).cast(DoubleType()))
df_community_centers = df_community_centers.withColumn("Longitude", df_community_centers["SplitCoords"].getItem(1).cast(DoubleType()))
df_community_centers = df_community_centers.drop("Coordinates", "SplitCoords")

# Broadcast the community centers DataFrame
broadcast_community_centers = spark.sparkContext.broadcast(df_community_centers.collect())

print(df_community_centers.show())

+--------+---------+
|Latitude|Longitude|
+--------+---------+
| 49.2428|-123.1883|
| 49.2809|-123.0393|
|  49.278|-123.1235|
| 49.2636|-123.0321|
| 49.2332|-123.1571|
| 49.2274|-123.0444|
| 49.2718|-123.1056|
| 49.2756|-123.0738|
| 49.2145|-123.1275|
| 49.2811|-123.1001|
| 49.2438|-123.1079|
| 49.2229|-123.1006|
| 49.2553|-123.0655|
| 49.2144|-123.0321|
| 49.2807|-123.0841|
| 49.2529|-123.1213|
| 49.2385|-123.0755|
| 49.2621|-123.1601|
| 49.2839|-123.0971|
| 49.2643|-123.1002|
+--------+---------+
only showing top 20 rows

None


In [6]:
# Define a UDF to calculate the minimum distance to the nearest community center for a given property
def min_distance_to_community_center(lat, lon):
    property_coord = (lat, lon)
    min_distance = float("inf")
    for center in broadcast_community_centers.value:
        center_coord = (center['Latitude'], center['Longitude'])
        distance = great_circle(property_coord, center_coord).kilometers
        if distance < min_distance:
            min_distance = distance
    return min_distance * 1000

# Register the UDF with Spark
min_distance_to_cc_udf = udf(min_distance_to_community_center, DoubleType())

# Add a new column to df_properties with the calculated minimum distance to the nearest community center
df_properties = df_properties.withColumn("COMMUNITY_CENTRE_DISTANCE_METERS", min_distance_to_cc_udf("LATITUDE", "LONGITUDE"))

df_properties.show()

broadcast_community_centers.unpersist()

+--------------------+----------+--------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------

In [7]:
from pyspark.sql.types import StringType, StructType, StructField
from pyspark.sql.functions import when

# Read and preprocess the cultural spaces CSV file
df_cultural_spaces = spark.read.csv('Cleaned/cultural_spaces_cleaned.csv', header=True, inferSchema=True)
df_cultural_spaces = df_cultural_spaces.withColumn("Latitude", split(col("Cordinates"), ", ").getItem(0).cast(DoubleType()))
df_cultural_spaces = df_cultural_spaces.withColumn("Longitude", split(col("Cordinates"), ", ").getItem(1).cast(DoubleType())).drop("Cordinates")


df_cultural_spaces = df_cultural_spaces.withColumn(
    "SimplifiedType",
    when(col("Type") == "Museum/Gallery", "Art")
    .when(col("Type") == "Theatre/Performance", "Theatre")
    .when(col("Type") == "Studio/Rehearsal", "Studio")
    .when((col("Type") == "Cafe/Restaurant/Bar") | (col("Type") == "Café/Restaurant/Bar"), "Food")
    .otherwise(col("Type"))
)

df_cultural_spaces = df_cultural_spaces.drop("Type")

# Collect cultural spaces data as a list of rows
cultural_spaces_data = df_cultural_spaces.collect()

# Define the schema for the output structure
schema = StructType([
    StructField("Year", StringType(), True),
    StructField("SimplifiedType", StringType(), True),
    StructField("MinDistance", DoubleType(), True)
])

In [8]:
# Define the UDF
def calculate_cultural_space_details(lat, lon):
    property_coord = (lat, lon)
    closest_distance = float("inf")
    closest_space_year = None
    closest_space_type = None
    for space in cultural_spaces_data:
        space_coord = (space['Latitude'], space['Longitude'])
        distance = great_circle(property_coord, space_coord).kilometers * 1000
        if distance < closest_distance:
            closest_distance = distance
            closest_space_year = space['YEAR']
            closest_space_type = space['SimplifiedType']
    # Return a tuple of (year, type, minimum distance)
    return (closest_space_year, closest_space_type, closest_distance)

# Register the UDF
calculate_cultural_space_details_udf = udf(calculate_cultural_space_details, schema)

# Calculate the minimum distance to the nearest cultural space for each property
df_properties = df_properties.withColumn(
    "CULTURAL_SPACES_DETAILS",
    calculate_cultural_space_details_udf(col("LATITUDE"), col("LONGITUDE"))
)

df_properties.show(truncate=False)

+--------------------+----------+--------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------

In [9]:
# Load the dog parks dataset
df_dog_parks = spark.read.csv('Cleaned/dog-park-cleaned.csv', header=True, inferSchema=True)
df_dog_parks = df_dog_parks.withColumn("SplitCoords", split(col("Coordinates"), ",\s*"))
df_dog_parks = df_dog_parks.withColumn("Latitude", col("SplitCoords").getItem(0).cast(DoubleType()))
df_dog_parks = df_dog_parks.withColumn("Longitude", col("SplitCoords").getItem(1).cast(DoubleType())).drop("SplitCoords").drop("Coordinates")

# Broadcast the processed DataFrame
broadcast_dog_parks = spark.sparkContext.broadcast(df_dog_parks.collect())

# Define the UDF to calculate distances
def calculate_distance(lat, lon):
    property_coords = (lat, lon)
    closest_distance = float("inf")
    # Access the broadcasted list directly inside the UDF
    for point in broadcast_dog_parks.value:
        point_coords = (float(point['Latitude']), float(point['Longitude']))
        distance = great_circle(property_coords, point_coords).kilometers
        if distance < closest_distance:
            closest_distance = distance
    return closest_distance * 1000 

# Register the UDF with Spark
calculate_distance_udf = udf(calculate_distance, DoubleType())

print(df_dog_parks.show())

+------------------+-------------------+
|          Latitude|          Longitude|
+------------------+-------------------+
| 49.24470507020434|-123.17507131392392|
| 49.29496162924832|-123.13434601446576|
|  49.2556804415666|-123.02750074280294|
| 49.25313116149829|-123.02649510491612|
| 49.27118798617408|-123.11098369199883|
|49.235945148740775|-123.06691167074283|
| 49.22711615721119|-123.04660137247627|
| 49.24754513634842|-123.07392010305213|
|49.231980422957584|-123.19315344144853|
| 49.24309791365466|-123.14769992982717|
|49.278746328517386|-123.23234034857063|
| 49.26683490730779|-123.03496507364125|
| 49.22308531255336|-123.09719199790725|
| 49.25207382387085|-123.04371705538496|
| 49.27818635315254| -123.1047318420208|
| 49.26719997068881|-123.12448776465106|
| 49.27353645619341|-123.11314457767041|
| 49.29210780606712|-123.04764287218421|
| 49.27711482880213|-123.12371511269338|
| 49.21632474891052|-123.04764770721968|
+------------------+-------------------+
only showing top

In [10]:
# Add a new column to df_properties with the calculated distance to the nearest dog park
df_properties = df_properties.withColumn(
    "CLOSEST_DOG_PARK_METERS",
    calculate_distance_udf(col("LATITUDE"), col("LONGITUDE"))
)

df_properties.show(truncate=False)

broadcast_dog_parks.unpersist()

+--------------------+----------+--------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------

In [11]:
from pyspark.sql.types import DoubleType, IntegerType


df_graffiti = spark.read.csv('Cleaned/graffiti-cleaned.csv', header=True, inferSchema=True)
df_graffiti = df_graffiti.withColumn("SplitCoords", split(col("Coordinates"), ",\s*"))
df_graffiti = df_graffiti.withColumn("Latitude", col("SplitCoords").getItem(0).cast(DoubleType()))
df_graffiti = df_graffiti.withColumn("Longitude", col("SplitCoords").getItem(1).cast(DoubleType()))
df_graffiti = df_graffiti.drop("SplitCoords", "Coordinates")

# Broadcast the processed DataFrame
broadcast_graffiti = spark.sparkContext.broadcast(df_graffiti.collect())

# Define the UDF to calculate distances and get the count of the nearest graffiti spot
def calculate_graffiti_details(lat, lon):
    property_coords = (lat, lon)
    closest_distance = float("inf")
    graffiti_count = None
    for graffiti in broadcast_graffiti.value:
        graffiti_coords = (graffiti['Latitude'], graffiti['Longitude'])
        distance = great_circle(property_coords, graffiti_coords).kilometers
        if distance < closest_distance:
            closest_distance = distance
            graffiti_count = graffiti['Count']
    return (closest_distance * 1000, graffiti_count) 

# Define the schema for the UDF return type
graffiti_schema = StructType([
    StructField("distance", DoubleType(), True),
    StructField("count", IntegerType(), True)
])

# Register the UDF with Spark
calculate_graffiti_details_udf = udf(calculate_graffiti_details, graffiti_schema)

# Add a new column to df_properties with the calculated distance to the nearest graffiti spot and the associated count
df_properties = df_properties.withColumn(
    "CLOSEST_GRAFFITI_METERS",
    calculate_graffiti_details_udf(col("LATITUDE"), col("LONGITUDE"))
)

In [12]:
df_properties.show(truncate=False)

broadcast_graffiti.unpersist()

[Stage 25:>                                                         (0 + 1) / 1]

+--------------------+----------+--------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------

                                                                                

In [13]:
df_homeless = spark.read.csv('Cleaned/homeless-cleaned.csv', header=True, inferSchema=True)
df_homeless = df_homeless.withColumn("SplitCoords", split(col("Coordinates"), ",\s*"))
df_homeless = df_homeless.withColumn("Latitude", col("SplitCoords").getItem(0).cast(DoubleType()))
df_homeless = df_homeless.withColumn("Longitude", col("SplitCoords").getItem(1).cast(DoubleType())).drop("SplitCoords")

# Broadcast the processed DataFrame
broadcast_homeless = spark.sparkContext.broadcast(df_homeless.collect())

# Define the UDF to calculate the minimum distance to a homeless coordinate
def calculate_homeless_distance(lat, lon, homeless_points):
    property_coords = (lat, lon)
    closest_distance = float("inf")
    for point in homeless_points:
        point_coords = (point['Latitude'], point['Longitude'])
        distance = great_circle(property_coords, point_coords).kilometers
        if distance < closest_distance:
            closest_distance = distance
    return closest_distance * 1000 

# Register the UDF with Spark
calculate_homeless_distance_udf = udf(lambda lat, lon: calculate_homeless_distance(lat, lon, broadcast_homeless.value), DoubleType())

In [14]:
# Add a new column to df_properties with the calculated distance to the nearest homeless coordinate
df_properties = df_properties.withColumn(
    "CLOSEST_HOMELESS_SHELTER",
    calculate_homeless_distance_udf(col("LATITUDE"), col("LONGITUDE"))
)

df_properties.show(truncate=False)

broadcast_homeless.unpersist()

[Stage 29:>                                                         (0 + 1) / 1]

+--------------------+----------+--------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------

                                                                                

In [15]:

df_parks = spark.read.csv('Cleaned/parks-cleaned.csv', header=True, inferSchema=True)
df_parks = df_parks.withColumn("SplitCoords", split(col("Coordinates"), ",\s*"))
df_parks = df_parks.withColumn("Latitude", col("SplitCoords").getItem(0).cast(DoubleType()))
df_parks = df_parks.withColumn("Longitude", col("SplitCoords").getItem(1).cast(DoubleType()))
df_parks = df_parks.withColumn("Area", col("Area").cast(DoubleType())).drop("SplitCoords", "Coordinates")

# Broadcast the processed DataFrame
broadcast_parks = spark.sparkContext.broadcast(df_parks.collect())

# Define the schema for the output of our UDF
park_info_schema = StructType([
    StructField("MinDistance", DoubleType(), True),
    StructField("ParkArea", DoubleType(), True)
])

# Define the UDF to calculate the minimum distance to a park and include the park's area
def calculate_park_distance_and_area(lat, lon):
    property_coords = (lat, lon)
    closest_distance = float("inf")
    park_area = 0.0
    for park in broadcast_parks.value:
        park_coords = (park['Latitude'], park['Longitude'])
        distance = great_circle(property_coords, park_coords).kilometers
        if distance < closest_distance:
            closest_distance = distance
            park_area = park['Area']
    return (closest_distance * 1000, park_area) 

# Register the UDF with Spark
calculate_park_distance_and_area_udf = udf(calculate_park_distance_and_area, park_info_schema)


In [16]:
# Add a new column to df_properties with the calculated distance to the nearest park and the park's area
df_properties = df_properties.withColumn(
    "CLOSEST_PARK_METERS", calculate_park_distance_and_area_udf(col("LATITUDE"), col("LONGITUDE"))
)

df_properties.show(truncate=False)

broadcast_parks.unpersist()

[Stage 33:>                                                         (0 + 1) / 1]

+--------------------+----------+--------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------

                                                                                

In [17]:

df_public_art = spark.read.csv('Cleaned/public-art-cleaned.csv', header=True, inferSchema=True)

# Filter out any rows where Coordinates are null or empty
df_public_art = df_public_art.filter(df_public_art.Coordinates.isNotNull() & (df_public_art.Coordinates != ""))

# Split the 'Coordinates' column into 'Latitude' and 'Longitude'
df_public_art = df_public_art.withColumn("SplitCoords", split(col("Coordinates"), ",\s*"))
df_public_art = df_public_art.withColumn("Latitude", col("SplitCoords").getItem(0).cast(DoubleType()))
df_public_art = df_public_art.withColumn("Longitude", col("SplitCoords").getItem(1).cast(DoubleType())).drop("SplitCoords", "Coordinates")

# Broadcast the processed DataFrame
broadcast_public_art = spark.sparkContext.broadcast(df_public_art.collect())

# Define the UDF to calculate the minimum distance to a public art location
def calculate_min_distance_to_public_art(lat, lon):
    property_coords = (lat, lon)
    closest_distance = float("inf")
    for art_location in broadcast_public_art.value:
        art_coords = (art_location['Latitude'], art_location['Longitude'])
        distance = great_circle(property_coords, art_coords).kilometers
        if distance < closest_distance:
            closest_distance = distance
    return closest_distance * 1000 

# Register the UDF with Spark
calculate_min_distance_to_public_art_udf = udf(calculate_min_distance_to_public_art, DoubleType())

In [18]:
# Add a new column to df_properties with the calculated distance to the nearest public art location
df_properties = df_properties.withColumn(
    "CLOSEST_PUBLIC_ART",
    calculate_min_distance_to_public_art_udf(col("LATITUDE"), col("LONGITUDE"))
)

df_properties.show(truncate=False)

broadcast_public_art.unpersist()

[Stage 37:>                                                         (0 + 1) / 1]

+--------------------+----------+--------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------

                                                                                

In [19]:

df_rapid_transit = spark.read.csv('Cleaned/rapid-transit-cleaned.csv', header=True, inferSchema=True)

# Split the 'Coordinates' column into 'Latitude' and 'Longitude'
df_rapid_transit = df_rapid_transit.withColumn("SplitCoords", split(col("Coordinates"), ",\s*"))
df_rapid_transit = df_rapid_transit.withColumn("Latitude", col("SplitCoords").getItem(0).cast(DoubleType()))
df_rapid_transit = df_rapid_transit.withColumn("Longitude", col("SplitCoords").getItem(1).cast(DoubleType())).drop("SplitCoords", "Coordinates")

# Broadcast the processed DataFrame
broadcast_rapid_transit = spark.sparkContext.broadcast(df_rapid_transit.collect())

# Define the UDF to calculate the minimum distance to a rapid transit location
def calculate_min_distance_to_rapid_transit(lat, lon):
    property_coords = (lat, lon)
    closest_distance = float("inf")
    for transit_location in broadcast_rapid_transit.value:
        transit_coords = (transit_location['Latitude'], transit_location['Longitude'])
        distance = great_circle(property_coords, transit_coords).kilometers
        if distance < closest_distance:
            closest_distance = distance
    return closest_distance * 1000  # Convert to meters

# Register the UDF with Spark
calculate_min_distance_to_rapid_transit_udf = udf(calculate_min_distance_to_rapid_transit, DoubleType())

In [20]:
# Add a new column to df_properties with the calculated distance to the nearest rapid transit location
df_properties = df_properties.withColumn(
    "CLOSEST_RAPID_TRANSIST",
    calculate_min_distance_to_rapid_transit_udf(col("LATITUDE"), col("LONGITUDE"))
)

df_properties.show(truncate=False)

broadcast_rapid_transit.unpersist()

[Stage 41:>                                                         (0 + 1) / 1]

+--------------------+----------+--------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------

                                                                                

In [21]:

school_distance_schema = StructType([
    StructField("MinDistance", DoubleType(), True),
    StructField("SchoolType", StringType(), True)
])

# Load and preprocess the schools CSV file
df_schools = spark.read.csv('Cleaned/school-cleaned.csv', header=True, inferSchema=True)
df_schools = df_schools.withColumn("SplitCoords", split(col("Coordinates"), ",\s*"))
df_schools = df_schools.withColumn("Latitude", col("SplitCoords").getItem(0).cast(DoubleType()))
df_schools = df_schools.withColumn("Longitude", col("SplitCoords").getItem(1).cast(DoubleType())).drop("SplitCoords")

# Broadcast the processed DataFrame
broadcast_schools = spark.sparkContext.broadcast(df_schools.collect())

# Define the UDF to calculate the minimum distance to a school and include the school's type
def calculate_school_distance_and_type(lat, lon):
    property_coords = (lat, lon)
    closest_distance = float("inf")
    school_type = None
    for school in broadcast_schools.value:
        school_coords = (school['Latitude'], school['Longitude'])
        distance = great_circle(property_coords, school_coords).kilometers
        if distance < closest_distance:
            closest_distance = distance
            school_type = school['School Type']
    return (closest_distance * 1000, school_type)  # Convert to meters and get the school type

# Register the UDF with Spark
calculate_school_distance_and_type_udf = udf(calculate_school_distance_and_type, school_distance_schema)

In [22]:
# Add a new column to df_properties with the calculated distance to the nearest school and the school's type
df_properties = df_properties.withColumn(
    "CLOSEST_SCHOOL_METERS",
    calculate_school_distance_and_type_udf(col("LATITUDE"), col("LONGITUDE"))
)

df_properties.show(truncate=False)

broadcast_schools.unpersist()

[Stage 45:>                                                         (0 + 1) / 1]

+--------------------+----------+--------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------

                                                                                

In [23]:

df_bus_stops = spark.read.csv('Cleaned/bus-stops-cleaned.csv', header=True, inferSchema=True)

# Broadcast the processed DataFrame
broadcast_bus_stops = spark.sparkContext.broadcast(df_bus_stops.collect())

# Define the UDF to calculate the minimum distance to a bus stop
def calculate_bus_stop_distance(lat, lon):
    property_coords = (lat, lon)
    closest_distance = float("inf")
    for bus_stop in broadcast_bus_stops.value:
        bus_stop_coords = (bus_stop['Latitude'], bus_stop['Longitude'])
        distance = great_circle(property_coords, bus_stop_coords).kilometers
        if distance < closest_distance:
            closest_distance = distance
    return closest_distance * 1000  # Convert to meters

# Register the UDF with Spark
calculate_bus_stop_distance_udf = udf(calculate_bus_stop_distance, DoubleType())

In [24]:
# Add a new column to df_properties with the calculated distance to the nearest bus stop
df_properties = df_properties.withColumn(
    "CLOSEST_BUS_STOPS",
    calculate_bus_stop_distance_udf(col("LATITUDE"), col("LONGITUDE"))
)

df_properties.show(truncate=False)

broadcast_bus_stops.unpersist()

[Stage 49:>                                                         (0 + 1) / 1]

+--------------------+----------+--------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------

                                                                                

In [25]:
tuple_columns = ["CLOSEST_SCHOOL_METERS", "CLOSEST_PARK_METERS", "CLOSEST_GRAFFITI_METERS", "CULTURAL_SPACES_DETAILS"]

# Convert tuples to a string because cannot save tuples to csv file
for column in tuple_columns:
    df_properties = df_properties.withColumn(column, col(column).cast("string"))

df_properties.coalesce(1).write.csv("Cleaned/ML-Data.csv", mode="overwrite", header=True)

# Stop the session
spark.stop()

                                                                                