In [0]:
%pip install geopandas
%pip install shapely
%pip install fiona

## STEP 1: Prepare Environment

In [0]:
import requests
import os
import dlt
import zipfile
import geopandas as gpd
import pandas as pd
from shapely import wkt
from pyspark.sql import SparkSession
from datetime import datetime
from dateutil.parser import isoparse



## STEP 2: Read Data from Volume into Spark DataFrame

In [0]:
# Spark session
spark = SparkSession.getActiveSession()

# Volume path
zip_dir = "/Volumes/emory_cidmath_data_hub/geography/census_tigerline/state"

# Get all entries that are directories and look like ISO 8601 timestamps
timestamp_dirs = [
    d for d in os.listdir(zip_dir)
    if os.path.isdir(os.path.join(zip_dir, d))
    and d[:4].isdigit()
]

# Parse the timestamp strings into datetime objects
parsed_dirs = [(d, isoparse(d)) for d in timestamp_dirs]

# Find the directory with the max datetime
latest_dir = max(parsed_dirs, key=lambda x: x[1])[0]

# Full path to the latest directory
latest_dir_path = os.path.join(zip_dir, latest_dir)

# Temporary local extraction path
extract_root = os.path.join(latest_dir_path, "shapefiles")
os.makedirs(extract_root, exist_ok=True)

# Helper: extract shapefile and read with geopandas
def read_shapefile_from_zip(zip_path):
    with zipfile.ZipFile(zip_path, "r") as zip_ref:
        zip_ref.extractall(extract_root)
        shp_file = [f for f in zip_ref.namelist() if f.endswith(".shp")][0]
        gdf = gpd.read_file(os.path.join(extract_root, shp_file))
        gdf["geometry"] = gdf["geometry"].apply(lambda geom: geom.wkt)  # convert for Spark
        return pd.DataFrame(gdf)

# Union of all GeoPandas DataFrames
all_dfs = []
for zip_file in os.listdir(latest_dir_path):
    if zip_file.endswith(".zip"):
        zip_path = os.path.join(latest_dir_path, zip_file)
        try:
            pdf = read_shapefile_from_zip(zip_path)
            if zip_file == 'tl_2000_us_state.zip':
                rename_map = {col: col.replace("00", "") for col in pdf.columns if col.endswith("00")}
                pdf.rename(columns=rename_map, inplace=True)
            elif zip_file == 'tl_2010_us_state.zip':
                rename_map = {col: col.replace("10", "") for col in pdf.columns if col.endswith("10")}
                pdf.rename(columns=rename_map, inplace=True) 
            pdf["zip_filename"] = zip_file
            all_dfs.append(pdf)
        except Exception as e:
            print(f"❌ Failed to process {zip_file}: {e}")

# Combine and convert to Spark
if all_dfs:
    combined_pdf = pd.concat(all_dfs, ignore_index=True)
    df_spark = spark.createDataFrame(combined_pdf)
else:
    df_spark = spark.createDataFrame([], schema="GEOID STRING")

  gdf["geometry"] = gdf["geometry"].apply(lambda geom: geom.wkt)  # convert for Spark
  gdf["geometry"] = gdf["geometry"].apply(lambda geom: geom.wkt)  # convert for Spark
  gdf["geometry"] = gdf["geometry"].apply(lambda geom: geom.wkt)  # convert for Spark
  gdf["geometry"] = gdf["geometry"].apply(lambda geom: geom.wkt)  # convert for Spark
  gdf["geometry"] = gdf["geometry"].apply(lambda geom: geom.wkt)  # convert for Spark
  gdf["geometry"] = gdf["geometry"].apply(lambda geom: geom.wkt)  # convert for Spark
  gdf["geometry"] = gdf["geometry"].apply(lambda geom: geom.wkt)  # convert for Spark
  gdf["geometry"] = gdf["geometry"].apply(lambda geom: geom.wkt)  # convert for Spark
  gdf["geometry"] = gdf["geometry"].apply(lambda geom: geom.wkt)  # convert for Spark
  gdf["geometry"] = gdf["geometry"].apply(lambda geom: geom.wkt)  # convert for Spark
  gdf["geometry"] = gdf["geometry"].apply(lambda geom: geom.wkt)  # convert for Spark
  gdf["geometry"] = gdf["geometry"].apply(lambda geom:

Name,Type
REGION,string
DIVISION,string
STATEFP,string
STATENS,string
GEOID,string
STUSPS,string
NAME,string
LSAD,string
MTFCC,string
FUNCSTAT,string


## STEP 3: Write Delta Live Table Materialized View

In [0]:
@dlt.table(
  name="bronze_census_tigerline_state",
  comment="Raw ingestion of Census TigerLine state geography data"
)
def bronze_census_tigerline_state():
  return (
    df_spark
    .drop('GEOIDFQ')
    .orderBy(["zip_filename", "GEOID"], ascending=[True, True])
  )