In [None]:
#!/usr/bin/env python3
# map_merged_voa_epc.py — geocode merged VOA+EPC data and build interactive Leaflet map

import csv, json, os, re, time, urllib.parse, urllib.request
from html import escape

# ---------------- CONFIG ----------------
INPUT_CSV = r""
OUTPUT_CSV = r""
OUTPUT_HTML = r""

USER_AGENT = ""
NOMINATIM_BASE = "https://nominatim.openstreetmap.org/search"
MAPBOX_TOKEN = ""
MAPBOX_BASE = "https://api.mapbox.com/geocoding/v5/mapbox.places/"
DELAY_SECONDS = 1.0
COUNTRYCODE = "gb"

# colours
RADIUS = 4
BLUE = "#2A7AE2"   # VOA-only data
RED = "#E02424"    # EPC-only data
ORANGE = "#FF8C00" # both

POSTCODE_RE = re.compile(r'([A-Z]{1,2}\d{1,2}[A-Z]?\s*\d[A-Z]{2})', re.IGNORECASE)

import ssl
ssl._create_default_https_context = ssl._create_unverified_context

# ---------------- HELPERS ----------------
def normalize_postcode(pc):
    if not pc: return ""
    s = re.sub(r"[^A-Z0-9]", "", pc.upper())
    return s[:-3].strip() + " " + s[-3:] if len(s) > 3 else s

def extract_postcode_from_text(s):
    if not s: return ""
    m = POSTCODE_RE.search(s)
    return m.group(1).upper().strip() if m else ""

def do_request(url, headers=None, timeout=30):
    headers = headers or {"User-Agent": USER_AGENT}
    req = urllib.request.Request(url, headers=headers)
    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            return resp.read().decode("utf-8"), None
    except Exception as e:
        return None, str(e)

def nominatim_search(params):
    params = {**params, "format":"json","limit":1}
    url = NOMINATIM_BASE + "?" + urllib.parse.urlencode(params, safe=",")
    body, err = do_request(url)
    if err: return None, url, err
    try:
        data = json.loads(body)
        return (data[0], url, None) if data else (None, url, "no_results")
    except Exception as e:
        return None, url, str(e)

def mapbox_search(query):
    q = urllib.parse.quote(query)
    url = f"{MAPBOX_BASE}{q}.json?access_token={MAPBOX_TOKEN}&limit=1&country=GB"
    body, err = do_request(url)
    if err: return None, url, err
    try:
        data = json.loads(body)
        f = data.get("features", [])
        return (f[0], url, None) if f else (None, url, "no_results")
    except Exception as e:
        return None, url, str(e)

def geocode_row(address_text, postcode_text):
    # embedded coords
    m = re.search(r'(-?\d+\.\d+)\s*,\s*(-?\d+\.\d+)', address_text or "")
    if m:
        return float(m[1]), float(m[2]), "embedded_coords"

    params = {"countrycodes": COUNTRYCODE}
    if postcode_text:
        params["postalcode"] = postcode_text
    parts = [p.strip() for p in re.split(r'[,|]', address_text or "") if p.strip()]
    if parts:
        params["street"] = parts[0]
        if len(parts) > 1:
            params["city"] = parts[-1]

    res, _, _ = nominatim_search(params)
    if res: return float(res["lat"]), float(res["lon"]), "nominatim_structured"

    time.sleep(DELAY_SECONDS)
    q = ", ".join(parts + [postcode_text]) if postcode_text else ", ".join(parts)
    if q:
        res, _, _ = nominatim_search({"q": q, "countrycodes": COUNTRYCODE})
        if res: return float(res["lat"]), float(res["lon"]), "nominatim_freeform"

    time.sleep(DELAY_SECONDS)
    if postcode_text:
        feat, _, _ = mapbox_search(postcode_text)
        if feat and "center" in feat:
            lon, lat = feat["center"]
            return lat, lon, "mapbox_postcode"
    return None, None, "no_result"

def read_merged_csv(path):
    if not os.path.exists(path):
        raise FileNotFoundError(path)
    with open(path, newline="", encoding="utf-8", errors="replace") as fh:
        return list(csv.DictReader(fh))

def format_epc_dict(epc_str):
    """Convert EPC dict(s) into clean HTML tables showing only selected fields with non-empty values."""
    if not epc_str or "{" not in epc_str:
        return "<p><i>No EPC data</i></p>"

    # Desired fields, in display order
    allowed_fields = ["ADDRESS","UPRN","FLOOR_AREA","STANDARD_EMISSIONS","TARGET_EMISSIONS","TYPICAL_EMISSIONS","BUILDING_EMISSIONS","ASSET_RATING_BAND","ASSET_RATING","MAIN_HEATING_FUEL","OTHER_FUEL_DESC","SPECIAL_ENERGY_USES","RENEWABLE_SOURCES","PRIMARY_ENERGY_VALUE","AIRCON_PRESENT","AIRCON_KW_RATING","ESTIMATED_AIRCON_KW_RATING","BUILDING_ENVIRONMENT","PROPERTY_TYPE","TRANSACTION_TYPE","INSPECTION_DATE",]
    units = {
        "FLOOR_AREA": "m²",
        "STANDARD_EMISSIONS": "kg CO₂/m²/year",
        "TARGET_EMISSIONS": "kg CO₂/m²/year",
        "TYPICAL_EMISSIONS": "kg CO₂/m²/year",
        "BUILDING_EMISSIONS": "kg CO₂/m²",
        "ASSET_RATING": "kg CO₂/m²",
    }

    parts = []
    try:
        dicts = re.findall(r"\{[^\}]+\}", epc_str)
        for d in dicts:
            epc = json.loads(d.replace("'", '"'))
            rows = []
            for key in allowed_fields:
                if key in epc and epc[key] not in ("", None, " "):
                    label = key.replace("_", " ").title()
                    # Add unit if applicable
                    unit = units.get(key, "")
                    value = f"{epc[key]} {unit}".strip()
                    rows.append(
                        f"<tr>"
                        f"<td style='border:1px solid #ccc;padding:3px;width:45%;'><b>{escape(label)}</b></td>"
                        f"<td style='border:1px solid #ccc;padding:3px;'>{escape(value)}</td>"
                        f"</tr>"
                    )
            if rows:
                table_html = (
                    "<table style='width:100%;border-collapse:collapse;font-size:11px;margin-top:4px;'>"
                    + "".join(rows) +
                    "</table><br>"
                )
                parts.append(table_html)
    except Exception as e:
        return f"<pre>Error parsing EPC data: {escape(str(e))}</pre>"

    return "".join(parts) if parts else "<p><i>No EPC data</i></p>"

def build_popup(record):
    """Create nicely formatted popup with essential VOA info in a clean table."""
    voa_info = record.get("voa_raw", "")
    full_address = None

    # If VOA data, extract full_address early
    if voa_info:
        fields = voa_info.strip("*").split("*") # Parse the *-separated VOA fields
        if len(fields) > 7:
            full_address = fields[7]

    # Prefer full_address from VOA if present, otherwise fallback
    heading = (
        full_address
        or record.get("voa_address")
        or record.get("epc_address")
        or "Data Centre"
    )
    heading_html = f"<h3 style='margin:0;font-size:14px;'>{escape(heading)}</h3>"

    postcode = record.get("postcode", "")
    sources = record.get("sources", "")
    voa_info = record.get("voa_raw", "")
    epc_raw = record.get("epc_raw_repr", "")
    epc_lm = record.get("epc_lmkey", "")

    parts = [
        "<div style='max-height:300px;overflow:auto;font-family:sans-serif;font-size:12px;'>",
        heading_html,
    ]
    if postcode:
        parts.append(f"<p style='margin:0 0 4px 0;'><br><b>Postcode:</b> {escape(postcode)}</p>")
    if sources:
        parts.append(f"<p style='margin:0 0 4px 0;'><b>Sources:</b> {escape(sources)}</p>")

    if voa_info:
        # Map known VOA positions
        try:
            property_code = fields[4] if len(fields) > 4 else ""
            description = fields[5] if len(fields) > 5 else ""
            primary_desc_code = fields[6] if len(fields) > 6 else ""
            full_address = fields[7] if len(fields) > 7 else ""
            effective_date = fields[15] if len(fields) > 15 and fields[15] else (
                fields[16] if len(fields) > 16 else ""
            )
            # Rateable Value (can appear at field 18)
            rv = f'£{fields[17]}' if len(fields) > 17 and fields[17] else "Unknown"
            special_cat = fields[21] if len(fields) > 21 and fields[21] else (
                fields[22] if len(fields) > 22 else ""
            )
        except Exception:
            property_code = description = primary_desc_code = full_address = ""
            effective_date = special_cat = rv = "Unknown"

        # Build table
        table_rows = [
            ("Property Code", property_code),
            ("Description", description),
            ("Primary Description Code", primary_desc_code),
            ("Full Address", full_address),
            ("Effective Date", effective_date),
            ("Special Category Code", special_cat),
            ("Rateable Value", rv),
        ]

        table_html = [
            "<br><b>VOA Summary:</b>",
            "<table style='border-collapse:collapse;width:100%;font-size:11px;margin-top:4px;'>"
        ]
        for label, val in table_rows:
            table_html.append(
                f"<tr>"
                f"<td style='border:1px solid #ccc;padding:3px;width:45%;'><b>{escape(label)}</b></td>"
                f"<td style='border:1px solid #ccc;padding:3px;'>{escape(str(val))}</td>"
                f"</tr>"
            )
        table_html.append("</table>")
        parts.extend(table_html)

    # EPC info
    if "EPC" in str(sources).upper():
        parts.append("<hr><h4 style='margin-top:4px;'>EPC potential location matches</h4>")
        if epc_lm:
            parts.append(f"<b>EPC LMK_KEY:</b> {escape(str(epc_lm))}<br>")
        parts.append(format_epc_dict(epc_raw))

    parts.append("</div>")
    return "".join(parts)

# ---------------- MAIN ----------------
def main():
    print("Reading merged CSV:", INPUT_CSV)
    rows = read_merged_csv(INPUT_CSV)
    print("Loaded", len(rows), "rows")

    results, cache = [], {}
    for idx, row in enumerate(rows, start=1):
        #if idx >= 10: break   # FOR TESTING
        lat = lon = method = None
        for k in ("lat","latitude","Lat","LAT","LATITUDE"):
            try:
                lat = float(row[k]) if row.get(k) else None
            except: pass
        for k in ("lon","lng","longitude","Longitude","LON"):
            try:
                lon = float(row[k]) if row.get(k) else None
            except: pass

        addr = row.get("voa_address") or row.get("epc_address") or row.get("match_key") or ""
        pc = row.get("postcode") or extract_postcode_from_text(addr)
        if lat is None or lon is None:
            key = (addr.lower().strip(), normalize_postcode(pc))
            if key in cache:
                lat, lon, method = cache[key]
            else:
                print(f"[{idx}/{len(rows)}] Geocoding: {addr} / {pc}")
                lat, lon, method = geocode_row(addr, pc)
                cache[key] = (lat, lon, method)
                time.sleep(DELAY_SECONDS)
        if lat and lon:
            row.update({"lat": lat, "lon": lon, "geocode_method": method})
            results.append((lat, lon, row))
        else:
            print(f"ERROR: No coords for: {addr} / {pc}")

    if not results:
        print("No geocoded results found.")
        return

    # Write geocoded CSV
    fieldnames = sorted({k for r in rows if isinstance(r, dict) for k in r.keys()})
    for extra in ("lat","lon","geocode_method"):
        if extra not in fieldnames: fieldnames.append(extra)
    with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as out:
        w = csv.DictWriter(out, fieldnames=fieldnames)
        w.writeheader()
        for _,_,r in results: w.writerow(r)
    print("Wrote geocoded CSV:", OUTPUT_CSV)

    # Map center
    lats, lons = [r[0] for r in results], [r[1] for r in results]
    center_lat, center_lon = sum(lats)/len(lats), sum(lons)/len(lons)

    # HTML head with styles, legend, and search
    html_head = f"""<!doctype html>
<html>
<head>
<meta charset="utf-8"/>
<title>VOA+EPC Data Centres Map</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css" />
<style>
#map{{position:absolute;top:0;bottom:0;left:0;right:0}}
.legend {{position:absolute;bottom:20px;right:10px;background:white;padding:10px;border-radius:8px;box-shadow:0 0 5px #888;cursor:pointer;font-family:sans-serif;font-size:12px;z-index:2000;}}
.legend h4{{margin:0 0 4px 0;cursor:pointer;}}
.legend-content{{display:block;}}
.search-container{{position:absolute;top:10px;left:15%;transform:translateX(-50%);z-index:1000;}}
.search-container input{{padding:6px 10px;font-size:14px;width:240px;border:1px solid #aaa;border-radius:4px;}}
</style>
</head>
<body>
<div id="map"></div>
<div class="legend" id="legend">
  <h4>Legend</h4>
  <div class="legend-content">
    <div><span style="display:inline-block;width:12px;height:12px;background:{BLUE};margin-right:6px;"></span>VOA only</div>
    <div><span style="display:inline-block;width:12px;height:12px;background:{RED};margin-right:6px;"></span>EPC only</div>
    <div><span style="display:inline-block;width:12px;height:12px;background:{ORANGE};margin-right:6px;"></span>Both sources</div>
  </div>
</div>
<div class="search-container"><input id="searchBox" type="text" placeholder="Search postcode or name..."></div>
<script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"></script>
<script>
var map = L.map('map').setView([{center_lat}, {center_lon}], 7);
L.tileLayer('https://tile.openstreetmap.org/{{z}}/{{x}}/{{y}}.png', {{maxZoom:19, attribution: '&copy; OpenStreetMap contributors'}}).addTo(map);
""".replace("{center_lat}", str(center_lat)).replace("{center_lon}", str(center_lon))

    # markers
    marker_lines = []
    for lat, lon, rec in results:
        sources = rec.get("sources", "")
        if isinstance(sources, (list, tuple)): sources = ";".join(sources)
        srcu = sources.upper()
        color = ORANGE if "VOA" in srcu and "EPC" in srcu else (BLUE if "VOA" in srcu else RED)
        popup_html = build_popup(rec).replace("\n", " ").replace('"', '&quot;')
        marker_lines.append(f"L.circleMarker([{lat}, {lon}], {{radius:{RADIUS}, color:'{color}', fillColor:'{color}', fillOpacity:0.9}}).addTo(map).bindPopup(\"{popup_html}\");")

    # enhanced search + collapsible legend JS
    html_tail = """

"""

    with open(OUTPUT_HTML, "w", encoding="utf-8") as fh:
        fh.write(html_head)
        for line in marker_lines:
            fh.write(line + "\n")
        fh.write(html_tail)

    print("Wrote HTML map:", OUTPUT_HTML)
    print("Done.")

if __name__ == "__main__":
    main()
