In [3]:
# employment_commute_build.py
# ------------------------------------------------------------
# Creates employment_commute.csv from LEHD LODES OD + TIGER/CB shapefiles
# Schema:
#   home_geo_id (Census Block GEOID, 15 chars)
#   work_geo_id (Census Block GEOID, 15 chars)
#   commuter_count (S000)
#   net_commute_flow (inflow - outflow for the HOME block)
#   data_year
#   source
# Optional extras (can be dropped later): home_zcta, work_zcta
#
# Requirements:
#   pip install pandas geopandas requests shapely pyproj
#   (geopandas requires system deps: GEOS/PROJ; on Windows use conda-forge)
# ------------------------------------------------------------

import os
import io
import gzip
import zipfile
import requests
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
import tempfile

# ---------- Parameters ----------
STATE_ABBR = "md"         # 2-letter lower-case for LODES (e.g., "md")
STATE_FIPS = "24"         # Maryland FIPS
YEAR = 2022               # LODES OD year (e.g., 2022)
JOBTYPE = "JT00"          # LODES job type (JT00: all jobs; JT01: primary; JT02: secondary)
CRS_EPSG = 4326           # Work in WGS84 for simplicity

OUT_DIR = "out_md_lodes"
os.makedirs(OUT_DIR, exist_ok=True)

# LODES v8 OD URL (update if needed)
LODES_OD_URL = f"https://lehd.ces.census.gov/data/lodes/LODES8/{STATE_ABBR}/od/{STATE_ABBR}_od_main_{JOBTYPE}_{YEAR}.csv.gz"

# TIGER/Line 2020 Blocks (statewide)
# (Statewide blocks shapefile for 2020)
BLOCKS_URL = f"https://www2.census.gov/geo/tiger/TIGER2020/TABBLOCK20/tl_2020_{STATE_FIPS}_tabblock20.zip"

# Cartographic Boundary (generalized, lightweight) ZCTA5 2020 (national)
# Much smaller than full TIGER ZCTA; we’ll clip by state bounding box after load
ZCTA_URL = "https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_zcta520_500k.zip"

# ---------- Helpers ----------
def download_to_bytes(url: str) -> bytes:
    print(f"Downloading: {url}")
    r = requests.get(url, stream=True, timeout=120)
    r.raise_for_status()
    return r.content

def read_lodes_od(url: str) -> pd.DataFrame:
    raw = download_to_bytes(url)
    with gzip.GzipFile(fileobj=io.BytesIO(raw), mode='rb') as gz:
        df = pd.read_csv(gz, dtype={"h_geocode": str, "w_geocode": str})
    # Keep only needed columns: S000 = total jobs in OD pair
    if "S000" not in df.columns:
        raise ValueError("LODES file missing S000 column (total jobs).")
    df = df[["h_geocode", "w_geocode", "S000"]].rename(columns={"S000": "commuter_count"})
    # Strip whitespace just in case
    df["h_geocode"] = df["h_geocode"].str.strip()
    df["w_geocode"] = df["w_geocode"].str.strip()
    # Filter non-positive
    df = df[df["commuter_count"] > 0].copy()
    return df

def read_zipped_shapefile(url: str) -> gpd.GeoDataFrame:
    """
    Download a zipped shapefile, extract to a temp directory, and read the .shp.
    This avoids pyogrio/fiona issues with 'zip://<BytesIO>' on some systems.
    """
    print(f"Downloading: {url}")
    import requests
    r = requests.get(url, stream=True, timeout=180)
    r.raise_for_status()
    content = r.content

    # Extract to a temp directory
    tmpdir = tempfile.mkdtemp(prefix="shp_")
    with zipfile.ZipFile(io.BytesIO(content)) as zf:
        zf.extractall(tmpdir)

    # Find the .shp file inside the zip
    shp_candidates = []
    for root, _, files in os.walk(tmpdir):
        for f in files:
            if f.lower().endswith(".shp"):
                shp_candidates.append(os.path.join(root, f))
    if not shp_candidates:
        raise RuntimeError("No .shp file found inside the ZIP.")
    shp_path = shp_candidates[0]  # take the first layer by default

    # Read with GeoPandas (pyogrio engine if available)
    return gpd.read_file(shp_path)

# ---------- Pipeline ----------
# 1) Read LODES OD
od = read_lodes_od(LODES_OD_URL)
print(f"LODES rows: {len(od):,}")

# 2) Compute net flow per HOME block: (inflow - outflow)
# Inflow by block = sum of commuter_count where block appears as w_geocode
inflow = od.groupby("w_geocode")["commuter_count"].sum().rename("inflow")
# Outflow by block = sum of commuter_count where block appears as h_geocode
outflow = od.groupby("h_geocode")["commuter_count"].sum().rename("outflow")

# Build a table of all blocks appearing in either role
all_blocks = pd.Index(sorted(set(od["h_geocode"]) | set(od["w_geocode"])), name="block_geoid")
net_tbl = pd.DataFrame(index=all_blocks).join(inflow, how="left", on="block_geoid").join(outflow, how="left", on="block_geoid")
net_tbl = net_tbl.fillna(0)
net_tbl["net_commute_flow"] = net_tbl["inflow"] - net_tbl["outflow"]
print(f"Unique blocks in OD: {len(net_tbl):,}")

# Map net flow back to OD rows as the HOME block’s net (you can also attach WORK block’s net if desired)
od = od.join(net_tbl["net_commute_flow"], how="left", on="h_geocode")

# 3) Optional: Attach ZCTAs via spatial join
#    This is helpful for ZIP-level reporting, but not part of the required schema.
print("Loading TIGER Blocks (2020, statewide)…")
blocks = read_zipped_shapefile(BLOCKS_URL)
# Standardize cols
# GEOID20 is the full 15-digit Census Block GEOID for 2020
if "GEOID20" not in blocks.columns:
    # Some releases use GEOID or GEOID10; try to discover automatically
    candidates = [c for c in ["GEOID20", "GEOID10", "GEOID"] if c in blocks.columns]
    if not candidates:
        raise ValueError("Blocks shapefile missing GEOID column.")
    geoid_col = candidates[0]
else:
    geoid_col = "GEOID20"

blocks = blocks[[geoid_col, "geometry"]].rename(columns={geoid_col: "block_geoid"})
blocks = blocks.to_crs(epsg=CRS_EPSG)

# Keep only blocks appearing in OD to lighten the spatial join
blocks_sub = blocks[blocks["block_geoid"].isin(net_tbl.index)].copy()
print(f"Blocks subset for OD: {len(blocks_sub):,}")

print("Loading ZCTA5 (2020, generalized)…")
zcta = read_zipped_shapefile(ZCTA_URL)
zcta = zcta.to_crs(epsg=CRS_EPSG)
# Keep only fields we need
zcta = zcta[["ZCTA5CE20", "geometry"]].rename(columns={"ZCTA5CE20": "zcta5"})

# Spatial join: block centroid within ZCTA polygon
print("Computing block centroids for spatial join…")
blocks_sub["centroid"] = blocks_sub.geometry.centroid
blocks_pts = blocks_sub.set_geometry("centroid", drop=False)[["block_geoid", "centroid"]].rename(columns={"centroid": "geometry"})
blocks_pts = gpd.GeoDataFrame(blocks_pts, geometry="geometry", crs=f"EPSG:{CRS_EPSG}")

print("Spatial join block → ZCTA…")
blk2zcta = gpd.sjoin(blocks_pts, zcta, how="left", predicate="within")[["block_geoid", "zcta5"]]

# Build lookups for home/work blocks
home_lookup = blk2zcta.set_index("block_geoid")["zcta5"].to_dict()
work_lookup = home_lookup  # same dict; mapping from any block to its ZCTA

od["home_zcta"] = od["h_geocode"].map(home_lookup)
od["work_zcta"] = od["w_geocode"].map(work_lookup)

# 4) Finalize schema and export
employment_commute = pd.DataFrame({
    "home_geo_id": od["h_geocode"],
    "work_geo_id": od["w_geocode"],
    "commuter_count": od["commuter_count"].astype("int64"),
    # Net for the HOME block (inflow - outflow); negative means net exporter of commuters
    "net_commute_flow": od["net_commute_flow"].astype("int64"),
    "data_year": YEAR,
    "source": f"LEHD LODES8 OD {JOBTYPE}",
    # Optional extras (drop if you want the minimal schema)
    "home_zcta": od["home_zcta"],
    "work_zcta": od["work_zcta"],
})

# Minimal schema only (uncomment to drop ZCTAs)
# employment_commute = employment_commute[[
#     "home_geo_id", "work_geo_id", "commuter_count", "net_commute_flow", "data_year", "source"
# ]]

out_csv = os.path.join(OUT_DIR, f"employment_commute_{STATE_ABBR}_{YEAR}_{JOBTYPE}.csv")
employment_commute.to_csv(out_csv, index=False)
print(f"✅ Wrote: {out_csv}")

Downloading: https://lehd.ces.census.gov/data/lodes/LODES8/md/od/md_od_main_JT00_2022.csv.gz
LODES rows: 1,979,851
Unique blocks in OD: 67,546
Loading TIGER Blocks (2020, statewide)…
Downloading: https://www2.census.gov/geo/tiger/TIGER2020/TABBLOCK20/tl_2020_24_tabblock20.zip
Blocks subset for OD: 67,546
Loading ZCTA5 (2020, generalized)…
Downloading: https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_zcta520_500k.zip
Computing block centroids for spatial join…



  blocks_sub["centroid"] = blocks_sub.geometry.centroid
`geo_col_name = gdf.active_geometry_name; gdf.set_geometry(new_geo_col).drop(columns=geo_col_name).rename_geometry(geo_col_name)`.
  blocks_pts = blocks_sub.set_geometry("centroid", drop=False)[["block_geoid", "centroid"]].rename(columns={"centroid": "geometry"})


Spatial join block → ZCTA…
✅ Wrote: out_md_lodes\employment_commute_md_2022_JT00.csv
