In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, current_timestamp, concat, lit
import os

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
listings_path = "hdfs://namenode:9000/user/hive/warehouse/airbnb.db/bilal/bronze/listings"

df_listings = (
    spark.read
    .option("mergeSchema", "true")
    .parquet(listings_path)
)

df_listings.printSchema()

root
 |-- id: long (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: long (nullable = true)
 |-- last_scraped: string (nullable = true)
 |-- source: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: long (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_count: do

In [4]:
df_listings.count()

104652

In [5]:
df_listings.select("extraction_date").distinct().show()

+---------------+
|extraction_date|
+---------------+
|     2025-09-14|
|     2025-09-21|
|     2025-09-26|
+---------------+



In [28]:
# neigh_path = "hdfs://namenode:9000/user/hive/warehouse/airbnb.db/bilal/bronze/neighbourhoods"

# df_neigh = (
#     spark.read
#     .option("mergeSchema", "true")
#     .parquet(neigh_path)
# )

# df_neigh.printSchema()

root
 |-- neighbourhood: string (nullable = true)
 |-- neighbourhood_group: integer (nullable = true)
 |-- geometry: binary (nullable = true)
 |-- updated_at_utc_0: timestamp (nullable = true)
 |-- city: string (nullable = true)



In [29]:
# df_neigh.count()

178

In [30]:
# df_neigh.show(5)

+--------------------+-------------------+--------------------+--------------------+------+
|       neighbourhood|neighbourhood_group|            geometry|    updated_at_utc_0|  city|
+--------------------+-------------------+--------------------+--------------------+------+
|Kingston upon Thames|               NULL|[01 06 00 00 00 0...|2026-01-06 08:06:...|london|
|             Croydon|               NULL|[01 06 00 00 00 0...|2026-01-06 08:06:...|london|
|             Bromley|               NULL|[01 06 00 00 00 0...|2026-01-06 08:06:...|london|
|            Hounslow|               NULL|[01 06 00 00 00 0...|2026-01-06 08:06:...|london|
|              Ealing|               NULL|[01 06 00 00 00 0...|2026-01-06 08:06:...|london|
+--------------------+-------------------+--------------------+--------------------+------+
only showing top 5 rows



In [10]:
# from pyspark.sql.functions import udf
# from pyspark.sql.types import StringType
# from shapely import wkb
# import json

# def wkb_to_geojson(wkb_bytes):
#     if wkb_bytes is None:
#         return None
#     geom = wkb.loads(wkb_bytes)
#     return json.dumps({
#         "type": geom.geom_type,
#         "coordinates": geom.__geo_interface__["coordinates"]
#     })

# wkb_to_geojson_udf = udf(wkb_to_geojson, StringType())

In [11]:
# neigh_geo = (
#     df_neigh
#     .withColumn("geojson", wkb_to_geojson_udf("geometry"))
#     .drop("geometry")
# )

In [12]:
# neigh_geo = neigh_geo.withColumnRenamed("city", "neigh_city")

In [13]:
# neigh_geo.printSchema()

root
 |-- neighbourhood: string (nullable = true)
 |-- neighbourhood_group: integer (nullable = true)
 |-- updated_at_utc_0: timestamp (nullable = true)
 |-- neigh_city: string (nullable = true)
 |-- geojson: string (nullable = true)



In [6]:
from pyspark.sql.functions import regexp_replace, col

df_listings_clean = df_listings.withColumn(
    "price_num",
    regexp_replace(col("price"), "[$,]", "").cast("double")
)

In [7]:
df_listings_clean.printSchema()

root
 |-- id: long (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: long (nullable = true)
 |-- last_scraped: string (nullable = true)
 |-- source: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: long (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_count: do

In [8]:
from pyspark.sql.functions import avg, count, sum

df_listings_agg = (
    df_listings_clean
    .groupBy("city", "neighbourhood_cleansed")
    .agg(
        count("*").alias("listing_count"),
        avg("latitude").alias("lat_centroid"),
        avg("longitude").alias("lon_centroid"),
        avg("price_num").alias("avg_price"),
        avg("review_scores_rating").alias("avg_rating"),
        avg("estimated_occupancy_l365d").alias("avg_occupancy"),
        sum("estimated_revenue_l365d").alias("total_revenue")
    )
)

In [9]:
df_listings_agg.printSchema()

root
 |-- city: string (nullable = true)
 |-- neighbourhood_cleansed: string (nullable = true)
 |-- listing_count: long (nullable = false)
 |-- lat_centroid: double (nullable = true)
 |-- lon_centroid: double (nullable = true)
 |-- avg_price: double (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- avg_occupancy: double (nullable = true)
 |-- total_revenue: double (nullable = true)



In [10]:
city_centers = {
    "london": (51.5072, -0.1276),
    "bristol": (51.4538, -2.5918),
    "edinburgh": (55.9533, -3.1883)
}

In [11]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import DoubleType
import math

def haversine(lat1, lon1, lat2, lon2):
    # convert decimal degrees to radians 
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1 
    dlon = lon2 - lon1 
    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
    c = 2 * math.asin(math.sqrt(a)) 
    km = 6371 * c  # Earth radius in km
    return km

# Register UDF
haversine_udf = udf(lambda lat, lon, city: haversine(lat, lon, *city_centers[city]), DoubleType())

In [12]:
df_listings_agg = df_listings_agg.withColumn(
    "dist_to_city_center_km",
    haversine_udf(col("lat_centroid"), col("lon_centroid"), col("city"))
)

In [13]:
df_listings_agg.printSchema()

root
 |-- city: string (nullable = true)
 |-- neighbourhood_cleansed: string (nullable = true)
 |-- listing_count: long (nullable = false)
 |-- lat_centroid: double (nullable = true)
 |-- lon_centroid: double (nullable = true)
 |-- avg_price: double (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- avg_occupancy: double (nullable = true)
 |-- total_revenue: double (nullable = true)
 |-- dist_to_city_center_km: double (nullable = true)



In [14]:
df_listings_agg.show(5)

+------+----------------------+-------------+------------------+--------------------+------------------+-----------------+------------------+-------------+----------------------+
|  city|neighbourhood_cleansed|listing_count|      lat_centroid|        lon_centroid|         avg_price|       avg_rating|     avg_occupancy|total_revenue|dist_to_city_center_km|
+------+----------------------+-------------+------------------+--------------------+------------------+-----------------+------------------+-------------+----------------------+
|london|             Southwark|         5475| 51.48616450648947|-0.07890336527554333|174.27688004972032|4.699992850333639|48.295159817351596|  3.4171553E7|    4.1030575226598405|
|london|        Waltham Forest|         1955| 51.57853798442461|-0.01275952574545...|117.31223980016652|4.756645569620255| 38.01943734015345|    6406467.0|     11.22479191844626|
|london|               Croydon|         1717| 51.38006414588728|-0.09538084124085859|102.36256323777403|4

In [15]:
df_listings_agg.count()

178

In [16]:
ch_url = "jdbc:ch://analytics-clickhouse:8123/default?user=spark_admin&password=spark_123"

ch_properties = {
    "driver": "com.clickhouse.jdbc.ClickHouseDriver",
    "createTableOptions": "ENGINE = MergeTree() ORDER BY (city, neighbourhood_cleansed)"
}

df_listings_agg.write.jdbc(
    url=ch_url,
    table="neighbourhood_listings_agg",
    mode="overwrite",
    properties=ch_properties
)