<h1><strong>1. Project overview</strong></h1>
<ol type="a">
  <li>Documentation is found in the Project documentation folder.</li>
  <li>This note-book is the data pipeline that enriches the provider collection in the Mongo db.</li>
</ol>


<hr style="border: none; height: 2px; background-color: blue; margin-top: 6px; margin-bottom: 0;" />
<small>Add all imports and get the DB</small>

In [2]:
from __future__ import annotations
from Utilities.ChatHealthyMongoUtilities import ChatHealthyMongoUtilities
from dotenv import load_dotenv
import os
from dataclasses import dataclass

import csv
from decimal import Decimal, ROUND_HALF_UP
from pathlib import Path
from typing import Any, Dict

from bson.decimal128 import Decimal128
from pymongo import MongoClient


load_dotenv()  # <-- REQUIRED for .env files

conn_str = os.getenv("MONGO_connectionString")
if not conn_str:
    raise EnvironmentError("MONGO_connectionString is not set")

DbUtil = ChatHealthyMongoUtilities(conn_str)
DB = DbUtil.getConnection()



MongoDB connection successfully established and validated.


Get zip -to county csv and load it to the db but also get the FIPS county name file so the file can be enriched with the county name cross walk. 

In [5]:

# -----------------------------
# Results
# -----------------------------
@dataclass(frozen=True)
class LoadResults:
    # HUD source (rows from ZIP_COUNTY_*.csv)
    hud_rows_read: int
    hud_rows_skipped_missing_required: int  # missing ZIP/COUNTY/TOT_RATIO after parsing
    hud_rows_skipped_non_county_fips: int   # COUNTY not present in county-level FIPS map (Summary Level != 050)
    hud_rows_considered_for_normalization: int  # rows that passed validation and county-only filter
    hud_rows_written: int                  # number of normalized ZIP docs written (unique ZIPs)

    # County lookup source (Census all-geocodes)
    county_lookup_rows_read: int
    county_lookup_rows_written: int
    county_lookup_counties_in_map: int     # how many county FIPS were captured (Summary Level 050)

    # Raw HUD crosswalk (loaded AS-IS)
    census_crosswalk_rows_read: int
    census_crosswalk_rows_written: int

    # Enrichment metric
    normalized_zips_missing_county_name: int  # should be 0 when county-only filter is enforced

    # Validation metrics
    raw_hud_distinct_zips: int
    normalized_distinct_zips: int


# -----------------------------
# Helpers
# -----------------------------
def _is_collection_empty(col: Collection) -> bool:
    return col.estimated_document_count() == 0


def _s(v: Any) -> Optional[str]:
    if v is None:
        return None
    s = str(v).strip()
    return s if s != "" else None


def _zip5(value: Any) -> Optional[str]:
    if value is None:
        return None
    s = str(value).strip()
    if s == "":
        return None
    digits = "".join(ch for ch in s if ch.isdigit())
    if len(digits) == 0:
        return None
    if len(digits) > 5:
        digits = digits[:5]
    return digits.zfill(5)


def _to_decimal128(value: Any) -> Optional[Decimal128]:
    """
    Convert a numeric-looking value into BSON Decimal128.
    Returns None if the value is blank/unparseable.
    """
    if value is None:
        return None
    s = str(value).strip()
    if s == "":
        return None
    try:
        return Decimal128(Decimal(s))
    except (InvalidOperation, ValueError):
        return None


def _read_csv_rows(path: str) -> Tuple[List[str], List[Dict[str, Any]]]:
    with open(path, "r", newline="", encoding="utf-8-sig") as f:
        reader = csv.DictReader(f)
        if not reader.fieldnames:
            raise ValueError(f"No header row found in CSV: {path}")
        headers = list(reader.fieldnames)
        rows = [r for r in reader]
        return headers, rows


def _read_xlsx_rows(path: str) -> Tuple[List[str], List[Dict[str, Any]]]:
    from openpyxl import load_workbook  # type: ignore

    wb = load_workbook(path, read_only=True, data_only=True)
    ws = wb.active

    rows_iter = ws.iter_rows(values_only=True)
    try:
        header_row = next(rows_iter)
    except StopIteration:
        raise ValueError(f"Empty XLSX: {path}")

    headers = [str(h).strip() if h is not None else "" for h in header_row]
    if not any(headers):
        raise ValueError(f"Header row appears empty in XLSX: {path}")

    out: List[Dict[str, Any]] = []
    for row in rows_iter:
        doc: Dict[str, Any] = {}
        for i, h in enumerate(headers):
            if h == "":
                continue
            doc[h] = row[i] if i < len(row) else None
        if any(v is not None and str(v).strip() != "" for v in doc.values()):
            out.append(doc)

    return headers, out


def _read_tabular_rows(path: str) -> Tuple[List[str], List[Dict[str, Any]]]:
    ext = os.path.splitext(path.lower())[1]
    if ext in [".csv", ".txt"]:
        return _read_csv_rows(path)
    if ext in [".xlsx", ".xlsm"]:
        return _read_xlsx_rows(path)
    raise ValueError(f"Unsupported file type '{ext}'. Use .csv or .xlsx/.xlsm: {path}")


def _insert_many_in_batches(
    col: Collection,
    docs: List[Dict[str, Any]],
    batch_size: int = 1000,
) -> int:
    if not docs:
        return 0
    total = 0
    for i in range(0, len(docs), batch_size):
        chunk = docs[i : i + batch_size]
        col.insert_many(chunk, ordered=False)
        total += len(chunk)
    return total


def _normalize_key(s: str) -> str:
    return "".join(ch.lower() for ch in s if ch.isalnum())


def _find_col(headers: List[str], candidates: List[str]) -> Optional[str]:
    norm_map = {_normalize_key(h): h for h in headers if h is not None}
    for c in candidates:
        key = _normalize_key(c)
        if key in norm_map:
            return norm_map[key]
    return None


def _build_fips_to_county_name_map_from_rows(
    headers: List[str],
    rows: List[Dict[str, Any]],
) -> Dict[str, str]:
    """
    Builds map: 5-digit county FIPS -> county name.

    Supports your "all-geocodes-v2021.csv" format:
      - Summary Level
      - State Code (FIPS)
      - County Code (FIPS)
      - Area Name (including legal/statistical area description)

    County rows use Summary Level == '050'.
    countyFips = zfill2(State Code) + zfill3(County Code)

    Also includes fallback logic for other Census schemas.
    """
    summary_col = _find_col(headers, ["Summary Level"])
    state_code_col = _find_col(headers, ["State Code (FIPS)"])
    county_code_col = _find_col(headers, ["County Code (FIPS)"])
    area_name_col = _find_col(headers, ["Area Name (including legal/statistical area description)"])

    is_all_geocodes = all([summary_col, state_code_col, county_code_col, area_name_col])

    out: Dict[str, str] = {}

    if is_all_geocodes:
        for row in rows:
            summary = str(row.get(summary_col, "")).strip()
            if summary != "050":
                continue

            st_raw = row.get(state_code_col)
            co_raw = row.get(county_code_col)
            nm_raw = row.get(area_name_col)

            st = "".join(ch for ch in str(st_raw).strip() if ch.isdigit()) if st_raw is not None else ""
            co = "".join(ch for ch in str(co_raw).strip() if ch.isdigit()) if co_raw is not None else ""
            name = str(nm_raw).strip() if nm_raw is not None else ""

            if st == "" or co == "" or name == "":
                continue

            county_fips = st.zfill(2) + co.zfill(3)
            out.setdefault(county_fips, name)

        if not out:
            raise ValueError(
                "Detected an all-geocodes style file, but found zero county rows (Summary Level '050'). "
                "Inspect the 'Summary Level' column and tell me the value used for counties if it differs."
            )
        return out

    # Fallback generic schema
    statefp_col = _find_col(headers, ["STATEFP", "STATEFP20", "STATEFP10", "StateFP"])
    countyfp_col = _find_col(headers, ["COUNTYFP", "COUNTYFP20", "COUNTYFP10", "CountyFP"])
    fips_col = _find_col(headers, ["FIPS", "GEOID", "COUNTYFIPS", "COUNTY_FIPS", "COUNTYFP"])
    name_col = _find_col(headers, ["COUNTYNAME", "COUNTY_NAME", "NAMELSAD", "NAME"])

    if name_col is None:
        raise ValueError(
            "Could not find a county name column in the county lookup file. "
            f"Headers seen: {headers}"
        )

    for row in rows:
        nm_raw = row.get(name_col)
        county_name = str(nm_raw).strip() if nm_raw is not None else ""
        if county_name == "":
            continue

        fips: Optional[str] = None

        if statefp_col and countyfp_col:
            st = row.get(statefp_col)
            co = row.get(countyfp_col)
            if st is not None and co is not None:
                st_s = "".join(ch for ch in str(st) if ch.isdigit()).zfill(2)
                co_s = "".join(ch for ch in str(co) if ch.isdigit()).zfill(3)
                if len(st_s) == 2 and len(co_s) == 3:
                    fips = st_s + co_s

        if fips is None and fips_col:
            v = row.get(fips_col)
            if v is not None:
                digits = "".join(ch for ch in str(v) if ch.isdigit())
                if len(digits) >= 5:
                    fips = digits[-5:]

        if fips:
            out.setdefault(fips, county_name)

    if not out:
        raise ValueError(
            "Built an empty county FIPS -> county name map. Headers may not match expected patterns."
        )

    return out


# -----------------------------
# Main loader
# -----------------------------
def load_county_zip_with_census_collections(
    argDbConnection: MongoClient,
    argDatabaseName: str,

    # 2 source files
    argHudCountyZipCsvPath: str,
    argCountyFipsNameFilePath: str,

    # 3 collections
    argHudNormalizedCollectionName: str,
    argCountyLookupCollectionName: str,
    argCensusCrosswalkCollectionName: str,

    argBatchSize: int = 1000,
) -> LoadResults:
    """
    Creates/loads THREE collections, using TWO source files:

    Source file #1: argHudCountyZipCsvPath (HUD ZIP-County crosswalk CSV)
      - Loaded AS-IS into argCensusCrosswalkCollectionName
      - Also normalized into argHudNormalizedCollectionName (highest TOT_RATIO per ZIP)
      - Normalized collection is restricted to COUNTY FIPS that exist in the county-only
        lookup map (Summary Level 050).

    Source file #2: argCountyFipsNameFilePath (County FIPS -> County Name reference)
      - Loaded AS-IS into argCountyLookupCollectionName
      - Used to enrich normalized records with countyName
      - Supports Census "all-geocodes" CSV format

    Normalized fields written:
      ZIP                 -> zip (ZIP5 string)
      COUNTY              -> countyFips (5-digit string)
      (derived)           -> countyName (string, from Summary Level 050 rows only)
      USPS_ZIP_PREF_CITY  -> mainCity
      USPS_ZIP_PREF_STATE -> state
      TOT_RATIO           -> percentOfZipInCounty (BSON Decimal128)

    Loads each collection only if empty, and returns a detailed validation report.
    """
    db = argDbConnection[argDatabaseName]

    normalized_col = db[argHudNormalizedCollectionName]
    county_lookup_col = db[argCountyLookupCollectionName]
    census_crosswalk_col = db[argCensusCrosswalkCollectionName]

    # Read HUD crosswalk once
    hud_headers, hud_rows = _read_csv_rows(argHudCountyZipCsvPath)

    required = ["ZIP", "COUNTY", "USPS_ZIP_PREF_CITY", "USPS_ZIP_PREF_STATE", "TOT_RATIO"]
    missing = [h for h in required if h not in hud_headers]
    if missing:
        raise ValueError(
            "HUD CSV is missing required columns: "
            f"{missing}. Headers seen: {hud_headers}"
        )

    # Compute raw distinct zips in HUD (for validation)
    raw_zip_set = set()
    for row in hud_rows:
        z = _zip5(row.get("ZIP"))
        if z:
            raw_zip_set.add(z)
    raw_hud_distinct_zips = len(raw_zip_set)

    # 1) Load HUD crosswalk AS-IS into raw collection
    census_crosswalk_rows_read = len(hud_rows)
    census_crosswalk_rows_written = 0
    if _is_collection_empty(census_crosswalk_col):
        census_crosswalk_rows_written = _insert_many_in_batches(
            census_crosswalk_col, hud_rows, batch_size=argBatchSize
        )

    # 2) Load county lookup AS-IS into lookup collection
    county_headers, county_rows = _read_tabular_rows(argCountyFipsNameFilePath)

    county_lookup_rows_read = len(county_rows)
    county_lookup_rows_written = 0
    if _is_collection_empty(county_lookup_col):
        county_lookup_rows_written = _insert_many_in_batches(
            county_lookup_col, county_rows, batch_size=argBatchSize
        )

    # Build county-only map (Summary Level 050)
    fips_to_name = _build_fips_to_county_name_map_from_rows(county_headers, county_rows)
    county_lookup_counties_in_map = len(fips_to_name)

    # 3) Normalize HUD + enrich with countyName; enforce county-only via fips_to_name membership
    hud_rows_read = 0
    hud_rows_skipped_missing_required = 0
    hud_rows_skipped_non_county_fips = 0
    hud_rows_considered_for_normalization = 0

    hud_rows_written = 0
    normalized_zips_missing_county_name = 0

    if _is_collection_empty(normalized_col):
        best_by_zip: Dict[str, Dict[str, Any]] = {}

        for row in hud_rows:
            hud_rows_read += 1

            zip5 = _zip5(row.get("ZIP"))
            county_digits = "".join(ch for ch in str(row.get("COUNTY", "")).strip() if ch.isdigit())
            county_fips = county_digits.zfill(5) if county_digits else None
            ratio128 = _to_decimal128(row.get("TOT_RATIO"))

            if zip5 is None or county_fips is None or ratio128 is None:
                hud_rows_skipped_missing_required += 1
                continue

            # COUNTY-ONLY FILTER: Only accept counties present in the 050-only map
            county_name = fips_to_name.get(county_fips)
            if county_name is None:
                hud_rows_skipped_non_county_fips += 1
                continue

            hud_rows_considered_for_normalization += 1

            candidate = {
                "zip": zip5,
                "countyFips": county_fips,
                "countyName": county_name,  # should always be present due to filter
                "mainCity": _s(row.get("USPS_ZIP_PREF_CITY")),
                "state": _s(row.get("USPS_ZIP_PREF_STATE")),
                "percentOfZipInCounty": ratio128,  # Decimal128
            }

            current = best_by_zip.get(zip5)
            if current is None:
                best_by_zip[zip5] = candidate
            else:
                cur_val = current["percentOfZipInCounty"].to_decimal()
                new_val = ratio128.to_decimal()
                if new_val > cur_val:
                    best_by_zip[zip5] = candidate

        docs = list(best_by_zip.values())

        # Should be zero now, but keep metric anyway
        normalized_zips_missing_county_name = sum(1 for d in docs if not d.get("countyName"))

        hud_rows_written = _insert_many_in_batches(
            normalized_col, docs, batch_size=argBatchSize
        )

        # Helpful indexes for your common queries
        normalized_col.create_index("zip")
        normalized_col.create_index([("zip", 1), ("countyFips", 1)])
        county_lookup_col.create_index("Summary Level")
        county_lookup_col.create_index("State Code (FIPS)")
        county_lookup_col.create_index("County Code (FIPS)")
        census_crosswalk_col.create_index("ZIP")
        census_crosswalk_col.create_index("COUNTY")

    else:
        # If collection already exists, we still return validation stats from sources,
        # but we will not rewrite normalized data.
        hud_rows_read = len(hud_rows)
        # We won't recompute these without reading DB; leave them as 0 to avoid false claims.
        hud_rows_skipped_missing_required = 0
        hud_rows_skipped_non_county_fips = 0
        hud_rows_considered_for_normalization = 0
        hud_rows_written = 0
        normalized_zips_missing_county_name = 0

    # Validation: distinct zips in normalized collection
    normalized_distinct_zips = normalized_col.distinct("zip")
    normalized_distinct_zips_count = len(normalized_distinct_zips)

    return LoadResults(
        hud_rows_read=hud_rows_read,
        hud_rows_skipped_missing_required=hud_rows_skipped_missing_required,
        hud_rows_skipped_non_county_fips=hud_rows_skipped_non_county_fips,
        hud_rows_considered_for_normalization=hud_rows_considered_for_normalization,
        hud_rows_written=hud_rows_written,
        county_lookup_rows_read=county_lookup_rows_read,
        county_lookup_rows_written=county_lookup_rows_written,
        county_lookup_counties_in_map=county_lookup_counties_in_map,
        census_crosswalk_rows_read=census_crosswalk_rows_read,
        census_crosswalk_rows_written=census_crosswalk_rows_written,
        normalized_zips_missing_county_name=normalized_zips_missing_county_name,
        raw_hud_distinct_zips=raw_hud_distinct_zips,
        normalized_distinct_zips=normalized_distinct_zips_count,
    )



Driver

In [6]:
zipCountyCSVPath=r"C:\chatHealthy\Resources\ZIP_COUNTY_092025.csv"
CountyFipsNameFilePath=r"C:\chatHealthy\Resources\all-geocodes-v2021.csv"

stats = results = load_county_zip_with_census_collections(
    argDbConnection=DbUtil.getConnection(),
    argDatabaseName="PublicHealthData",
    argHudCountyZipCsvPath=zipCountyCSVPath,
    argCountyFipsNameFilePath=CountyFipsNameFilePath,
    argHudNormalizedCollectionName="CountyZipNormalized",
    argCountyLookupCollectionName="CountyFipsLookup",
    argCensusCrosswalkCollectionName="HudZipCountyRaw",
)
for key, value in stats.__dict__.items():
    print(f"{key}: {value}")

LoadResults(hud_rows_read=54574, hud_rows_skipped_missing_required=0, hud_rows_skipped_non_county_fips=0, hud_rows_considered_for_normalization=0, hud_rows_written=0, county_lookup_rows_read=43833, county_lookup_rows_written=0, county_lookup_counties_in_map=3221, census_crosswalk_rows_read=54574, census_crosswalk_rows_written=0, normalized_zips_missing_county_name=0, raw_hud_distinct_zips=39493, normalized_distinct_zips=39493)
