Test


In [None]:
# 0) Setup (install if needed)
# If running locally (or Kaggle with Internet ON), uncomment as needed:
# !pip -q install osmnx geopandas shapely pyproj folium requests rtree

import warnings, math, re, os, json
warnings.filterwarnings("ignore")

In [None]:
# 1) Imports + settings

import pandas as pd
import geopandas as gpd
from shapely.geometry import LineString, MultiLineString
import osmnx as ox
import folium
from folium.plugins import MeasureControl
import requests

ox.settings.use_cache = True
ox.settings.log_console = False

print("Versions →", 
      "osmnx", ox.__version__, 
      "| geopandas", gpd.__version__)

In [None]:
# 2) Config

PLACE_NAME = "Washington, District of Columbia, USA"
CRS = "EPSG:4326"

# Crash join toggles (turn off if ArcGIS blocks)
CRASH_ENABLE = True
YEARS_BACK = 5
CRASH_BUFFER_M = 10

# Outputs
OUT_GEOJSON = "segments_lts_mvp.geojson"
OUT_HTML = "ridescoredc_map.html"

In [None]:
# 3) Area of interest

aoi_gdf = ox.geocode_to_gdf(PLACE_NAME).to_crs(CRS)
aoi = aoi_gdf.geometry.iloc[0]
aoi_gdf

In [None]:
# 4) Fetch OSM edges (robust) + explode multilines

# Highway filter includes cycleways/paths so protected facilities are retained
custom_filter = (
    '["highway"~"motorway|trunk|primary|secondary|tertiary|residential|living_street|'
    'unclassified|service|cycleway|path|pedestrian|track"]'
)

G = ox.graph_from_polygon(aoi, custom_filter=custom_filter, retain_all=False, simplify=True)
edges = ox.graph_to_gdfs(G, nodes=False, edges=True, fill_edge_geometry=True).to_crs(CRS)

# Some edges can be MultiLineString; explode to LineString to avoid style/coords issues in Folium
edges = edges.explode(index_parts=False, ignore_index=True)
edges = edges[edges.geometry.type == "LineString"].copy()

print("OSM edges (LineString only):", len(edges))
edges.head(2)

In [None]:
# 5) Helpers for tag parsing (speed/lanes/facility/parking)

def parse_maxspeed(val):
    if val is None or (isinstance(val, float) and math.isnan(val)): return None
    if isinstance(val, (list, tuple)): val = val[0]
    s = str(val).strip().lower()
    m = re.match(r"^(\d+)\s*(mph)?$", s)
    if m: return int(m.group(1))
    m = re.match(r"^(\d+)\s*km/?h$", s)
    if m: return int(round(int(m.group(1))*0.621371))
    m = re.search(r"(\d+)", s)
    return int(m.group(1)) if m else None

def parse_lanes(row):
    val = row.get("lanes")
    if isinstance(val, (list, tuple)):
        try: val = int(val[0])
        except: val = None
    lanes = int(val) if (val is not None and str(val).isdigit()) else None
    fwd = row.get("lanes:forward"); bwd = row.get("lanes:backward")
    try: fwd = int(fwd) if fwd is not None else None
    except: fwd = None
    try: bwd = int(bwd) if bwd is not None else None
    except: bwd = None
    if fwd is not None or bwd is not None:
        return (fwd or 0) + (bwd or 0) or lanes
    return lanes

def classify_facility(tags):
    c  = str(tags.get("cycleway","")).lower()
    cb = str(tags.get("cycleway:both","")).lower()
    cl = str(tags.get("cycleway:left","")).lower()
    cr = str(tags.get("cycleway:right","")).lower()
    hw = str(tags.get("highway","")).lower()
    if hw == "cycleway": return "separated_lane"
    if hw == "path" and str(tags.get("bicycle","")).lower() in {"designated","yes"}:
        return "protected_track"
    cand = "|".join([c,cb,cl,cr])
    if any(k in cand for k in ["track","separate","separated","buffered_protected"]): return "protected_track"
    if "buffered" in cand:  return "buffered_lane"
    if "lane" in cand:      return "painted_lane"
    if any(k in cand for k in ["shared_lane","sharrow","shared"]): return "shared"
    return "none"

def has_parking(tags):
    for k,v in tags.items():
        if k.startswith("parking:lane") and str(v).lower() not in {"no","none"}:
            return True
    return False

In [None]:
# 6) Normalize attributes

rows = []
for i, r in edges.reset_index(drop=True).iterrows():
    tags = r.to_dict()
    facility = classify_facility(tags)
    speed_mph = parse_maxspeed(tags.get("maxspeed"))
    lanes = parse_lanes(tags)
    highway = str(tags.get("highway","")).lower()
    rows.append({
        "segment_id": f"osm-{r.get('u','')}-{r.get('v','')}-{i}",
        "highway": highway,
        "num_lanes": lanes if lanes is not None else 1,
        "speed_limit": speed_mph if speed_mph is not None else 25,
        "bike_facility_type": facility,
        "parking_presence": has_parking(tags),
        "geometry": r.geometry
    })

gdf = gpd.GeoDataFrame(rows, geometry="geometry", crs=CRS)
print("Normalized segments:", len(gdf))
gdf.head(2)

In [None]:
# 7) LTS rules (compact + tunable)

def lts_level(facility, speed, lanes, highway=""):
    hw = (highway or "").lower()
    if facility in {"protected_track","separated_lane"} or hw in {"cycleway","path"}: return 1
    if facility in {"buffered_lane","painted_lane"}:
        if speed <= 25 and lanes <= 2: return 2
        if speed <= 30 and lanes <= 2: return 3
        return 4
    if facility in {"shared","none"}:
        if speed <= 20 and lanes <= 2 and hw in {"residential","living_street","service","unclassified"}: return 2
        if speed <= 30 and lanes <= 2: return 3
        return 4
    return 4

gdf["lts_level"] = gdf.apply(lambda r: lts_level(r.bike_facility_type, int(r.speed_limit), int(r.num_lanes), r.highway), axis=1)
gdf["lts_level"].value_counts().sort_index()

In [None]:
# 8) Optional crash fetch (safe fallback)

def try_fetch_dc_crashes(aoi_gdf, years_back=5):
    urls = [
        "https://opendata.arcgis.com/datasets/DCGIS::crashes-in-dc.geojson",
        "https://opendata.arcgis.com/api/v3/datasets/9c0b8b0673da4a6fa3b3a8bdafbbf7a2_0/downloads/data?format=geojson&spatialRefId=4326"
    ]
    for url in urls:
        try:
            r = requests.get(url, timeout=60)
            if r.ok and "json" in r.headers.get("content-type","").lower():
                gj = r.json()
                crashes = gpd.GeoDataFrame.from_features(gj["features"], crs=CRS)
                # clip to AOI
                crashes = gpd.overlay(crashes, aoi_gdf[["geometry"]], how="intersection")
                # filter by date if a date column exists
                for c in crashes.columns:
                    cl = c.lower()
                    if ("report" in cl and "date" in cl) or cl == "date":
                        crashes[c] = pd.to_datetime(crashes[c], errors="coerce", utc=True)
                        cutoff = pd.Timestamp.utcnow() - pd.Timedelta(days=365*years_back)
                        crashes = crashes[crashes[c] >= cutoff]
                        break
                # Keep only points
                crashes = crashes[crashes.geometry.notnull() & crashes.geometry.geom_type.isin(["Point","MultiPoint"])]
                return crashes
        except Exception:
            pass
    return gpd.GeoDataFrame(columns=["geometry"], geometry="geometry", crs=CRS)

if CRASH_ENABLE:
    crashes_gdf = try_fetch_dc_crashes(aoi_gdf, YEARS_BACK)
    print("Crash points:", len(crashes_gdf))
else:
    crashes_gdf = gpd.GeoDataFrame(columns=["geometry"], geometry="geometry", crs=CRS)

In [None]:
# 9) Crash counts → segments (buffered spatial join; handles no-spatial-index case)

def count_crashes_near_segments(segments_gdf, crashes_gdf, buffer_m=10):
    if crashes_gdf.empty or segments_gdf.empty:
        segments_gdf["crash_count_5yr"] = 0
        return segments_gdf

    proj = "EPSG:3857"  # meters
    seg_p = segments_gdf.to_crs(proj).copy()
    cr_p  = crashes_gdf.to_crs(proj).copy()

    seg_p["buf"] = seg_p.geometry.buffer(buffer_m)
    seg_p = seg_p.set_geometry("buf", crs=proj)

    try:
        joined = gpd.sjoin(cr_p[["geometry"]], seg_p[["buf"]], how="left", predicate="within")
    except Exception as e:
        # If spatial index missing, geopandas prints an rtree/pygeos message—fallback to brute force (slow but safe for MVP)
        from shapely.prepared import prep
        counts = []
        prepped = [prep(g) for g in seg_p["buf"]]
        for pt in cr_p.geometry:
            hits = [i for i, pg in enumerate(prepped) if pg.contains(pt)]
            counts.extend(hits)
        joined = pd.Series(counts).value_counts().rename_axis("index_right").rename("size").to_frame()

        seg_p["crash_count_5yr"] = 0
        seg_p.loc[joined.index, "crash_count_5yr"] = joined["size"].values
        return seg_p.drop(columns=["buf"]).set_geometry("geometry").to_crs(segments_gdf.crs)

    counts = joined.groupby(joined.index_right).size().rename("crash_count_5yr")
    seg_p = seg_p.drop(columns=["buf"]).set_geometry("geometry")
    seg_p["crash_count_5yr"] = counts.reindex(seg_p.index).fillna(0).astype(int)
    return seg_p.to_crs(segments_gdf.crs)

gdf = count_crashes_near_segments(gdf, crashes_gdf, buffer_m=CRASH_BUFFER_M)
gdf["serious_injury_count_5yr"] = 0
gdf["fatal_count_5yr"] = 0
gdf[["crash_count_5yr"]].describe()

In [None]:
# 10) RideScore (0–100)

def p95(values):
    s = sorted([int(v) for v in values if pd.notnull(v)])
    if not s: return 1
    k = int(round(0.95 * (len(s) - 1)))
    return max(s[k], 1)

def lts_to_score(x): return {1:100, 2:75, 3:40, 4:10}.get(int(x), 10)

facility_bonus = {"protected_track":10, "separated_lane":10, "buffered_lane":5, "painted_lane":3, "shared":0, "none":0}

P95_CRASH = p95(gdf["crash_count_5yr"])
def crash_inv(n): return 100.0 * (1.0 - min(max(float(n)/float(P95_CRASH), 0.0), 1.0))

W_LTS, W_CRASH, W_FAC = 0.6, 0.3, 0.1

gdf["s_LTS"] = gdf["lts_level"].map(lts_to_score)
gdf["s_crash"] = gdf["crash_count_5yr"].map(crash_inv)
gdf["s_facility"] = gdf["bike_facility_type"].map(facility_bonus).fillna(0)
gdf["ridescore_v1"] = (W_LTS*gdf.s_LTS + W_CRASH*gdf.s_crash + W_FAC*gdf.s_facility).round(1)

gdf[["s_LTS","s_crash","s_facility","ridescore_v1"]].describe().round(1)

In [None]:
# 10A) Build a lean render GeoDataFrame

# === knobs you can tweak ===
SIMPLIFY_TOL_M   = 4      # simplify by ~4 meters
ROUND_COORDS     = 5      # round lon/lat (~1 m)
MIN_LENGTH_M     = 8      # drop tiny segments
KEEP_FIELDS      = ["segment_id", "lts_level", "ridescore_v1", "geometry"]
FILTER_SCORE_MIN = None   # e.g., 20
FILTER_SCORE_MAX = None   # e.g., 80

# Start from the scored 'gdf' produced in cell 10
render = gdf[KEEP_FIELDS].copy()

# Optional score filters
if FILTER_SCORE_MIN is not None:
    render = render[render["ridescore_v1"] >= float(FILTER_SCORE_MIN)]
if FILTER_SCORE_MAX is not None:
    render = render[render["ridescore_v1"] <= float(FILTER_SCORE_MAX)]

# Drop duplicate geometries
render["_wkb"] = render.geometry.apply(lambda geom: geom.wkb)
render = render.drop_duplicates("_wkb").drop(columns="_wkb")

# Drop very short segments
rp = render.to_crs(3857)
render = render[rp.length >= MIN_LENGTH_M].copy()

# Simplify then return to WGS84
rp = render.to_crs(3857)
rp["geometry"] = rp.geometry.simplify(SIMPLIFY_TOL_M, preserve_topology=True)
render = rp.to_crs(4326)

# Round coordinates
from shapely.geometry import LineString
def round_linestring(ls, n=ROUND_COORDS):
    coords = [(round(x, n), round(y, n)) for x, y in ls.coords]
    return LineString(coords)

render["geometry"] = render.geometry.apply(lambda g: round_linestring(g, ROUND_COORDS))
print("Render features:", len(render))
render.head(2)

In [None]:
# 10B) Build a tiny, ride-score-only GeoJSON (in-memory)

def color_rank(v):
    stops = [(0,"#f7fbff"), (25,"#c6dbef"), (50,"#6baed6"), (75,"#2171b5"), (100,"#08306b")]
    c = "#999"
    for th, col in stops:
        if v >= th: c = col
    return c

features = []
for _, r in render.iterrows():
    coords = list(r.geometry.coords)
    features.append({
        "type":"Feature",
        "properties":{
            "segment_id": r.segment_id,
            "lts_level": int(r.lts_level),
            "ridescore_v1": float(r.ridescore_v1),
        },
        "geometry":{"type":"LineString","coordinates":[[float(x), float(y)] for x,y in coords]}
    })
geo_rank_only = {"type":"FeatureCollection","features": features}

In [None]:
# 10C) Map — only the RideScore layer

import folium
from folium.plugins import MeasureControl

m = folium.Map(location=[38.9072, -77.0369], zoom_start=12, tiles="CartoDB positron")

fg_rank = folium.FeatureGroup(name="Ranking (RideScore 0–100)", show=True)
folium.GeoJson(
    geo_rank_only,
    smooth_factor=1.0,  # client-side simplification for faster draw
    style_function=lambda f: {
        "color": color_rank(f["properties"]["ridescore_v1"]),
        "weight": 2.0,
        "opacity": 1.0
    },
    tooltip=folium.GeoJsonTooltip(
        fields=["segment_id","ridescore_v1","lts_level"],
        aliases=["id","score","lts"],
        sticky=False
    )
).add_to(fg_rank)
fg_rank.add_to(m)

folium.LayerControl(collapsed=True).add_to(m)
m.add_child(MeasureControl(primary_length_unit='meters'))

m  # display inline
display(m)

In [None]:
# 10D) Save lightweight HTML

OUT_HTML = "ridescoredc_map_ranking_only.html"
m.save(OUT_HTML)
OUT_HTML

In [None]:
# # 11) Export GeoJSON

# keep = ["segment_id","highway","num_lanes","speed_limit","bike_facility_type","parking_presence",
#         "lts_level","crash_count_5yr","serious_injury_count_5yr","fatal_count_5yr","ridescore_v1","geometry"]
# gdf[keep].to_file(OUT_GEOJSON, driver="GeoJSON")
# OUT_GEOJSON

In [None]:
# # 12) Folium map (interactive)

# def color_rank(v):
#     # 5-step blue ramp
#     stops = [(0,"#f7fbff"), (25,"#c6dbef"), (50,"#6baed6"), (75,"#2171b5"), (100,"#08306b")]
#     c = "#999"
#     for th, col in stops:
#         if v >= th: c = col
#     return c

# def color_crash(v):
#     # 4 steps green by raw counts (MVP)
#     stops = [(0,"#e5f5e0"), (1,"#a1d99b"), (3,"#31a354"), (7,"#006d2c")]
#     c = "#999"
#     for th, col in stops:
#         if v >= th: c = col
#     return c

# lane_colors = {"1":"#08519c","2":"#3182bd","3":"#6baed6","4+":"#bdd7e7"}

# # Prepare GeoJSON-like dict (safer for Folium)
# features = []
# for _, r in gdf.iterrows():
#     coords = list(r.geometry.coords)
#     features.append({
#         "type":"Feature",
#         "properties":{
#             "segment_id": r.segment_id,
#             "num_lanes": int(r.num_lanes),
#             "speed_limit": int(r.speed_limit),
#             "bike_facility_type": r.bike_facility_type,
#             "lts_level": int(r.lts_level),
#             "crash_count_5yr": int(r.crash_count_5yr),
#             "ridescore_v1": float(r.ridescore_v1),
#         },
#         "geometry":{"type":"LineString","coordinates":[[float(x), float(y)] for x,y in coords]}
#     })
# geo = {"type":"FeatureCollection","features":features}

# m = folium.Map(location=[38.9072,-77.0369], zoom_start=12, tiles="OpenStreetMap")

# # Ranking
# fg1 = folium.FeatureGroup(name="Ranking (RideScore 0–100)", show=True)
# folium.GeoJson(
#     geo,
#     style_function=lambda f: {"color": color_rank(f["properties"]["ridescore_v1"]), "weight": 2.5},
#     tooltip=folium.GeoJsonTooltip(fields=["segment_id","ridescore_v1","lts_level","bike_facility_type"])
# ).add_to(fg1); fg1.add_to(m)

# # Crashes
# fg2 = folium.FeatureGroup(name="Crash history (5y)")
# folium.GeoJson(
#     geo,
#     style_function=lambda f: {"color": color_crash(f["properties"]["crash_count_5yr"]), "weight": 3},
#     tooltip=folium.GeoJsonTooltip(fields=["segment_id","crash_count_5yr"])
# ).add_to(fg2); fg2.add_to(m)

# # Car lanes
# def lane_style(f):
#     n = f["properties"]["num_lanes"]; key = "4+" if n >= 4 else str(n)
#     return {"color": lane_colors.get(key, "#999"), "weight": 3}
# fg3 = folium.FeatureGroup(name="Number of car lanes")
# folium.GeoJson(
#     geo,
#     style_function=lane_style,
#     tooltip=folium.GeoJsonTooltip(fields=["segment_id","num_lanes","speed_limit"])
# ).add_to(fg3); fg3.add_to(m)

# folium.LayerControl(collapsed=False).add_to(m)
# m.add_child(MeasureControl(primary_length_unit='meters', primary_area_unit='sqmeters'))

# m

In [None]:
# 13) Save as HTML (shareable)

# m.save(OUT_HTML)
# OUT_HTML

In [None]:
# alternative display in Jupyter

# display(m)