# Census TIGER/Line Data Loader

This notebook loads US Census TIGER/Line shapefiles into Delta tables with the following features:
* **Parameterized** - Ready for job/pipeline execution
* **Prerequisite** - Catalog and schemas must be created first (use the setup notebook)

## Parameters

| Parameter | Description | Example |
|-----------|-------------|----------|
| `catalog` | Unity Catalog name | `odi_datalake` |
| `schema_name` | Schema name | `odi_bronze` |
| `tiger_year` | Census year (4 digits) | `2020` |
| `state` | US state abbreviation | `CA` |

## Prerequisites

* Unity Catalog and schemas must already exist
* Run the setup notebook first to create:
  * Catalog: `odi_datalake`
  * Schemas: `odi_bronze`, `odi_silver`, `odi_gold`
  * Volume: `supporting_geometry_files`

## Notes

* This notebook assumes the catalog and schema already exist
* TIGER/Line data will be loaded into the specified catalog and schema
* Tables are prefixed with `tiger_` (e.g., `tiger_counties`, `tiger_tracts`)

In [0]:
# Install required packages for loading the census Tiger shape files
# Note specific package versions can be pinned for reporoducability
# All packages already exist in the serverless runtime except pygris 
# Additional packages are installed here for illustration
%pip install pygris geopandas shapely --quiet
dbutils.library.restartPython()

In [0]:
from pygris import (
    block_groups,
    blocks,
    coastline,
    combined_statistical_areas,
    core_based_statistical_areas,
    counties,
    county_subdivisions,
    divisions,
    nation,
    native_areas,
    places,
    primary_roads,
    primary_secondary_roads,
    pumas,
    rails,
    regions,
    states,
    tracts,
    tribal_block_groups,
    tribal_subdivisions_national,
    urban_areas,
)
import geopandas as gpd
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

In [0]:
# Get parameters from widgets
CATALOG = dbutils.widgets.get("catalog")
schema_name = dbutils.widgets.get("schema_name")
YEAR = int(dbutils.widgets.get("tiger_year"))
STATE = dbutils.widgets.get("state")

In [0]:
def gdf_to_delta(gdf: gpd.GeoDataFrame, catalog: str, schema: str, table_name: str, year: int) -> None:
    """Convert a GeoDataFrame to a Delta table in Unity Catalog.
    
    Args:
        gdf: GeoDataFrame with geometry column
        catalog: Unity Catalog name
        schema: Schema name
        table_name: Table name
        year: Census year for the data
    """
    # Ensure CRS is WGS84 (EPSG:4326)
    if gdf.crs is not None and gdf.crs.to_epsg() != 4326:
        gdf = gdf.to_crs(epsg=4326)
    
    # Reset index to avoid non-standard index issues
    gdf = gdf.reset_index(drop=True)
    
    # Add year column to track data vintage
    gdf['year'] = year
    
    # Convert geometry to WKb using vectorized operation
    gdf['geometry_wkb'] = gdf.geometry.to_wkb()
    
    # Drop the original geometry column (can't directly convert to Spark)
    gdf_no_geom = gdf.drop(columns=['geometry'])
    
    # Convert to Spark DataFrame
    spark_df = spark.createDataFrame(gdf_no_geom)
    
    # Create fully qualified table name
    full_table_name = f"{catalog}.{schema}.{table_name}"
    
    # Overwrite the table (creates if doesn't exist, replaces if exists)
    spark_df.write.format("delta").mode("overwrite").saveAsTable(full_table_name)
    
    print(f"✓ Wrote {len(gdf)} rows to {full_table_name}")
    
    # Optimize table for better query performance
    spark.sql(f"OPTIMIZE {full_table_name}")
    print(f"  ✓ Optimized {full_table_name}")

In [0]:
def load_geo_data(
    catalog: str, 
    schema: str, 
    year: int, 
    state: str = "CA"
) -> None:
    
    print(f"Downloading data for {state} in year {year}")
    print(f"Target location: {catalog}.{schema}")
    print("=" * 60)
    
    # State-specific loaders
    ca_loaders = {
        "counties": counties,
        "tracts": tracts,
        "block_groups": block_groups,
        "blocks": blocks,
        "places": places,
        "pumas": pumas,
        "county_subdivisions": county_subdivisions,
        "primary_secondary_roads": primary_secondary_roads,
    }
    
    # US-wide loaders
    us_loaders = {
        "coastline": coastline,
        "divisions": divisions,
        "nation": nation,
        "native_areas": native_areas,
        "primary_roads": primary_roads,
        "rails": rails,
        "regions": regions,
        "states": states,
        "tribal_block_groups": tribal_block_groups,
        "tribal_subdivisions_national": tribal_subdivisions_national,
        "urban_areas": urban_areas,
        "core_based_statistical_areas": core_based_statistical_areas,
        "combined_statistical_areas": combined_statistical_areas,
    }
    
    # Load state-specific data
    print(f"\nLoading {state}-specific datasets...")
    for table_name, loader in ca_loaders.items():
        try:
            table_name = "tiger_" + table_name
            print(f"  Loading {table_name}...", end=" ")
            gdf = loader(state=state, year=year)
            gdf_to_delta(gdf, catalog, schema, table_name, year)
        except ValueError as e:
            print(f"\n  ✗ ValueError for {table_name}: {e}")
        except Exception as e:
            print(f"\n  ✗ Error for {table_name}: {type(e).__name__}: {e}")
    
    # Load US-wide data
    print(f"\nLoading US-wide datasets...")
    for table_name, loader in us_loaders.items():
        try:
            table_name = "tiger_" + table_name
            print(f"  Loading {table_name}...", end=" ")
            gdf = loader(year=year)
            gdf_to_delta(gdf, catalog, schema, table_name, year)
        except ValueError as e:
            print(f"\n  ✗ ValueError for {table_name}: {e}")
        except Exception as e:
            print(f"\n  ✗ Error for {table_name}: {type(e).__name__}: {e}")
    
    print("\n" + "=" * 60)
    print("✓ Data loading complete!")

In [0]:
# Run the data loading
load_geo_data(
    catalog=CATALOG,
    schema=schema_name,
    year=YEAR,
    state=STATE
)