In [4]:
%%configure -f
{
    "jars": [
        "<ADD ADFS PATH>/geotools-wrapper-1.8.0-33.1.jar",
        "<ADD ADFS PATH>/sedona-spark-shaded-3.5_2.12-1.8.0.jar" 
        ]
}

StatementMeta(, 83c40b2b-b419-4d14-baf9-37efeb6f1d82, -1, Finished, Available, Finished)

In [5]:
# Test cell, to check for correct installation Sedona

from sedona.spark import *

sedona = SedonaContext.create(spark)

sedona.sql("SELECT ST_GeomFromEWKT('SRID=4269;POINT(40.7128 -74.0060)')").show()

StatementMeta(, 83c40b2b-b419-4d14-baf9-37efeb6f1d82, 5, Finished, Available, Finished)

+--------------------------------------------------+
|st_geomfromewkt(SRID=4269;POINT(40.7128 -74.0060))|
+--------------------------------------------------+
|                              POINT (40.7128 -7...|
+--------------------------------------------------+



In [6]:
# check the directories within default lakehouse

import os

root = "/lakehouse/default/Files"
items = os.listdir(root)

for name in items:
    full = os.path.join(root, name)
    print(f"{'DIR ' if os.path.isdir(full) else 'FILE'}  {full}")



StatementMeta(, 83c40b2b-b419-4d14-baf9-37efeb6f1d82, 6, Finished, Available, Finished)

DIR   /lakehouse/default/Files/Apache_Sedona_jars


In [7]:
# Download BAG data (building footprints Netherlands)

# Import required modules
import os
import requests
from datetime import datetime

# Create folder name with current month and year as postfix
current_time = datetime.now()
folder_name = f"BAG_download_{current_time.strftime('%Y%m')}"
lh_folder = f"/lakehouse/default/Files/{folder_name}"

# Ensure the target folder exists
os.makedirs(lh_folder, exist_ok=True)

# File download GeoPackage
# For further information see https://service.pdok.nl/lv/bag/atom/bag.xml
gpkg_url = "https://service.pdok.nl/lv/bag/atom/downloads/bag-light.gpkg"
gpkg_path = os.path.join(lh_folder, "bag-light.gpkg")

# Download the GeoPackage file from the given URL
response = requests.get(gpkg_url, stream=True)
with open(gpkg_path, "wb") as f:
    for chunk in response.iter_content(chunk_size=1024):
        if chunk:
            f.write(chunk)


StatementMeta(, 83c40b2b-b419-4d14-baf9-37efeb6f1d82, 7, Finished, Available, Finished)

In [8]:
# Download BRT data (bestuurlijke gebieden / administrative areas Netherlands)

# Import required modules
import os
import requests
from datetime import datetime

# Create folder name with current month and year as postfix
current_time = datetime.now()
folder_name = f"BRK_download_{current_time.strftime('%Y')}"
lh_folder = f"/lakehouse/default/Files/{folder_name}"

# Ensure the target folder exists
os.makedirs(lh_folder, exist_ok=True)

# File download GeoPackage
# For further information see https://nationaalgeoregister.nl/geonetwork/srv/dut/catalog.search#/metadata/208bc283-7c66-4ce7-8ad3-1cf3e8933fb5
gpkg_url = "https://service.pdok.nl/kadaster/brk-bestuurlijke-gebieden/atom/downloads/BestuurlijkeGebieden_2026.gpkg"
gpkg_path = os.path.join(lh_folder, "BestuurlijkeGebieden_2026.gpkg")

# Download the GeoPackage file from the given URL
response = requests.get(gpkg_url, stream=True)
with open(gpkg_path, "wb") as f:
    for chunk in response.iter_content(chunk_size=1024):
        if chunk:
            f.write(chunk)

StatementMeta(, 83c40b2b-b419-4d14-baf9-37efeb6f1d82, 8, Finished, Available, Finished)

In [9]:

# ===========================
# BAG + Provinces GeoPackage → GeoJSON (WGS84 / EPSG:4326), split to < 1 GB per file (with 4 KB buffer)
# Sedona 1.8 native GeoPackage reader; auto-discover SQLite table names & per-layer CRS
# Parallel province processing: ONE broadcast spatial join for PAND and ONE for VBO
# Bundle PAND and VERBLIJFSOBJECT per Province (attribute 'naam'); export provinces layer as BRK_provincies
# Unique temp directories per chunk; versioned output folder (_v01)
# ===========================

from sedona.spark import *
sedona = SedonaContext.create(spark)

from pyspark.sql import functions as F, Window, types as T
from pyspark import StorageLevel
from notebookutils import mssparkutils
from datetime import datetime
from uuid import uuid4
import os, re, sqlite3, shutil, time

# --- Default CRS (used if GPKG doesn't provide SRS metadata) ---
DEFAULT_SOURCE_CRS = "EPSG:28992"   # RD New (typical for BAG & PDOK datasets)
TARGET_CRS = "EPSG:4326"            # WGS84

# Friendly layer selectors for BAG (we will resolve real table names from gpkg_contents)
BAG_FRIENDLY_LAYERS = {
    "pand":            ["pand"],
    "verblijfsobject": ["verblijfsobject", "vbo"],
    "standplaats":     ["standplaats"],
    "ligplaats":       ["ligplaats"],
}

# Provinces layer selector (resolve actual table; typical name: 'provinciegebied')
PROV_FRIENDLY_LAYERS = {
    "provincie": ["provincie", "provinciegebied"]
}
PROV_NAME_COL = "naam"  # Provinces attribute for the province name

# Sedona / join tuning (adjust to your cluster)
spark.conf.set("sedona.join.gridtype", "kdbtree")
spark.conf.set("sedona.join.index", "true")
spark.conf.set("sedona.join.buildOnSpatialPartitionedRDD", "true")  # accelerate indexed joins
spark.conf.set("spark.sql.shuffle.partitions", "200")               # for wide shuffles

# Parallelism target: ~4x core count is a reasonable start
TARGET_PARTITIONS = 32

# --- Input dirs (BAG per month; Provinces per year) and versioned output ---
current_time    = datetime.now()
version_postfix = "v01"  # bump to v02/v03 for isolated runs
bag_folder_name_in   = f"BAG_download_{current_time.strftime('%Y%m')}"
prov_folder_name_in  = f"BRK_download_{current_time.strftime('%Y')}"   # from your download cell
out_folder_name      = f"BAG_geojson_province_WGS84_4326_{current_time.strftime('%Y%m')}_{version_postfix}"

# Lakehouse mounted paths (use file: URIs for Spark/Sedona)
BAG_IN_DIR   = f"file:/lakehouse/default/Files/{bag_folder_name_in}"
PROV_IN_DIR  = f"file:/lakehouse/default/Files/{prov_folder_name_in}"
OUTPUT_DIR   = f"file:/lakehouse/default/Files/{out_folder_name}"
mssparkutils.fs.mkdirs(OUTPUT_DIR)

# ---- helpers ----
def sanitize_filename(s: str) -> str:
    s = (s or "").strip()
    s = re.sub(r"[^\w\-]+", "_", s)
    s = s.strip("_")
    return s or "unknown"

def rm_any(path_uri: str, recursive: bool = True):
    """Remove paths via mssparkutils, Hadoop FS, and local OS for robustness."""
    # 1) mssparkutils
    try:
        if mssparkutils.fs.exists(path_uri):
            mssparkutils.fs.rm(path_uri, recursive)
    except Exception:
        pass
    # 2) Hadoop FS
    try:
        jpath = spark._jvm.org.apache.hadoop.fs.Path(path_uri)
        fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
        fs.delete(jpath, recursive)
    except Exception:
        pass
    # 3) Local OS
    try:
        local_path = path_uri.replace("file:", "")
        if os.path.isdir(local_path):
            shutil.rmtree(local_path, ignore_errors=True)
        elif os.path.isfile(local_path):
            os.remove(local_path)
    except Exception:
        pass

def detect_geometry_column(df):
    for cand in ["geometry", "geom", "the_geom", "wkb", "wkt"]:
        if cand in df.columns:
            return cand
    raise RuntimeError(f"No geometry-like column found. Columns: {df.columns}")

def to_wgs84(df, geom_col, src_crs: str):
    """Transform geometries to WGS84 from per-layer source CRS."""
    source = src_crs or DEFAULT_SOURCE_CRS
    return df.selectExpr("*", f"ST_Transform({geom_col}, '{source}', '{TARGET_CRS}') AS geometry_wgs84")

def to_feature_df(gdf_wgs84, include_geom_col="geometry_wgs84", drop_cols=None):
    if drop_cols is None: drop_cols = []
    prop_cols = [c for c in gdf_wgs84.columns if c not in {include_geom_col} | set(drop_cols)]
    return (
        gdf_wgs84
        .withColumn("geom_json", F.expr(f"ST_AsGeoJSON({include_geom_col})"))
        .withColumn("props_json", F.to_json(F.struct(*[F.col(c) for c in prop_cols])))
        .select(
            F.concat(
                F.lit('{"type":"Feature","geometry":'),
                F.col("geom_json"),
                F.lit(',"properties":'),
                F.col("props_json"),
                F.lit("}")
            ).alias("feature_json")
        )
    )

def write_featurecollection_chunks(df_features, final_base_name):
    """Write FeatureCollection GeoJSON under OUTPUT_DIR, split to ≤ 1 GB with 4 KB buffer."""
    MAX_BYTES = 1 * 1024 * 1024 * 1024 - 4096

    sized = (
        df_features
        .withColumn("feature_bytes", F.length("feature_json").cast("long") + F.lit(2))  # +2 for comma/newline
        .withColumn("row_id", F.monotonically_increasing_id())
    )
    w = Window.orderBy("row_id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    sized = sized.withColumn("cum_bytes", F.sum("feature_bytes").over(w))
    chunked = sized.withColumn("chunk_id", F.floor((F.col("cum_bytes") - F.lit(1)) / F.lit(MAX_BYTES))).cache()

    chunk_ids = [r.chunk_id for r in chunked.select("chunk_id").distinct().collect()]
    print(f"Writing {final_base_name}: {len(chunk_ids)} chunk(s)")

    for idx, cid in enumerate(sorted(chunk_ids), start=1):
        out_file = (
            f"{OUTPUT_DIR}/{final_base_name}.geojson" if len(chunk_ids) == 1
            else f"{OUTPUT_DIR}/{final_base_name}_{idx:02d}.geojson"
        )
        # Unique temp dir to avoid any rerun collisions
        tmp_out_dir = f"{OUTPUT_DIR}/__tmp_{final_base_name}_{idx:02d}_{uuid4().hex}"

        # Build FeatureCollection stream (header, features, footer)
        rdd = (
            chunked.filter(F.col("chunk_id") == cid)
            .select("feature_json")
            .rdd
            .map(lambda r: r[0])
        )

        def add_header_footer(_, it):
            yield '{"type":"FeatureCollection","features":['
            first = True
            for f in it:
                if first: yield f; first = False
                else:     yield "," + f
            yield ']}'

        rdd_fc = rdd.coalesce(1).mapPartitionsWithIndex(add_header_footer)

        df_text = spark.createDataFrame(rdd_fc.map(lambda s: (s,)), ["value"])
        df_text.coalesce(1).write.text(tmp_out_dir)  # temp path is unique; no overwrite required

        # Move single part to final file name
        parts = mssparkutils.fs.ls(tmp_out_dir)
        part_files = [p.path for p in parts if p.path.endswith("/part-00000")]
        if not part_files:
            part_files = [p.path for p in parts if "/part-" in p.path]
        if not part_files:
            rm_any(tmp_out_dir, recursive=True)
            raise RuntimeError(f"No output part file produced for {final_base_name}, chunk {idx}")

        # Overwrite final if exists
        rm_any(out_file, recursive=True)
        mssparkutils.fs.mv(part_files[0], out_file, True, True)
        rm_any(tmp_out_dir, recursive=True)

        meta = [f for f in mssparkutils.fs.ls(OUTPUT_DIR) if f.path == out_file]
        if meta:
            print(f"  -> {out_file}  size={meta[0].size} bytes")

# ---- GeoPackage utilities (discover tables and per-layer CRS) ----
def list_feature_tables(local_gpkg_path: str):
    conn = sqlite3.connect(local_gpkg_path)
    cur = conn.cursor()
    cur.execute("SELECT table_name FROM gpkg_contents WHERE data_type='features'")
    rows = [r[0] for r in cur.fetchall()]
    conn.close()
    return rows

def pick_table(tables, candidates):
    """Return first table whose name contains any candidate substring (case-insensitive)."""
    low = [t.lower() for t in tables]
    for cand in candidates:
        c = cand.lower()
        for i, t in enumerate(low):
            if c in t:
                return tables[i]
    return None

def get_layer_epsg(local_gpkg_path: str, table_name: str) -> str:
    """
    Return 'EPSG:<code>' for the layer if available from gpkg_geometry_columns + gpkg_spatial_ref_sys.
    Fallback to DEFAULT_SOURCE_CRS if not resolvable.
    """
    try:
        conn = sqlite3.connect(local_gpkg_path)
        cur = conn.cursor()
        cur.execute("""
            SELECT srs.organization, srs.organization_coordsys_id
            FROM gpkg_geometry_columns gc
            JOIN gpkg_spatial_ref_sys srs
              ON gc.srs_id = srs.srs_id
            WHERE gc.table_name = ?
        """, (table_name,))
        row = cur.fetchone()
        conn.close()
        if row and row[0] and row[1]:
            org, org_id = row[0], row[1]
            if str(org).upper() == "EPSG":
                return f"EPSG:{org_id}"
    except Exception:
        pass
    return DEFAULT_SOURCE_CRS

def read_layer_geopackage(gpkg_uri_file: str, table_name: str):
    """Sedona GeoPackage reader (point to specific .gpkg file via file: URI)."""
    return (
        sedona.read
        .format("geopackage")
        .option("tableName", table_name)
        .load(gpkg_uri_file)
    )

# ---------------------------
# Locate the two GeoPackages
# ---------------------------

# BAG
bag_entries = mssparkutils.fs.ls(BAG_IN_DIR)
bag_gpkg_files = [e.name for e in bag_entries if e.isFile and e.name.lower().endswith(".gpkg")]
if not bag_gpkg_files:
    raise FileNotFoundError(f"No BAG .gpkg found in {BAG_IN_DIR}")
if len(bag_gpkg_files) > 1:
    print(f"[Info] Multiple BAG .gpkg found; using first: {bag_gpkg_files[0]}")
BAG_GPKG_URI  = f"{BAG_IN_DIR}/{bag_gpkg_files[0]}"
BAG_GPKG_LOCAL = BAG_GPKG_URI.replace("file:", "")
print(f"Using BAG GeoPackage: {BAG_GPKG_URI}")

# Provinces (BestuurlijkeGebieden)
prov_entries = mssparkutils.fs.ls(PROV_IN_DIR)
prov_gpkg_files = [e.name for e in prov_entries if e.isFile and e.name.lower().endswith(".gpkg")]
if not prov_gpkg_files:
    raise FileNotFoundError(f"No provinces .gpkg found in {PROV_IN_DIR}")
if len(prov_gpkg_files) > 1:
    print(f"[Info] Multiple province .gpkg found; using first: {prov_gpkg_files[0]}")
PROV_GPKG_URI  = f"{PROV_IN_DIR}/{prov_gpkg_files[0]}"
PROV_GPKG_LOCAL = PROV_GPKG_URI.replace("file:", "")
print(f"Using Provinces GeoPackage: {PROV_GPKG_URI}")

# ---------------------------
# Resolve actual table names
# ---------------------------

bag_tables  = list_feature_tables(BAG_GPKG_LOCAL)
prov_tables = list_feature_tables(PROV_GPKG_LOCAL)
print("BAG feature tables:", bag_tables)
print("Province feature tables:", prov_tables)

tbl_pand          = pick_table(bag_tables,  BAG_FRIENDLY_LAYERS["pand"])
tbl_vbo           = pick_table(bag_tables,  BAG_FRIENDLY_LAYERS["verblijfsobject"])
tbl_standplaats   = pick_table(bag_tables,  BAG_FRIENDLY_LAYERS["standplaats"])
tbl_ligplaats     = pick_table(bag_tables,  BAG_FRIENDLY_LAYERS["ligplaats"])
tbl_prov          = pick_table(prov_tables, PROV_FRIENDLY_LAYERS["provincie"])

missing = []
if not tbl_pand:        missing.append("pand")
if not tbl_vbo:         missing.append("verblijfsobject")
if not tbl_prov:        missing.append("provinciegebied")
# standplaats/ligplaats are optional

if missing:
    raise RuntimeError(f"Could not find expected tables: {missing}\n"
                       f"BAG tables: {bag_tables}\nProvince tables: {prov_tables}")

print("Resolved table names:",
      {"pand": tbl_pand, "verblijfsobject": tbl_vbo,
       "standplaats": tbl_standplaats, "ligplaats": tbl_ligplaats,
       "provincie": tbl_prov})

# ---------------------------
# Load layers and transform to WGS84 using per-layer CRS
# ---------------------------

# Determine per-layer EPSG (if available), fallback to RD New
epsg_pand = get_layer_epsg(BAG_GPKG_LOCAL,  tbl_pand)
epsg_vbo  = get_layer_epsg(BAG_GPKG_LOCAL,  tbl_vbo)
epsg_prov = get_layer_epsg(PROV_GPKG_LOCAL, tbl_prov)

gdf_pand_src = read_layer_geopackage(BAG_GPKG_URI,  tbl_pand)
gdf_vbo_src  = read_layer_geopackage(BAG_GPKG_URI,  tbl_vbo)
gdf_prov_src = read_layer_geopackage(PROV_GPKG_URI, tbl_prov)

geom_pand = detect_geometry_column(gdf_pand_src)
geom_vbo  = detect_geometry_column(gdf_vbo_src)
geom_prov = detect_geometry_column(gdf_prov_src)

# Repartition early to unlock parallelism
gdf_pand_wgs = to_wgs84(gdf_pand_src, geom_pand, epsg_pand).repartition(TARGET_PARTITIONS).persist(StorageLevel.MEMORY_AND_DISK)
gdf_vbo_wgs  = to_wgs84(gdf_vbo_src,  geom_vbo,  epsg_vbo ).repartition(TARGET_PARTITIONS).persist(StorageLevel.MEMORY_AND_DISK)
gdf_prov_wgs = to_wgs84(gdf_prov_src, geom_prov, epsg_prov).repartition(4).cache()

# Materialize
_ = gdf_pand_wgs.count(); _ = gdf_vbo_wgs.count(); _ = gdf_prov_wgs.count()

# ---------------------------
# Whole-layer exports (provinces + optional standplaats/ligplaats)
# ---------------------------

print("\nExporting provinces and optional whole layers with size-aware chunking...")

# Provinces (rename to BRK_provincies as requested)
write_featurecollection_chunks(
    to_feature_df(gdf_prov_wgs, include_geom_col="geometry_wgs84"),
    final_base_name="BRK_provincies"
)

# Optional whole-layer exports if present
if tbl_standplaats:
    epsg_stand = get_layer_epsg(BAG_GPKG_LOCAL, tbl_standplaats)
    gdf_stand_src = read_layer_geopackage(BAG_GPKG_URI, tbl_standplaats)
    geom_stand = detect_geometry_column(gdf_stand_src)
    gdf_stand_wgs = to_wgs84(gdf_stand_src, geom_stand, epsg_stand).repartition(8).cache()
    write_featurecollection_chunks(
        to_feature_df(gdf_stand_wgs, include_geom_col="geometry_wgs84"),
        final_base_name="BAG_standplaats"
    )

if tbl_ligplaats:
    epsg_lig = get_layer_epsg(BAG_GPKG_LOCAL, tbl_ligplaats)
    gdf_lig_src = read_layer_geopackage(BAG_GPKG_URI, tbl_ligplaats)
    geom_lig = detect_geometry_column(gdf_lig_src)
    gdf_lig_wgs = to_wgs84(gdf_lig_src, geom_lig, epsg_lig).repartition(8).cache()
    write_featurecollection_chunks(
        to_feature_df(gdf_lig_wgs, include_geom_col="geometry_wgs84"),
        final_base_name="BAG_ligplaats"
    )

# ---------------------------
# Parallel per-Province processing (ONE spatial join per layer)
# ---------------------------

print("\nExporting per Province (PAND and VERBLIJFSOBJECT) with parallel join …")

# Build one unioned geometry per province (12 rows)
t0 = time.time()
prov_unioned = (
    gdf_prov_wgs
    .groupBy(PROV_NAME_COL)
    .agg(F.expr("ST_Union_Aggr(geometry_wgs84) AS province_geom"))
    .repartition(12)
    .cache()
)
_ = prov_unioned.count()
print(f"Built province unions in {time.time() - t0:.1f}s")

# ONE distributed join for PAND
t1 = time.time()
pand_with_province = (
    gdf_pand_wgs.alias("p")
    .join(
        F.broadcast(prov_unioned).alias("r"),
        F.expr("ST_Intersects(p.geometry_wgs84, r.province_geom)")
    )
    .select(
        F.col(f"r.{PROV_NAME_COL}").alias(PROV_NAME_COL),
        *[F.col(f"p.{c}").alias(c) for c in gdf_pand_wgs.columns]
    )
    .repartition(TARGET_PARTITIONS, F.col(PROV_NAME_COL))     # cluster by province for fast filtering
    .persist(StorageLevel.MEMORY_AND_DISK)
)
pand_count = pand_with_province.count()
print(f"PAND join produced {pand_count} rows in {time.time() - t1:.1f}s")

# ONE distributed join for VBO
t2 = time.time()
vbo_with_province = (
    gdf_vbo_wgs.alias("v")
    .join(
        F.broadcast(prov_unioned).alias("r"),
        F.expr("ST_Intersects(v.geometry_wgs84, r.province_geom)")
    )
    .select(
        F.col(f"r.{PROV_NAME_COL}").alias(PROV_NAME_COL),
        *[F.col(f"v.{c}").alias(c) for c in gdf_vbo_wgs.columns]
    )
    .repartition(TARGET_PARTITIONS, F.col(PROV_NAME_COL))
    .persist(StorageLevel.MEMORY_AND_DISK)
)
vbo_count = vbo_with_province.count()
print(f"VBO join produced {vbo_count} rows in {time.time() - t2:.1f}s")

# Province list (distinct once)
prov_names = [r[PROV_NAME_COL] for r in prov_unioned.select(PROV_NAME_COL).collect()]
print(f"Writing per-province GeoJSON for {len(prov_names)} provinces …")

for prov_name in prov_names:
    safe_prov = sanitize_filename(prov_name)
    print(f"  -> Writing province: {prov_name}")

    # Filter (cheap – data already partitioned by province)
    df_pand_prov = pand_with_province.filter(F.col(PROV_NAME_COL) == prov_name)
    write_featurecollection_chunks(
        to_feature_df(df_pand_prov, include_geom_col="geometry_wgs84"),
        final_base_name=f"BAG_{safe_prov}"
    )

    df_vbo_prov = vbo_with_province.filter(F.col(PROV_NAME_COL) == prov_name)
    write_featurecollection_chunks(
        to_feature_df(df_vbo_prov, include_geom_col="geometry_wgs84"),
        final_base_name=f"BAG_verblijfsobject_{safe_prov}"
    )

print("\nAll files processed.")



StatementMeta(, 83c40b2b-b419-4d14-baf9-37efeb6f1d82, 9, Finished, Available, Finished)

Using BAG GeoPackage: file:/lakehouse/default/Files/BAG_download_202602/bag-light.gpkg
Using Provinces GeoPackage: file:/lakehouse/default/Files/BRK_download_2026/BestuurlijkeGebieden_2026.gpkg
BAG feature tables: ['pand', 'verblijfsobject', 'ligplaats', 'standplaats', 'woonplaats']
Province feature tables: ['gemeentegebied', 'landgebied', 'provinciegebied']
Resolved table names: {'pand': 'pand', 'verblijfsobject': 'verblijfsobject', 'standplaats': 'standplaats', 'ligplaats': 'ligplaats', 'provincie': 'provinciegebied'}

Exporting provinces and optional whole layers with size-aware chunking...
Writing BRK_provincies: 1 chunk(s)
  -> file:/lakehouse/default/Files/BAG_geojson_province_WGS84_4326_202602_v02/BRK_provincies.geojson  size=8518916 bytes
Writing BAG_standplaats: 1 chunk(s)
  -> file:/lakehouse/default/Files/BAG_geojson_province_WGS84_4326_202602_v02/BAG_standplaats.geojson  size=50213400 bytes
Writing BAG_ligplaats: 1 chunk(s)
  -> file:/lakehouse/default/Files/BAG_geojson_pro