
 Postal‑Code Route Mapper — Stage 1 (Postal Codes Only)


- Works as a normal script (CLI with argparse) OR directly from Jupyter/Lab without CLI flags.
- When run inside Jupyter with no flags, it DOES NOT error. It just defines helper functions you can call:

  route_from_postals(postal_list, depot, out_html="route_postal.html", out_csv="route_postal_order.csv")
  run_from_csv(csv_path, depot, out_html="route_postal.html", out_csv="route_postal_order.csv")

What this does (no API keys required):
  • Takes Canadian postal codes (Winnipeg or anywhere in CA)
  • Geocodes each postal code to a centroid (lat/lon) using pgeocode
  • Builds a quick route using a Nearest‑Neighbor heuristic (distance saving)
  • Estimates travel time from straight‑line distance and a city speed
  • Exports an interactive Leaflet map (HTML) + an ordered CSV

Notes:
  • This is a *postal‑code level* planner. Stage 2 will upgrade to address‑level road routing.
  • Leaflet tiles from OpenStreetMap (no keys).

In [28]:

from __future__ import annotations
import argparse
import csv
import math
import os
import sys
from dataclasses import dataclass
from typing import List, Tuple

# ---------- Config ----------
ASSUMED_SPEED_KMH = 35  # conservative inner‑city speed; tune per season/traffic
OUTPUT_CSV_DEFAULT = "route_postal_order.csv"
OUTPUT_HTML_DEFAULT = "route_postal_map.html"

# ---------- Optional import (pgeocode) ----------
try:
    import pgeocode  # Canadian postal centroids
except Exception:
    pgeocode = None

# ---------- Data classes ----------
@dataclass
class Point:
    label: str  # e.g., "R3C 4T3" or depot name
    lat: float
    lon: float

# ---------- Helpers ----------
def clean_postal(pc: str) -> str:
    return pc.strip().upper().replace(" ", "")  # canonical form for lookup


def haversine_km(a: Tuple[float, float], b: Tuple[float, float]) -> float:
    R = 6371.0
    lat1, lon1 = math.radians(a[0]), math.radians(a[1])
    lat2, lon2 = math.radians(b[0]), math.radians(b[1])
    dlat, dlon = lat2 - lat1, lon2 - lon1
    h = math.sin(dlat/2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2) ** 2
    return 2 * R * math.asin(math.sqrt(h))


def minutes_from_km(km: float, speed_kmh: float = ASSUMED_SPEED_KMH) -> int:
    return int(round((km / speed_kmh) * 60))


# ---------- Geocoding (postal code -> centroid) ----------

def geocode_postal_codes(postals: List[str]) -> List[Point]:
    """Robust geocoder for Canadian postal codes.
    Strategy:
      1) Try full postal code (e.g., R2R1V5)
      2) If not found / NaN, fallback to FSA centroid (first 3 chars, e.g., R2R)
      3) Skip if both fail
    Returns a list of Point; prints a warning for each FSA fallback or failure.
    """
    if pgeocode is None:
        raise RuntimeError("pgeocode not installed. Install with: pip install pgeocode")

    def _extract_latlon(series) -> Tuple[float, float]:
        try:
            lat, lon = float(series.latitude), float(series.longitude)
            if math.isnan(lat) or math.isnan(lon):
                raise ValueError
            return lat, lon
        except Exception:
            raise ValueError

    nomi = pgeocode.Nominatim("CA")
    points: List[Point] = []

    for raw in postals:
        if not raw or not raw.strip():
            continue
        pc_full = clean_postal(raw)
        # 1) Try full code
        try:
            lat, lon = _extract_latlon(nomi.query_postal_code(pc_full))
            points.append(Point(label=raw.strip(), lat=lat, lon=lon))
            continue
        except ValueError:
            pass
        # 2) Fallback to FSA (first 3 chars)
        fsa = pc_full[:3]
        try:
            lat, lon = _extract_latlon(nomi.query_postal_code(fsa))
            print(f"[warn] '{raw}' not found at full precision; using FSA centroid '{fsa}'.")
            points.append(Point(label=raw.strip(), lat=lat, lon=lon))
            continue
        except ValueError:
            print(f"[warn] Unable to geocode '{raw}' (full or FSA). Skipping.")
            continue

    return points

# ---------- Route builder (Nearest‑Neighbor + optional 2‑opt polish) ----------

def _path_length_km(route: List[Point]) -> float:
    total = 0.0
    for i in range(1, len(route)):
        a, b = route[i - 1], route[i]
        total += haversine_km((a.lat, a.lon), (b.lat, b.lon))
    return total


def nearest_neighbor_route(depot: Point, stops: List[Point]) -> List[Point]:
    remaining = stops[:]
    route = [depot]
    cur = depot
    while remaining:
        nxt = min(remaining, key=lambda p: haversine_km((cur.lat, cur.lon), (p.lat, p.lon)))
        route.append(nxt)
        remaining.remove(nxt)
        cur = nxt
    route.append(depot)  # return to depot
    # Light local improvement (2‑opt)
    route = two_opt(route)
    return route


def two_opt(route: List[Point]) -> List[Point]:
    """2‑opt local search keeping first/last node fixed (depot round‑trip)."""
    best = route[:]
    improved = True
    while improved:
        improved = False
        for i in range(1, len(best) - 2):
            for k in range(i + 1, len(best) - 1):
                new = best[:i] + list(reversed(best[i:k + 1])) + best[k + 1:]
                if _path_length_km(new) + 1e-9 < _path_length_km(best):
                    best = new
                    improved = True
                    break
            if improved:
                break
    return best

# ---------- Leaflet HTML export ---------- (no external Python deps) ----------
LEAFLET_HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
  <meta charset=\"utf-8\" />
  <meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\" />
  <title>Postal‑Code Route Map</title>
  <link rel=\"stylesheet\" href=\"https://unpkg.com/leaflet@1.9.4/dist/leaflet.css\" integrity=\"sha256-p4NxAoJBhIIN+hmNHrzRCf9tD/miZyoHS5obTRR9BMY=\" crossorigin=\"\" />
  <style>
    html, body, #map { height: 100%; margin: 0; }
    .label { font: 12px/1.2 sans-serif; background: white; padding: 2px 4px; border-radius: 3px; border: 1px solid #ccc; }
  </style>
</head>
<body>
  <div id=\"map\"></div>
  <script src=\"https://unpkg.com/leaflet@1.9.4/dist/leaflet.js\" integrity=\"sha256-20nQCchB9co0qIjJZRGuk2/Z9VM+kNiyxNV1lvTlZBo=\" crossorigin=\"\"></script>
  <script>
    const points = __POINTS_JSON__;
    const map = L.map('map');
    L.tileLayer('https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', { maxZoom: 19, attribution: '&copy; OpenStreetMap' }).addTo(map);

    const latlngs = points.map(p => [p.lat, p.lon]);
    const bounds = L.latLngBounds(latlngs);
    map.fitBounds(bounds.pad(0.2));

    // Markers with numbers
    points.forEach((p, idx) => {
      const m = L.marker([p.lat, p.lon]).addTo(map);
      const label = idx === 0 ? 'Depot (Start)' : (idx === points.length - 1 ? 'Depot (End)' : `#${idx} — ${p.label}`);
      m.bindPopup(`<b>${label}</b><br/>${p.lat.toFixed(5)}, ${p.lon.toFixed(5)}`);
    });

    // Draw route polyline
    const route = L.polyline(latlngs, { weight: 4 }).addTo(map);
  </script>
</body>
</html>
"""


def export_leaflet_html(route: List[Point], out_html: str) -> None:
    import json
    pts_json = [dict(label=p.label, lat=p.lat, lon=p.lon) for p in route]
    html = LEAFLET_HTML_TEMPLATE.replace("__POINTS_JSON__", json.dumps(pts_json))
    with open(out_html, "w", encoding="utf-8") as f:
        f.write(html)


# ---------- CSV export ----------

def export_order_csv(route: List[Point], out_csv: str) -> None:
    with open(out_csv, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(["sequence", "label", "lat", "lon", "km_from_prev", "est_travel_min"])
        total_km = 0.0
        for i, p in enumerate(route):
            if i == 0:
                km = 0.0
            else:
                prev = route[i - 1]
                km = haversine_km((prev.lat, prev.lon), (p.lat, p.lon))
                total_km += km
            w.writerow([i, p.label, f"{p.lat:.6f}", f"{p.lon:.6f}", f"{km:.3f}", minutes_from_km(km)])


# ---------- IO ----------

def read_postals_from_csv(path: str) -> List[str]:
    postals: List[str] = []
    with open(path, newline="", encoding="utf-8") as f:
        for row in csv.DictReader(f):
            pc = row.get("postal_code") or row.get("postal") or row.get("code")
            if pc:
                postals.append(pc.strip())
    return postals


# ---------- Notebook/CLI utilities ----------

def route_from_postals(postal_list: List[str], depot: str, out_html: str = OUTPUT_HTML_DEFAULT, out_csv: str = OUTPUT_CSV_DEFAULT):
    """Notebook‑friendly: build & export a route from an in‑memory list of postals.
    Now with robust depot fallback: if depot geocoding fails, use the centroid of the geocoded stops.
    """
    pts_stops = geocode_postal_codes(postal_list)
    if not pts_stops:
        raise ValueError("No valid stops after geocoding. Check postal codes.")

    depot_pt_list = geocode_postal_codes([depot])
    if not depot_pt_list:
        # Fallback: use centroid of stops as depot to keep the workflow going
        lat = sum(p.lat for p in pts_stops) / len(pts_stops)
        lon = sum(p.lon for p in pts_stops) / len(pts_stops)
        print(f"[warn] Failed to geocode depot '{depot}'. Using stops centroid as depot.")
        depot_pt = Point(label=f"{depot} (centroid)", lat=lat, lon=lon)
    else:
        depot_pt = depot_pt_list[0]

    route = nearest_neighbor_route(depot_pt, pts_stops)
    export_leaflet_html(route, out_html)
    export_order_csv(route, out_csv)
    return route


def run_from_csv(csv_path: str, depot: str, out_html: str = OUTPUT_HTML_DEFAULT, out_csv: str = OUTPUT_CSV_DEFAULT):
    stops_pc = read_postals_from_csv(csv_path)
    return route_from_postals(stops_pc, depot, out_html=out_html, out_csv=out_csv)


def _running_in_ipynb() -> bool:
    try:
        from IPython import get_ipython
        return get_ipython() is not None
    except Exception:
        return False


# ---------- CLI entrypoint ----------

def main():
    ap = argparse.ArgumentParser(description="Postal‑code route mapper (Leaflet map output)")
    ap.add_argument("--csv", help="CSV file with header 'postal_code' (non‑depot stops)")
    ap.add_argument("--depot", required=True, help="Depot/postal code to start/end the route (e.g., 'R3C 4T3')")
    ap.add_argument("--out", default=OUTPUT_HTML_DEFAULT, help="Output HTML map filename")
    ap.add_argument("--ordercsv", default=OUTPUT_CSV_DEFAULT, help="Output CSV for ordered route")
    ap.add_argument("--use-sample", action="store_true", help="Use a small Winnipeg sample set (ignores --csv)")
    args = ap.parse_args()

    # Build input postal list
    if args.use_sample:
        postals = [
            "R3C 4T3",  # (Depot) – near downtown
            "R3T 2N2",  # Fort Garry / U of M area
            "R2C 3A2",  # Transcona
            "R2X 1Z2",  # Garden City
            "R3E 0W2",  # HSC area
            "R3J 0S4",  # St James / Polo Park
            "R2N 1L7",  # St Vital
            "R3K 0Y2",  # Charleswood
        ]
        depot_pc = args.depot
        stops_pc = [pc for pc in postals if clean_postal(pc) != clean_postal(depot_pc)]
    else:
        if not args.csv or not os.path.exists(args.csv):
            raise SystemExit("Provide --csv path or use --use-sample")
        stops_pc = read_postals_from_csv(args.csv)
        depot_pc = args.depot

    # Geocode
    pts_stops = geocode_postal_codes(stops_pc)
    depot_pt_list = geocode_postal_codes([depot_pc])
    if not depot_pt_list:
        raise SystemExit(f"Failed to geocode depot postal code: {args.depot}")
    depot_pt = depot_pt_list[0]

    if not pts_stops:
        raise SystemExit("No valid stops after geocoding. Check postal codes.")

    # Build route (nearest‑neighbor)
    route = nearest_neighbor_route(depot_pt, pts_stops)

    # Output artifacts
    export_leaflet_html(route, args.out)
    export_order_csv(route, args.ordercsv)

    # Console summary
    total_km = 0.0
    print("Optimized order (postal codes):")
    for i, p in enumerate(route):
        if i == 0:
            seg_km = 0.0
        else:
            prev = route[i - 1]
            seg_km = haversine_km((prev.lat, prev.lon), (p.lat, p.lon))
            total_km += seg_km
        print(f"{i:2d}. {p.label:>8}  (+{seg_km:5.1f} km)")
    print(f"Estimated path length ≈ {total_km:.1f} km @ {ASSUMED_SPEED_KMH} km/h → ~{minutes_from_km(total_km)} min")
    print(f"HTML map: {args.out}")
    print(f"Ordered CSV: {args.ordercsv}")

if __name__ == "__main__":
    # If running inside Jupyter/Lab WITHOUT CLI flags, don't parse argparse.
    if _running_in_ipynb() and not any(a.startswith("--") for a in sys.argv[1:]):
        print(
            "Notebook detected. Functions are loaded. Call one of:\n"
            "  route_from_postals([...], depot=\"R3C 4T3\")\n"
            "  run_from_csv(\"postal_codes.csv\", depot=\"R3C 4T3\")\n"
            f"Outputs default to: {OUTPUT_HTML_DEFAULT}, {OUTPUT_CSV_DEFAULT}"
        )
    else:
        main()


Notebook detected. Functions are loaded. Call one of:
  route_from_postals([...], depot="R3C 4T3")
  run_from_csv("postal_codes.csv", depot="R3C 4T3")
Outputs default to: route_postal_map.html, route_postal_order.csv


In [30]:
postal_list = ["R3T 2N2","R2X 1Z2","R3K 0Y2","R3T 3K5","R2M 0R4","R3B 2E9","R2P 2J1","R3H 1C2","R2C 3A2","R2V 2B9"," R3G 0W4"]
route = route_from_postals(
    postal_list,
    depot="R2R",                      # or your full depot code
    out_html="route_postal_map.html",
    out_csv="route_postal_order.csv"
)


[warn] 'R3T 2N2' not found at full precision; using FSA centroid 'R3T'.
[warn] 'R2X 1Z2' not found at full precision; using FSA centroid 'R2X'.
[warn] 'R3K 0Y2' not found at full precision; using FSA centroid 'R3K'.
[warn] 'R3T 3K5' not found at full precision; using FSA centroid 'R3T'.
[warn] 'R2M 0R4' not found at full precision; using FSA centroid 'R2M'.
[warn] 'R3B 2E9' not found at full precision; using FSA centroid 'R3B'.
[warn] 'R2P 2J1' not found at full precision; using FSA centroid 'R2P'.
[warn] 'R3H 1C2' not found at full precision; using FSA centroid 'R3H'.
[warn] 'R2C 3A2' not found at full precision; using FSA centroid 'R2C'.
[warn] 'R2V 2B9' not found at full precision; using FSA centroid 'R2V'.
[warn] ' R3G 0W4' not found at full precision; using FSA centroid 'R3G'.


In [32]:
from IPython.display import IFrame
IFrame("route_postal_map.html", width="100%", height=600)

In [26]:
import inspect
print(inspect.getsource(route_from_postals))

def route_from_postals(postal_list: List[str], depot: str, out_html: str = OUTPUT_HTML_DEFAULT, out_csv: str = OUTPUT_CSV_DEFAULT):
    """Notebook‑friendly: build & export a route from an in‑memory list of postals.
    Now with robust depot fallback: if depot geocoding fails, use the centroid of the geocoded stops.
    """
    pts_stops = geocode_postal_codes(postal_list)
    if not pts_stops:
        raise ValueError("No valid stops after geocoding. Check postal codes.")

    depot_pt_list = geocode_postal_codes([depot])
    if not depot_pt_list:
        # Fallback: use centroid of stops as depot to keep the workflow going
        lat = sum(p.lat for p in pts_stops) / len(pts_stops)
        lon = sum(p.lon for p in pts_stops) / len(pts_stops)
        print(f"[warn] Failed to geocode depot '{depot}'. Using stops centroid as depot.")
        depot_pt = Point(label=f"{depot} (centroid)", lat=lat, lon=lon)
    else:
        depot_pt = depot_pt_list[0]

    route = nearest_neighbor_route(