
# 🏊‍♀️ Berlin Pools — End‑to‑End Pipeline (Refactored)

**Purpose:** run this notebook top‑to‑bottom to build the unified dataset of Berlin pools from legacy and OSM sources.

---

## What this notebook does (high level)
1. **Environment & Config** — installs minimal deps (if missing) and sets file paths & knobs.
2. **Run legacy extractor** — executes `pool_data_processing.ipynb` (if needed) to produce:  
   - `berlin_pools_final_dataset.csv`  
   - `pools_data_cleaned.csv`
3. **(Optional) OSM wide export** — builds `osm_pools_wide.csv` via OSMnx (can be heavy, disabled by default).
4. **OSM preparation** — creates normalized names and the list of **new public named pools** not matched in legacy:  
   - `legacy_enrichment_list.csv` (candidate pairs for enrichment)  
   - `osm_public_named.csv` (OSM-only named pools)
5. **Reverse geocode enrich (cache-aware)** — fills missing **street** and **postal_code** in `osm_public_named.csv` using Nominatim with `reverse_geocode_cache.csv`.
6. **Master build** — combines legacy + OSM-only rows into `pools_master_minimal.csv`, ensuring:
   - stable **pool_id** for legacy rows (preserved if present)
   - robust **pool_id** for OSM rows (from OSM id or coordinate surrogate)
   - **district_id** mapped from district names (and reverse‑geocoded if needed)
   - missing `open_all_year` filled with `False`
7. **Validation & outputs** — prints row counts and missing fields.



## 1) Environment & Config

**Why:** ensure the notebook is reproducible, self‑contained, and gives you simple toggles.


In [1]:

# --- Minimal deps auto-install (safe no-ops if already installed) ---
import sys, subprocess, importlib

def ensure(pkg, pip_name=None):
    try:
        importlib.import_module(pkg)
    except ImportError:
        subprocess.check_call([sys.executable, "-m", "pip", "install", pip_name or pkg])

for mod, pipn in [("pandas", None),
                  ("numpy", None),
                  ("geopy", None),
                  ("nbformat", None),
                  ("nbconvert", None)]:
    ensure(mod, pipn)

# OSMnx only needed if MAKE_OSM_WIDE = True
try:
    import osmnx  # noqa
except Exception:
    pass

from pathlib import Path
import pandas as pd
import numpy as np


In [8]:

# --- Configuration knobs & paths (adjust as needed) ---
def resolve_path(p):
    return Path(str(p)).expanduser().resolve()

# Input artifacts expected next to this notebook
LEGACY_NOTEBOOK = resolve_path("pool_data_processing.ipynb")
LEGACY_XLSX     = resolve_path("baederleben_berlin.xlsx")

# Outputs produced by legacy extractor
LEGACY_MAIN_CSV = resolve_path("berlin_pools_final_dataset.csv")
LEGACY_ALT_CSV  = resolve_path("pools_data_cleaned.csv")

# OSM artifacts
MAKE_OSM_WIDE   = True  # set True to build OSM via OSMnx (heavy + may need internet)
OSM_PLACE       = "Berlin, Germany"
OSM_WIDE_CSV    = resolve_path("osm_pools_wide.csv")   # optional
OSM_PUBLIC_NAMED_CSV = resolve_path("osm_public_named.csv")

# Matching & filtering knobs
DIST_M          = 100.0
STRICT_PUBLIC   = False  # keep False unless you want to only keep strictly public pools

# Enrichment cache for reverse geocoding
CACHE_PATH      = resolve_path("reverse_geocode_cache.csv")

print("Configured paths:")
print("  LEGACY_NOTEBOOK:", LEGACY_NOTEBOOK)
print("  LEGACY_XLSX    :", LEGACY_XLSX)
print("  LEGACY_MAIN_CSV:", LEGACY_MAIN_CSV)
print("  LEGACY_ALT_CSV :", LEGACY_ALT_CSV)
print("  OSM_WIDE_CSV   :", OSM_WIDE_CSV, "(build:", MAKE_OSM_WIDE, "place:", OSM_PLACE, ")")
print("  OSM_PUBLIC_NAMED_CSV:", OSM_PUBLIC_NAMED_CSV)
print("  DIST_M:", DIST_M, "| STRICT_PUBLIC:", STRICT_PUBLIC)
print("  CACHE_PATH:", CACHE_PATH)


Configured paths:
  LEGACY_NOTEBOOK: C:\Users\micha\Projects VS\refacforing hopefuly last\pool_data_processing.ipynb
  LEGACY_XLSX    : C:\Users\micha\Projects VS\refacforing hopefuly last\baederleben_berlin.xlsx
  LEGACY_MAIN_CSV: C:\Users\micha\Projects VS\refacforing hopefuly last\berlin_pools_final_dataset.csv
  LEGACY_ALT_CSV : C:\Users\micha\Projects VS\refacforing hopefuly last\pools_data_cleaned.csv
  OSM_WIDE_CSV   : C:\Users\micha\Projects VS\refacforing hopefuly last\osm_pools_wide.csv (build: True place: Berlin, Germany )
  OSM_PUBLIC_NAMED_CSV: C:\Users\micha\Projects VS\refacforing hopefuly last\osm_public_named.csv
  DIST_M: 100.0 | STRICT_PUBLIC: False
  CACHE_PATH: C:\Users\micha\Projects VS\refacforing hopefuly last\reverse_geocode_cache.csv



## 2) Run legacy extractor (if needed)

**Why:** reproduce the legacy outputs that serve as the baseline ground truth.  
This executes `pool_data_processing.ipynb` with its own working directory to generate the CSVs.

**Outputs:**
- `berlin_pools_final_dataset.csv`
- `pools_data_cleaned.csv`


In [3]:

from nbconvert.preprocessors import ExecutePreprocessor
import nbformat, io, os

need_legacy = (not LEGACY_MAIN_CSV.exists()) or (not LEGACY_ALT_CSV.exists())

if need_legacy:
    if not LEGACY_NOTEBOOK.exists():
        raise FileNotFoundError(f"Legacy notebook missing: {LEGACY_NOTEBOOK}")
    if not LEGACY_XLSX.exists():
        raise FileNotFoundError(f"Legacy input Excel missing: {LEGACY_XLSX}")
    print("[info] Executing legacy extractor notebook…")
    with open(LEGACY_NOTEBOOK, "r", encoding="utf-8") as f:
        nb = nbformat.read(f, as_version=4)
    ep = ExecutePreprocessor(timeout=1800, kernel_name="python3")
    ep.preprocess(nb, resources={"metadata":{"path": str(LEGACY_NOTEBOOK.parent)}})

# Probe outputs / fallbacks
found = []
if LEGACY_MAIN_CSV.exists(): found.append(LEGACY_MAIN_CSV)
if LEGACY_ALT_CSV.exists():  found.append(LEGACY_ALT_CSV)
if not found:
    # search for likely candidates
    candidates = list(LEGACY_NOTEBOOK.parent.rglob("*pools*cleaned*.csv"))
    if candidates:
        print("[warn] Expected outputs not found — using best candidate:", candidates[0])
        if not LEGACY_ALT_CSV.exists():
            LEGACY_ALT_CSV = candidates[0]

print("[ok] Legacy outputs ready:",
      "MAIN=" + str(LEGACY_MAIN_CSV.name if LEGACY_MAIN_CSV.exists() else "missing"),
      "| ALT=" + str(LEGACY_ALT_CSV.name if LEGACY_ALT_CSV.exists() else "missing"))


[info] Executing legacy extractor notebook…
[ok] Legacy outputs ready: MAIN=berlin_pools_final_dataset.csv | ALT=pools_data_cleaned.csv


## (Optional) Build OSM wide export

**Why:** reproducible OSM pull to CSV (`osm_pools_wide.csv`). This step is **disabled by default** because it can be heavy and requires internet.

**Skip it** if you already have an OSM CSV to work from.


In [10]:
# 3) (Optional) Build OSM wide export (osm_pools_wide.csv) — version-safe for OSMnx

MAKE_OSM_WIDE = globals().get("MAKE_OSM_WIDE", False)
OSM_PLACE     = globals().get("OSM_PLACE", "Berlin, Germany")
OSM_WIDE_CSV  = globals().get("OSM_WIDE_CSV", Path("osm_pools_wide.csv"))

if MAKE_OSM_WIDE:
    try:
        import os
        import pandas as pd
        from pathlib import Path
        import osmnx as ox

        # be polite & reproducible
        try:
            ox.settings.use_cache = True
            ox.settings.log_console = False
        except Exception:
            pass

        print("[info] Building OSM wide export via OSMnx…")
        tags = {"leisure": ["swimming_pool", "sports_centre"]}

        # Handle OSMnx API differences
        if hasattr(ox, "geometries_from_place"):
            gdf = ox.geometries_from_place(OSM_PLACE, tags)
        elif hasattr(ox, "features_from_place"):
            gdf = ox.features_from_place(OSM_PLACE, tags)
        elif hasattr(ox, "features") and hasattr(ox.features, "features_from_place"):
            gdf = ox.features.features_from_place(OSM_PLACE, tags)
        else:
            raise AttributeError(
                "Your OSMnx version doesn’t expose geometries_from_place/features_from_place."
            )

        gdf = gdf.reset_index(drop=False)

        # Ensure WGS84 and compute representative lat/lon
        try:
            if gdf.crs is None or (getattr(gdf.crs, "to_epsg", lambda: None)() != 4326):
                gdf = gdf.to_crs(epsg=4326)
        except Exception:
            pass

        if "geometry" in gdf.columns:
            def _lat(geom):
                try:
                    return geom.y if geom.geom_type == "Point" else geom.centroid.y
                except Exception:
                    return None
            def _lon(geom):
                try:
                    return geom.x if geom.geom_type == "Point" else geom.centroid.x
                except Exception:
                    return None
            gdf["lat"] = gdf["geometry"].map(_lat)
            gdf["lon"] = gdf["geometry"].map(_lon)

        # Choose a tidy subset and rename addr columns
        keep_cols = [
            "osmid","name","leisure","sport","website","opening_hours",
            "addr:street","addr:postcode","lat","lon"
        ]
        keep_cols = [c for c in keep_cols if c in gdf.columns]
        out = gdf[keep_cols].rename(columns={"addr:street":"street","addr:postcode":"postal_code"})

        out.to_csv(OSM_WIDE_CSV, index=False)
        print("[ok] Wrote:", OSM_WIDE_CSV, "| rows:", len(out))
    except Exception as e:
        print("[warn] OSM wide build failed:", repr(e))
        print("      Tip: you can skip this step and place an existing OSM CSV in the folder instead.")
else:
    print("[skip] MAKE_OSM_WIDE=False — expecting an existing OSM CSV if needed.")



[info] Building OSM wide export via OSMnx…
[ok] Wrote: C:\Users\micha\Projects VS\refacforing hopefuly last\osm_pools_wide.csv | rows: 1764



## 3) Build additional files to cross check what is missing in legacy tool vs OSM based on osm_pools_wide.csv and berlin_pools_final_dataset.csv
- `legacy_enrichment_list.csv` — candidates to pull extra attributes from OSM
- `osm_public_named.csv` — OSM pools not matched to legacy (potential new pools)

In [25]:
from pathlib import Path
import pandas as pd, numpy as np, math, unicodedata, re

# --- inputs already defined earlier in the NB ---
LEGACY_PATH = LEGACY_MAIN_CSV if LEGACY_MAIN_CSV.exists() else LEGACY_ALT_CSV
OSM_PATH    = OSM_WIDE_CSV
DIST_M      = 250   # or your preferred matching radius
STRICT_PUBLIC = False

print(f"[info] Using legacy: {LEGACY_PATH.name}")
print(f"[info] Using OSM:    {OSM_PATH.name}")

legacy_raw = pd.read_csv(LEGACY_PATH)
osm_raw    = pd.read_csv(OSM_PATH)

# ------- helpers -------
def normalize_ascii(s: str) -> str:
    if not isinstance(s, str): return ""
    s = unicodedata.normalize("NFKD", s)
    return "".join(ch for ch in s if not unicodedata.combining(ch))

def normalize_name(s: str) -> str:
    if not isinstance(s, str): return ""
    s = normalize_ascii(s).lower().strip()
    s = re.sub(r"[^a-z0-9]+", " ", s)
    return re.sub(r"\s+", " ", s).strip()

def is_public_access(v, strict=False) -> bool:
    if v is None or (isinstance(v, float) and math.isnan(v)):
        return not strict
    v = str(v).strip().lower()
    if v in {"private","customers","residents"}: return False
    return (v in {"yes","public"} if strict else v in {"","yes","public","permissive"} or v is None)

def haversine_m(lat1, lon1, lat2, lon2) -> float:
    if any(pd.isna([lat1, lon1, lat2, lon2])): return np.nan
    R=6371000.0
    phi1=math.radians(lat1); phi2=math.radians(lat2)
    dphi=math.radians(lat2-lat1); dl=math.radians(lon2-lon1)
    a=math.sin(dphi/2)**2+math.cos(phi1)*math.cos(phi2)*math.sin(dl/2)**2
    return 2*R*math.asin(math.sqrt(a))

def colmap(df): return {c.lower(): c for c in df.columns}
def cget(cols, *choices):
    for ch in choices:
        if ch in cols: return cols[ch]
    return None

def coalesce_series(df, colnames):
    if not colnames: return pd.Series([""]*len(df), index=df.index, dtype=object)
    out = df[colnames[0]].astype(object).fillna("")
    for c in colnames[1:]:
        nxt = df[c].astype(object).fillna("")
        use_next = out.astype(str).str.strip().eq("")
        out = out.where(~use_next, nxt)
    return out.fillna("").astype(str)

def has_swimming(val):
    if val is None or (isinstance(val, float) and np.isnan(val)): return False
    s = str(val).lower()
    return ("swimming" in s) or ("schwimm" in s)

# 1) Filter OSM wide to swimming features
cols = colmap(osm_raw)
leisure_c = cget(cols, "leisure")
sport_c   = cget(cols, "sport")
if not leisure_c:
    raise KeyError("OSM-wide CSV must include a 'leisure' column.")

leisure = osm_raw[leisure_c].astype(str).str.lower()
sport   = osm_raw[sport_c].astype(str) if sport_c else pd.Series([""]*len(osm_raw), index=osm_raw.index)

is_pool = (
    leisure.eq("swimming_pool") |
    leisure.eq("swimming_area") |
    (leisure.eq("sports_centre") & sport.apply(has_swimming))
)
osm_swim = osm_raw[is_pool].copy()

# 2) Map to a working schema + keep addr fields
def map_osm_to_db(df: pd.DataFrame, strict_public: bool=False) -> pd.DataFrame:
    if df.empty: return df.copy()
    c = colmap(df)

    name_candidates = [x for x in ["name","official_name","short_name","alt_name","name:de","name:en","brand","operator"] if x in c]
    name_best = coalesce_series(df, [c[x] for x in name_candidates])

    access_col = cget(c, "access")
    lat_col    = cget(c, "lat","latitude")
    lon_col    = cget(c, "lon","longitude")
    typ_col    = cget(c, "pool_type","leisure","sport","type")
    phone_col  = cget(c, "phone","contact:phone")
    web_col    = cget(c, "website","contact:website","url")
    oh_col     = cget(c, "opening_hours")
    wh_col     = cget(c, "wheelchair")
    id_col     = cget(c, "osm_id","@id","id","element_id","osmid")
    street_col = cget(c, "addr:street","addr_street","street")
    post_col   = cget(c, "addr:postcode","addr_postcode","postal_code")

    df_f = df.copy()
    if access_col and access_col in df_f.columns:
        df_f = df_f[df_f[access_col].apply(lambda v: is_public_access(v, strict_public))].copy()

    # stable-ish source_id with surrogate fallback
    src = df_f[id_col].astype("string") if id_col else pd.Series([""]*len(df_f), index=df_f.index, dtype="string")
    needs_sur = src.isna() | src.str.strip().eq("") | (src == "<NA>")
    src = src.where(~needs_sur, "sur_" + pd.Series(df_f.index.astype(str), index=df_f.index))

    out = pd.DataFrame({
        "source":      "osm",
        "source_id":   src,
        "name":        name_best.reindex(df_f.index).fillna(""),
        "name_norm":   name_best.reindex(df_f.index).fillna("").astype(str).map(normalize_name),
        "lat":         pd.to_numeric(df_f.get(lat_col, np.nan), errors="coerce"),
        "lon":         pd.to_numeric(df_f.get(lon_col, np.nan), errors="coerce"),
        "street":      df_f.get(street_col, ""),
        "postal_code": df_f.get(post_col, ""),
        "pool_type":   df_f.get(typ_col, ""),
        "phone":       df_f.get(phone_col, ""),
        "website":     df_f.get(web_col, ""),
        "opening_hours": df_f.get(oh_col, ""),
        "wheelchair":  df_f.get(wh_col, ""),
    })
    return out

def map_legacy_to_db(df: pd.DataFrame) -> pd.DataFrame:
    if df.empty: return df.copy()
    c = colmap(df)
    return pd.DataFrame({
        "source":"legacy",
        "source_id": df.get(cget(c,"legacy_id","pool_id","id"), pd.Series(index=df.index, dtype=object)),
        "name": df.get(cget(c,"name","pool_name"), ""),
        "name_norm": df.get(cget(c,"name","pool_name"), "").astype(str).map(normalize_name),
        "lat": pd.to_numeric(df.get(cget(c,"lat","latitude"), np.nan), errors="coerce"),
        "lon": pd.to_numeric(df.get(cget(c,"lon","longitude"), np.nan), errors="coerce"),
        "address": df.get(cget(c,"address","addr","street"), ""),
        "district": df.get(cget(c,"district","bezirk"), ""),
        "pool_type": df.get(cget(c,"pool_type","type"), ""),
        "phone": df.get(cget(c,"phone"), ""),
        "website": df.get(cget(c,"website","url"), ""),
        "opening_hours": df.get(cget(c,"opening_hours","hours"), ""),
        "wheelchair": df.get(cget(c,"wheelchair"), ""),
    })

legacy_db = map_legacy_to_db(legacy_raw)
osm_db    = map_osm_to_db(osm_swim, strict_public=STRICT_PUBLIC)

# keep only named + valid coords + light de-dup
named_mask = osm_db["name"].astype("string").fillna("").str.strip().ne("")
coord_mask = osm_db["lat"].notna() & osm_db["lon"].notna()
osm_db = osm_db[named_mask & coord_mask].copy()
osm_db["lat_r"] = osm_db["lat"].round(5)
osm_db["lon_r"] = osm_db["lon"].round(5)
osm_db = osm_db.drop_duplicates(subset=["name_norm","lat_r","lon_r"], keep="first").drop(columns=["lat_r","lon_r"])

print({
    "legacy_rows": len(legacy_db),
    "osm_wide_rows": len(osm_raw),
    "osm_swim_only": len(osm_swim),
    "osm_named_valid": len(osm_db)
})

# 3) Pair & split unmatched → osm_public_named
L = legacy_db.assign(bucket=legacy_db["name_norm"].str[:10])
O = osm_db.assign(bucket=osm_db["name_norm"].str[:10])

cand = L.merge(O, on="bucket", suffixes=("_l","_o"))
cand["name_match"] = (cand["name_norm_l"].ne("")) & cand["name_norm_l"].eq(cand["name_norm_o"])
cand["dist_m"] = cand.apply(lambda r: haversine_m(r["lat_l"], r["lon_l"], r["lat_o"], r["lon_o"]), axis=1)

legacy_enrichment_list = cand[(cand["name_match"]) | (cand["dist_m"] <= DIST_M)].copy()
keep_cols = [
    "source_id_l","name_l","lat_l","lon_l","address_l","district_l",
    "source_id_o","name_o","lat_o","lon_o","street_o","postal_code_o",
    "dist_m","name_match","website_o","opening_hours_o","wheelchair_o","phone_o"
]
legacy_enrichment_list = legacy_enrichment_list[[c for c in keep_cols if c in legacy_enrichment_list.columns]]

matched_osm_ids = set(
    legacy_enrichment_list.get("source_id_o", pd.Series(dtype=object))
    .dropna().astype(str).tolist()
)
osm_public_named = osm_db[~osm_db["source_id"].astype(str).isin(matched_osm_ids)].copy()

# standardize columns + save
cols_order = ["source_id","name","name_norm","lat","lon","street","postal_code","pool_type","phone","website","opening_hours","wheelchair"]
legacy_enrichment_list.to_csv("legacy_enrichment_list.csv", index=False)
osm_public_named[ [c for c in cols_order if c in osm_public_named.columns] ].to_csv("osm_public_named.csv", index=False)

print("[ok] Wrote: legacy_enrichment_list.csv, osm_public_named.csv")
print({
    "legacy_enrichment_list": len(legacy_enrichment_list),
    "osm_public_named": len(osm_public_named)
})





[info] Using legacy: berlin_pools_final_dataset.csv
[info] Using OSM:    osm_pools_wide.csv
{'legacy_rows': 144, 'osm_wide_rows': 1764, 'osm_swim_only': 984, 'osm_named_valid': 71}
[ok] Wrote: legacy_enrichment_list.csv, osm_public_named.csv
{'legacy_enrichment_list': 49, 'osm_public_named': 24}



## 4) Load inputs & prep candidate pairs

**Why:** we align legacy and OSM pools by approximate name + geo distance to produce:
- `legacy_enrichment_list.csv` — candidates to pull extra attributes from OSM
- `osm_public_named.csv` — OSM pools not matched to legacy (potential new pools)


In [26]:
# ---- add normalized names (robust + coalesce for OSM)
import unicodedata, re
import numpy as np

def normalize_name(s: str) -> str:
    s = "" if s is None or (isinstance(s, float) and np.isnan(s)) else str(s)
    s = unicodedata.normalize("NFKD", s)
    s = "".join(ch for ch in s if not unicodedata.combining(ch))
    s = re.sub(r"[^a-z0-9]+", " ", s.lower()).strip()
    return re.sub(r"\s+", " ", s).strip()

def ensure_text_col(df, *candidates, create="name"):
    """Pick the first existing col; if none exist, create an empty text col."""
    for c in candidates:
        if c in df.columns:
            return c
    if create not in df.columns:
        df[create] = ""
    return create

def coalesce_cols(df, *cols):
    """First non-empty string across given columns (if present)."""
    present = [c for c in cols if c in df.columns]
    if not present:
        return pd.Series([""] * len(df), index=df.index, dtype=object)
    out = df[present[0]].astype(str).fillna("")
    for c in present[1:]:
        nxt = df[c].astype(str).fillna("")
        use_nxt = out.str.strip().eq("")
        out = out.where(~use_nxt, nxt)
    return out.fillna("")

# --- legacy: use existing name/pool_name
legacy_name_col = ensure_text_col(legacy_db, "name", "pool_name", create="name")
legacy_db["name"] = legacy_db[legacy_name_col].astype(str).fillna("").str.strip()
legacy_db["name_norm"] = legacy_db["name"].map(normalize_name)

# --- OSM: coalesce multiple name fields before normalizing
osm_db["name"] = coalesce_cols(
    osm_db,
    "name", "official_name", "short_name", "alt_name", "name:de", "name:en", "brand", "operator"
).str.strip()

# require string and normalize
osm_db["name"] = osm_db["name"].fillna("").astype(str)
osm_db["name_norm"] = osm_db["name"].map(normalize_name)





## 5) Reverse geocode (street & postal_code) for OSM-only named and berlin pools final

**Why:** OSM points can miss addresses. We fill `street` & `postal_code` using Nominatim, **with a local cache** to stay fast and polite.


In [39]:
# --- Enrich street / postal_code / district for BOTH OSM + LEGACY with one cache ---
from pathlib import Path
import pandas as pd, re, unicodedata
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter

# --------------------------- CONFIG ---------------------------
OSM_PUBLIC_NAMED_CSV = Path("osm_public_named.csv")
CACHE_PATH = Path("reverse_geocode_cache.csv")
WRITE_LEGACY_INPLACE = True  # False -> writes berlin_pools_final_dataset_enriched.csv

# Try to reuse notebook vars, else fall back to filename
try:
    LEGACY_CSV = (LEGACY_MAIN_CSV if LEGACY_MAIN_CSV.exists() else LEGACY_ALT_CSV)
except Exception:
    LEGACY_CSV = Path("berlin_pools_final_dataset.csv")

# -------------------------- HELPERS ---------------------------
def strip_accents(s: str) -> str:
    s = "" if s is None else str(s)
    s = unicodedata.normalize("NFKD", s)
    return "".join(ch for ch in s if not unicodedata.combining(ch))

def is_blank(x) -> bool:
    if pd.isna(x): return True
    return str(x).strip().lower() in {"", "nan", "none", "<na>", "null"}

def key(lat, lon):
    try: return round(float(lat), 6), round(float(lon), 6)
    except Exception: return None

def clean_postcode(p):
    m = re.search(r"\b(1[0-4]\d{3})\b", str(p))
    return m.group(1) if m else ""

# Ortsteil (suburb) → Bezirk
ORTSTEIL_TO_BEZIRK = {
    # Mitte
    "mitte":"Mitte","tiergarten":"Mitte","wedding":"Mitte","gesundbrunnen":"Mitte",
    # Friedrichshain-Kreuzberg
    "friedrichshain":"Friedrichshain-Kreuzberg","kreuzberg":"Friedrichshain-Kreuzberg",
    # Pankow
    "pankow":"Pankow","prenzlauer berg":"Pankow","niederschönhausen":"Pankow","heinersdorf":"Pankow",
    "blankenburg":"Pankow","buch":"Pankow","karow":"Pankow","französisch buchholz":"Pankow",
    "weißensee":"Pankow","stadtrandsiedlung malchow":"Pankow","borna":"Pankow",
    # Charlottenburg-Wilmersdorf
    "charlottenburg":"Charlottenburg-Wilmersdorf","wilmersdorf":"Charlottenburg-Wilmersdorf",
    "grunewald":"Charlottenburg-Wilmersdorf","halensee":"Charlottenburg-Wilmersdorf","westend":"Charlottenburg-Wilmersdorf",
    # Spandau
    "spandau":"Spandau","haselhorst":"Spandau","siemensstadt":"Spandau","staaken":"Spandau",
    "gatow":"Spandau","kladow":"Spandau","hakenfelde":"Spandau",
    # Steglitz-Zehlendorf
    "steglitz":"Steglitz-Zehlendorf","zehlendorf":"Steglitz-Zehlendorf","dahlem":"Steglitz-Zehlendorf",
    "lichterfelde":"Steglitz-Zehlendorf","lankwitz":"Steglitz-Zehlendorf","nikolassee":"Steglitz-Zehlendorf",
    "wannsee":"Steglitz-Zehlendorf",
    # Tempelhof-Schöneberg
    "tempelhof":"Tempelhof-Schöneberg","mariendorf":"Tempelhof-Schöneberg","marienfelde":"Tempelhof-Schöneberg",
    "lichtenrade":"Tempelhof-Schöneberg","schöneberg":"Tempelhof-Schöneberg","friedrichshöhe":"Tempelhof-Schöneberg",
    # Neukölln
    "neukölln":"Neukölln","britz":"Neukölln","buckow":"Neukölln","rudow":"Neukölln","gropiusstadt":"Neukölln",
    # Treptow-Köpenick
    "alt-treptow":"Treptow-Köpenick","plänterwald":"Treptow-Köpenick","baumschulenweg":"Treptow-Köpenick",
    "niederschöneweide":"Treptow-Köpenick","oberschöneweide":"Treptow-Köpenick","köpenick":"Treptow-Köpenick",
    "friedrichshagen":"Treptow-Köpenick","rahnsdorf":"Treptow-Köpenick","grünau":"Treptow-Köpenick",
    "müggelheim":"Treptow-Köpenick","schmöckwitz":"Treptow-Köpenick","adlershof":"Treptow-Köpenick",
    "altglienicke":"Treptow-Köpenick","bohnsdorf":"Treptow-Köpenick",
    # Marzahn-Hellersdorf
    "marzahn":"Marzahn-Hellersdorf","hellersdorf":"Marzahn-Hellersdorf","biesdorf":"Marzahn-Hellersdorf",
    "kaulsdorf":"Marzahn-Hellersdorf","mahlsdorf":"Marzahn-Hellersdorf",
    # Lichtenberg
    "lichtenberg":"Lichtenberg","fennpfuhl":"Lichtenberg","friedrichsfelde":"Lichtenberg",
    "rummelsburg":"Lichtenberg","karlshorst":"Lichtenberg","malchow":"Lichtenberg",
    "wartenberg":"Lichtenberg","neu-hohenschönhausen":"Lichtenberg","alt-hohenschönhausen":"Lichtenberg",
    # Reinickendorf
    "reinickendorf":"Reinickendorf","tegel":"Reinickendorf","wittenau":"Reinickendorf","hermsdorf":"Reinickendorf",
    "frohnau":"Reinickendorf","märki­sches viertel":"Reinickendorf","heiligensee":"Reinickendorf",
    "konradshöhe":"Reinickendorf","lübars":"Reinickendorf","waidmannslust":"Reinickendorf",
}
district_mapping = {
    'Mitte': '01','Friedrichshain-Kreuzberg': '02','Pankow': '03',
    'Charlottenburg-Wilmersdorf': '04','Spandau': '05','Steglitz-Zehlendorf': '06',
    'Tempelhof-Schöneberg': '07','Neukölln': '08','Treptow-Köpenick': '09',
    'Marzahn-Hellersdorf': '10','Lichtenberg': '11','Reinickendorf': '12'
}
def canonical_district(raw: str) -> str:
    s = strip_accents(raw).strip()
    s = s.replace("Mitte (Berlin)", "Mitte") \
         .replace("Friedrichshain - Kreuzberg","Friedrichshain-Kreuzberg") \
         .replace("Charlottenburg - Wilmersdorf","Charlottenburg-Wilmersdorf") \
         .replace("Steglitz - Zehlendorf","Steglitz-Zehlendorf") \
         .replace("Tempelhof - Schoneberg", "Tempelhof-Schöneberg") \
         .replace("Tempelhof - Schöneberg","Tempelhof-Schöneberg") \
         .replace("Treptow - Kopenick","Treptow-Köpenick") \
         .replace("Marzahn - Hellersdorf","Marzahn-Hellersdorf")
    s = re.sub(r"^(Bezirk|District)\s+", "", s, flags=re.I)
    s = re.sub(r"\s*-\s*", "-", s).strip()
    return s if s in district_mapping else ""

# Very light street parser from a free-text "address" (legacy)
STREET_TOKENS = ("straße","str.","strasse","allee","damm","weg","platz","ufer","chaussee","ring","steig","promenade","gasse","pfad")
def street_from_address(addr: str) -> str:
    parts = [p.strip() for p in str(addr).split(",") if p.strip()]
    # pick the first piece that looks like a street
    for p in parts:
        low = p.lower()
        if any(t in low for t in STREET_TOKENS):
            return p
    # fallback: "Word Word 123" pattern
    m = re.search(r"([A-Za-zÄÖÜäöüß\-\s]+)\s(\d+[a-zA-Z]?)", str(addr))
    return m.group(0).strip() if m else ""

# ------------------ shared cache + geocoder ------------------
cache_cols = ["lat","lon","street","postal_code","district"]
cache = pd.read_csv(CACHE_PATH) if CACHE_PATH.exists() else pd.DataFrame(columns=cache_cols)
for c in cache_cols:
    if c not in cache.columns: cache[c] = ""
cache["key"] = cache.apply(lambda r: key(r.get("lat"), r.get("lon")), axis=1)
cache_dict = {k: (s, p, d) for k, s, p, d in zip(cache["key"], cache["street"], cache["postal_code"], cache["district"]) if pd.notna(k)}

geolocator = Nominatim(user_agent="pools_enrichment/1.0 (contact: your_email@example.com)")
reverse = RateLimiter(geolocator.reverse, min_delay_seconds=1.0, max_retries=2, error_wait_seconds=2.0, swallow_exceptions=True)

def enrich_table_inplace(df, lat_candidates, lon_candidates, street_col, post_col, dist_col, prefill_from=None, name_hint_col=None, sanity_label=""):
    """Mutates df to fill street/postal_code/district in-place."""
    # ensure cols
    for c in [street_col, post_col, dist_col]:
        if c not in df.columns: df[c] = ""
        df[c] = df[c].astype("string")
    # lat/lon
    def pickcol(cands):
        for c in cands:
            if c in df.columns: return c
        return None
    lat_c = pickcol(lat_candidates)
    lon_c = pickcol(lon_candidates)
    if not lat_c or not lon_c:
        print(f"[skip] {sanity_label}: no lat/lon")
        return

    # optional prefill
    if prefill_from and prefill_from in df.columns:
        mask = df[street_col].apply(is_blank)
        pf = df[prefill_from].astype(str).map(street_from_address)
        df.loc[mask & pf.astype(str).str.strip().ne(""), street_col] = pf[mask & pf.astype(str).str.strip().ne("")]

    # who needs?
    need_mask = (
        df[street_col].apply(is_blank) |
        df[post_col].apply(is_blank)   |
        df[dist_col].apply(is_blank)
    ) & df[lat_c].notna() & df[lon_c].notna()

    need = df[need_mask].copy()
    print(f"[info] {sanity_label}: rows needing enrichment:", int(need_mask.sum()))

    filled_rows = []
    for idx, r in need.iterrows():
        k = key(r[lat_c], r[lon_c])
        if not k: 
            continue
        s, p, d = cache_dict.get(k, ("","",""))

        if is_blank(s) or is_blank(p) or is_blank(d):
            loc = reverse((r[lat_c], r[lon_c]), exactly_one=True, addressdetails=True, language="de", zoom=18)
            ad  = (loc.raw.get("address") if loc and hasattr(loc, "raw") else {}) or {}

            # street / house no
            if is_blank(s):
                road = ad.get("road") or ad.get("pedestrian") or ad.get("footway") or ad.get("path") or ""
                hn   = ad.get("house_number") or ""
                s = " ".join([x for x in [road, hn] if x]).strip()

            # postcode
            if is_blank(p):
                p = clean_postcode(ad.get("postcode", ""))

            # district
            if is_blank(d):
                raw_d = ad.get("city_district") or ad.get("borough") or ad.get("county") or ""
                d_can = canonical_district(raw_d)
                if not d_can:
                    sub = strip_accents(ad.get("suburb") or ad.get("municipality") or ad.get("neighbourhood") or "").lower().strip()
                    d_can = ORTSTEIL_TO_BEZIRK.get(sub, "")
                d = d_can

            if k:
                filled_rows.append({"lat": k[0], "lon": k[1], "street": s, "postal_code": p, "district": d, "key": k})

        # assign back only if blank
        if not is_blank(s) and is_blank(df.at[idx, street_col]):
            df.at[idx, street_col] = str(s)
        if not is_blank(p) and is_blank(df.at[idx, post_col]):
            df.at[idx, post_col] = str(p)
        if not is_blank(d) and is_blank(df.at[idx, dist_col]):
            df.at[idx, dist_col] = str(d)

    # persist cache
    if filled_rows:
        cc = pd.concat([cache, pd.DataFrame(filled_rows)], ignore_index=True)
        cc = cc.drop_duplicates(subset=["key"], keep="last")
        cc[cache_cols].to_csv(CACHE_PATH, index=False)
        # refresh in-memory map
        cc["key"] = cc.apply(lambda r: key(r.get("lat"), r.get("lon")), axis=1)
        cache_dict.update({k: (s, p, d) for k, s, p, d in zip(cc["key"], cc["street"], cc["postal_code"], cc["district"]) if pd.notna(k)})

# ------------------ OSM FILE ------------------
osm = pd.read_csv(OSM_PUBLIC_NAMED_CSV)
# prefill from OSM-style columns first
if "addr_street" in osm.columns:
    m = osm["street"].apply(is_blank)
    osm.loc[m, "street"] = osm.loc[m, "addr_street"].astype(str)
if "addr_postcode" in osm.columns:
    m = osm["postal_code"].apply(is_blank) if "postal_code" in osm.columns else pd.Series(True, index=osm.index)
    osm.loc[m, "postal_code"] = osm.loc[m, "addr_postcode"].astype(str)

enrich_table_inplace(
    osm,
    lat_candidates=("lat","latitude"),
    lon_candidates=("lon","longitude"),
    street_col="street",
    post_col="postal_code",
    dist_col="district",
    prefill_from=None,           # OSM already handled above
    sanity_label="OSM"
)
# normalize post code as text
osm["postal_code"] = osm["postal_code"].astype(str).str.replace(".0","",regex=False)
osm.to_csv(OSM_PUBLIC_NAMED_CSV, index=False)

# ------------------ LEGACY FILE ------------------
legacy = pd.read_csv(LEGACY_CSV)
# ensure columns exist for enrichment targets
for c in ["street","postal_code","district"]:
    if c not in legacy.columns: legacy[c] = ""

# try to prefill street from any "address" field
addr_col = "address" if "address" in legacy.columns else None
enrich_table_inplace(
    legacy,
    lat_candidates=("lat","latitude"),
    lon_candidates=("lon","longitude"),
    street_col="street",
    post_col="postal_code",
    dist_col="district",
    prefill_from=addr_col,
    sanity_label="LEGACY"
)
legacy["postal_code"] = legacy["postal_code"].astype(str).str.replace(".0","",regex=False)

# write legacy back
if WRITE_LEGACY_INPLACE:
    legacy.to_csv(LEGACY_CSV, index=False)
    print("[ok] Updated legacy file →", LEGACY_CSV.name)
else:
    out_legacy = LEGACY_CSV.with_name(LEGACY_CSV.stem + "_enriched.csv")
    legacy.to_csv(out_legacy, index=False)
    print("[ok] Wrote legacy enriched →", out_legacy.name)

# ------------------ tiny checks ------------------
def peek_missing(df, label):
    miss_st = int(df["street"].astype(str).str.strip().eq("").sum()) if "street" in df.columns else 0
    miss_pc = int(df["postal_code"].astype(str).str.strip().eq("").sum()) if "postal_code" in df.columns else 0
    miss_di = int(df["district"].astype(str).str.strip().eq("").sum()) if "district" in df.columns else 0
    print(f"[report] {label} missing → street:{miss_st}  postal_code:{miss_pc}  district:{miss_di}")

peek_missing(osm, "OSM")
peek_missing(legacy, "LEGACY")



[info] OSM: rows needing enrichment: 0
[info] LEGACY: rows needing enrichment: 3
[ok] Updated legacy file → berlin_pools_final_dataset.csv
[report] OSM missing → street:0  postal_code:0  district:0
[report] LEGACY missing → street:0  postal_code:0  district:0


issue with sommerbad - no street name 

In [40]:
# Fix Sommerbad (and others in case simillar will appear in the future)
# Fill missing streets by (1) another Nominatim pass + (2) nearest named OSM road via OSMnx
from pathlib import Path
import pandas as pd, re, unicodedata

OSM_PUBLIC_NAMED_CSV = Path("osm_public_named.csv")
NEAREST_ROAD_CACHE = Path("nearest_road_cache.csv")

def is_blank(x):
    if pd.isna(x): return True
    return str(x).strip().lower() in {"", "nan", "none", "<na>", "null"}

def street_like_piece(s: str) -> str:
    # pick a display_name piece that looks like a street
    tokens = ["straße","strasse","allee","damm","weg","platz","ufer","chaussee","ring","steig","promenade","gasse","pfad","chaussee"]
    for piece in [p.strip() for p in str(s).split(",")]:
        low = piece.lower()
        if any(t in low for t in tokens):
            return piece
    return ""

def key(lat, lon):
    try: return round(float(lat), 6), round(float(lon), 6)
    except Exception: return None

df = pd.read_csv(OSM_PUBLIC_NAMED_CSV)

# ensure columns + dtypes
for c in ["name","street","postal_code","district"]:
    if c not in df.columns: df[c] = ""
df[["name","street","postal_code","district"]] = df[["name","street","postal_code","district"]].astype("string")

lat_col = "lat" if "lat" in df.columns else ("latitude" if "latitude" in df.columns else None)
lon_col = "lon" if "lon" in df.columns else ("longitude" if "longitude" in df.columns else None)
if not lat_col or not lon_col:
    raise KeyError("Need 'lat'/'lon' (or 'latitude'/'longitude') to find nearest roads.")

need_mask = df["street"].apply(is_blank) & df[lat_col].notna() & df[lon_col].notna()
need = df[need_mask].copy()
print(f"[street-fill] rows needing street: {int(need_mask.sum())}")

# 1) Tiny extra Nominatim poke (zoom up + parse display_name) — cheap and sometimes enough
try:
    from geopy.geocoders import Nominatim
    from geopy.extra.rate_limiter import RateLimiter
    geolocator = Nominatim(user_agent="pools_enrichment/1.0 (contact: your_email@example.com)")
    reverse = RateLimiter(geolocator.reverse, min_delay_seconds=1.0, max_retries=2, error_wait_seconds=2.0, swallow_exceptions=True)
    for idx, r in need.iterrows():
        if not is_blank(df.at[idx, "street"]): 
            continue
        loc = reverse((r[lat_col], r[lon_col]), exactly_one=True, addressdetails=True, language="de", zoom=19)
        if not loc: 
            continue
        ad = (loc.raw.get("address") if hasattr(loc, "raw") else {}) or {}
        road = ad.get("road") or ad.get("pedestrian") or ad.get("footway") or ad.get("path") or ""
        hn   = ad.get("house_number") or ""
        if road:
            df.at[idx, "street"] = f"{road} {hn}".strip()
        elif "display_name" in loc.raw:
            pick = street_like_piece(loc.raw["display_name"])
            if pick:
                df.at[idx, "street"] = pick
except Exception as e:
    print("[street-fill] Nominatim extra pass skipped:", repr(e))

# 2) Nearest named road via OSMnx (Overpass) — robust fallback
still = df[df["street"].apply(is_blank) & df[lat_col].notna() & df[lon_col].notna()].copy()
print(f"[street-fill] still missing after Nominatim: {len(still)}")

# Load / init cache for nearest roads
if NEAREST_ROAD_CACHE.exists():
    road_cache = pd.read_csv(NEAREST_ROAD_CACHE)
else:
    road_cache = pd.DataFrame(columns=["lat","lon","street","key"])
road_cache["key"] = road_cache.apply(lambda r: key(r.get("lat"), r.get("lon")), axis=1)
road_cache_dict = {k: s for k, s in zip(road_cache["key"], road_cache["street"]) if pd.notna(k)}

def nearest_road_name(lat, lon):
    k = key(lat, lon)
    if k in road_cache_dict and str(road_cache_dict[k]).strip():
        return str(road_cache_dict[k])
    try:
        import osmnx as ox
        # version-agnostic graph builder
        if hasattr(ox, "graph_from_point"):
            G = ox.graph_from_point((lat, lon), dist=250, network_type="all_private", retain_all=True, simplify=True)
        else:
            # very old OSMnx not supported
            return ""
        # find nearest edge and read its name
        try:
            from osmnx import distance as oxdist
            u, v, kkey = oxdist.nearest_edges(G, X=[float(lon)], Y=[float(lat)])[0]
            data = G.get_edge_data(u, v, kkey) or {}
        except Exception:
            # fallback via nearest node and its incident edges
            n = ox.distance.nearest_nodes(G, float(lon), float(lat))
            data = next(iter(G[n][list(G[n])[0]].values()), {})
        name = data.get("name")
        if isinstance(name, list):  # sometimes multiple names
            name = name[0] if name else ""
        if name:
            road_cache_dict[k] = name
            return str(name)
    except Exception:
        return ""
    return ""

filled_rows = []
for idx, r in still.iterrows():
    nm = nearest_road_name(r[lat_col], r[lon_col])
    if nm:
        df.at[idx, "street"] = nm
        k = key(r[lat_col], r[lon_col])
        if k:
            filled_rows.append({"lat": k[0], "lon": k[1], "street": nm, "key": k})

# persist nearest-road cache
if filled_rows:
    nc = pd.DataFrame(filled_rows)
    if not road_cache.empty:
        road_cache = pd.concat([road_cache, nc], ignore_index=True)
    else:
        road_cache = nc
    road_cache.drop_duplicates(subset=["key"], keep="last").to_csv(NEAREST_ROAD_CACHE, index=False)

# Save and show Sommerbad
df.to_csv(OSM_PUBLIC_NAMED_CSV, index=False)
print(df.loc[df["name"].str.contains("Sommerbad", case=False, na=False), ["name","street","postal_code","district"]])
print("[ok] street backfilled where possible")



[street-fill] rows needing street: 0
[street-fill] still missing after Nominatim: 0
        name          street postal_code  district
5  Sommerbad  Campus Efeuweg       12351  Neukölln
[ok] street backfilled where possible



## 6) Build **pools_master_minimal.csv**

**Why:** consolidate legacy + OSM-only rows into one minimal master table with consistent IDs & districts.

**What we ensure:**
- **pool_id** present for every row (legacy preserved; OSM generated robustly).
- **district_id** filled from district name mapping; if missing, reverse‑geocoded from coordinates (cached).
- `open_all_year` filled with `False` where empty.


In [41]:
# === Step 6 — Build final master (robust) ===
import pandas as pd
import numpy as np
from pathlib import Path

# reload inputs
legacy = pd.read_csv(LEGACY_MAIN_CSV if LEGACY_MAIN_CSV.exists() else LEGACY_ALT_CSV)
enrich = pd.read_csv("legacy_enrichment_list.csv")
osm_new = pd.read_csv(OSM_PUBLIC_NAMED_CSV)

def pickcol(df, *options):
    for c in options:
        if c in df.columns:
            return c
    return None

# --- district helpers
district_mapping = {
    'Mitte': '01', 'Friedrichshain-Kreuzberg': '02', 'Pankow': '03',
    'Charlottenburg-Wilmersdorf': '04', 'Spandau': '05', 'Steglitz-Zehlendorf': '06',
    'Tempelhof-Schöneberg': '07', 'Neukölln': '08', 'Treptow-Köpenick': '09',
    'Marzahn-Hellersdorf': '10', 'Lichtenberg': '11', 'Reinickendorf': '12'
}

import unicodedata, re
def strip_accents(s: str) -> str:
    s = "" if s is None else str(s)
    s = unicodedata.normalize("NFKD", s)
    return "".join(ch for ch in s if not unicodedata.combining(ch))

def canonical_district(raw: str) -> str:
    if not isinstance(raw, str) or not raw.strip():
        return ""
    s = strip_accents(raw).strip()
    s = s.replace("Mitte (Berlin)", "Mitte")
    s = s.replace("Friedrichshain - Kreuzberg", "Friedrichshain-Kreuzberg")
    s = s.replace("Charlottenburg - Wilmersdorf", "Charlottenburg-Wilmersdorf")
    s = s.replace("Steglitz - Zehlendorf", "Steglitz-Zehlendorf")
    s = s.replace("Tempelhof - Schoneberg", "Tempelhof-Schöneberg").replace("Tempelhof - Schöneberg", "Tempelhof-Schöneberg")
    s = s.replace("Treptow - Kopenick", "Treptow-Köpenick")
    s = s.replace("Marzahn - Hellersdorf", "Marzahn-Hellersdorf")
    s = re.sub(r"^(Bezirk|District)\s+", "", s, flags=re.I)
    s = re.sub(r"\s*-\s*", "-", s).strip()
    if s in district_mapping:
        return s
    # very light fuzzy fallback
    toks = set(s.lower().replace("-", " ").split())
    best, overlap = "", 0
    for official in district_mapping:
        otoks = set(official.lower().replace("-", " ").split())
        ov = len(toks & otoks)
        if ov > overlap:
            best, overlap = official, ov
    return best if overlap else ""

# --- name normalization (for enrichment) ---
def normalize_name(s: str) -> str:
    s = "" if pd.isna(s) else str(s)
    s = unicodedata.normalize("NFKD", s)
    s = "".join(ch for ch in s if not unicodedata.combining(ch))
    s = re.sub(r"[^a-z0-9]+", " ", s.lower()).strip()
    return re.sub(r"\s+", " ", s).strip()

legacy_name_col = pickcol(legacy, "name", "pool_name") or "name"
legacy["_name_norm"] = legacy[legacy_name_col].astype(str).map(normalize_name)

enrich_name_col = pickcol(enrich, "name_l") or "name_l"
enrich["_name_norm"] = enrich[enrich_name_col].astype(str).map(normalize_name)

# take a few OSM attrs to enrich legacy (fill if legacy is blank)
osm_add_cols = [c for c in ["website_o","opening_hours_o","wheelchair_o","phone_o"] if c in enrich.columns]
enrich_small  = enrich[["_name_norm"] + osm_add_cols].drop_duplicates("_name_norm")
legacy_enriched = legacy.merge(enrich_small, on="_name_norm", how="left")

for lcol, ocol in {"website":"website_o","opening_hours":"opening_hours_o","wheelchair":"wheelchair_o","phone":"phone_o"}.items():
    if lcol in legacy_enriched.columns and ocol in legacy_enriched.columns:
        legacy_enriched[lcol] = legacy_enriched[lcol].where(
            legacy_enriched[lcol].notna() & (legacy_enriched[lcol].astype(str).str.strip() != ""),
            legacy_enriched[ocol]
        )
legacy_enriched.drop(columns=[c for c in ["_name_norm","website_o","opening_hours_o","wheelchair_o","phone_o"] if c in legacy_enriched.columns],
                     inplace=True, errors="ignore")

# --- OSM rows → final schema ---
def make_osm_pool_id(df):
    id_col = pickcol(df, "source_id","osm_id","element_id","osmid")
    if id_col:
        base = df[id_col].fillna("").astype(str).str.strip()
    else:
        base = pd.Series([""] * len(df), index=df.index)
    latc = pickcol(df, "lat","latitude")
    lonc = pickcol(df, "lon","longitude")
    if latc and lonc:
        coord_sur = df[latc].round(6).astype(str) + "_" + df[lonc].round(6).astype(str)
    else:
        coord_sur = pd.Series([f"{i:05d}" for i in range(len(df))], index=df.index)
    base = np.where(base == "", coord_sur, base)
    return pd.Series("OSM_" + pd.Series(base), index=df.index)

osm_final = pd.DataFrame({
    "pool_id":       make_osm_pool_id(osm_new),
    "district":      osm_new[pickcol(osm_new, "district")] if pickcol(osm_new, "district") else "",
    "district_id":   osm_new[pickcol(osm_new, "district_id")] if pickcol(osm_new, "district_id") else "",
    "name":          osm_new[pickcol(osm_new, "name")] if pickcol(osm_new, "name") else "",
    "pool_type":     osm_new[pickcol(osm_new, "pool_type","leisure","sport")] if pickcol(osm_new, "pool_type","leisure","sport") else "",
    "street":        osm_new[pickcol(osm_new, "street","addr_street","address")] if pickcol(osm_new, "street","addr_street","address") else "",
    "postal_code":   osm_new[pickcol(osm_new, "postal_code","addr_postcode")] if pickcol(osm_new, "postal_code","addr_postcode") else "",
    "latitude":      pd.to_numeric(osm_new[pickcol(osm_new, "lat","latitude")], errors="coerce") if pickcol(osm_new, "lat","latitude") else pd.NA,
    "longitude":     pd.to_numeric(osm_new[pickcol(osm_new, "lon","longitude")], errors="coerce") if pickcol(osm_new, "lon","longitude") else pd.NA,
    "open_all_year": pd.NA,
})

# --- Legacy rows → final schema ---
legacy_final = pd.DataFrame({
    "pool_id":     legacy_enriched[pickcol(legacy_enriched, "pool_id","id","legacy_id")] if pickcol(legacy_enriched, "pool_id","id","legacy_id") else ("L_" + legacy_enriched.index.astype(str)),
    "district":    legacy_enriched[pickcol(legacy_enriched, "district")] if pickcol(legacy_enriched, "district") else "",
    "district_id": legacy_enriched[pickcol(legacy_enriched, "district_id")] if pickcol(legacy_enriched, "district_id") else "",
    "name":        legacy_enriched[pickcol(legacy_enriched, "name","pool_name")] if pickcol(legacy_enriched, "name","pool_name") else "",
    "pool_type":   legacy_enriched[pickcol(legacy_enriched, "pool_type","type")] if pickcol(legacy_enriched, "pool_type","type") else "",
    "street":      legacy_enriched[pickcol(legacy_enriched, "street","address")] if pickcol(legacy_enriched, "street","address") else "",
    "postal_code": legacy_enriched[pickcol(legacy_enriched, "postal_code","postcode","zip")] if pickcol(legacy_enriched, "postal_code","postcode","zip") else "",
    "latitude":    pd.to_numeric(legacy_enriched[pickcol(legacy_enriched, "latitude","lat")], errors="coerce") if pickcol(legacy_enriched, "latitude","lat") else pd.NA,
    "longitude":   pd.to_numeric(legacy_enriched[pickcol(legacy_enriched, "longitude","lon")], errors="coerce") if pickcol(legacy_enriched, "longitude","lon") else pd.NA,
    "open_all_year": legacy_enriched[pickcol(legacy_enriched, "open_all_year","open_all_year_round","open_year_round")] if pickcol(legacy_enriched, "open_all_year","open_all_year_round","open_year_round") else pd.NA,
})

# --- combine
final_master = pd.concat([legacy_final, osm_final], ignore_index=True)

# --- ensure dtypes
for c in ["pool_id","district","district_id","name","pool_type","street","postal_code"]:
    if c in final_master.columns:
        final_master[c] = final_master[c].astype("string")
for c in ["latitude","longitude"]:
    if c in final_master.columns:
        final_master[c] = pd.to_numeric(final_master[c], errors="coerce")

# --- fill district via cache-aware reverse geocoding if needed
def key(lat, lon):
    try:
        return round(float(lat), 6), round(float(lon), 6)
    except Exception:
        return None

need = (final_master["district_id"].isna() | final_master["district_id"].astype(str).str.strip().eq("")) & \
       final_master["latitude"].notna() & final_master["longitude"].notna()

if need.any():
    cache_cols = ["lat","lon","street","postal_code","district"]
    cache = pd.read_csv(CACHE_PATH) if CACHE_PATH.exists() else pd.DataFrame(columns=cache_cols)
    if "district" not in cache.columns:
        cache["district"] = ""
    cache["key"] = cache.apply(lambda r: key(r.get("lat"), r.get("lon")), axis=1)
    cache_dict = {k: d for k, d in zip(cache["key"], cache["district"]) if pd.notna(k)}

    from geopy.geocoders import Nominatim
    from geopy.extra.rate_limiter import RateLimiter
    geolocator = Nominatim(user_agent="pools_enrichment/1.0 (contact: your_email@example.com)")
    reverse = RateLimiter(geolocator.reverse, min_delay_seconds=1.0)

    filled = []
    for idx, row in final_master[need].iterrows():
        k = key(row["latitude"], row["longitude"])
        dname = cache_dict.get(k, "")
        if not dname:
            try:
                loc = reverse((row["latitude"], row["longitude"]), exactly_one=True, addressdetails=True, language="de")
                ad = (loc.raw.get("address") if loc and hasattr(loc, "raw") else {}) or {}
                raw = ad.get("city_district") or ad.get("borough") or ad.get("county") or ""
                dname = canonical_district(raw)
                if k:
                    filled.append({"lat": k[0], "lon": k[1], "street": "", "postal_code": "", "district": dname, "key": k})
            except Exception:
                dname = ""
        if dname:
            final_master.at[idx, "district"] = dname

    if filled:
        cache = pd.concat([cache, pd.DataFrame(filled)], ignore_index=True)
        cache = cache.drop_duplicates(subset=["key"], keep="last")
        cache[cache_cols].to_csv(CACHE_PATH, index=False)

# map district → id (pad to 2 chars)
need_id = final_master["district_id"].isna() | final_master["district_id"].astype(str).str.strip().eq("")
final_master.loc[need_id, "district_id"] = final_master.loc[need_id, "district"].map(district_mapping).astype("string")
final_master["district_id"] = final_master["district_id"].fillna("").str.replace(".0","",regex=False)
mask = final_master["district_id"].ne("")
final_master.loc[mask, "district_id"] = final_master.loc[mask, "district_id"].str.zfill(2)

# --- open_all_year: coerce & fill blanks → False (boolean)
if "open_all_year" in final_master.columns:
    s = final_master["open_all_year"].astype(str).str.strip().str.lower()
    trueish  = {"true","1","yes","y","ja"}
    falseish = {"false","0","no","n","nein",""}
    final_master["open_all_year"] = s.map(lambda x: True if x in trueish else (False if x in falseish else False))

# --- ensure unique pool_id (keep first if accidental dupes)
dup_ct = int(final_master["pool_id"].duplicated().sum())
if dup_ct:
    print(f"[warn] duplicate pool_id found: {dup_ct} → keeping first")
    final_master = final_master.drop_duplicates(subset=["pool_id"], keep="first")

# --- guarantee final column order & save
final_master = final_master[[
    "pool_id","district_id","name","pool_type","street",
    "postal_code","latitude","longitude","open_all_year"
]]
final_master.to_csv("pools_master_minimal.csv", index=False)
print("[ok] Wrote pools_master_minimal.csv with", len(final_master), "rows")


[ok] Wrote pools_master_minimal.csv with 168 rows


Final tweek for pool_id column - OSM does not provide it - decided to use numbers 1-400 as the first entry from legacy tool has id number : 472

In [42]:
# Replace surrogate IDs like "OSM_sur_123" with consecutive numbers "1", "2", ...
import pandas as pd, re

PATH = "pools_master_minimal.csv"

df = pd.read_csv(PATH, dtype={"pool_id": "string"})

# rows with OSM surrogate ids
mask = df["pool_id"].astype(str).str.fullmatch(r"OSM_sur_\d+")
sur_ids = df.loc[mask, "pool_id"].astype(str).unique().tolist()

# order by numeric suffix so mapping is stable/predictable
def sur_num(s): 
    m = re.search(r"(\d+)$", s)
    return int(m.group(1)) if m else 10**9

ordered = sorted(sur_ids, key=sur_num)

# provisional mapping: OSM_sur_* -> "1", "2", ...
proposed = [str(i+1) for i in range(len(ordered))]

# avoid collision with any existing non-surrogate pool_id
existing = set(df.loc[~mask, "pool_id"].astype(str))
start = 1
while any(str(start + i) in existing for i in range(len(ordered))):
    start += 1  # bump start until there are no collisions

mapping = {sid: str(start + i) for i, sid in enumerate(ordered)}

# (Optional) sanity: if you *really* want to cap at 400, uncomment next 2 lines
# if len(mapping) > 400:
#     print(f"[warn] {len(mapping)} surrogate ids > 400; numbering will continue beyond 400.")

# apply mapping
df.loc[mask, "pool_id"] = df.loc[mask, "pool_id"].map(mapping)

# keep pool_id as string/object
df["pool_id"] = df["pool_id"].astype("string")

df.to_csv(PATH, index=False)

# small summary
new_vals = list(mapping.values())
print(f"[ok] Replaced {len(mapping)} surrogate ids. New range:",
      (min(map(int, new_vals)), max(map(int, new_vals))) if new_vals else "n/a")
print("Examples:", dict(list(mapping.items())[:5]))


[ok] Replaced 24 surrogate ids. New range: (1, 24)
Examples: {'OSM_sur_12': '1', 'OSM_sur_30': '2', 'OSM_sur_122': '3', 'OSM_sur_125': '4', 'OSM_sur_127': '5'}



## 7) Validation & Outputs

**Why:** quick checks to ensure core fields are present and sensible.


In [43]:
# === Step 7 — Quick QC on pools_master_minimal.csv ===
import pandas as pd, numpy as np, re

df = pd.read_csv("pools_master_minimal.csv")

print("Rows:", len(df))
print("Missing pool_id:", int(df["pool_id"].astype(str).str.strip().eq("").sum()))
print("Missing district_id:", int(df["district_id"].astype(str).str.strip().eq("").sum()))

# --- basic uniqueness & formatting checks
dupe_ids = int(df["pool_id"].duplicated().sum())
print("Duplicate pool_id:", dupe_ids)

# district_id must be 01..12 (2 digits)
dist_pat = r"(0[1-9]|1[0-2])"
bad_dist_mask = ~df["district_id"].astype(str).str.fullmatch(dist_pat)
bad_dist_count = int(bad_dist_mask.sum())
print("Invalid district_id format (not 01–12):", bad_dist_count)

# postal code: Berlin 10xxx–14xxx (allow empty)
pc = df["postal_code"].astype(str).str.strip()
bad_pc_mask = (pc != "") & ~pc.str.fullmatch(r"1[0-4]\d{3}")
print("Suspicious postal_code (non-empty but not 10xxx–14xxx):", int(bad_pc_mask.sum()))

# coordinates sanity
lat = pd.to_numeric(df["latitude"], errors="coerce")
lon = pd.to_numeric(df["longitude"], errors="coerce")
missing_lat = int(lat.isna().sum())
missing_lon = int(lon.isna().sum())
out_lat = int((~lat.between(-90, 90)).sum())
out_lon = int((~lon.between(-180, 180)).sum())
print("Missing latitude:", missing_lat, "| Missing longitude:", missing_lon)
print("Out-of-range latitude:", out_lat, "| Out-of-range longitude:", out_lon)

# open_all_year should be boolean (from Step 6). Show distribution.
print("\nopen_all_year dtype:", df["open_all_year"].dtype)
print(df["open_all_year"].value_counts(dropna=False))

# --- Peek at problems (up to 5 rows each) ---
def peek(mask, cols, title):
    m = df[mask]
    if not m.empty:
        print(f"\n{title} (showing up to 5):")
        display(m[cols].head(5))

core_cols = ["pool_id","district_id","name","street","postal_code","latitude","longitude","open_all_year"]

peek(df["pool_id"].astype(str).str.strip().eq(""), core_cols, "Rows with missing pool_id")
peek(bad_dist_mask, core_cols, "Rows with invalid district_id")
peek(bad_pc_mask, core_cols, "Rows with suspicious postal_code")
peek(lat.isna() | lon.isna(), core_cols, "Rows with missing coordinates")
peek((~lat.between(-90, 90)) | (~lon.between(-180, 180)), core_cols, "Rows with out-of-range coordinates")

print("\nSample (top 10):")
display(df.head(10))




Rows: 168
Missing pool_id: 0
Missing district_id: 0
Duplicate pool_id: 0
Invalid district_id format (not 01–12): 138
Suspicious postal_code (non-empty but not 10xxx–14xxx): 0
Missing latitude: 0 | Missing longitude: 0
Out-of-range latitude: 0 | Out-of-range longitude: 0

open_all_year dtype: bool
open_all_year
False    89
True     79
Name: count, dtype: int64

Rows with invalid district_id (showing up to 5):


Unnamed: 0,pool_id,district_id,name,street,postal_code,latitude,longitude,open_all_year
1,473,9,Kleine Schwimmhalle Wuhlheide,An der Wuhlheide 161,12459,52.45993,13.53965,True
2,474,8,Kombibad Mariendorf,Ankogelweg 95,12107,52.41972,13.40154,True
4,476,2,Stadtbad Kreuzberg - Baerwaldbad,Baerwaldstraße 64-67,10961,52.49451,13.40432,True
5,477,3,Strandbad am Weißen See,Berliner Allee 155,13086,52.55396,13.46583,False
6,478,5,Sommerbad Staaken-West,Brunsbüttler Damm 443,13591,52.53386,13.13123,False



Sample (top 10):


Unnamed: 0,pool_id,district_id,name,pool_type,street,postal_code,latitude,longitude,open_all_year
0,472,12,Strandbad Lübars,Naturbad,Am Freibad 9,13469,52.61824,13.33519,False
1,473,9,Kleine Schwimmhalle Wuhlheide,Hallenbad,An der Wuhlheide 161,12459,52.45993,13.53965,True
2,474,8,Kombibad Mariendorf,Kombibad,Ankogelweg 95,12107,52.41972,13.40154,True
3,475,11,Schwimmhalle Anton-Saefkow-Platz,Hallenbad,Anton-Saefkow-Platz 1,10369,52.53093,13.47184,True
4,476,2,Stadtbad Kreuzberg - Baerwaldbad,Hallenbad,Baerwaldstraße 64-67,10961,52.49451,13.40432,True
5,477,3,Strandbad am Weißen See,Naturbad,Berliner Allee 155,13086,52.55396,13.46583,False
6,478,5,Sommerbad Staaken-West,Freibad,Brunsbüttler Damm 443,13591,52.53386,13.13123,False
7,479,10,Schwimmhalle Kaulsdorf,Schulbad,Clara-Zetkin-Weg 13,12619,52.5208,13.58541,True
8,480,8,Sommerbad Neukölln,Freibad,Columbiadamm 160-180,10965,52.48025,13.41595,False
9,481,6,Schwimmhalle Finckensteinallee,Hallenbad,Finckensteinallee 73,12205,52.43225,13.29791,False


# === Finalize schema & save (from CSV) ===

In [44]:

import pandas as pd

df = pd.read_csv("pools_master_minimal.csv")

need_cols = ["pool_id","name","pool_type","street","postal_code",
             "latitude","longitude","open_all_year","district_id"]
for c in need_cols:
    if c not in df.columns:
        df[c] = "" if c not in ["latitude","longitude"] else pd.NA

df["latitude"]  = pd.to_numeric(df["latitude"], errors="coerce").astype("float64")
df["longitude"] = pd.to_numeric(df["longitude"], errors="coerce").astype("float64")

obj_cols = ["pool_id","name","pool_type","street","postal_code","open_all_year","district_id"]
for c in obj_cols:
    df[c] = df[c].astype(str).fillna("").astype("object")

df.loc[df["open_all_year"].astype(str).str.strip().eq(""), "open_all_year"] = "False"

df = df[need_cols]
df.to_csv("pools_master_minimal.csv", index=False)

print(df.dtypes)


pool_id           object
name              object
pool_type         object
street            object
postal_code       object
latitude         float64
longitude        float64
open_all_year     object
district_id       object
dtype: object
