# Model-Agnostic Input Data Preprocessing in CONFLUENCE

## Introduction

This notebook focuses on the model-agnostic preprocessing steps for input data in CONFLUENCE. Model-agnostic preprocessing involves tasks that are common across different hydrological models, such as data acquisition, quality control, and initial formatting.

Key steps covered in this notebook include:

1. Spatial resampling of forcing data to match the model domain
2. Calculate zonal statistics for the domain geospatial attributes 

In this preprocessing stage we ensure that our input data is consistent, complete, and properly formatted before we move on to model-specific preprocessing steps. By the end of this notebook, you will have clean, standardized datasets ready for further model-specific processing.

## First we import the libraries and functions we need

In [1]:
import sys
from pathlib import Path
from typing import Dict, Any
import logging
import yaml # type: ignore

current_dir = Path.cwd()
parent_dir = current_dir.parent.parent
sys.path.append(str(parent_dir))

from utils.dataHandling_utils.agnosticPreProcessor_util import forcingResampler, geospatialStatistics # type: ignore
from utils.dataHandling_utils.data_utils import ObservedDataProcessor # type: ignore  

# Set up logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

## Check configurations

Now we should print our configuration settings and make sure that we have defined all the settings we need. 

In [2]:
config_path = Path('../../0_config_files/config_active.yaml')
with open(config_path, 'r') as config_file:
    config = yaml.safe_load(config_file)
    print(f"FORCING_DATASET: {config['FORCING_DATASET']}")
    print(f"EASYMORE_CLIENT: {config['EASYMORE_CLIENT']}")
    print(f"FORCING_VARIABLES: {config['FORCING_VARIABLES']}")
    print(f"EXPERIMENT_TIME_START: {config['EXPERIMENT_TIME_START']}")
    print(f"EXPERIMENT_TIME_END: {config['EXPERIMENT_TIME_END']}")

FORCING_DATASET: ERA5
EASYMORE_CLIENT: easymore cli
FORCING_VARIABLES: longitude,latitude,time,LWRadAtm,SWRadAtm,pptrate,airpres,airtemp,spechum,windspd
EXPERIMENT_TIME_START: 2008-01-01 00:00
EXPERIMENT_TIME_END: 2022-12-31 23:00


## Define default paths

Now let's define the paths to data directories before we run the pre processing scripts and create the containing directories

In [3]:
# Main project directory
data_dir = config['CONFLUENCE_DATA_DIR']
project_dir = Path(data_dir) / f"domain_{config['DOMAIN_NAME']}"

# Data directoris
raw_data_dir = project_dir / 'forcing' / 'raw_data'
basin_averaged_data = project_dir / 'forcing' / 'basin_averaged_data'
catchment_intersection_dir = project_dir / 'shapefiles' / 'catchment_intersection'

# Make sure the new directories exists
basin_averaged_data.mkdir(parents = True, exist_ok = True)
catchment_intersection_dir.mkdir(parents = True, exist_ok = True)

## 1. Pre process forcing data

Now let's resample the forcing data onto our model domain. We use the easymore resampling tool by Gharari et al., 2023

In [None]:
# Initialize forcingReampler class
fr = forcingResampler(config, logger)

# Run resampling
fr.run_resampling()

## 2. Pre process geospatial data

Now let's calculate the zonal statistics of the geospatial attributes we need for our model

In [None]:
# Set up
# Initialize geospatialStatistics class
gs = geospatialStatistics(config, logger)

# Run resampling
gs.run_statistics()

## 3. Pre process observed streamflow data

Process the streamflow data into the same timestep as will run the model at.

In [4]:
# Initialize ObservedDataProcessor class
odp = ObservedDataProcessor(config, logger)

# Run resampling
odp.process_streamflow_data()

2025-03-21 14:30:37,714 - INFO - Processing USGS streamflow data
2025-03-21 14:30:39,153 - INFO - Processed streamflow data saved to: /Users/amedin/Research/Confluence/CONFLUENCE_data/domain_Wolverine/observations/streamflow/preprocessed/Wolverine_streamflow_processed.csv
2025-03-21 14:30:39,154 - INFO - Total rows in processed data: 126774
2025-03-21 14:30:39,155 - INFO - Number of non-null values: 65770
2025-03-21 14:30:39,156 - INFO - Number of null values: 61004


## 4. Pre process geospatial data for glaciers

Now let's calculate the zonal statistics of the geospatial attributes we need for our model over the glaciers.

In [4]:
import numpy as np
import geopandas as gpd # type: ignore
import rasterio
import matplotlib.pyplot as plt
import pandas as pd 
from shapely.geometry import mapping
from rasterio import features
from rasterstats import zonal_stats # type: ignore

# Set up paths
data_dir = config['CONFLUENCE_DATA_DIR']
domain_name = config['DOMAIN_NAME']
project_dir = Path(data_dir) / f"domain_{config['DOMAIN_NAME']}"
dem_name = config['DEM_NAME']
if dem_name == "default":
    dem_name = f"domain_{config['DOMAIN_NAME']}_elv.tif"

debris_path = project_dir / 'attributes' / 'glaciers' / f"domain_{config['DOMAIN_NAME']}_debris_thickness.tif"
type_path = project_dir / 'attributes' / 'glaciers' / f"domain_{config['DOMAIN_NAME']}_domain_type_extend.tif"
dem_path = project_dir / 'attributes' / 'elevation' / 'dem' / f"domain_{config['DOMAIN_NAME']}_elv.tif"

catchment_path = project_dir / 'shapefiles' / 'catchment'
catchment_name = config['CATCHMENT_SHP_NAME']
if catchment_name == 'default':
    catchment_name = f"{config['DOMAIN_NAME']}_HRUs_{config['DOMAIN_DISCRETIZATION']}.shp"
catchment_gdf0 = gpd.read_file(catchment_path / catchment_name)

In [5]:
# Calculate debris thickness statistics
catchment_gdf = catchment_gdf0.copy()
with rasterio.open(debris_path) as src:
    affine = src.transform
    debris_data = src.read(1)
stats = zonal_stats(catchment_gdf, debris_data, affine=affine, stats=['mean'], nodata=0)
result_df = pd.DataFrame(stats).rename(columns={'mean': 'debri_mean_new'})

if 'debri_mean' in catchment_gdf.columns:
    print("Updating existing 'debri_mean' column")
    catchment_gdf['debri_mean'] = result_df['debri_mean_new']
else:
    print("Adding new 'debri_mean' column")
    catchment_gdf['debri_mean'] = result_df['debri_mean_new']
result_df = result_df.drop(columns=['debri_mean_new'])

intersect_path = project_dir / 'shapefiles' / 'catchment_intersection' / 'with_debris_thickness'
intersect_name = 'catchment_with_debris.shp'
intersect_path.mkdir(parents=True, exist_ok=True)
catchment_gdf.to_file(intersect_path / intersect_name)

print(f"Debris thickness statistics saved to {intersect_path / intersect_name}")

2025-09-08 16:38:53,123 - INFO - Created 3 records


Adding new 'debri_mean' column
Debris thickness statistics saved to /Users/amedin/Research/Confluence/CONFLUENCE_data/domain_Gulkana/shapefiles/catchment_intersection/with_debris_thickness/catchment_with_debris.shp


In [6]:
# Calculate domain type statistics
catchment_gdf = catchment_gdf0.copy()
with rasterio.open(type_path) as src:
    affine = src.transform
    type_data = src.read(1)
stats = zonal_stats(catchment_gdf, type_data, affine=affine, stats=['count'], categorical=True, nodata=0)
result_df = pd.DataFrame(stats).fillna(0)

def rename_column(x):
    if x == 'count':
        return x
    try:
        return f'domType_{int(float(x))}'
    except ValueError:
        return x
result_df = result_df.rename(columns=rename_column)
for col in result_df.columns:
    if col != 'count':
        result_df[col] = result_df[col].astype(int)
catchment_gdf = catchment_gdf.join(result_df)
intersect_path = project_dir / 'shapefiles' / 'catchment_intersection' / 'with_domain_type'
intersect_name = 'catchment_with_domain_type.shp'
intersect_path.mkdir(parents=True, exist_ok=True)
catchment_gdf.to_file(intersect_path / intersect_name)

print(f"Domain type statistics saved to {intersect_path / intersect_name}")

2025-09-08 16:38:53,157 - INFO - Created 3 records


Domain type statistics saved to /Users/amedin/Research/Confluence/CONFLUENCE_data/domain_Gulkana/shapefiles/catchment_intersection/with_domain_type/catchment_with_domain_type.shp


In [7]:
# Calculate elevation statistics, need to combine with type statistics
catchment_gdf = catchment_gdf0.copy()
with rasterio.open(dem_path) as src:
    nodata_value = src.nodatavals[0]
    if nodata_value is None:
        nodata_value = -9999
    affine = src.transform
    dem_data = src.read(1)

for domain_type in range(1, 6): # currently only 5 domain types
    dem_domain_data = dem_data.copy()
    dem_domain_data[type_data != domain_type] = nodata_value
    if (dem_domain_data == nodata_value).all(): 
        continue # skip if no data in this domain type
    stats = zonal_stats(catchment_gdf, dem_domain_data, affine=affine, stats=['mean'], nodata=nodata_value)
    result_df = pd.DataFrame(stats).rename(columns={'mean': f'elv_mean_{domain_type}'})    
    if f'elv_mean_{domain_type}' in catchment_gdf.columns:
        print(f"Updating existing 'elv_mean_{domain_type}' column")
        catchment_gdf[f'elv_mean_{domain_type}'] = result_df[f'elv_mean_{domain_type}']
    else:
        print(f"Adding new 'elv_mean_{domain_type}' column")
        catchment_gdf[f'elv_mean_{domain_type}'] = result_df[f'elv_mean_{domain_type}']
    result_df = result_df.drop(columns=[f'elv_mean_{domain_type}'])
    
    catchment_gdf = catchment_gdf.join(result_df)

intersect_path = project_dir / 'shapefiles' / 'catchment_intersection' / 'with_dem_domain'
intersect_name = 'catchment_with_dem_domain.shp'
intersect_path.mkdir(parents=True, exist_ok=True)
catchment_gdf.to_file(intersect_path / intersect_name)

print(f"Elevation statistics saved to {intersect_path / intersect_name}")

2025-09-08 16:38:53,235 - INFO - Created 3 records


Adding new 'elv_mean_1' column
Adding new 'elv_mean_2' column
Adding new 'elv_mean_3' column
Adding new 'elv_mean_4' column
Elevation statistics saved to /Users/amedin/Research/Confluence/CONFLUENCE_data/domain_Gulkana/shapefiles/catchment_intersection/with_dem_domain/catchment_with_dem_domain.shp


In [8]:
# Create a new raster with HRU ID
catchment_gdf = catchment_gdf0.copy()
with rasterio.open(dem_path) as src:
    affine = src.transform
    dem_data = src.read(1)
    meta = src.meta.copy()

hru_id = catchment_gdf['HRU_ID'].values
nodata_value = -9999
hru_id_raster = np.ones_like(dem_data, dtype=np.int32) * nodata_value

# Update the metadata to include the correct nodata value
meta.update({
    'dtype': 'int32',
    'nodata': nodata_value
})

# Create a list of (geometry, value) tuples for rasterization
shapes = [(mapping(geom), hru) for geom, hru in zip(catchment_gdf.geometry, hru_id)]

# Rasterize all HRU IDs
hru_id_raster = features.rasterize( shapes, out_shape=hru_id_raster.shape,
    transform=affine, fill=nodata_value, all_touched=True, dtype=np.int32)

# Save the raster
hru_id_path = project_dir / 'attributes' / 'glaciers' / f"domain_{config['DOMAIN_NAME']}_hru_id.tif"
with rasterio.open(hru_id_path, 'w', **meta) as dst:
    dst.write(hru_id_raster, 1)