# INGEST.ipynb 
# Data Retrieval Tools

---
## Overview
- fetch_census_tracts.py
- fetch_nc_counties.py
- fetch_osm_outlets.py
- fetch_population.py
- fetch_usda_food_access.py

# 1 - fetch_census_tracts.py

This file retrieves North Carolina census tract data as a GeoPandas GeoDataFrame. 

NOTE: This project deals with some pretty heavy geographic data analysis, so here is a little bit of design background for a better understanding of how census tract data is constructed and how it is used in this application: 

Census tracts are small geographic units consisting of 1,200-8,000 people. They are designed for statistical analysis of localized areas, allowing for a deeper view than county or city-level analysis. The narrow size and permanent boundaries allow for an easy comparison across decades, and they can be used to study socioeconomic disparities. This project uses Cartographic Boundaries, which have simplified geometries comparied to the US Census Bureau's typical TIGER/Line files and excel in visualization over precision. The project is already heavy, so optimizing for speed allows the application to run effectively at a state-wide level. 

This file makes use of caching. Repeatedly retrieving remote data increases runtime and runs the risk of encountering network issues, so zip files are cached to avoid re-downloading with each run and avoiding network issues downstream. 

The zip file used in this analysis is a shapefile, which is the GIS standard for storing geographic features and their attributes. The contents of an shapefile include a .shp file (feature geometry), .shx file (geometry index), .dbf file (dBASE table containing feature attributes), and an optional .prj file (defines coordinate system). 

Shapefiles usually contain CRS metadata. CRS (Coordinate Reference System) brings context to coordinate data, explaining where coordinates are located, what units they use, and how they measure direction and angles. EPSG is a standardized registry of coordinate systems. Most US government census data uses EPSG:4269 (NAD83). However, this application uses EPSG:4326 instead, as do most web maps. An earlier version of this project used 4269, but the layers were incorrectly offset from their intended location. 

GEOIDs are hierarchical geographic identifiers. They are used across multiple datasets, including census datasets. They are the canonical join-keys for census geography. However, some shapefiles omit GEOIDs. Thus, this file generates one if it does not exist. 

# 1.1 - Imports and Globals

This section covers necessary imports and global variables.

Annotations/type hints are incorporated for clarity. Path is imported to work with file systems. Zipfile is used for reading/extracting `.zip` files. Requests is used for remote data retrieval. GeoPandas is used for geospatial data management. Additionally, a useful function `ensure_dir` from the `utils.cache` module is imported. 

`TIGER_TRACT_ZIP_URL` is a global variable that points to the Census TIGER/Cartographic Boundary shapefile ZIP for North Carolina census tracts in 2023, with a resolution of 500,000 generalized boundaries.

In [None]:
from __future__ import annotations

from pathlib import Path
import zipfile
import requests
import geopandas as gpd

from src.utils.cache import ensure_dir

TIGER_TRACT_ZIP_URL = "https://www2.census.gov/geo/tiger/GENZ2023/shp/cb_2023_37_tract_500k.zip"

# 1.2 - Download NC Tracts Zip

The function `download_nc_tracts_zip` simply downloads the census tract data from a cached directory or URL. The function accepts a Path `cache_dir`, and a string `url` that defaults to `TIGER_TRACT_ZIP_URL`. In practice, `cache_dir` and `url` are sources for accessing the census tract data. For speed and memory purposes, the function first attempts to retrieve data from `cache_dir` first, then resorts to `url` as a fallback if the first attempt fails. The function returns a Path, which is the path to the downloaded zip file.

Line-by-line breakdown:
- Before proceeding, ensures that `cache_dir` exists.
- Construct the output filename inside the cache director and store it as `out`.
- If the zip file exists locally, skip downloading and return the cached file path.
- Otherwise, send an HTTP Get request. Set `stream` to True to read the data in chunks, and set the request to timeout after hanging for 120 seconds.
- If HTTP request is not 200-ish (likely 404 or 500), then raise an exception immediately.
- Open output file `out` in write-binary mode.
- Iterate through the response body in 1 MB chunks
- Guard against keep-alive chunks, which are rare but possible.
- Write each chunk to disc.
- Return the Path to the downloaded zip file. 


In [None]:
def download_nc_tracts_zip(cache_dir: Path, url: str = TIGER_TRACT_ZIP_URL) -> Path:
    ensure_dir(cache_dir)
    out = cache_dir / "cb_2023_37_tract_500k.zip"
    if out.exists():
        return out
    r = requests.get(url, stream=True, timeout=120)
    r.raise_for_status()
    with out.open("wb") as f:
        for chunk in r.iter_content(chunk_size=1024 * 1024):
            if chunk:
                f.write(chunk)
    return out

# 1.3 - Extract Zip

This function extracts a zip file into a directory and returns the extraction directory path. The function accepts a Path `zip_path` and a Path `extract_dir`, and it returns a Path. In practice, `zip_path` is the location of the zip file to extract, `extract_dir` is the directory to extract into, and the return object is `extract_dir` which now contains the zip file contents.

Line-by-line breakdown:
- Ensure the extraction directory exists.
- Open the zip archive for reading.
- Extract all files from the zip file into `extract_dir`.
- Return the directory where files were extracted.

In [None]:
def extract_zip(zip_path: Path, extract_dir: Path) -> Path:
    ensure_dir(extract_dir)
    with zipfile.ZipFile(zip_path, "r") as z:
        z.extractall(extract_dir)
    return extract_dir

# 1.4 - Load NC Tracts GDF

This function downloads the zip if needed, extracts it, loads the tract shapefile into a GeoPandas dataframe, normalizes CRS, ensures a GEOID column exists, and returns a GeoDataFrame. The function accepts a Path `cache_dir` for use in the `download_nc_tracts_zip` function.

Line-by-line breakdown:
- Download the zip (or use cached) and store its path.
- Extract the zip contents into a subfolder inside `cache_dir`.
- Find shapefiles (.shp) in the extracted directory.
- If no shapefile exists, raise a clear error statement.
- Read the first shapefile found into a GeoDataFrame
- If the shapefile lacks CRS metadata, then assign CRS EPSG:4269 (NAD83).
- Reproject to EPSG:4326 (WGS84 lat/lon) — standard for web maps.
- If a tract identifier column does not exist, theb build a list of the component columns needed to construct GEOID (only include those that exist).
- If all three components exist, then construct GEOID by concatenating state FIPS, county FIPS, and tract code (all cast to strings to avoid numeric concatenation issues).
- If GEOID is missing and can't be built, raise a clear error statement.
- Return the resulting GeoPandas dataframe.

In [None]:
def load_nc_tracts_gdf(cache_dir: Path) -> gpd.GeoDataFrame:
    zip_path = download_nc_tracts_zip(cache_dir)
    shp_dir = extract_zip(zip_path, cache_dir / "cb_2023_37_tract_500k")
    shp_files = list(shp_dir.glob("*.shp"))
    if not shp_files:
        raise FileNotFoundError(f"No .shp found in {shp_dir}")
    gdf = gpd.read_file(shp_files[0])
    if gdf.crs is None:
        gdf = gdf.set_crs("EPSG:4269", allow_override=True)
    gdf = gdf.to_crs("EPSG:4326")
    if "GEOID" not in gdf.columns:
        cols = [c for c in ["STATEFP", "COUNTYFP", "TRACTCE"] if c in gdf.columns]
        if len(cols) == 3:
            gdf["GEOID"] = (
                gdf["STATEFP"].astype(str)
                + gdf["COUNTYFP"].astype(str)
                + gdf["TRACTCE"].astype(str)
            )
        else:
            raise KeyError("GEOID not found and cannot be constructed from STATEFP/COUNTYFP/TRACTCE.")
    return gdf


# 2 - fetch_nc_counties.py

This file is responsible for retrieving North Carolina county data. The retrieved data is a GeoDataFrame containing county names and the associated geometry polygons. 

# 2.1 - Imports and Globals

This section covers necessary imports and global variables.

Path is imported to work with file systems. GeoPandas is used for geospatial data management. Requests is used for remote data retrieval. Zipfile is used for reading/extracting `.zip` files. Additionally, a useful function `ensure_dir` from the `utils.cache` module is imported. 

`TIGER_COUNTY_URL` is a global variable that points to the Census TIGER/Line shapefile ZIP for all US counties in 2022, which will be filtered to only NC later.

In [None]:
from pathlib import Path
import geopandas as gpd
import requests
import zipfile

from src.utils.cache import ensure_dir

TIGER_COUNTY_URL = (
    "https://www2.census.gov/geo/tiger/TIGER2022/COUNTY/tl_2022_us_county.zip"
)

# 2.2 - Load NC Counties

This function is responsible for loading North Carolina counties as a GeoPandas GeoDataFrame and returning it. The function accepts a Path `cache_dir`, which in practice is a cached directory that holds the county data for quick and easy retrieval. 

Line-by-line breakdown:
- Make sure `cache_dir` exists before saving zip file or extracted files.
- The path name of the downloaded zip file in the cached diectory is stored as `zip_path`.
- If the zip file is located in the cache, then skip the downloads. Otherwise, download county data with requests, throw an error if it fails, and write to output file (binary writing, 1 MB chunks).
- Create `extract_dir`, the folder to store extract files.
- If already extracted, skip extraction.
- Open zip file and extract contents into `extract_dir`.
- Find all shapefiles in the extraction directory, and throw error if no shapefiles exist.
- Load shapefile as a GeoDataFrame and convert to EPSG:4326.
- Filter GeoDataFrame using the FIPS code "37" (a string, not an integer).
- Return only the county name and the geometry polygon (everything else is not needed for this project).

In [None]:
def load_nc_counties(cache_dir: Path) -> gpd.GeoDataFrame:
    ensure_dir(cache_dir)

    zip_path = cache_dir / "tl_2022_us_county.zip"

    # Download once
    if not zip_path.exists():
        print("   downloading TIGER/Line US counties…")
        r = requests.get(TIGER_COUNTY_URL, stream=True, timeout=120)
        r.raise_for_status()
        with zip_path.open("wb") as f:
            for chunk in r.iter_content(1024 * 1024):
                if chunk:
                    f.write(chunk)

    extract_dir = cache_dir / "tl_2022_us_county"
    if not extract_dir.exists():
        with zipfile.ZipFile(zip_path, "r") as z:
            z.extractall(extract_dir)

    shp_files = list(extract_dir.glob("*.shp"))
    if not shp_files:
        raise FileNotFoundError("County shapefile not found after extraction.")

    gdf = gpd.read_file(shp_files[0]).to_crs("EPSG:4326")

    # Filter to North Carolina (STATEFP = '37')
    gdf = gdf[gdf["STATEFP"] == "37"]

    # Keep only what we need
    return gdf[["NAME", "geometry"]]

# 3 - fetch_osm_outlets.py

This file is responsible for accessing OpenStreetMap and retrieving food location data, the key piece to determining food access conditions. Additionally, the file retrieves healthy and unhealthy outlets for nutritional comparison.

# 3.1 - Imports and Globals

This section is responsible for necessary imports and globals.

Annotations/type hints are included for clarity. Path is used for file system management. Json, dataclass, and Pandas are imported for data management. Requests is used for data retrieval, and time is used for separating queries. 

`OVERPASS_URLS` contains three urls corresponding to different Overpass servers. Since Overpass frequently rate-limits, having multiple options is robust.

BoundingBox is an object holding south latitude, west longitude, north latitude, and east longitude. The default bounding box is the North Carolina region.

In [None]:
from __future__ import annotations
from pathlib import Path
import json
from dataclasses import dataclass
import requests
import time
import pandas as pd

OVERPASS_URLS = [
    "https://overpass-api.de/api/interpreter",
    "https://overpass.kumi.systems/api/interpreter",
    "https://overpass.nchc.org.tw/api/interpreter",
]


@dataclass(frozen=True)
class BoundingBox:
    south: float
    west: float
    north: float
    east: float

NC_BBOX = BoundingBox(south=33.75, west=-84.45, north=36.6, east=-75.4)

# 3.2 - Overpass Query

This function wraps a query body inside a full Overpass QL query. The function accepts a BoundingBox `bbox` (region) and a string `q_body` (query body). The function returns a string, which in this case is an Overpass QL query.

NOTE: Both ways and nodes are fetched in this file. `out center` ensures that Overpass returns a center coordinate for ways (polygons) and lat/lon for nodes (points). 

In [None]:
def _overpass_query(bbox: BoundingBox, q_body: str) -> str:
    return f"""[out:json][timeout:180];
(
{q_body}
);
out center;"""

# 3.3 - Fetch Overpass

This function actually fetches data from Overpass using a string `query`, and it returns a dictionary representing a JSON response. 

Line-by-line breakdown:
- `last_err` will be used to store errors for reporting.
- Loop through each url, trying a request with a 5 minute client-side timeout
- Throw an error for HTTP 400ish/500ish errors
- Return the parsed JSON response
- If Exception occurs, then store error.
- Since a success would have led to an early return, the end of this function raises a RuntimeError to indicate response failure along with the exact error `last_err`.

In [None]:
def _fetch_overpass(query: str) -> dict:
    last_err = None
    for url in OVERPASS_URLS:
        try:
            r = requests.post(url, data={"data": query}, timeout=300)
            r.raise_for_status()
            return r.json()
        except Exception as e:
            last_err = e
            time.sleep(5)
    raise RuntimeError(f"All Overpass endpoints failed. Last error: {last_err}")

# 3.3 - Fetch With Cache

This function is responsible for fetching data locally. `CACHE_DIR` is the cache directory location. The function accepts a string `cache_name` and a string `query`, and it returns a dictionary (JSON data).

Line-by-line breakdown:
- Ensure cache directory exists. 
- If cached response exists, load JSON and return.
- Otherwise, fetch data from overpass using the `query` parameter.
- Write JSON to disk, and return data.

In [None]:
CACHE_DIR = Path(__file__).resolve().parents[3] / "data" / "raw" / "cache"

def _fetch_with_cache(cache_name: str, query: str) -> dict:
    CACHE_DIR.mkdir(parents=True, exist_ok=True)
    path = CACHE_DIR / cache_name
    if path.exists():
        return json.loads(path.read_text())
    data = _fetch_overpass(query)
    path.write_text(json.dumps(data))
    return data

# 3.4 - Elements To Points

This function takes Overpass elements (nodes and ways), and returns a dataframe of normalized point records. The parameters are a list of dictionaries `elements` and a string `outlet_type`.

Line-by-line breakdown:
- Build a list to store row dictionaries.
- Iterate through each element.
- Overpass elements store metadate in a `tags` dictionary.
- Derive a human-readable name, using "Unknown" as a fallback.
- Handle both node and way geometry types, skipping elements without coordinates.
- Create a row consisting of a name/label, latitude, longitude, outlet type (healthy or unhealthy) and a `osm_id`, and append to `rows`.
- After the loop exits, return the row dictionary.

In [None]:
def _elements_to_points(elements: list[dict], outlet_type: str) -> pd.DataFrame:
    rows = []
    for el in elements:
        tags = el.get("tags", {})
        name = tags.get("name") or tags.get("brand") or tags.get("operator") or "Unknown"
        lat = el.get("lat") or (el.get("center") or {}).get("lat")
        lon = el.get("lon") or (el.get("center") or {}).get("lon")
        if lat is None or lon is None:
            continue
        rows.append({
            "name": name,
            "lat": float(lat),
            "lon": float(lon),
            "outlet_type": outlet_type,
            "osm_id": f"{el.get('type','')}/{el.get('id','')}",
        })
    return pd.DataFrame(rows)

# 3.5 - Fetch Healthy Outlets

This function retrieves healthy food outlets from OSM. The function accepts a BoundingBox `bbox` (defaulting to NC region) and returns a Pandas dataframe consisting of healthy food outlet point data.

Line-by-line breakdown:
- Extract bounding box coordinates.
- Build query body: pulling point and polygon features for supermarkets, grocery stoers, and markets. 
- Wrap query body into a full Overpass query, using `_fetch_with_cache` to use cached JSON if it exists.
- Convert Raw OSM elements into points and return.

In [None]:
def fetch_healthy_outlets(bbox: BoundingBox = NC_BBOX) -> pd.DataFrame:
    s,w,n,e = bbox.south, bbox.west, bbox.north, bbox.east
    q_body = f"""
node[shop=supermarket]({s},{w},{n},{e});
way[shop=supermarket]({s},{w},{n},{e});
node[shop=grocery]({s},{w},{n},{e});
way[shop=grocery]({s},{w},{n},{e});
node[amenity=marketplace]({s},{w},{n},{e});
way[amenity=marketplace]({s},{w},{n},{e});
"""
    data = _fetch_with_cache("osm_healthy.json", _overpass_query(bbox, q_body))
    return _elements_to_points(data.get("elements", []), "healthy")

# 3.6 - Fetch Unhealthy Outlets

This function retrieves unhealthy food outlets from OSM. The function accepts a BoundingBox `bbox` (defaulting to NC region) and returns a Pandas dataframe consisting of unhealthy food outlet point data.

Line-by-line breakdown:
- Extract bounding box coordinates.
- Build query body: pulling point and polygon features for fast food locations
- Wrap query body into a full Overpass query, using `_fetch_with_cache` to use cached JSON if it exists.
- Convert Raw OSM elements into points and return.

In [None]:
def fetch_unhealthy_outlets(bbox: BoundingBox = NC_BBOX) -> pd.DataFrame:
    s,w,n,e = bbox.south, bbox.west, bbox.north, bbox.east
    q_body = f"""
node[amenity=fast_food]({s},{w},{n},{e});
way[amenity=fast_food]({s},{w},{n},{e});
"""
    data = _fetch_with_cache("osm_unhealthy.json", _overpass_query(bbox, q_body))
    return _elements_to_points(data.get("elements", []), "unhealthy")

# 4 - fetch_population.py

This file is responsible for retrieving population data for each census tract. 

# 4.1 - Imports and Globals

This section covers necessary imports and global variables.

Annotations/type hints are incorporated for clarity. Path is imported to work with file systems. Requests is used for remote data retrieval. Pandas is used for data management. 

`ACS_URL` is a global variable that points to the ACS 5-year estimates population survey from 2022. This project specifically utilizes ACS 5-year because it covers all census tracts, it is more statistically stable than a 1-year estimate, and it is the standard choice for tract-level analysis. 

In [None]:
from __future__ import annotations
from pathlib import Path
import requests
import pandas as pd

ACS_URL = "https://api.census.gov/data/2022/acs/acs5"

# 4.2 - Fetch NC Tract Population

This function retrieves the total population for each census tract in North Carolina based on the ACS 5-year estimates. The function returns a Pandas dataframe containing a GEOID and the corresponding population.

Line-by-line breakdown:
- Initialize `params`, a dictionary containing a Census API query. "B01003_001E" is a total population table estimate, "tract:*" requests all tracts, and "state:37" refers to North Carolina. 
- Make the request, and timeout after 60 seconds of inactivity. Throw an error statement if it fails.
- Store the JSON object as `data`.
- Convert data into a dataframe `df`.
- Generate a GEOID manually.
- Clean population values and ensure they are integers.
- Return the dateframe, including only GEOID and associated population values.

In [None]:
def fetch_nc_tract_population() -> pd.DataFrame:
    """
    Fetch total population per census tract in North Carolina
    using ACS 5-year estimates.
    """
    params = {
        "get": "B01003_001E",
        "for": "tract:*",
        "in": "state:37",
    }

    r = requests.get(ACS_URL, params=params, timeout=60)
    r.raise_for_status()
    data = r.json()

    df = pd.DataFrame(data[1:], columns=data[0])
    df["GEOID"] = (
        df["state"]
        + df["county"]
        + df["tract"]
    )

    df["population"] = pd.to_numeric(df["B01003_001E"], errors="coerce").fillna(0).astype(int)

    return df[["GEOID", "population"]]

# 5 - fetch_usda_food_access.py

This file is responsible for retrieving and loading USDA food access data. 

# 5.1 - Imports and Globals

This section covers necessary imports and global variables.

Annotations/type hints are incorporated for clarity. Path is imported to work with file systems. Pandas is used for data management. Requests is used for remote data retrieval. Additionally, a useful function `ensure_dir` from the `utils.cache` module is imported. 

`DEFAULT_LOCAL_NAME` is a global variable that points to a local .csv file containing food access data from the USDA. 

In [None]:
from __future__ import annotations
from pathlib import Path
import pandas as pd
import requests
from src.utils.cache import ensure_dir

DEFAULT_LOCAL_NAME = "usda_food_access.csv"

# 5.2 - Get USDA Food Access

This function ensures that the USDA Food Access CSV exists by finding it or writing it to disk. The function accepts a Path `cache_dir` (directory on desk to store file), and a string `url` (url to retrieve data from). The function returns a Path (local path to file).

Line-by-line breakdown:
- Guarantee a cache directory exists before writing.
- Construct the full path to the expected CSV file.
- Early return if file exists already.
- If file does not exist and no URL is provided, then throw a clear error.
- Otherwise, make request.
- Throw error if status is 400ish/500ish.
- Open destination file in binary write mode.
- Iterate through 1 MB chunks, writing only non-empty chunks.
- Return local path.

In [None]:
def get_usda_food_access(cache_dir: Path, url: str | None = None) -> Path:
    ensure_dir(cache_dir)
    out = cache_dir / DEFAULT_LOCAL_NAME
    if out.exists():
        return out
    if not url:
        raise FileNotFoundError(
            f"USDA food access CSV not found at {out}. "
            "Place it there or pass a download URL via url=..."
        )
    r = requests.get(url, stream=True, timeout=120)
    r.raise_for_status()
    with out.open("wb") as f:
        for chunk in r.iter_content(chunk_size=1024 * 1024):
            if chunk:
                f.write(chunk)
    return out

# 5.3 - Load USDA Food Access

This function loads the USDA csv from the disk. The function accepts a Path `cache_dir` (location of file on disk) and a string `url` (url to download data from), and it returns a Pandas dataframe (csv file of data, converted into a dataframe). This function simply wraps the get function above. 

In [None]:
def load_usda_food_access(cache_dir: Path, url: str | None = None) -> pd.DataFrame:
    return pd.read_csv(get_usda_food_access(cache_dir, url=url), low_memory=False)