In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, array, explode, struct, udf, count
from pyspark.sql.types import StringType
import os
import json
from functools import reduce
from datetime import datetime, date
from pathlib import Path
from sedona.spark import SedonaContext
from dotenv import load_dotenv
from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType, MapType
from sedona.sql.types import GeometryType
from sedona.register import SedonaRegistrator
from pyspark.sql.functions import input_file_name
load_dotenv()

True

In [20]:
CACHE_DIR = Path('../_cache')
LOGS_DIR = Path('../_logs')
DATASET_DIR = Path('../datasets')
OUTPUT_DIR = Path('../outputs')

os.makedirs(CACHE_DIR, exist_ok=True)
os.makedirs(LOGS_DIR, exist_ok=True)
os.makedirs(DATASET_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)

In [21]:
sedona = SedonaContext.builder() \
    .appName("BDCCFinalExam") \
    .master("local[*]") \
    .config("spark.driver.memory", "7g") \
    .config("spark.executor.memory", "7g") \
    .config("spark.storage.memoryFraction","0.4") \
    .config("spark.memory.fraction","0.6") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID")) \
    .config("spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY")) \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .config("spark.hadoop.fs.s3a.fast.upload", "true") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true") \
    .config("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true") \
    .config("spark.jars.packages",
            'org.apache.sedona:sedona-spark-shaded-3.3_2.12:1.7.1,'
            "org.apache.hadoop:hadoop-aws:3.2.0,"\
            'org.datasyslab:geotools-wrapper:1.7.1-28.5,'\
            "com.amazonaws:aws-java-sdk-bundle:1.11.375") \
    .getOrCreate()

In [22]:
# Register Sedona UDTs and functions
SedonaRegistrator.registerAll(sedona)

  SedonaRegistrator.registerAll(sedona)
25/06/08 21:58:56 WARN UDTRegistration: Cannot register UDT for org.geotools.coverage.grid.GridCoverage2D, which is already registered.
25/06/08 21:58:56 WARN SimpleFunctionRegistry: The function rs_union_aggr replaced a previously registered function.
25/06/08 21:58:56 WARN UDTRegistration: Cannot register UDT for org.locationtech.jts.geom.Geometry, which is already registered.
25/06/08 21:58:56 WARN UDTRegistration: Cannot register UDT for org.apache.sedona.common.geometryObjects.Geography, which is already registered.
25/06/08 21:58:56 WARN UDTRegistration: Cannot register UDT for org.locationtech.jts.index.SpatialIndex, which is already registered.
25/06/08 21:58:56 WARN SimpleFunctionRegistry: The function st_envelope_aggr replaced a previously registered function.
25/06/08 21:58:56 WARN SimpleFunctionRegistry: The function st_intersection_aggr replaced a previously registered function.
25/06/08 21:58:56 WARN SimpleFunctionRegistry: The func

True

### Step 1 — Define `process_province()` function

This function implements the full data processing pipeline for generating machine-learning-ready features from a single province's GeoJSON data.

#### Inputs:
- `df`: Spark DataFrame containing the `properties` of the province GeoJSON
- `mapping_dict`: dictionary mapping raw feature values to clean categories
- `quarter_date`: string representing the current quarter (e.g. "2018-07-01"), to be added as a column for time-based analysis

#### Processing steps:
1️⃣ Selects and handles relevant feature columns:
   - `building`, `amenity`, `leisure`, `public_transport`, `office`, `shop`, `tourism`
   - Handles flexible schemas — only uses columns present in the file.

2️⃣ Explodes multi-column categorical data into a long format:
   - Ensures multiple categories per row are properly represented.

3️⃣ Maps raw feature values to clean categories:
   - Uses a **case-insensitive mapping**.
   - Any unknown or unmapped values are assigned a safe default category `"uncat__OTHER"` to prevent pivot column collisions.

4️⃣ Aggregates feature counts:
   - Performs a groupby-pivot to create a wide feature table, with one column per category and counts as values.

5️⃣ Adds metadata columns:
   - Renames `"province"` → `"gadm"` for consistency with administrative boundaries.
   - Adds a `"date"` column with the quarter date, enabling time-based ML.

#### Output:
- Returns a Spark DataFrame with:
  - One row per province
  - Feature columns = category counts
  - `"gadm"` column = province name
  - `"date"` column = quarter date

#### Purpose:
- The resulting DataFrame is suitable for:
  - ML feature engineering
  - Time-series analysis
  - Clustering and classification
  - Longitudinal studies of urban change and amenities


In [23]:
# Define function
def process_province(df, mapping_dict, quarter_date):
    # 1️⃣ Safe default category
    DEFAULT_CATEGORY = "uncat__OTHER"

    # 2️⃣ Define type columns
    desired_type_columns = [
        "building",
        "amenity",
        "leisure",
        "public_transport",
        "office",
        "shop",
        "tourism"
    ]
    type_columns = [col for col in desired_type_columns if col in df.columns]

    # 3️⃣ Explode columns
    exploded_array = array(
        *[struct(lit(c).alias("column"), col(c).alias("value")) for c in type_columns]
    )
    df_exploded = (
        df
        .withColumn("exploded", exploded_array)
        .withColumn("exploded", explode(col("exploded")))
        .select(
            "province",  # will rename later
            "exploded.column",
            "exploded.value"
        )
    )
    df_exploded = df_exploded.filter(col("value").isNotNull())

    # 4️⃣ Define UDF — case-insensitive mapping with safe default
    def map_to_category(value):
        if value is None:
            return DEFAULT_CATEGORY
        return mapping_dict.get(value.lower(), DEFAULT_CATEGORY)

    map_to_category_udf = udf(map_to_category, StringType())

    # 5️⃣ Map to new_category
    df_mapped = df_exploded.withColumn(
        "new_category",
        map_to_category_udf(col("value"))
    )

    # 6️⃣ Pivot — aggregate counts
    df_final = (
        df_mapped
        .groupBy("province")  # will rename later
        .pivot("new_category")
        .agg(count("value"))
        .fillna(0)
    )

    # 7️⃣ Rename province → gadm
    df_final = df_final.withColumnRenamed("province", "gadm")

    # 8️⃣ Add date column
    df_final = df_final.withColumn("date", lit(quarter_date))

    # 9️⃣ Return final DataFrame
    return df_final


### Step 2 — Per-quarter processing: generate single Parquet file

This block processes **one quarter (date folder)** of GeoJSON data:

#### Inputs:
- `quarter_date`: quarter to process (ex: "2018-07-01")
- `quarter_folder`: path to `date=YYYY-MM-DD` folder

#### Processing steps:
1️⃣ Loads the category mapping:
   - Ensures all mappings are **case-insensitive**.
   - Prevents `"uncategorized"` from being used as a mapped category — uses `"uncat__OTHER"` instead.

2️⃣ Iterates through:
   - Each region folder in this quarter.
   - Each province GeoJSON file inside the region folder.

3️⃣ For each province:
   - Loads and flattens GeoJSON → Spark DataFrame.
   - Drops any conflicting `"uncategorized"` column if present in raw data.
   - Runs `process_province()` to produce an ML feature table for that province.
   - Collects each province feature table into `df_list`.

4️⃣ Combines all provinces for this quarter:
   - Uses `unionByName(allowMissingColumns=True)` to align columns across provinces (some categories may not appear in every province).
   - Result is one unified DataFrame for the entire quarter.

5️⃣ Saves result:
   - Saves one **Parquet dataset per quarter** → ready for ML pipelines.
   - Output path: `output/features_quarter=<quarter_date>/features.parquet`

#### Output:
- A single **Parquet file** per quarter, containing one row per province with:
  - `"gadm"` = province name
  - Category count columns (features)
  - `"date"` column

#### Purpose:
- Standardizes feature engineering across all quarters.
- Ensures data is ready for:
  - Longitudinal time-based ML
  - Clustering
  - Classification
  - Visualizations
  - Reporting


## Download and Transform Amenity Counts

In [24]:
schema = StructType([
    StructField("crs", StructType([
        StructField("properties", StructType([
            StructField("name", StringType(), True)
        ]), True),
        StructField("type", StringType(), True)
    ]), True),

    StructField("features", ArrayType(
        StructType([
            StructField("geometry", GeometryType(), True), 
            StructField("properties", StructType([
                StructField("amenity", StringType(), True),
                StructField("building", StringType(), True),
                StructField("element", StringType(), True),
                StructField("id", LongType(), True),
                StructField("leisure", StringType(), True),
                StructField("name", StringType(), True),
                StructField("office", StringType(), True),
                StructField("province", StringType(), True),
                StructField("public_transport", StringType(), True),
                StructField("region", StringType(), True),
                StructField("shop", StringType(), True),
                StructField("tourism", StringType(), True)
            ]), True),
            StructField("type", StringType(), True)
        ])
    ), True),

    StructField("name", StringType(), True),
    StructField("type", StringType(), True)
])

In [25]:
snapshot_dates = [
    date(year, month, 1)
    for year in range(2023, 2017, -1)
    for month in (1, 4, 7, 10)  # January, April, July, October
]

print("Snapshot dates:")
print(snapshot_dates)

Snapshot dates:
[datetime.date(2023, 1, 1), datetime.date(2023, 4, 1), datetime.date(2023, 7, 1), datetime.date(2023, 10, 1), datetime.date(2022, 1, 1), datetime.date(2022, 4, 1), datetime.date(2022, 7, 1), datetime.date(2022, 10, 1), datetime.date(2021, 1, 1), datetime.date(2021, 4, 1), datetime.date(2021, 7, 1), datetime.date(2021, 10, 1), datetime.date(2020, 1, 1), datetime.date(2020, 4, 1), datetime.date(2020, 7, 1), datetime.date(2020, 10, 1), datetime.date(2019, 1, 1), datetime.date(2019, 4, 1), datetime.date(2019, 7, 1), datetime.date(2019, 10, 1), datetime.date(2018, 1, 1), datetime.date(2018, 4, 1), datetime.date(2018, 7, 1), datetime.date(2018, 10, 1)]


In [14]:
DEFAULT_CATEGORY = "uncat__OTHER"

with open(DATASET_DIR / "feature_to_new_category.json", "r") as f:
    mapping_dict_raw = json.load(f)

mapping_dict = {
    k.lower(): (v if v.lower() != "uncategorized" else DEFAULT_CATEGORY)
    for k, v in mapping_dict_raw.items()
}

In [None]:
for date in snapshot_dates:
    date__str = date.strftime("%Y-%m-%d")

    df_raw = sedona.read.format("geojson") \
        .option("multiLine", "true") \
        .schema(schema) \
        .load(f"s3a://amenities-dataset/amenities_v2/date={date__str}")

    df_exploded = df_raw.select(explode("features").alias("feature"))
    df = df_exploded.select("feature.properties.*")

    # 🚀 Drop conflicting column if exists
    if "uncategorized" in df.columns:
        print(f"⚠️  WARNING: Dropping conflicting column 'uncategorized' in {file_path}")
        df = df.drop("uncategorized")

    df_final = process_province(df, mapping_dict, date__str)
    df_final.write.mode("overwrite").parquet(str(OUTPUT_DIR / "features" / f"features_quarter={date__str}"))