# Deep Data Layer â€” Copernicus Terrain Enrichment (v3)

This notebook adds **terrain information** to Airbnb listings using the **Copernicus 30 m DEM** dataset.

For each listing, it:
- Finds the matching Copernicus terrain tile  
- Analyzes elevation within a **200 m radius**  
- Computes basic elevation and slope statistics  
- Classifies the surrounding terrain into clear, rule-based categories  

The output is a **deterministic v3 terrain table** that is used later in the final accessibility scoring.  
This notebook contains **no exploration or analysis** and exists only to build the terrain data layer.


### Environment setup: install pinned geospatial dependencies for Copernicus DEM processing


In [0]:
%pip install --upgrade --force-reinstall "numpy<2" "rasterio==1.4.4" "shapely<2.1" "pyproj<3.8" requests


### Restart Python to activate newly installed geospatial dependencies


In [0]:
dbutils.library.restartPython()

### Prepare listing point geometry (property_id, latitude, longitude) for terrain enrichment


In [0]:
df_pts = (
  spark.table("default.access4all_airbnb_v1_9cities")
  .selectExpr(
      "property_id",
      "CAST(lat AS DOUBLE)  AS lat",
      "CAST(`long` AS DOUBLE) AS lon"
  )
  .dropna(subset=["lat","lon"])
)


### Fetch and cache the official Copernicus DEM tile index used to resolve required terrain tiles


In [0]:
import requests

tilelist_url = "https://copernicus-dem-30m.s3.amazonaws.com/tileList.txt"

r = requests.get(tilelist_url, timeout=60)
r.raise_for_status()

tile_list = set(line.strip() for line in r.text.splitlines() if line.strip())

print("tile list size:", len(tile_list))
print("sample:", list(sorted(tile_list))[:5])


### Copernicus DEM terrain enrichment: fetch tiles, extract 200m-buffer elevation stats, and derive slope-based context labels


In [0]:
import math
import numpy as np
import pandas as pd
import rasterio
from rasterio.mask import mask
from shapely.geometry import Point, mapping
from shapely.ops import transform as shp_transform
from pyproj import Transformer
from datetime import datetime

# ------------------------
# Config
# ------------------------
RADIUS_M = 200
PIPELINE_VERSION = "v3.3-final-pilot"
DEM_SOURCE = "copernicus_dem_glo30_public"
DEM_RESOLUTION_M = 30.0

# WGS84 -> WebMercator (meters)
to3857 = Transformer.from_crs("EPSG:4326", "EPSG:3857", always_xy=True)

# ------------------------
# Tile naming
# ------------------------
def fmt_ns(lat_deg: int) -> str:
    return f"N{lat_deg:02d}_00" if lat_deg >= 0 else f"S{abs(lat_deg):02d}_00"

def fmt_ew(lon_deg: int) -> str:
    return f"E{lon_deg:03d}_00" if lon_deg >= 0 else f"W{abs(lon_deg):03d}_00"

def tile_folder(lat: float, lon: float) -> str:
    return f"Copernicus_DSM_COG_10_{fmt_ns(math.floor(lat))}_{fmt_ew(math.floor(lon))}_DEM"

# ------------------------
# Open DEM (tries common key patterns)
# ------------------------
def open_dem(folder: str):
    base = "https://copernicus-dem-30m.s3.amazonaws.com"
    candidates = [
        f"/vsicurl/{base}/{folder}/{folder}.tif",
        f"/vsicurl/{base}/{folder}/{folder}.tiff",
        f"/vsicurl/{base}/{folder}/DEM.tif",
        f"/vsicurl/{base}/{folder}/dem.tif",
    ]
    last_err = None
    for url in candidates:
        try:
            return rasterio.open(url), url
        except Exception as e:
            last_err = e
    raise last_err

# ------------------------
# Build 200m buffer in meters (3857), then reproject to src.crs for raster mask
# ------------------------
def buffer_geom_in_src_crs(lon, lat, src_crs, radius_m=200):
    x, y = to3857.transform(lon, lat)
    poly_3857 = Point(x, y).buffer(radius_m)

    tf = Transformer.from_crs("EPSG:3857", src_crs, always_xy=True)
    poly_src = shp_transform(lambda xx, yy, zz=None: tf.transform(xx, yy), poly_3857)
    return mapping(poly_src)

# ------------------------
# Validity mask: finite, plausible, not nodata
# (0/neg treated as invalid to avoid void/ocean encodings causing spikes)
# ------------------------
def valid_mask(elev: np.ndarray, nodata):
    finite = np.isfinite(elev)
    plausible = (elev > 0) & (elev < 9000)  # wide bound
    if nodata is None:
        return finite & plausible
    return finite & plausible & (elev != nodata)

# ------------------------
# Slope (% grade), CRS-aware pixel spacing
# IMPORTANT: expects elev_nan has NaNs where invalid so gradients near void become NaN
# ------------------------
def slope_pct(elev_nan: np.ndarray, transform, src_crs, lat_center: float):
    if src_crs is not None and getattr(src_crs, "is_projected", False):
        dx = transform.a
        dy = -transform.e
    else:
        dx_deg = transform.a
        dy_deg = -transform.e
        m_per_deg_lat = 111320.0
        m_per_deg_lon = 111320.0 * np.cos(np.deg2rad(lat_center))
        dx = dx_deg * m_per_deg_lon
        dy = dy_deg * m_per_deg_lat

    gy, gx = np.gradient(elev_nan.astype("float64"), dy, dx)
    return np.sqrt(gx*gx + gy*gy) * 100.0

# ------------------------
# Coverage + label rules (stable for 200m + 30m DEM)
# ------------------------
def coverage_label(r):
    if r >= 0.90: return "good"
    if r >= 0.70: return "partial"
    return "poor"

def terrain_label(cov, slope_p50, share_gt8):
    if cov == "poor" or slope_p50 is None or share_gt8 is None:
        return "unknown_coverage"
    if slope_p50 <= 5.0 and share_gt8 <= 0.20:
        return "flat_supportive"
    if slope_p50 <= 8.0 and share_gt8 <= 0.35:
        return "moderate"
    return "steep_challenging"

# ------------------------
# Compute one listing
# ------------------------
def compute_one(pid, lat, lon):
    base_out = {
        "property_id": pid,
        "lat": float(lat),
        "lon": float(lon),
        "radius_m": RADIUS_M,
        "dem_source": DEM_SOURCE,
        "dem_resolution_m": float(DEM_RESOLUTION_M),
        "pipeline_version": PIPELINE_VERSION,
        "computed_at_utc": datetime.utcnow().isoformat(),
    }

    folder = tile_folder(lat, lon)

    # deterministic unknown if tile missing
    if folder not in tile_list:
        return {**base_out,
            "dem_valid_cell_ratio_200m": 0.0,
            "dem_void_ratio_200m": 1.0,
            "dem_coverage_label_200m": "poor",
            "elev_min_200m": None, "elev_max_200m": None, "elev_range_200m": None,
            "slope_p50_pct_200m": None, "slope_p90_pct_200m": None, "slope_max_pct_200m": None,
            "share_area_slope_gt_8pct_200m": None,
            "terrain_context_label_200m": "unknown_coverage",
        }

    try:
        src, _ = open_dem(folder)
        with src:
            geom = buffer_geom_in_src_crs(lon, lat, src.crs, RADIUS_M)
            out, out_transform = mask(src, [geom], crop=True, all_touched=True)
            elev = out[0].astype("float64")
            nodata = src.nodata
            src_crs = src.crs
    except Exception:
        return {**base_out,
            "dem_valid_cell_ratio_200m": 0.0,
            "dem_void_ratio_200m": 1.0,
            "dem_coverage_label_200m": "poor",
            "elev_min_200m": None, "elev_max_200m": None, "elev_range_200m": None,
            "slope_p50_pct_200m": None, "slope_p90_pct_200m": None, "slope_max_pct_200m": None,
            "share_area_slope_gt_8pct_200m": None,
            "terrain_context_label_200m": "unknown_coverage",
        }

    valid = valid_mask(elev, nodata)
    total = elev.size
    valid_count = int(valid.sum())
    ratio = float(valid_count / total) if total > 0 else 0.0
    cov = coverage_label(ratio)

    if cov == "poor":
        return {**base_out,
            "dem_valid_cell_ratio_200m": ratio,
            "dem_void_ratio_200m": 1.0 - ratio,
            "dem_coverage_label_200m": cov,
            "elev_min_200m": None, "elev_max_200m": None, "elev_range_200m": None,
            "slope_p50_pct_200m": None, "slope_p90_pct_200m": None, "slope_max_pct_200m": None,
            "share_area_slope_gt_8pct_200m": None,
            "terrain_context_label_200m": "unknown_coverage",
        }

    elev_v = elev[valid]
    elev_min = float(elev_v.min())
    elev_max = float(elev_v.max())
    elev_range = float(elev_max - elev_min)

    # void-safe slope: invalid -> NaN
    elev_nan = elev.copy()
    elev_nan[~valid] = np.nan

    s = slope_pct(elev_nan, out_transform, src_crs, lat)

    # only slopes that are finite and from valid elev
    slope_ok = valid & np.isfinite(s)
    if slope_ok.sum() == 0:
        return {**base_out,
            "dem_valid_cell_ratio_200m": ratio,
            "dem_void_ratio_200m": 1.0 - ratio,
            "dem_coverage_label_200m": cov,
            "elev_min_200m": elev_min, "elev_max_200m": elev_max, "elev_range_200m": elev_range,
            "slope_p50_pct_200m": None, "slope_p90_pct_200m": None, "slope_max_pct_200m": None,
            "share_area_slope_gt_8pct_200m": None,
            "terrain_context_label_200m": "unknown_coverage",
        }

    s_v = s[slope_ok]
    slope_p50 = float(np.percentile(s_v, 50))
    slope_p90 = float(np.percentile(s_v, 90))
    slope_max = float(s_v.max())
    share_gt8 = float(np.mean(s_v > 8.0))

    label = terrain_label(cov, slope_p50, share_gt8)

    return {**base_out,
        "dem_valid_cell_ratio_200m": ratio,
        "dem_void_ratio_200m": 1.0 - ratio,
        "dem_coverage_label_200m": cov,
        "elev_min_200m": elev_min, "elev_max_200m": elev_max, "elev_range_200m": elev_range,
        "slope_p50_pct_200m": slope_p50,
        "slope_p90_pct_200m": slope_p90,
        "slope_max_pct_200m": slope_max,
        "share_area_slope_gt_8pct_200m": share_gt8,
        "terrain_context_label_200m": label,
    }






### Resolve each listing to its Copernicus DEM tile and flag whether the tile exists in the published index


In [0]:
from pyspark.sql import functions as F, types as T

# anchor points from v1
df_pts = (
  spark.table("default.access4all_airbnb_v1_9cities")
  .selectExpr(
      "CAST(property_id AS STRING) AS property_id",
      "CAST(lat AS DOUBLE) AS lat",
      "CAST(`long` AS DOUBLE) AS lon"
  )
  .dropna(subset=["lat","lon"])
)

# broadcast tile_list (Python set)
tile_list_b = spark.sparkContext.broadcast(tile_list)

@F.udf("string")
def tile_folder_udf(lat, lon):
    import math
    def fmt_ns(lat_deg: int) -> str:
        return f"N{lat_deg:02d}_00" if lat_deg >= 0 else f"S{abs(lat_deg):02d}_00"
    def fmt_ew(lon_deg: int) -> str:
        return f"E{lon_deg:03d}_00" if lon_deg >= 0 else f"W{abs(lon_deg):03d}_00"
    return f"Copernicus_DSM_COG_10_{fmt_ns(math.floor(lat))}_{fmt_ew(math.floor(lon))}_DEM"

@F.udf("boolean")
def tile_exists_udf(tile):
    return tile in tile_list_b.value

df_run = (df_pts
  .withColumn("tile_folder", tile_folder_udf("lat","lon"))
  .withColumn("tile_exists", tile_exists_udf("tile_folder"))
)


### Explicit output schema for deterministic terrain enrichment results (v3 layer)


In [0]:
out_schema = T.StructType([
  T.StructField("property_id", T.StringType(), False),
  T.StructField("lat", T.DoubleType(), False),
  T.StructField("lon", T.DoubleType(), False),
  T.StructField("radius_m", T.IntegerType(), False),
  T.StructField("dem_source", T.StringType(), False),
  T.StructField("dem_resolution_m", T.DoubleType(), False),
  T.StructField("pipeline_version", T.StringType(), False),
  T.StructField("computed_at_utc", T.StringType(), False),

  T.StructField("dem_valid_cell_ratio_200m", T.DoubleType(), True),
  T.StructField("dem_void_ratio_200m", T.DoubleType(), True),
  T.StructField("dem_coverage_label_200m", T.StringType(), True),

  T.StructField("elev_min_200m", T.DoubleType(), True),
  T.StructField("elev_max_200m", T.DoubleType(), True),
  T.StructField("elev_range_200m", T.DoubleType(), True),

  T.StructField("slope_p50_pct_200m", T.DoubleType(), True),
  T.StructField("slope_p90_pct_200m", T.DoubleType(), True),
  T.StructField("slope_max_pct_200m", T.DoubleType(), True),
  T.StructField("share_area_slope_gt_8pct_200m", T.DoubleType(), True),

  T.StructField("terrain_context_label_200m", T.StringType(), True),
])


### Partition-level Spark computation: batch listings by DEM tile, extract 200m terrain metrics, and emit rows matching the v3 schema


In [0]:
PIPELINE_VERSION = "v3.3-full-spark"
RADIUS_M = 200
DEM_SOURCE = "copernicus_dem_glo30_public"
DEM_RESOLUTION_M = 30.0

def v3_partition_compute(pdf_iter):
    import numpy as np
    import pandas as pd
    import rasterio
    from rasterio.mask import mask
    from shapely.geometry import Point, mapping
    from shapely.ops import transform as shp_transform
    from pyproj import Transformer
    from datetime import datetime

    to3857 = Transformer.from_crs("EPSG:4326", "EPSG:3857", always_xy=True)

    def open_dem(folder: str):
        base = "https://copernicus-dem-30m.s3.amazonaws.com"
        candidates = [
            f"/vsicurl/{base}/{folder}/{folder}.tif",
            f"/vsicurl/{base}/{folder}/{folder}.tiff",
            f"/vsicurl/{base}/{folder}/DEM.tif",
            f"/vsicurl/{base}/{folder}/dem.tif",
        ]
        last = None
        for url in candidates:
            try:
                return rasterio.open(url), url
            except Exception as e:
                last = e
        raise last

    def buffer_geom_in_src_crs(lon, lat, src_crs, radius_m=200):
        x, y = to3857.transform(lon, lat)
        poly_3857 = Point(x, y).buffer(radius_m)
        tf = Transformer.from_crs("EPSG:3857", src_crs, always_xy=True)
        poly_src = shp_transform(lambda xx, yy, zz=None: tf.transform(xx, yy), poly_3857)
        return mapping(poly_src)

    def valid_mask(elev, nodata):
        finite = np.isfinite(elev)
        plausible = (elev > 0) & (elev < 9000)
        if nodata is None:
            return finite & plausible
        return finite & plausible & (elev != nodata)

    def slope_pct(elev_nan, transform, src_crs, lat_center):
        if src_crs is not None and getattr(src_crs, "is_projected", False):
            dx = transform.a
            dy = -transform.e
        else:
            dx_deg = transform.a
            dy_deg = -transform.e
            m_per_deg_lat = 111320.0
            m_per_deg_lon = 111320.0 * np.cos(np.deg2rad(lat_center))
            dx = dx_deg * m_per_deg_lon
            dy = dy_deg * m_per_deg_lat
        gy, gx = np.gradient(elev_nan.astype("float64"), dy, dx)
        return np.sqrt(gx*gx + gy*gy) * 100.0

    def coverage_label(r):
        if r >= 0.90: return "good"
        if r >= 0.70: return "partial"
        return "poor"

    def terrain_label(cov, slope_p50, share_gt8):
        if cov == "poor" or slope_p50 is None or share_gt8 is None:
            return "unknown_coverage"
        if slope_p50 <= 5.0 and share_gt8 <= 0.20:
            return "flat_supportive"
        if slope_p50 <= 8.0 and share_gt8 <= 0.35:
            return "moderate"
        return "steep_challenging"

    for pdf in pdf_iter:
        out_rows = []

        # group in this partition by tile to avoid re-opening raster per row
        for tile, g in pdf.groupby("tile_folder"):
            tile_exists = bool(g["tile_exists"].iloc[0])
            now = datetime.utcnow().isoformat()

            if not tile_exists:
                for r in g.itertuples(index=False):
                    out_rows.append({
                        "property_id": str(r.property_id),
                        "lat": float(r.lat), "lon": float(r.lon),
                        "radius_m": RADIUS_M,
                        "dem_source": DEM_SOURCE,
                        "dem_resolution_m": float(DEM_RESOLUTION_M),
                        "pipeline_version": PIPELINE_VERSION,
                        "computed_at_utc": now,
                        "dem_valid_cell_ratio_200m": 0.0,
                        "dem_void_ratio_200m": 1.0,
                        "dem_coverage_label_200m": "poor",
                        "elev_min_200m": None, "elev_max_200m": None, "elev_range_200m": None,
                        "slope_p50_pct_200m": None, "slope_p90_pct_200m": None, "slope_max_pct_200m": None,
                        "share_area_slope_gt_8pct_200m": None,
                        "terrain_context_label_200m": "unknown_coverage",
                    })
                continue

            try:
                src, _ = open_dem(tile)
            except Exception:
                for r in g.itertuples(index=False):
                    out_rows.append({
                        "property_id": str(r.property_id),
                        "lat": float(r.lat), "lon": float(r.lon),
                        "radius_m": RADIUS_M,
                        "dem_source": DEM_SOURCE,
                        "dem_resolution_m": float(DEM_RESOLUTION_M),
                        "pipeline_version": PIPELINE_VERSION,
                        "computed_at_utc": now,
                        "dem_valid_cell_ratio_200m": 0.0,
                        "dem_void_ratio_200m": 1.0,
                        "dem_coverage_label_200m": "poor",
                        "elev_min_200m": None, "elev_max_200m": None, "elev_range_200m": None,
                        "slope_p50_pct_200m": None, "slope_p90_pct_200m": None, "slope_max_pct_200m": None,
                        "share_area_slope_gt_8pct_200m": None,
                        "terrain_context_label_200m": "unknown_coverage",
                    })
                continue

            with src:
                nodata = src.nodata
                src_crs = src.crs

                for r in g.itertuples(index=False):
                    now2 = datetime.utcnow().isoformat()
                    try:
                        geom = buffer_geom_in_src_crs(float(r.lon), float(r.lat), src_crs, RADIUS_M)
                        out, out_transform = mask(src, [geom], crop=True, all_touched=True)
                        elev = out[0].astype("float64")

                        valid = valid_mask(elev, nodata)
                        total = elev.size
                        ratio = float(valid.sum() / total) if total > 0 else 0.0
                        cov = coverage_label(ratio)

                        if cov == "poor":
                            out_rows.append({
                                "property_id": str(r.property_id),
                                "lat": float(r.lat), "lon": float(r.lon),
                                "radius_m": RADIUS_M,
                                "dem_source": DEM_SOURCE,
                                "dem_resolution_m": float(DEM_RESOLUTION_M),
                                "pipeline_version": PIPELINE_VERSION,
                                "computed_at_utc": now2,
                                "dem_valid_cell_ratio_200m": ratio,
                                "dem_void_ratio_200m": 1.0 - ratio,
                                "dem_coverage_label_200m": cov,
                                "elev_min_200m": None, "elev_max_200m": None, "elev_range_200m": None,
                                "slope_p50_pct_200m": None, "slope_p90_pct_200m": None, "slope_max_pct_200m": None,
                                "share_area_slope_gt_8pct_200m": None,
                                "terrain_context_label_200m": "unknown_coverage",
                            })
                            continue

                        elev_v = elev[valid]
                        elev_min = float(elev_v.min())
                        elev_max = float(elev_v.max())
                        elev_range = float(elev_max - elev_min)

                        elev_nan = elev.copy()
                        elev_nan[~valid] = np.nan

                        s = slope_pct(elev_nan, out_transform, src_crs, float(r.lat))
                        slope_ok = valid & np.isfinite(s)

                        if slope_ok.sum() == 0:
                            out_rows.append({
                                "property_id": str(r.property_id),
                                "lat": float(r.lat), "lon": float(r.lon),
                                "radius_m": RADIUS_M,
                                "dem_source": DEM_SOURCE,
                                "dem_resolution_m": float(DEM_RESOLUTION_M),
                                "pipeline_version": PIPELINE_VERSION,
                                "computed_at_utc": now2,
                                "dem_valid_cell_ratio_200m": ratio,
                                "dem_void_ratio_200m": 1.0 - ratio,
                                "dem_coverage_label_200m": cov,
                                "elev_min_200m": elev_min, "elev_max_200m": elev_max, "elev_range_200m": elev_range,
                                "slope_p50_pct_200m": None, "slope_p90_pct_200m": None, "slope_max_pct_200m": None,
                                "share_area_slope_gt_8pct_200m": None,
                                "terrain_context_label_200m": "unknown_coverage",
                            })
                            continue

                        s_v = s[slope_ok]
                        slope_p50 = float(np.percentile(s_v, 50))
                        slope_p90 = float(np.percentile(s_v, 90))
                        slope_max = float(s_v.max())
                        share_gt8 = float(np.mean(s_v > 8.0))

                        label = terrain_label(cov, slope_p50, share_gt8)

                        out_rows.append({
                            "property_id": str(r.property_id),
                            "lat": float(r.lat), "lon": float(r.lon),
                            "radius_m": RADIUS_M,
                            "dem_source": DEM_SOURCE,
                            "dem_resolution_m": float(DEM_RESOLUTION_M),
                            "pipeline_version": PIPELINE_VERSION,
                            "computed_at_utc": now2,
                            "dem_valid_cell_ratio_200m": ratio,
                            "dem_void_ratio_200m": 1.0 - ratio,
                            "dem_coverage_label_200m": cov,
                            "elev_min_200m": elev_min, "elev_max_200m": elev_max, "elev_range_200m": elev_range,
                            "slope_p50_pct_200m": slope_p50,
                            "slope_p90_pct_200m": slope_p90,
                            "slope_max_pct_200m": slope_max,
                            "share_area_slope_gt_8pct_200m": share_gt8,
                            "terrain_context_label_200m": label,
                        })

                    except Exception:
                        out_rows.append({
                            "property_id": str(r.property_id),
                            "lat": float(r.lat), "lon": float(r.lon),
                            "radius_m": RADIUS_M,
                            "dem_source": DEM_SOURCE,
                            "dem_resolution_m": float(DEM_RESOLUTION_M),
                            "pipeline_version": PIPELINE_VERSION,
                            "computed_at_utc": now2,
                            "dem_valid_cell_ratio_200m": 0.0,
                            "dem_void_ratio_200m": 1.0,
                            "dem_coverage_label_200m": "poor",
                            "elev_min_200m": None, "elev_max_200m": None, "elev_range_200m": None,
                            "slope_p50_pct_200m": None, "slope_p90_pct_200m": None, "slope_max_pct_200m": None,
                            "share_area_slope_gt_8pct_200m": None,
                            "terrain_context_label_200m": "unknown_coverage",
                        })

        yield pd.DataFrame(out_rows)


### Execute distributed terrain enrichment and write the finalized v3 Delta table (overwrite)


In [0]:
df_exec = df_run.repartition(256, "tile_folder")

df_v3 = df_exec.mapInPandas(v3_partition_compute, schema=out_schema)

(df_v3.write.format("delta")
     .mode("overwrite")
     .option("overwriteSchema", "true")
     .saveAsTable("default.access4all_airbnb_v3_terrain_9cities"))


### Deduplicate v3 results to the latest computation per property and persist the canonical terrain table


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

v3 = spark.table("default.access4all_airbnb_v3_terrain_9cities")

w = Window.partitionBy("property_id").orderBy(F.col("computed_at_utc").desc())

v3_dedup = (
    v3.withColumn("rn", F.row_number().over(w))
      .filter(F.col("rn") == 1)
      .drop("rn")
)

(v3_dedup.write.format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("default.access4all_airbnb_v3_terrain_9cities"))
