In [1]:
# Importing required libraries
import os

import sys
import logging
from typing import Literal
import shutil

import pandas as pd
import numpy as np
import geopandas as gpd
import json
import shapely.geometry
import duckdb
from shapely.geometry import mapping
from shapely.geometry import box
import fiona
import pyarrow.parquet as pq
import pyarrow as pa


from google.cloud import storage

# Import utility constants and functions
import utils

In [2]:
# Define constants for the project
#POC_DATASET = 'encumbered_parcels'
#POC_TABLE = 'parcels'
GEO_CRS = "EPSG:4326"
PROJECTED_CRS = "EPSG:3857" 
ENCUMBRANCES = [
    'roadways',
    'railways',
    'protected_lands',
    'wetlands',
    'transmission_lines',
    ]
EncumbranceType = Literal[
    'roadways',
    'railways',
    'protected_lands',
    'wetlands',
    'transmission_lines',
]

STATE_ABBREV_TO_FIPS = {
    'AK': '02',
    'AL': '01',
    'AZ': '04',
    'AR': '05',
    'CA': '06',
    'CO': '08',
    'CT': '09',
    'DE': '10',
    'FL': '12',
    'GA': '13',
    'HI': '15',
    'ID': '16',
    'IL': '17',
    'IN': '18',
    'IA': '19',
    'KS': '20',
    'KY': '21',
    'LA': '22',
    'ME': '23',
    'MD': '24',
    'MA': '25',
    'MI': '26',
    'MN': '27',
    'MS': '28',
    'MO': '29',
    'MT': '30',
    'NE': '31',
    'NV': '32',
    'NH': '33',
    'NJ': '34',
    'NM': '35',
    'NY': '36',
    'NC': '37',
    'ND': '38',
    'OH': '39',
    'OK': '40',
    'OR': '41',
    'PA': '42',
    'RI': '44',
    'SC': '45',
    'SD': '46',
    'TN': '47',
    'TX': '48',
    'UT': '49',
    'VT': '50',
    'VA': '51',
    'WA': '53',
    'WV': '54',
    'WI': '55',
    'WY': '56',
}
# Define paths for local data
LOCAL_DATA_FOLDER = r"C:\Users\eprashar\OneDrive - CoreLogic Solutions, LLC\github\jan_25_proj_infra_parcels\data" # This is the local path where data files are stored

# Define paths for various data files
COUNTY_DATA = r"counties\tl_2024_us_county\tl_2024_us_county.shp" # This is the path to the shapefile with county boundaries from CENSUS

# Line datasets
RAILWAYS_DATA = r"NTAD_North_American_Rail_Network_Lines\NARN.gdb" # This is the path to the railways data from North American Rail Network (Dept. of Transportation)
TRANSMISSION_LINES_DATA = r"transmission_lines\Transmission_Lines.shp" # This is the path to the transmission lines data downloaded from Homeland Security's Data Downloads
ROADWAYS_DATA = r"NTAD_North_American_Roads\North_American_Roads.shp" # This is the path to the roadways data from North American Roads (Dept. of Transportation)

# Polygon datasets
PROTECTED_LANDS_NATIONAL = r"protected_lands_national\PADUS4_1VectorAnalysis_PADUS_Only\PADUS4_1VectorAnalysis_PADUS_Only.gdb" # This is the path to the protected lands data downloaded from PAD-US (Protected Areas Database of the US)
WETLANDS = r"C:\Users\eprashar\OneDrive - CoreLogic Solutions, LLC\github\jan_25_proj_infra_parcels\data\Wetlands" # USGS Wetlands data is stored at a state-level granularity in this folder
WETLAND_ATTRIBUTES = r"C:\Users\eprashar\OneDrive - CoreLogic Solutions, LLC\github\jan_25_proj_infra_parcels\data\Wetlands\NWI-Code-Definitions\NWI-Code-Definitions\NWI_Code_Definitions.gdb"

# Output paths
PARQUET_INGESTION_PATH = r"C:\Users\eprashar\OneDrive - CoreLogic Solutions, LLC\github\jan_25_proj_infra_parcels\data\ingestion_parquets" # This is the path where the parquet files will be stored for ingestion into BigQuery
WETLAND_COUNTY_FILES = os.path.join(PARQUET_INGESTION_PATH, "wetland_county_level") # This is the path where the county-level wetland files will be stored. These are then uploaded to BQ using code in this file

In [None]:
# DuckDB doesn't have something along the lines of geopandas for CRS conversion
# So check the CRS of each data source here and manually define it in config object below for later use
# Sample code for roadways
path = os.path.join(LOCAL_DATA_FOLDER, ROADWAYS_DATA)
gdf = gpd.read_file(path)
gdf.crs

# Another way to do this is load the file in QGIS and copy paste the CRS from there

<Geographic 2D CRS: EPSG:4326>
Name: WGS 84
Axis Info [ellipsoidal]:
- Lat[north]: Geodetic latitude (degree)
- Lon[east]: Geodetic longitude (degree)
Area of Use:
- name: World.
- bounds: (-180.0, -90.0, 180.0, 90.0)
Datum: World Geodetic System 1984 ensemble
- Ellipsoid: WGS 84
- Prime Meridian: Greenwich

In [21]:
# Define Config object to hold configuration settings
# Projection conversions are super tricky to handle in DuckDb and need some manual QA 
# always_xy := TRUE is a duckdb-specific argument to ensure that the first coordinate is always treated as X (longitude) and the second as Y (latitude), regardless of the CRS definition.
DATASET_CONFIG = {
    'transmission_lines': {
        'path': TRANSMISSION_LINES_DATA,
        'source_crs': 'EPSG:3857', # Main pitfall of DudkDB is we have to manually set the source CRS. I did this using gdf.crs from the geopandas read_file method. X[east]. Y[north] 
        'force_xy': True,
        'columns_to_drop': ['OBJECTID', 'SOURCE', 'SOURCEDATE', 'VAL_METHOD', 'VOLTAGE', 'INFERRED', 'SUB_1', 'SUB_2']
    },
    'railways': {
        'path': RAILWAYS_DATA,
        'read_kwargs': {'layer': 'North_American_Rail_Network_Lines'},
        'source_crs': 'EPSG:3857', # Same note as in transmission lines 
        'force_xy': True,
        'columns_to_drop': ['FRFRANODE', 'TOFRANODE', 'STFIPS', 'CNTYFIPS', 'STATEAB', 'COUNTRY',
                            'FRADISTRCT', 'RROWNER1', 'RROWNER2', 'RROWNER3', 'TRKRGHTS1', 'TRKRGHTS2',
                            'TRKRGHTS3', 'TRKRGHTS4', 'TRKRGHTS5', 'TRKRGHTS6', 'TRKRGHTS7', 'TRKRGHTS8',
                            'TRKRGHTS9', 'DIVISION', 'SUBDIV', 'BRANCH', 'YARDNAME', 'PASSNGR', 'STRACNET',
                            'TRACKS', 'NET', 'MILES', 'TIMEZONE', 'SHAPE_Length']
    },
    'roadways': {
        'path': ROADWAYS_DATA,
        'source_crs': 'EPSG:4326', # Lat North; Long East
        'force_xy': True,
        'filter_clause': "COUNTRY = 2", # Filter for USA roads
        'columns_to_drop': ['DIR', 'LINKID', 'JURISCODE', 'ROADNUM', 'CLASS', 'NHS', 'COUNTRY']
    },
    # Projection information: https://www.fws.gov/node/264848
   'wetlands': {
        'gdb_config': lambda state: {'folder': WETLANDS, 'subfolder': f"{state}_geodatabase_wetlands", 'gdb_name': f"{state}_geodatabase_wetlands.gdb"},
        'source_crs_lookup': lambda state: {
            'AK': ('EPSG:3338', True),
            'HI': ('PROJCS["NAD_1983_Albers",BASEGEOGCRS["NAD83",DATUM["North American Datum 1983",ELLIPSOID["GRS 1980",6378137,298.257222101,LENGTHUNIT["metre",1]],ID["EPSG",6269]],PRIMEM["Greenwich",0,ANGLEUNIT["Degree",0.0174532925199433]]],CONVERSION["unnamed",METHOD["Albers Equal Area",ID["EPSG",9822]],PARAMETER["Latitude of false origin",3,ANGLEUNIT["Degree",0.0174532925199433],ID["EPSG",8821]],PARAMETER["Longitude of false origin",-157,ANGLEUNIT["Degree",0.0174532925199433],ID["EPSG",8822]],PARAMETER["Latitude of 1st standard parallel",8,ANGLEUNIT["Degree",0.0174532925199433],ID["EPSG",8823]],PARAMETER["Latitude of 2nd standard parallel",18,ANGLEUNIT["Degree",0.0174532925199433],ID["EPSG",8824]],PARAMETER["Easting at false origin",0,LENGTHUNIT["metre",1],ID["EPSG",8826]],PARAMETER["Northing at false origin",0,LENGTHUNIT["metre",1],ID["EPSG",8827]]],CS[Cartesian,2],AXIS["(E)",east,ORDER[1],LENGTHUNIT["metre",1,ID["EPSG",9001]]],AXIS["(N)",north,ORDER[2],LENGTHUNIT["metre",1,ID["EPSG",9001]]]]', True)
        }.get(state, ('EPSG:5070', True))
    },
    'protected_lands_national': {
        'path': PROTECTED_LANDS_NATIONAL,
        'read_kwargs': {'layer':'PADUS4_1VectorAnalysis_PADUS_Only_Simp_SingP'},
        'source_crs': 'EPSG:5070', # NAD83 Albers
        'force_xy': True,
        'columns_to_drop': ['FID_GAP_Sts14_13_12_12_11', 'Agg_Src', 'ShL_ShA', 'DupShL_ShA', 
                            'RevOID', 'Shp_AreaDup', 'GIS_Acres', 'BndryName', 'BndryExten', 
                            'BndryID', 'GIS_AcrsDb', 'InPoly_FID', 'SimPgnFlag', 
                            'MaxSimpTol', 'MinSimpTol', 'Shape_Length'
                            ]
    }
}

In [22]:
# Processor class to load a dataset, make valid geometries, transform CRS, drop columns and save to Parquet
# This class handles both national datasets and state-specific datasets based on the provided configuration
# Data can be saved for state even if the dataset is national (e.g. roads)
# To save datasets at a state level, provide the state abbreviation (e.g. 'CA') when initializing the class
# This class leverages county polygons from CENSUS to then assign fips codes to the layer for the assigned state
class GeospatialProcessor:
    """
    A class to process large geospatial datasets using a memory-efficient,
    config-driven DuckDB pipeline.
    """
    def __init__(self, dataset, state=None):
        self.dataset = dataset
        self.state = state
        self.config = DATASET_CONFIG.get(dataset)
        if not self.config:
            raise ValueError(f"Unsupported dataset: {self.dataset}")

        filename_prefix = f"{self.state}_{self.dataset}" if self.state else self.dataset
        self.db_file = os.path.join(PARQUET_INGESTION_PATH, f"{filename_prefix}.duckdb")
        self.output_parquet_path = os.path.join(PARQUET_INGESTION_PATH, f"{filename_prefix}.parquet")
        self.target_crs = GEO_CRS
        
        os.makedirs(PARQUET_INGESTION_PATH, exist_ok=True)
        self.con = self._connect_db()

    def _connect_db(self):
        print(f"Connecting to DuckDB database: {self.db_file}")
        con = duckdb.connect(database=self.db_file, read_only=False)
        con.execute("INSTALL spatial; LOAD spatial;")
        print("DuckDB connection established and spatial extension loaded.")
        return con

    def run(self):
        """Dispatcher method to run the correct pipeline based on config."""
        try:
            if self.state:
                self._run_state_pipeline()
            else:
                self._run_national_pipeline()
        except Exception as e:
            print(f"\nERROR: An error occurred during processing for {self.dataset}: {e}")
            raise
        finally:
            self.close()

    # This function deals with DuckDB specific quirks around reading different file formats
    # gdb files typically seem to have geometry stored in a column named 'shape' while shapefiles have it in geometry or geom
    # The quirk is using "keep_wkb=TRUE" in the ST_Read function changes geometry column name to wkb_geometry for shape formats but not so for gdb files
    def _create_standardized_source_table(self, full_path, output_table_name, layer_name=None):
        """
        Reads a source file and creates a table with a standardized 'geom_wkb' column.
        """
        print(f"Reading source file '{os.path.basename(full_path)}' and standardizing schema...")
        layer_sql = f", layer='{layer_name}'" if layer_name else ""
        
        self.con.execute(f"CREATE OR REPLACE TEMP VIEW temp_schema_view AS SELECT * FROM ST_Read('{full_path}'{layer_sql}, keep_wkb=TRUE);")
        df_schema = self.con.execute("DESCRIBE temp_schema_view;").fetchdf()
        
        # For new datasets, we may need to expand this list of possible geometry column names
        geom_col_name = None
        for col_name in df_schema['column_name']:
            if col_name.lower() in ['shape', 'wkb_geometry', 'geometry', 'geom']:
                geom_col_name = col_name
                break
        
        if not geom_col_name:
            raise ValueError(f"Could not find a recognized geometry column in {full_path}")
            
        print(f"-> Detected raw geometry column: '{geom_col_name}'")

        self.con.execute(f"""
            CREATE OR REPLACE TABLE {output_table_name} AS
            SELECT 
                * EXCLUDE ("{geom_col_name}"), 
                "{geom_col_name}" AS geom_wkb
            FROM ST_Read('{full_path}'{layer_sql}, keep_wkb=TRUE);
        """)
        print(f"-> Standardized raw data loaded into '{output_table_name}' table.")

    # Make valid geometries, filter out null values, transform to target CRS (EPSG 4326), 
    # drop unwanted columns and save to Parquet
    def _run_national_pipeline(self):
        """Executes the processing pipeline for a single national dataset."""
        print(f"\n--- Running National Pipeline for '{self.dataset}' ---")
        
        full_path = os.path.join(LOCAL_DATA_FOLDER, self.config['path'])
        layer_name = self.config.get('read_kwargs', {}).get('layer')
        source_crs = self.config['source_crs']
        force_xy_flag = self.config.get('force_xy', False)
        transform_params = ", always_xy := TRUE" if force_xy_flag else ""

        self._create_standardized_source_table(full_path, 'raw_source_data', layer_name)
        
        query = f"""
            CREATE OR REPLACE TABLE processed_data AS
            WITH casted_data AS (
                SELECT *, TRY_CAST(geom_wkb AS GEOMETRY) as geom_obj FROM raw_source_data
            ),
            validated_data AS (
                SELECT *, ST_MakeValid(geom_obj) as validated_geom FROM casted_data 
                WHERE validated_geom IS NOT NULL AND NOT ST_IsEmpty(validated_geom)
            ),
            transformed_data AS (
                SELECT *, ST_Transform(validated_geom, '{source_crs}', '{self.target_crs}'{transform_params}) as transformed_geom
                FROM validated_data
            )
            SELECT * EXCLUDE (geom_wkb, geom_obj, validated_geom), transformed_geom as geometry 
            FROM transformed_data;
        """
        self.con.execute(query)

        current_table = 'processed_data'
        if self.config.get('filter_clause'):
            self.con.execute(f"CREATE OR REPLACE TABLE filtered_data AS SELECT * FROM {current_table} WHERE {self.config['filter_clause']}")
            current_table = 'filtered_data'

        columns_to_drop = self.config.get('columns_to_drop', [])
        all_columns = self.con.execute(f"DESCRIBE {current_table}").fetchdf()['column_name'].tolist()
        final_columns = [f'"{col}"' for col in all_columns if col not in columns_to_drop and col != 'geometry']
        final_select_sql = ", ".join(final_columns)

        save_query = f"""
        COPY (
            SELECT {final_select_sql}, ST_AsWKB(geometry) AS geometry
            FROM {current_table}
        ) TO '{self.output_parquet_path}' (FORMAT PARQUET);
        """
        
        print("Executing final query and saving to Parquet...")
        self.con.execute(save_query)
        print(f"-> SUCCESS: Saved processed data to '{self.output_parquet_path}'")
    
    # Everything in national-level pipeline plus county fips assignment using spatial join on CENSUS boundaries 
    def _run_state_pipeline(self):
        """Executes the multi-step pipeline for state-based data."""
        print(f"\n--- Running State-Level Pipeline for '{self.dataset}' in State: '{self.state}' ---")
        self._load_source_data()
        self._load_boundaries()
        self._perform_spatial_join()
        self._save_state_level_result()
        self._cleanup_intermediate_tables()
        print(f"-> SUCCESS: End-to-end processing for '{self.dataset}' in {self.state} complete.")

    # Only for state-level processing
    def _load_source_data(self):
        """Loads and validates the source data for the state-level pipeline."""
        layer_name = self.config.get('read_kwargs', {}).get('layer')
        
        # This logic is now cleaner and more robust
        if 'gdb_config' in self.config:
            gdb_config = self.config['gdb_config'](self.state)
            full_path = os.path.join(LOCAL_DATA_FOLDER, gdb_config['folder'], gdb_config['subfolder'], gdb_config['gdb_name'])
            if not layer_name:
                layer_name = self._find_largest_layer(full_path)
            # Unpack the CRS code and the flip flag from the config
            source_crs, force_xy_flag = self.config['source_crs_lookup'](self.state)
        else:
            full_path = os.path.join(LOCAL_DATA_FOLDER, self.config['path'])
            source_crs = self.config['source_crs']
            force_xy_flag = self.config.get('force_xy', False)

        # Dynamically build the transformation parameter string
        transform_params = ", always_xy := TRUE" if force_xy_flag else ""

        self._create_standardized_source_table(full_path, 'raw_source_data', layer_name)
        
        print(f"Step 1: Transforming and validating source data '{self.dataset}'")
        query = f"""
            CREATE OR REPLACE TABLE source_data AS
            WITH casted_data AS (
                SELECT *, TRY_CAST(geom_wkb AS GEOMETRY) as geom_obj FROM raw_source_data
            ),
            validated_data AS (
                SELECT *, ST_MakeValid(geom_obj) as validated_geom FROM casted_data 
                WHERE validated_geom IS NOT NULL
            )
            SELECT 
                * EXCLUDE (geom_wkb, geom_obj, validated_geom),
                ST_Transform(validated_geom, '{source_crs}', '{self.target_crs}'{transform_params}) as geometry
            FROM validated_data;
        """
        self.con.execute(query)
        count = self.con.execute("SELECT COUNT(*) FROM source_data").fetchone()[0]
        print(f"-> Loaded and validated {count:,} features into 'source_data' table.")

    # Loads census county boundaries, filters for the state, transforms to target CRS
    # CENSUS CRS IS MANUALLY DEFINED HERE
    def _load_boundaries(self):
        print("Step 2: Loading county boundaries...")
        boundaries_path = os.path.join(LOCAL_DATA_FOLDER, COUNTY_DATA)
        self._create_standardized_source_table(boundaries_path, 'raw_boundaries_data')
        
        boundary_crs = 'EPSG:4269' 
        state_fips = STATE_ABBREV_TO_FIPS.get(self.state)
        if not state_fips: raise ValueError(f"State FIPS code not found for {self.state}")

        # The county data uses a standard CRS that does not require axis flipping.
        query = f"""
            CREATE OR REPLACE TABLE county_boundaries AS
            WITH casted_data AS (
                SELECT *, TRY_CAST(geom_wkb AS GEOMETRY) as geom_obj FROM raw_boundaries_data
            )
            SELECT 
                GEOID as fips, 
                NAME, 
                NAMELSAD, 
                ST_Transform(geom_obj, '{boundary_crs}', '{self.target_crs}') AS geometry
            FROM casted_data 
            WHERE STATEFP = '{state_fips}';
        """
        self.con.execute(query)
        count = self.con.execute("SELECT COUNT(*) FROM county_boundaries").fetchone()[0]
        print(f"-> Loaded {count} county boundaries for state {self.state}.")

    # Only for state-level processing
    def _perform_spatial_join(self):
        print("Step 3: Performing spatial join...")
        self.con.execute(f"""
            CREATE OR REPLACE TABLE data_by_county AS
            SELECT s.*, b.fips, b.NAME, b.NAMELSAD
            FROM source_data s
            JOIN county_boundaries b ON ST_Intersects(s.geometry, b.geometry);
        """)
        count = self.con.execute("SELECT COUNT(*) FROM data_by_county").fetchone()[0]
        print(f"-> Spatial join complete. {count:,} features assigned to counties.")

    # Only for state-level processing
    def _save_state_level_result(self):
        """Saves the final joined data, handling potential memory errors."""
        print("Step 4: Saving final state-level data...")
        save_query = f"""
        COPY (
            SELECT * REPLACE (ST_AsWKB(geometry) AS geometry)
            FROM data_by_county
        ) TO '{self.output_parquet_path}' (FORMAT PARQUET);
        """
        try:
            self.con.execute(save_query)
            print(f"-> Final state-level data saved to {self.output_parquet_path}")
        except Exception as e:
            print(f"-> WARNING: Failed to save final data directly due to error: {e}")
            debug_db_path = self.db_file.replace(".duckdb", "_error_debug.duckdb")
            print(f"-> Preserving database state for debugging at: {debug_db_path}")
            self.con.close()
            self.con = None 
            shutil.copy2(self.db_file, debug_db_path)
            raise
    
    # Only for state-level processing
    def _cleanup_intermediate_tables(self):
        print("Step 5: Cleaning up intermediate tables...")
        tables_to_drop = ['source_data', 'county_boundaries', 'raw_source_data', 'raw_boundaries_data', 'processed_data', 'filtered_data']
        for table in tables_to_drop:
            self.con.execute(f"DROP TABLE IF EXISTS {table};")
        print("-> Intermediate tables dropped.")

    def close(self):
        """Closes the DuckDB connection and cleans up the database file."""
        if self.con:
            self.con.close()
        # Only remove the file if the process was successful and it's a national file
        if not self.state and os.path.exists(self.db_file):
             os.remove(self.db_file)
        print("DuckDB connection closed.")
    
    # Identifies the largest layer in a gdb file      
    # Relevant for wetlands data where layer name might not be known upfront  
    def _find_largest_layer(self, gdb_path):
        with fiona.open(gdb_path) as collection:
            return max(fiona.listlayers(gdb_path), key=lambda layer: len(fiona.open(gdb_path, layer=layer)))

In [None]:
# Execute the pipeline for a national dataset and state
processor = GeospatialProcessor(
    dataset='wetlands',
    state='HI' 
    )
processor.run()

Connecting to DuckDB database: C:\Users\eprashar\OneDrive - CoreLogic Solutions, LLC\github\jan_25_proj_infra_parcels\data\ingestion_parquets\HI_wetlands.duckdb
DuckDB connection established and spatial extension loaded.

--- Running State-Level Pipeline for 'wetlands' in State: 'HI' ---
Reading source file 'HI_geodatabase_wetlands.gdb' and standardizing schema...
-> Detected raw geometry column: 'Shape'
-> Standardized raw data loaded into 'raw_source_data' table.
Step 1: Transforming and validating source data 'wetlands'
-> Loaded and validated 9,191 features into 'source_data' table.
Step 2: Loading county boundaries...
Reading source file 'tl_2024_us_county.shp' and standardizing schema...
-> Detected raw geometry column: 'wkb_geometry'
-> Standardized raw data loaded into 'raw_boundaries_data' table.
-> Loaded 5 county boundaries for state HI.
Step 3: Performing spatial join...
-> Spatial join complete. 9,197 features assigned to counties.
Step 4: Saving final state-level data...


In [None]:
# Execute the main processing pipeline for wetlands data
# This script processes wetlands data for specified states, handling retries for specific states if needed.
state_list = list(STATE_ABBREV_TO_FIPS.keys())

# OR use specific states for retry
state_retry = ['AK']

# Loop through states as needed
for state in state_retry:
    # Skip if parquet file already exists for state
    if os.path.exists(os.path.join(PARQUET_INGESTION_PATH, f"{state}_wetlands.parquet")):
        print(f"Skipping state {state} as it has already been processed.")
        continue
    else:
        try:
            # 1. Instantiate the processor for the desired dataset and state
            processor = GeospatialProcessor(
                dataset='wetlands', 
                state=state
                )  
            # 2. Run the entire pipeline
            processor.run()
        except Exception as e:
            print(f"The main script caught an error: {e}")

In [None]:
# In case console crashes and we want to retrieve the last state of the database for debugging
state = 'ak'
db_file = os.path.join(PARQUET_INGESTION_PATH, f"{state}_wetlands.duckdb")
debug_db_path = db_file.replace(".duckdb", "_error_debug.duckdb")
print(f"-> Preserving database state for debugging at: {debug_db_path}")
try:
    # Now copy the file
    shutil.copy2(db_file, debug_db_path)
except Exception as copy_e:
    print(f"-> ERROR: Could not copy debug database file: {copy_e}")
    raise
print(f"-> Final state-level data saved as {debug_db_path}")

-> Preserving database state for debugging at: C:\Users\eprashar\OneDrive - CoreLogic Solutions, LLC\github\jan_25_proj_infra_parcels\data\ingestion_parquets\ak_wetlands_error_debug.duckdb
-> Final state-level data saved as C:\Users\eprashar\OneDrive - CoreLogic Solutions, LLC\github\jan_25_proj_infra_parcels\data\ingestion_parquets\ak_wetlands_error_debug.duckdb


<u>NOTES</u>:

1. Wetlands has a more extensive workflow as a part of which, we need to merge certain attributes to data and save parquet files at a county-level for ingestion in BQ from GCS
2. All other data sources can straight-away be uploaded to GCS (either national file or state level file). Note if national-file is uploaded, fips assignment will need to be done in BQ using intersection with county boundaries.

In [22]:
# Once a state level wetlands file is created, merge the attributes
# This function merges wetlands attributes into the state-level wetlands data files.
def merge_wetland_attributes(state_list):
    """
    Merges wetlands attributes into the state-level wetlands data files.
    This function creates a persistent DuckDB file for the attributes once,
    and then attaches that file to process each state's data.

    Args:
        state_list (list): A list of state abbreviations to process (e.g., ['AK', 'MT']).
    """
    print("--- Starting Wetland Attribute Merge Process ---")

    # --- 1. Create or Verify the Persistent Attributes Database ---
    attributes_db_file = os.path.join(PARQUET_INGESTION_PATH, "wetlands_attributes.duckdb")
    
    if not os.path.exists(attributes_db_file):
        print(f"Attributes database not found. Creating at: {attributes_db_file}")
        try:
            con = duckdb.connect(database=attributes_db_file, read_only=False)
            con.execute("INSTALL spatial; LOAD spatial;")
            load_attributes_query = f"""
            CREATE OR REPLACE TABLE wetland_attributes AS
            SELECT
                ATTRIBUTE, SUBSYSTEM_NAME, CLASS_NAME, SUBCLASS_NAME,
                SPLIT_CLASS_NAME, WATER_REGIME_NAME, WATER_REGIME_SUBGROUP
            FROM ST_Read('{WETLAND_ATTRIBUTES}');
            """
            con.execute(load_attributes_query)
            print("-> Successfully created and loaded wetlands attributes database.")
        except Exception as e:
            print(f"-> FATAL ERROR: Could not create attributes database: {e}")
            return # Stop the process if attributes can't be loaded
        finally:
            if 'con' in locals() and con:
                con.close()
    else:
        print(f"Using existing attributes database: {attributes_db_file}")

    # --- 2. Loop through each state and process its file ---
    for state in state_list:
        print(f"\n--- Processing State: {state} ---")
        
        source_parquet = os.path.join(PARQUET_INGESTION_PATH, f"{state}_wetlands.parquet")
        source_duckdb = os.path.join(PARQUET_INGESTION_PATH, f"{state}_wetlands_error_debug.duckdb")
        
        if os.path.exists(source_parquet):
            print(f"Found source Parquet file: {source_parquet}")
            # For Parquet, we can use an in-memory DB and attach the attributes file
            con = duckdb.connect(database=':memory:', read_only=False)
            try:
                con.execute(f"ATTACH '{attributes_db_file}' AS attributes_db (READ_ONLY);")
                temp_output_file = source_parquet.replace(".parquet", "_merged_temp.parquet")
                
                join_query = f"""
                COPY (
                    SELECT s.*, a.* EXCLUDE (ATTRIBUTE)
                    FROM read_parquet('{source_parquet}') s
                    LEFT JOIN attributes_db.wetlands_attributes a ON s.ATTRIBUTE = a.ATTRIBUTE
                ) TO '{temp_output_file}' (FORMAT PARQUET);
                """
                
                print(f"-> Performing join and writing to temporary file...")
                con.execute(join_query)
                
                os.remove(source_parquet)
                os.rename(temp_output_file, source_parquet)
                print(f"-> SUCCESS: Replaced original file with merged data at: {source_parquet}")

            except Exception as e:
                print(f"-> ERROR processing Parquet for state {state}: {e}")
                if os.path.exists(temp_output_file):
                    os.remove(temp_output_file)
            finally:
                if con: con.close()

        elif os.path.exists(source_duckdb):
            print(f"Found source DuckDB debug file: {source_duckdb}")
            # For DuckDB files, connect directly and attach the attributes file
            con = duckdb.connect(database=source_duckdb, read_only=False)
            try:
                con.execute(f"ATTACH '{attributes_db_file}' AS attributes_db (READ_ONLY);")
                
                print("-> Performing join and updating 'data_by_county' table in-place...")
                con.execute("""
                    CREATE OR REPLACE TABLE merged_data AS
                    SELECT s.*, a.* EXCLUDE (ATTRIBUTE)
                    FROM data_by_county s
                    LEFT JOIN attributes_db.wetlands_attributes a ON s.ATTRIBUTE = a.ATTRIBUTE;
                """)
                
                con.execute("DROP TABLE data_by_county;")
                con.execute("ALTER TABLE merged_data RENAME TO data_by_county;")
                
                print(f"-> SUCCESS: Updated 'data_by_county' table in {source_duckdb}")

            except Exception as e:
                print(f"-> ERROR processing DuckDB file for state {state}: {e}")
            finally:
                if con:
                    con.execute("DETACH attributes_db;")
                    con.close()

        else:
            print(f"-> No source file found for state {state}. Skipping.")
            continue
        
    print("\n--- Attribute Merge Process Finished ---")

In [None]:
# Execute the merge_wetland_attributes function for all states
# This will merge attributes into the state-level wetlands data files.
state_list = list(STATE_ABBREV_TO_FIPS.keys())
state_retry = ['AK']
for state in state_retry: # Check which states need to be reprocessed
    try:# Process all counties for the given state
        merge_wetland_attributes(state_list=[state])
    except Exception as e:
        print(f"The main script caught an error while processing state {state}: {e}")

In [73]:
# Function to extract a single county's data in chunks
# This function is designed to handle large datasets efficiently by processing them in smaller chunks.
def extract_single_county_chunked(
        con,
        from_clause,
        state_abbrev, 
        fips_code,
        chunk_size=40000):
    """
     Args:
        con: An active DuckDB connection object.
        state_abbrev (str): The two-letter state abbreviation.
        fips_code (str): The specific county FIPS code to extract.
        chunk_size (int): The number of rows to process in each chunk.
    """
    print(f"  -> Starting extraction for FIPS: {fips_code}")
    output_file = os.path.join(WETLAND_COUNTY_FILES, f"{state_abbrev}_{fips_code}_wetlands.parquet")
    
    # This query selects all data for the county. 
    query = f"SELECT \
        NWI_ID, \
        '{state_abbrev}' AS state,\
        fips,\
        NAME AS county,\
        NAMELSAD AS county_full_name,\
        ATTRIBUTE, \
        WETLAND_TYPE,\
        ACRES,\
        SUBSYSTEM_NAME, \
        CLASS_NAME,\
        SUBCLASS_NAME,\
        SPLIT_CLASS_NAME,\
        WATER_REGIME_NAME,\
        WATER_REGIME_SUBGROUP,\
        ST_AsWKB(geometry) AS geometry\
        FROM {from_clause} WHERE fips = '{fips_code}'"
    
    # Use fetch_record_batch for maximum efficiency with pyarrow
    reader = con.execute(query).fetch_record_batch(chunk_size)
    print(f"  -> Query executed. Processing data for FIPS: {fips_code} in chunks of {chunk_size} rows.")

    # Initialize a counter for total rows processed
    writer = 0
    total_rows = 0
    
    try:
        for i, chunk in enumerate(reader):
            if i == 0:
                print(f"Processing chunk {i+1}...")
                # For the first chunk, create the Parquet file and writer
                writer = pq.ParquetWriter(output_file, chunk.schema)
            else:
                print(f"Processing chunk {i+1}...")

            writer.write_batch(chunk)
            total_rows += len(chunk)

        if total_rows > 0:
            print(f"  -> SUCCESS: Wrote {total_rows:,} rows to: {output_file}")
        else:
            print(f"  -> No data found for FIPS {fips_code}. No file was created.")

    except Exception as e:
        print(f"  -> ERROR for FIPS {fips_code}: {e}")
        # If an error occurs, clean up the partially written file
        if writer:
            writer.close()
            os.remove(output_file)
        raise
    finally:
        if writer:
            writer.close()


########################################################
def process_all_counties(state_abbrev):
    """
    Orchestrates the extraction process for all counties in a state.

    Args:
        state_abbrev (str): Two-letter state abbreviation (e.g., 'AK').
    """
    print(f"\n--- Starting Final Extraction Process for State: {state_abbrev} ---")
    
    # --- 1. Determine which source file to use ---
    source_parquet = os.path.join(PARQUET_INGESTION_PATH, f"{state_abbrev}_wetlands.parquet")
    source_duckdb = os.path.join(PARQUET_INGESTION_PATH, f"{state_abbrev}_wetlands_error_debug.duckdb")
    
    source_path = None
    #source_is_parquet = False
    con = None
    from_clause = None

    if os.path.exists(source_parquet):
        print(f"Found Parquet source file: {source_parquet}")
        source_path = source_parquet
        #source_is_parquet = True
        con = duckdb.connect(read_only=False) # Use in-memory DB for reading Parquet
        from_clause = f"read_parquet('{source_path}')"
    elif os.path.exists(source_duckdb):
        print(f"Parquet file not found. Found DuckDB debug file: {source_duckdb}")
        source_path = source_duckdb
        con = duckdb.connect(database=source_path, read_only=False)
        from_clause = "data_by_county"
    else:
        print(f"ERROR: No source file (.parquet or .duckdb) found for state {state_abbrev}. Exiting.")
        return

    # --- 2. Connect and get list of counties ---
    try:
        con.execute("INSTALL spatial; LOAD spatial;")
        print("Fetching list of all counties from source...")
        fips_codes_result = con.execute(f"SELECT DISTINCT fips FROM {from_clause} ORDER BY fips;").fetchall()
        fips_codes = [fips[0] for fips in fips_codes_result]
        print(f"Found {len(fips_codes)} counties to process.")

        # --- 3. Loop through each county and process it ---
        for fips_code in fips_codes:
            extract_single_county_chunked(
                con, 
                from_clause, 
                state_abbrev, 
                fips_code
                )

    except Exception as e:
        print(f"A critical error occurred during the extraction process: {e}")
        raise
    finally:
        if con:
            con.close()
        print("\n--- Extraction Process Finished ---")

In [None]:
# Execute process by county 
state_list = list(STATE_ABBREV_TO_FIPS.keys())
state_retry = ['AK']  # List of states to retry processing
for state in state_retry:
    try:
        process_all_counties(state_abbrev=state)
    except Exception as e:
        print(f"The main script caught an error while processing state {state}: {e}")

In [90]:
# Uploading to GCS
# First, validate the authentication token
CREDENTIALS_PATH =  r"C:\Users\eprashar\AppData\Roaming\gcloud\application_default_credentials.json"
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = str(CREDENTIALS_PATH)

# Verify credentials
utils.check_and_authenticate(CREDENTIALS_PATH)

Credentials file is older than 24 hours. Re-authenticating...
Trying reauthentication on gcloud server using shell command...
Login window opened...please complete authentication
Waiting for credentials file to update...
Authentication confirmed! Credentials file updated.


In [97]:
# Define constants for GCS upload
GIS_PROJECT = 'clgx-gis-app-dev-06e3'
BUCKET = 'geospatial-projects'
DATASET = 'infra_parcels'

# Function to upload county-level Parquet files to Google Cloud Storage
def upload_to_gcs(
        bucket_name,
        source_folder, 
        destination_folder,
        remove_local_files=False):
    """
    Uploads all Parquet files from a local folder to a Google Cloud Storage bucket.

    Args:
        bucket_name (str): The name of the GCS bucket.
        source_folder (str): Local folder containing Parquet files.
        destination_folder (str): Destination folder in the GCS bucket.
    """
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    for filename in os.listdir(source_folder):
        if filename.endswith('.parquet') or filename.__contains__('_error_debug.duckdb'):
            print(f"Preparing to upload {filename}...")
            try:
                local_file_path = os.path.join(source_folder, filename)
                blob = bucket.blob(os.path.join(destination_folder, filename))
                blob.upload_from_filename(local_file_path)
                print(f"Uploaded {filename} to gs://{bucket_name}/{destination_folder}/")
                # Optionally, remove the local file after upload
                if remove_local_files:
                    os.remove(local_file_path)
                    print(f"Removed local file: {local_file_path}")
            except Exception as e:
                print(f"Failed to upload {filename}: {e}")
                raise

In [98]:
# Upload all county-level Parquet files to GCS
upload_to_gcs(
    bucket_name=BUCKET,
    source_folder=r"C:\Users\eprashar\OneDrive - CoreLogic Solutions, LLC\github\jan_25_proj_infra_parcels\data\ingestion_parquets",
    destination_folder=f"{DATASET}/wetlands_v2/state/",
    remove_local_files=False
)



Preparing to upload MN_wetlands_error_debug.duckdb...
Uploaded MN_wetlands_error_debug.duckdb to gs://geospatial-projects/infra_parcels/wetlands_v2/state//
Preparing to upload TX_wetlands_error_debug.duckdb...
Uploaded TX_wetlands_error_debug.duckdb to gs://geospatial-projects/infra_parcels/wetlands_v2/state//
