Mount Drive

In [None]:
# =========================
# Cell 1 — Mount Drive
# =========================
from google.colab import drive
drive.mount('/content/drive')


Imports

In [None]:

# =========================
# Cell 2 — Imports
# =========================
import os, math, re, warnings
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import LineString, MultiLineString, Polygon, MultiPolygon, Point
from pandas.api.types import is_datetime64_any_dtype, is_datetime64tz_dtype

warnings.filterwarnings("ignore", category=UserWarning, module="shapely")


Set the Parameters and Paths. Do parameters for what it takes to be a crosswalk, clustering radius sizes, and # of segments it takes to be considered an intersection

In [None]:
# =================================
# Cell 3 — Paths & Parameters
# =================================

# Paths
CRASHES_PATH        = "/content/drive/My Drive/Crashes_in_DC.csv"
SPEED_GEOJSON_PATH  = "/content/drive/My Drive/Roadway_SubBlock.geojson"  # DDOT lines
PEDESTRIAN_PATH     = "/content/drive/My Drive/Sidewalks.geojson"         # sidewalks + crosswalks (lines/polys)
OUT_DIR             = "/content/drive/My Drive/outputs_intersections"
os.makedirs(OUT_DIR, exist_ok=True)

# Filters
DATE_START, DATE_END = "2020-01-01", "2025-04-30"
MAR_MIN = 100  # set None to disable

# CRS
CRS_LL = 4326
CRS_M  = 3857

# Road model
DEFAULT_ROAD_HALF_WIDTH_M = 15.0  # for road buffers in crosswalk test

# Crosswalk geometry rules
MIN_CROSSWALK_LEN_M = 2.5
MAX_CROSSWALK_LEN_M = 60.0
ANGLE_MIN_DEG, ANGLE_MAX_DEG = 40, 140   # ~perpendicular to road


Helpers:
_bearing_deg: gets a direction (0–180°) for any line, ignoring which way it was drawn.

_angle_diff: returns the smallest angle between two bearings (0–90°).

_midpoint / _endpoints: quickly grab the middle point and the two ends of a line.

_lines_from_any: converts polygons/multilines to simple LineStrings (2D), preserving attributes — ensures everything is line geometry for later checks.

In [None]:
# =========================
# Cell 4 — Helpers
# =========================

def _drop_zm_line(ls: LineString):
    """Return a 2D LineString (drop Z/M). None if degenerate."""
    if ls is None or not hasattr(ls, "coords"):
        return None
    coords2d = [(c[0], c[1]) for c in ls.coords if len(c) >= 2]
    if len(coords2d) < 2:
        return None
    if all(coords2d[0] == c for c in coords2d[1:]):
        return None
    return LineString(coords2d)

def lines_from_any_preserve_attrs(gdf):
    """
    Convert polygons/multilines to LineStrings while preserving non-geometry columns.
    Explodes Multi* into single LineStrings; forces 2D.
    """
    if gdf is None or gdf.empty:
        return gpd.GeoDataFrame(geometry=gpd.GeoSeries([], crs=gdf.crs))
    rows = []
    cols = [c for c in gdf.columns if c != "geometry"]
    for _, row in gdf.iterrows():
        geom = row.geometry
        base = row[cols].to_dict()
        if geom is None:
            continue
        if isinstance(geom, (Polygon, MultiPolygon)):
            b = geom.boundary
            geoms = list(b.geoms) if isinstance(b, MultiLineString) else [b]
        elif isinstance(geom, MultiLineString):
            geoms = list(geom.geoms)
        elif isinstance(geom, LineString):
            geoms = [geom]
        else:
            continue
        for ls in geoms:
            ls2 = _drop_zm_line(ls)
            if ls2 is not None and not ls2.is_empty and ls2.length > 0:
                rec = dict(base)
                rec["geometry"] = ls2
                rows.append(rec)
    if not rows:
        return gpd.GeoDataFrame(geometry=gpd.GeoSeries([], crs=gdf.crs))
    out = gpd.GeoDataFrame(rows, crs=gdf.crs).reset_index(drop=True)
    return out

def _bearing_degrees(ls: LineString) -> float:
    """Return 0..180 bearing using endpoints (directionless, 2D)."""
    if ls is None or not hasattr(ls, "coords") or len(ls.coords) < 2:
        return np.nan
    x0, y0 = ls.coords[0][0], ls.coords[0][1]
    x1, y1 = ls.coords[-1][0], ls.coords[-1][1]
    if (x1 == x0) and (y1 == y0):
        return np.nan
    ang = math.degrees(math.atan2((x1 - x0), (y1 - y0)))
    return abs(ang) % 180.0

def _angle_diff(a, b):
    """Smallest angle between two bearings (0..90)."""
    if pd.isna(a) or pd.isna(b):
        return np.nan
    d = abs(a - b) % 180.0
    return d if d <= 90 else 180 - d

def _segment_midpoint(ls: LineString) -> Point:
    try:
        return ls.interpolate(0.5, normalized=True)
    except Exception:
        x0, y0 = ls.coords[0][0], ls.coords[0][1]
        x1, y1 = ls.coords[-1][0], ls.coords[-1][1]
        return Point((x0+x1)/2.0, (y0+y1)/2.0)

def _endpoints(ls: LineString):
    x0, y0 = ls.coords[0][0], ls.coords[0][1]
    x1, y1 = ls.coords[-1][0], ls.coords[-1][1]
    return Point(x0, y0), Point(x1, y1)

def _dedupe_left(df):
    """Keep first left row after sjoin_nearest ties/dups."""
    return df[~df.index.duplicated(keep="first")].copy()


Load & Prep the Pedestrian Crashes and apply filtering. Also, only keep accidents that involve pedestrians

In [None]:
# =====================================
# Cell 5 — Load & Prep Crashes (ped-only)
# =====================================

df = pd.read_csv(CRASHES_PATH, dtype={"STREETSEGID": str}, low_memory=False)

# Clean lat/lon
df = df.dropna(subset=["LATITUDE","LONGITUDE"]).copy()
df["LATITUDE"]  = pd.to_numeric(df["LATITUDE"], errors="coerce")
df["LONGITUDE"] = pd.to_numeric(df["LONGITUDE"], errors="coerce")
df = df.dropna(subset=["LATITUDE","LONGITUDE"]).copy()

# Date filter (UTC-aware safe)
df["FROMDATE"] = pd.to_datetime(df["FROMDATE"], errors="coerce", utc=True)
df = df[(df["FROMDATE"] >= DATE_START) & (df["FROMDATE"] <= DATE_END)].copy()

# MAR filter
if "MAR_SCORE" in df.columns and MAR_MIN is not None:
    df["MAR_SCORE"] = pd.to_numeric(df["MAR_SCORE"], errors="coerce")
    df = df[df["MAR_SCORE"] >= MAR_MIN].copy()

# Injury fields fill (for later, optional)
injury_cols = [
    "MAJORINJURIES_BICYCLIST","MINORINJURIES_BICYCLIST","UNKNOWNINJURIES_BICYCLIST","FATAL_BICYCLIST",
    "MAJORINJURIES_DRIVER","MINORINJURIES_DRIVER","UNKNOWNINJURIES_DRIVER","FATAL_DRIVER",
    "MAJORINJURIES_PEDESTRIAN","MINORINJURIES_PEDESTRIAN","UNKNOWNINJURIES_PEDESTRIAN","FATAL_PEDESTRIAN",
    "FATALPASSENGER","MAJORINJURIESPASSENGER","MINORINJURIESPASSENGER",
    "MAJORINJURIESOTHER","MINORINJURIESOTHER","FATALOTHER"
]
for c in injury_cols:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0)
    else:
        df[c] = 0

# Pedestrian-only subset
ped_cols = ["MAJORINJURIES_PEDESTRIAN","MINORINJURIES_PEDESTRIAN","UNKNOWNINJURIES_PEDESTRIAN","FATAL_PEDESTRIAN"]
df_ped = df[df[ped_cols].sum(axis=1) > 0].copy()

# GeoDataFrame to meters
gdf_ped_4326 = gpd.GeoDataFrame(
    df_ped, geometry=gpd.points_from_xy(df_ped["LONGITUDE"], df_ped["LATITUDE"]), crs=CRS_LL
)
gdf_ped_m = gdf_ped_4326.to_crs(CRS_M)

print(f"Ped crashes loaded: {len(gdf_ped_m)}")


Load DDOT lines

In [None]:
# ==========================================================
# Cell 6 — Load DDOT lines (Roadway_SubBlock) → to 3857 lines
# ==========================================================

roads = gpd.read_file(SPEED_GEOJSON_PATH)
if roads.crs is None:
    roads = roads.set_crs(CRS_LL)
roads_m = roads.to_crs(CRS_M)

roads_lines = lines_from_any_preserve_attrs(roads_m)
roads_lines["road_bearing"] = roads_lines.geometry.apply(_bearing_degrees)

print(f"DDOT road lines: {len(roads_lines)}")


Load in the crosswalks+sidewalks

In [None]:
# ===============================================
# Cell 7 — Load crosswalk/sidewalk layer → to 3857
# ===============================================

ped_raw = gpd.read_file(PEDESTRIAN_PATH)
if ped_raw.crs is None:
    ped_raw = ped_raw.set_crs(CRS_LL)
ped_m_raw = ped_raw.to_crs(CRS_M)

# force to single LineStrings (handles polys/multilines)
ped_lines = lines_from_any_preserve_attrs(ped_m_raw)
ped_lines["length_m"] = ped_lines.length
ped_lines["bearing"]  = ped_lines.geometry.apply(_bearing_degrees)

print(f"Pedestrian lines (raw→lines): {len(ped_lines)}")



Finds the nearest road for each pedestrian line and brings over that road’s bearing.

Runs the three tests:



Angle: pedestrian line is roughly perpendicular to the road (within your angle band).

Length: between your min/max length thresholds.

Placement: line’s midpoint inside a buffered roadway; endpoints outside (curbs).



Marks lines that pass all three as crosswalks; everything else stays as sidewalk/other.

Produces two working sets: crosswalks and sidewalks.

In [None]:
# ==============================================================
# Cell 8 — Figure out if crosswalk or not (attr + geometry test)
# ==============================================================

# (A) Attribute hint: look for text cues such as "crosswalk/crossing/xing/zebra/marked x"
text_cols = [c for c in ped_lines.columns if c != "geometry" and pd.api.types.is_string_dtype(ped_lines[c])]
needle = re.compile(r"(crosswalk|crossing|xing|zebra|marked\s*x)", re.I)

def _attr_crosswalk(row):
    for c in text_cols:
        v = row.get(c)
        if isinstance(v, str) and needle.search(v):
            return True
    return False

ped_lines["attr_is_crosswalk"] = ped_lines.apply(_attr_crosswalk, axis=1)

# (B) Geometry test: nearest road, check angle & length, and that the segment MIDPOINT sits over the road,
# but endpoints lie off the road buffer (i.e., spans across)
join = gpd.sjoin_nearest(
    ped_lines, roads_lines[["geometry","road_bearing"]],
    how="left", distance_col="dist_to_road_m"
)
join = _dedupe_left(join)
idx_cols = [c for c in join.columns if c.startswith("index_right")]
if idx_cols:
    join = join.rename(columns={idx_cols[0]: "road_idx"})
else:
    join["road_idx"] = pd.NA

join["road_geom"]    = join["road_idx"].map(roads_lines["geometry"]) if join["road_idx"].notna().any() else None
join["road_bearing"] = join["road_idx"].map(roads_lines["road_bearing"]) if join["road_idx"].notna().any() else np.nan

def _classify_row(r):
    ls: LineString = r.geometry
    rg = r.get("road_geom", None)
    rb = r.get("road_bearing", np.nan)
    if ls is None or rg is None or pd.isna(rb):
        return pd.Series({"angle_diff": np.nan, "mid_in": False, "a_in": False, "b_in": False,
                          "len_ok": False, "ang_ok": False, "geo_crosswalk": False})

    ang_ped = r["bearing"]
    ad = _angle_diff(ang_ped, rb)

    # Road buffer to approximate carriageway
    road_buf = rg.buffer(DEFAULT_ROAD_HALF_WIDTH_M, cap_style=2, join_style=2)

    mid = _segment_midpoint(ls)
    a, b = _endpoints(ls)
    mid_in = road_buf.contains(mid)
    a_in   = road_buf.contains(a)
    b_in   = road_buf.contains(b)

    length_ok = (r["length_m"] >= MIN_CROSSWALK_LEN_M) and (r["length_m"] <= MAX_CROSSWALK_LEN_M)
    angle_ok  = (not pd.isna(ad)) and (ad >= ANGLE_MIN_DEG) and (ad <= ANGLE_MAX_DEG)

    geo_cross = bool(length_ok and angle_ok and mid_in and (not a_in) and (not b_in))
    return pd.Series({"angle_diff": ad, "mid_in": mid_in, "a_in": a_in, "b_in": b_in,
                      "len_ok": length_ok, "ang_ok": angle_ok, "geo_crosswalk": geo_cross})

tests = join.apply(_classify_row, axis=1)
ped_class = pd.concat([join, tests], axis=1)

# Final label
ped_class["CLASS"] = np.where(ped_class["attr_is_crosswalk"] | ped_class["geo_crosswalk"], "CROSSWALK", "SIDEWALK")

crosswalks_m = ped_class[ped_class["CLASS"] == "CROSSWALK"].copy()
sidewalks_m  = ped_class[ped_class["CLASS"] == "SIDEWALK"].copy()

print("\n=== Crosswalk classification summary ===")
print(f"Total ped segments: {len(ped_class)}")
print(f"  Attribute-hinted crosswalks: {int(ped_class['attr_is_crosswalk'].sum())}")
print(f"  Geometry-pass crosswalks:    {int(ped_class['geo_crosswalk'].sum())}")
print(f"  FINAL CROSSWALKS:            {len(crosswalks_m)}")
print(f"  FINAL SIDEWALKS:             {len(sidewalks_m)}")




build the intersections using ddot

In [None]:
# =========================
# Cell 9 — Build intersections (snap endpoints & compute degree)
# =========================

# Safe defaults in case not set earlier
SNAP_TOL_M          = globals().get("SNAP_TOL_M", 6.0)
NODE_TOUCH_TOL_M    = globals().get("NODE_TOUCH_TOL_M", 1.5)
MIN_INTERSECTION_DEG= globals().get("MIN_INTERSECTION_DEG", 3)
CRS_M               = globals().get("CRS_M", 3857)

# 1) Extract endpoints from road LineStrings
end_pts = []
for i, geom in enumerate(roads_lines.geometry):
    if geom is None or geom.is_empty or not isinstance(geom, LineString) or len(geom.coords) < 2:
        continue
    x0, y0 = geom.coords[0]
    x1, y1 = geom.coords[-1]
    end_pts.append({"road_idx": i, "geometry": Point(x0, y0)})
    end_pts.append({"road_idx": i, "geometry": Point(x1, y1)})

end_pts_g = gpd.GeoDataFrame(end_pts, crs=CRS_M)

if end_pts_g.empty:
    nodes_deg = gpd.GeoDataFrame(columns=["degree"], geometry=gpd.GeoSeries([], crs=CRS_M))
else:
    # 2) SNAP: cluster endpoints within SNAP_TOL_M using DBSCAN (min_samples=1)
    from sklearn.cluster import DBSCAN
    XY = np.c_[end_pts_g.geometry.x.values, end_pts_g.geometry.y.values]
    labels = DBSCAN(eps=SNAP_TOL_M, min_samples=1, metric="euclidean", n_jobs=-1).fit_predict(XY)
    end_pts_g["node_id"] = labels

    # Cluster centroids
    nodes = (
        end_pts_g.groupby("node_id")
                 .agg(x=("geometry", lambda s: np.mean([p.x for p in s])),
                      y=("geometry", lambda s: np.mean([p.y for p in s])))
                 .reset_index()
    )
    nodes_g = gpd.GeoDataFrame(nodes, geometry=gpd.points_from_xy(nodes["x"], nodes["y"], crs=CRS_M))

    # 3) DEGREE: count distinct road segments that intersect a tiny buffer around node
    nodes_buf = nodes_g.copy()
    nodes_buf["geometry"] = nodes_buf.buffer(NODE_TOUCH_TOL_M)
    join_deg = gpd.sjoin(roads_lines[["geometry"]].reset_index().rename(columns={"index":"road_idx"}),
                         nodes_buf[["node_id","geometry"]],
                         how="right", predicate="intersects")
    deg = (join_deg.groupby("node_id")["road_idx"].nunique()
                    .rename("degree").reset_index())
    nodes_deg = nodes_g.merge(deg, on="node_id", how="left").fillna({"degree": 0})
    nodes_deg["degree"] = nodes_deg["degree"].astype(int)

# Keep real intersections (degree ≥ MIN_INTERSECTION_DEG)
intersections_m = nodes_deg[nodes_deg["degree"] >= MIN_INTERSECTION_DEG].copy()

print(f"Built intersections: {len(intersections_m)} (degree ≥ {MIN_INTERSECTION_DEG}) / all nodes: {len(nodes_deg)}")


Tag interesections with crosswalks nearby or not(within 25 meters which was just a value that made sense because it is just a little bigger than a typical intersection, bigger lanes are about 12 feet in width, so 25 meters is about the size of 7 lanes. This would be the size of a massive intersection, but it makes sense for it to be larger because that crosswalk would definitely be part of the same intersection because it is 25 meters away)

In [None]:
# ===== Cell 10 — HARD RESET + recompute =====

CRS_M = 3857
XWALK_THRESHOLD_M = 25.0

def _strip_indexish(df):
    return df.drop(columns=[c for c in df.columns if c=="index" or c.startswith("index_") or c in ("index_left","index_right")],
                   errors="ignore").reset_index(drop=True)

# 0) Rebuild CROSSWALK POINTS from ped_class (robust to prior state)
#    (Assumes you already ran Cells 7–8 so ped_class exists)
assert 'ped_class' in globals(), "ped_class not found; run Cells 7–8 first."
crosswalks_m = ped_class[ped_class["CLASS"] == "CROSSWALK"].copy()
if not crosswalks_m.empty:
    if "midpoint" not in crosswalks_m.columns:
        crosswalks_m["midpoint"] = crosswalks_m.geometry.apply(_segment_midpoint)
    crosswalk_pts_m = gpd.GeoDataFrame(
        crosswalks_m.drop(columns=["geometry"]).copy(),
        geometry=gpd.GeoSeries(crosswalks_m["midpoint"], crs=crosswalks_m.crs)
    )
else:
    crosswalk_pts_m = gpd.GeoDataFrame(geometry=gpd.GeoSeries([], crs=CRS_M))

# 1) Rebuild INTERSECTIONS from nodes_deg (prevents using a previously filtered set)
#    (Assumes Cell 9 ran and nodes_deg exists)
assert 'nodes_deg' in globals(), "nodes_deg not found; run Cell 9 first."
MIN_INTERSECTION_DEG = globals().get("MIN_INTERSECTION_DEG", 3)
intersections_m = nodes_deg[nodes_deg["degree"] >= MIN_INTERSECTION_DEG].copy()

# 2) Clean + align CRS
crosswalk_pts_m = _strip_indexish(crosswalk_pts_m)
intersections_m = _strip_indexish(intersections_m)

if crosswalk_pts_m.crs != CRS_M and crosswalk_pts_m.crs is not None:
    crosswalk_pts_m = crosswalk_pts_m.to_crs(CRS_M)
if intersections_m.crs != CRS_M and intersections_m.crs is not None:
    intersections_m = intersections_m.to_crs(CRS_M)
if intersections_m.crs is None:
    intersections_m.set_crs(CRS_M, inplace=True)
if crosswalk_pts_m.crs is None:
    crosswalk_pts_m.set_crs(CRS_M, inplace=True)

# 3) Nearest distance (NO max_distance; always get a distance)
if intersections_m.empty or crosswalk_pts_m.empty:
    intersections_m["dist_to_crosswalk_mid_m"] = np.nan
else:
    tag = gpd.sjoin_nearest(
        intersections_m,
        crosswalk_pts_m[["geometry"]],
        how="left",
        distance_col="dist_to_crosswalk_mid_m"
    )
    intersections_m = _strip_indexish(tag)

# 4) Threshold → flag
intersections_m["dist_to_crosswalk_mid_m"] = pd.to_numeric(intersections_m["dist_to_crosswalk_mid_m"], errors="coerce")
intersections_m["has_crosswalk_nearby"] = intersections_m["dist_to_crosswalk_mid_m"].le(XWALK_THRESHOLD_M)

# 5) Quick sanity
print(f"Crosswalk segments: {len(crosswalks_m)} | Crosswalk points: {len(crosswalk_pts_m)}")
print(f"Intersections total: {len(intersections_m)}")
print(f"Within {XWALK_THRESHOLD_M} m: {int(intersections_m['has_crosswalk_nearby'].sum())}")
print(intersections_m["dist_to_crosswalk_mid_m"].describe())



split intersections into what has crosswalks nearby and what does not

In [None]:
# ===========================================================
# Cell 11 — Split nodes into with-crosswalk vs no-crosswalk
# ===========================================================

ints_with_xwalk = intersections_m[intersections_m["has_crosswalk_nearby"]].copy()
ints_no_xwalk   = intersections_m[~intersections_m["has_crosswalk_nearby"]].copy()

print(f"Intersections with crosswalk: {len(ints_with_xwalk)}")
print(f"Intersections without crosswalk: {len(ints_no_xwalk)}")


Only look at interesections that have no crosswalks

In [None]:
# --- Keep only intersections without crosswalks ---
intersections_m = intersections_m[~intersections_m["has_crosswalk_nearby"]].copy()
print(f"Remaining intersections (no crosswalk nearby): {len(intersections_m)}")

Cell 12 — Make Buffers at No-Crosswalk Intersections

-Creates circular buffers (e.g., 30 m radius) around every intersection without a nearby crosswalk.

-Each buffer represents a small “zone of influence” for that intersection.

-These will later be used to see how many pedestrian crashes fall inside.-

In [None]:
# =========================
# Cell 12 — Buffers at no-crosswalk intersections
# =========================

BUFFER_RADIUS_M = globals().get("BUFFER_RADIUS_M", 25.0)  # tweak as you like

def _make_buffers(gdf_pts, radius_m, id_col):
    if gdf_pts.empty:
        return gpd.GeoDataFrame(columns=[id_col], geometry=gpd.GeoSeries([], crs=gdf_pts.crs))
    out = gdf_pts.copy()
    out[id_col] = np.arange(len(out), dtype=int)
    out["geometry"] = out.buffer(radius_m)
    return out

buf_no_xwalk = _make_buffers(ints_no_xwalk, BUFFER_RADIUS_M, "buf_id")
print(f"No-crosswalk buffers: {len(buf_no_xwalk)} (radius={BUFFER_RADIUS_M} m)")


Cell 13 — Join Pedestrian Crashes to Buffers + Compute Severity

-Spatially joins all pedestrian crashes to those no-crosswalk buffers.

-Calculates a severity score for each crash (weighted by fatal/major/minor injuries).

-Aggregates per buffer → gives each buffer:

  -number of crashes (n_crashes)

  -total crash severity (severity_sum).

In [None]:
# =========================
# Cell 13 — Join ped crashes to buffers + severity (FIXED)
# =========================

def _ped_severity_single(r):
    """
    Assign ONE severity score per crash, using priority:
    - any fatal pedestrian injury -> 7
    - else any major pedestrian injury -> 4
    - else any minor pedestrian injury -> 1
    - else 0
    """
    fatal = float(r.get("FATAL_PEDESTRIAN", 0) or 0)
    major = float(r.get("MAJORINJURIES_PEDESTRIAN", 0) or 0)
    minor = float(r.get("MINORINJURIES_PEDESTRIAN", 0) or 0)

    if fatal > 0:
        return 7
    elif major > 0:
        return 4
    elif minor > 0:
        return 1
    else:
        return 0

# Work on a copy to avoid chained-assignment weirdness
gdf_ped_m = gdf_ped_m.copy()
gdf_ped_m["SEVERITY_SCORE_PED"] = gdf_ped_m.apply(_ped_severity_single, axis=1)

# Spatial join: crashes that fall within no-crosswalk buffers
crash_in_no = gpd.sjoin(
    gdf_ped_m,
    buf_no_xwalk[["buf_id", "geometry"]],
    how="inner",
    predicate="within"
)

# --- Make sure we only count each crash once per buffer ---

# Try to detect a crash ID column
possible_crash_cols = ["CRASH_ID", "crash_id", "CRASH_INDEX", "crash_index", "OBJECTID"]
crash_col = None
for c in possible_crash_cols:
    if c in crash_in_no.columns:
        crash_col = c
        break

# If we can't find one, fall back to using the row index as a unique ID
if crash_col is None:
    crash_in_no = crash_in_no.reset_index().rename(columns={"index": "crash_row"})
    crash_col = "crash_row"

# Collapse to ONE row per (buffer, crash), using max severity in case of duplicates
per_crash = (
    crash_in_no
    .groupby(["buf_id", crash_col], as_index=False)
    .agg(SEVERITY_SCORE_PED=("SEVERITY_SCORE_PED", "max"))
)

# Aggregate per buffer:
# - n_crashes = number of unique crashes in that buffer
# - severity_sum = sum of severity scores (each crash contributes at most 7)
agg_no = (
    per_crash
    .groupby("buf_id", as_index=False)
    .agg(
        n_crashes=(crash_col, "nunique"),
        severity_sum=("SEVERITY_SCORE_PED", "sum")
    )
)

# Attach back to buffers
buf_no_xwalk = buf_no_xwalk.merge(agg_no, on="buf_id", how="left").fillna(
    {"n_crashes": 0, "severity_sum": 0}
)

print("No-crosswalk buffer crash stats (head):")
print(buf_no_xwalk[["buf_id", "n_crashes", "severity_sum"]].head())



Make a table based on severity

In [None]:
# =========================
#  Crosswalk Severity Table (RANK, N_CRASHES, SEVERITY_SUM, AVG_LON, AVG_LAT)
# =========================

import os
import numpy as np
import pandas as pd
import geopandas as gpd
from IPython.display import display

MIN_CRASHES_TO_SHOW = globals().get("MIN_CRASHES_TO_SHOW", 0)

# --- 1. Start from buf_no_xwalk produced in the previous cell ---
table_no_xwalk = buf_no_xwalk.copy()

# --- 2. Make sure it's a GeoDataFrame with a CRS ---
if not isinstance(table_no_xwalk, gpd.GeoDataFrame):
    table_no_xwalk = gpd.GeoDataFrame(table_no_xwalk, geometry="geometry")

if table_no_xwalk.crs is None:
    # If you KNOW your buffers are in a projected CRS (e.g. EPSG:32618), set that instead.
    # For now we assume they are already in lon/lat:
    table_no_xwalk = table_no_xwalk.set_crs(epsg=4326)

# --- 3. Compute centroids in a projected CRS, then back to lon/lat ---
# Use Web Mercator as a simple planar CRS for centroids
g_proj = table_no_xwalk.to_crs(epsg=3857)
g_proj["centroid"] = g_proj.geometry.centroid

# Reproject centroids back to WGS84 (lon/lat)
centroids_ll = g_proj.set_geometry("centroid").to_crs(epsg=4326).geometry

# Store lon/lat in the original table
table_no_xwalk["avg_lon"] = centroids_ll.x
table_no_xwalk["avg_lat"] = centroids_ll.y

# --- 4. Optional: filter by crash count threshold ---
if MIN_CRASHES_TO_SHOW > 0:
    table_no_xwalk = table_no_xwalk[
        table_no_xwalk["n_crashes"] >= MIN_CRASHES_TO_SHOW
    ].copy()

# --- 5. Build display table with desired columns ---
table_no_xwalk_display = table_no_xwalk[["n_crashes", "severity_sum", "avg_lon", "avg_lat"]]
table_no_xwalk_display = table_no_xwalk_display.rename(columns=str.upper)

# Sort by severity, then crashes
table_no_xwalk_display = table_no_xwalk_display.sort_values(
    ["SEVERITY_SUM", "N_CRASHES"], ascending=[False, False]
).reset_index(drop=True)

# Add rank column
table_no_xwalk_display.insert(0, "RANK", np.arange(1, len(table_no_xwalk_display) + 1))

print(f"No-Crosswalk Table (min crashes ≥ {MIN_CRASHES_TO_SHOW}):")
display(table_no_xwalk_display.head(10))

# --- 6. Export full table ---
table_no_xwalk_display.to_csv(
    os.path.join(OUT_DIR, "no_crosswalk_intersections_table.csv"),
    index=False
)
print(f"Saved full table to {OUT_DIR}/no_crosswalk_intersections_table.csv")


Map

In [None]:
# ===========================================================================================
# Final Map — Crashes (gray), Top 10 Clusters (circles), and Crosswalks (green)
# ===========================================================================================

import folium
from branca.colormap import linear

def gdf_geom_only(gdf):
    if gdf is None or gdf.empty:
        return None
    return gpd.GeoDataFrame(geometry=gdf.geometry, crs=gdf.crs).to_crs(4326).to_json()

# --- Prep layers in 4326 ---
crashes_4326 = gdf_ped_m.to_crs(4326)

clusters_src = (
    buf_no_xwalk.copy()
    if "buf_no_xwalk" in globals() and not buf_no_xwalk.empty else
    gpd.GeoDataFrame(geometry=gpd.GeoSeries([], crs="EPSG:4326"))
)

crosswalks_4326 = (
    crosswalks_m.to_crs(4326)
    if "crosswalks_m" in globals() and not crosswalks_m.empty else
    gpd.GeoDataFrame(geometry=gpd.GeoSeries([], crs="EPSG:4326"))
)

# --- Map ---
MAP_CENTER = globals().get("MAP_CENTER", [38.9072, -77.0369])
MAP_ZOOM   = globals().get("MAP_ZOOM", 12)
m = folium.Map(location=MAP_CENTER, zoom_start=MAP_ZOOM, tiles="cartodbpositron")

# 1) All crashes (gray) — slightly smaller dots
fg_crashes = folium.FeatureGroup(name="All pedestrian crashes (gray)", show=True)
for _, r in crashes_4326.iterrows():
    folium.CircleMarker(
        location=[r.geometry.y, r.geometry.x],
        radius=2.0,          # was 2.5 → a tiny bit smaller
        color="#0d6efd",
        fill=True,
        fill_opacity=0.7
    ).add_to(fg_crashes)
fg_crashes.add_to(m)

# 2) Top 10 clusters (≥2 crashes) — solid circles like complete-linkage map
fg_clusters = folium.FeatureGroup(name="Top 10 clusters (by severity sum)", show=True)

if not clusters_src.empty and "n_crashes" in clusters_src.columns and "severity_sum" in clusters_src.columns:
    # Keep only clusters with ≥2 crashes
    clusters_2p = clusters_src[clusters_src["n_crashes"] >= 2].copy()

    if not clusters_2p.empty:
        # Sort by severity_sum then n_crashes, take top 10
        clusters_top = (
            clusters_2p
            .sort_values(["severity_sum", "n_crashes"], ascending=[False, False])
            .head(10)
            .to_crs(4326)
        )

        # Colormap based on severity_sum (like the other cell)
        vmin = float(clusters_top["severity_sum"].min())
        vmax = float(clusters_top["severity_sum"].max())
        if vmin == vmax:
            vmin = 0.0
        cmap = linear.Reds_09.scale(vmin, vmax)

        for _, r in clusters_top.iterrows():
            sev = float(r.get("severity_sum", 0))
            ncr = int(r.get("n_crashes", 0))

            cen = r.geometry.centroid
            lat, lon = float(cen.y), float(cen.x)

            # Radius scaled exactly like the complete-linkage centroids:
            radius = 6 if vmax == vmin else 6 + 20 * (sev - vmin) / (vmax - vmin)

            popup_html = (
                f"<b>Crashes:</b> {ncr}<br>"
                f"<b>Severity sum:</b> {sev:.1f}<br>"
                f"<b>Center lon:</b> {lon:.5f}<br>"
                f"<b>Center lat:</b> {lat:.5f}"
            )

            folium.CircleMarker(
                location=[lat, lon],
                radius=radius,
                color=cmap(sev),
                fill=True,
                fill_color=cmap(sev),
                fill_opacity=0.9,
                popup=folium.Popup(popup_html, max_width=360),
                tooltip=f"Sev {sev:.1f} | Cr {ncr}"
            ).add_to(fg_clusters)

        # Add legend for severity
        cmap.caption = "Cluster severity (severity_sum)"
        cmap.add_to(m)

fg_clusters.add_to(m)

# 3) All crosswalks (green) — geometry-only to avoid Timestamp serialization issues
if not crosswalks_4326.empty:
    fg_xwalks = folium.FeatureGroup(name="All crosswalks (green)", show=False)
    gj_xwalks = gdf_geom_only(crosswalks_4326)
    folium.GeoJson(
        gj_xwalks,
        style_function=lambda f: {"color": "#4caf50", "weight": 2.5, "opacity": 0.9}
    ).add_to(fg_xwalks)
    fg_xwalks.add_to(m)

# Controls
folium.LayerControl(collapsed=False).add_to(m)
m


