# Green & Ampt Infiltration Processor

The FLO-2D model includes the Green-Ampt infiltration method as one of its core infiltration engines. This approach is used in hydrologic modeling because it effectively captures transmission losses as water infiltrates into the soil.

In the FLO-2D model, rainfall is distributed to the computational grid, where it infiltrates into the soil until reaching the saturation depth or fill volume. This continuous process accounts for the dynamic movement of water through the watershed as flood routing progresses.



In [None]:
# Automatically set base path to the project directory where the notebook is running
from pathlib import Path

# This gets the directory where the current notebook is located
base_path = Path.cwd()

print(f"📂 Base path automatically set to: {base_path}")

# Find the fields 

In [None]:
"""
Print the fields of vector files in the project directory. This script prints the 
fields of shapefiles and GeoPackage files in the specified directory.  The resulting 
list can be used to fill the fields list in the next processor which will write a 
raster file for each field.
"""

import os
from osgeo import ogr, gdal
import shapefile  # pyshp
from pathlib import Path
import warnings

# Suppress GDAL exception warnings
ogr.UseExceptions()
gdal.UseExceptions()


# File paths
soil_shapefile = base_path / 'Data' / 'Infiltration' / 'Soil 2023.shp'
landuse_shapefile = base_path / 'Data' / 'Infiltration' / 'Land Use 2018.shp'
grid_layer_path = base_path / 'Data' / 'GeoPackage' / 'selfhelp.gpkg'

def print_fields(filepath):
    if filepath.suffix == '.shp':
        # Use pyshp for shapefiles
        sf = shapefile.Reader(str(filepath))
        fields = [f[0] for f in sf.fields[1:]]  # Skip deletion flag
        sf.close()
    elif filepath.suffix == '.gpkg':
        # Use ogr for GeoPackage
        ds = ogr.Open(str(filepath))
        layer = ds.GetLayer()
        fields = [field.name for field in layer.schema]
        ds = None
    else:
        print(f"⚠️ Unsupported file type: {filepath}")
        return

    # Print the fields
    print(f"\nFields in {filepath.name}:")
    for field in fields:
        print(f" - {field}")

# Print the fields for each vector file
print_fields(soil_shapefile)
print_fields(landuse_shapefile)
print_fields(grid_layer_path)


# Rasterize the data

In [None]:
import os
from osgeo import gdal, ogr
import shapefile  # pyshp
import tempfile

# Define input paths
soil_shapefile = base_path / 'Data' / 'Infiltration' / 'Soil 2023.shp'
landuse_shapefile = base_path / 'Data' / 'Infiltration' / 'Land Use 2018.shp'
grid_layer_path = base_path / 'Data' / 'GeoPackage' / 'selfhelp.gpkg'
output_folder = base_path / 'Data' / 'Infiltration' / 'Rasters'

# Fields to rasterize.  Make sure these fields are in the results printed by the previous cell.
soil_fields = ['hydc', 'soil_depth', 'psif', 'dthetad', 'dthetan', 'dthetaw']
landuse_fields = ['IA', 'RTIMP', 'VC', 'InitSat']

# EPSG code for NAD83 Central AZ
target_epsg = 2223

import shapefile
import tempfile
import os

def preprocess_sat_field(shapefile_path, field_name="InitSat"):
    """
    Preprocess the InitSat field in the shapefile to convert categorical values to numeric.  
    Converts:
    - "wet" to 0
    - "normal" to 1
    - "dry" to 2
    - Anything else to -1 (unknown)
    """
    # Load the shapefile
    reader = shapefile.Reader(shapefile_path)
    fields = reader.fields[1:]  # skip DeletionFlag
    field_names = [f[0] for f in fields]

    if field_name not in field_names:
        raise ValueError(f"Field '{field_name}' not found in {shapefile_path}")

    # Find the index of the InitSat field
    sat_index = field_names.index(field_name)

    # Create a temporary directory for the processed shapefile
    temp_dir = tempfile.mkdtemp()
    print(f"Temp directory for Sat processing: {temp_dir}")
    temp_shp = os.path.join(temp_dir, "processed.shp")

    # Create the new shapefile with the updated InitSat field
    writer = shapefile.Writer(temp_shp)
    for field in fields:
        if field[0] == field_name:
            writer.field(field_name, 'N', decimal=0)
        else:
            writer.field(*field)

    # Process each record
    record_count = 0
    for sr in reader.shapeRecords():
        rec = list(sr.record)
        val = str(rec[sat_index]).strip()
        if val == "wet":
            rec[sat_index] = 0
        elif val == "normal":
            rec[sat_index] = 1
        elif val == "dry":
            rec[sat_index] = 2
        else:
            rec[sat_index] = -1  # unknown category

        writer.shape(sr.shape)
        writer.record(*rec)
        record_count += 1

    writer.close()
    print(f"Processed {record_count} records to {temp_shp}")
    return temp_shp


# Rasterization properties calculated from the grid layer.
def rasterize_field(input_vector, field, reference_layer, output_path):
    """"
    Rasterizes a field from a vector file to a raster file using the specified reference layer.
    """

    grid_ds = gdal.OpenEx(reference_layer, gdal.OF_VECTOR)
    grid_layer = grid_ds.GetLayer()
    xmin, xmax, ymin, ymax = grid_layer.GetExtent()
    xres = 30
    yres = 30
    cols = int((xmax - xmin) / xres)
    rows = int((ymax - ymin) / yres)

    driver = gdal.GetDriverByName('GTiff')
    out_raster = driver.Create(output_path, cols, rows, 1, gdal.GDT_Float32)
    out_raster.SetGeoTransform((xmin, xres, 0, ymax, 0, -yres))

    srs = ogr.osr.SpatialReference()
    srs.ImportFromEPSG(target_epsg)
    out_raster.SetProjection(srs.ExportToWkt())

    vector_ds = ogr.Open(input_vector)
    vector_layer = vector_ds.GetLayer()

    gdal.RasterizeLayer(out_raster, [1], vector_layer, options=[
        f"ATTRIBUTE={field}",
        "ALL_TOUCHED=TRUE"
    ])
    print(f"Raster saved: {output_path}")

# Make sure output directory exists
os.makedirs(output_folder, exist_ok=True)

# Rasterize Soil fields
for field in soil_fields:
    out_path = os.path.join(output_folder, f"{field}.tif")
    rasterize_field(soil_shapefile, field, grid_layer_path, out_path)

# Rasterize Land Use fields
for field in landuse_fields:
    input_path = landuse_shapefile

    # Preprocess InitSat specifically
    if field == "InitSat":
        print(f"Preprocessing {field} for numeric conversion...")
        # Corrected shapefile path
        input_path = preprocess_sat_field(str(landuse_shapefile), field)

    out_path = os.path.join(output_folder, f"{field}.tif")
    print(f"Rasterizing {field} from {input_path} to {out_path}")
    rasterize_field(input_path, field, grid_layer_path, out_path)



# Process GeoPackage

In [22]:
# Green-Ampt Raster Sampler
# 
# This script extracts centroid coordinates from a grid layer in a GeoPackage, samples raster values at those centroids, 
# and saves the results to a new GeoPackage layer. It is designed for large datasets and avoids dependencies on rasterio or geopandas.
#
# Author: ChatGPT

import os
from osgeo import ogr, gdal, osr
from pathlib import Path

# Set paths and configuration
# Paths to input files and output location
base_path = Path("C:/Users/Karen/VS Code Projects/ASFPM-LLM-Data-Management-Workshop")
grid_path = base_path / 'Data' / 'GeoPackage' / 'selfhelp.gpkg'
raster_folder = base_path / 'Data' / 'Infiltration' / 'Rasters'
grid_layer_name = "grid"
output_gpkg = grid_path
output_layer_name = "green_ampt_sampled"
epsg = 2223  # NAD83 / Arizona Central (ft)

# Open the grid GeoPackage and get the grid layer
# This loads the vector data so we can extract centroids
grid_ds = gdal.OpenEx(str(grid_path), gdal.OF_VECTOR)
grid_layer = grid_ds.GetLayerByName(grid_layer_name)

# Verify the field names in the grid layer
layer_defn = grid_layer.GetLayerDefn()
field_names = [layer_defn.GetFieldDefn(i).GetName() for i in range(layer_defn.GetFieldCount())]
print(f"Available fields in the layer: {field_names}")

# Use the known field name 'fid' if it exists, otherwise raise an error
actual_field_name = 'fid'
if actual_field_name not in field_names:
    raise KeyError(f"Field '{actual_field_name}' not found in the layer. Available fields: {field_names}. Note: 'fid' might be a special field and not listed explicitly.")

# Extract centroids from each grid feature
centroids = []
grid_layer.ResetReading()
for feat in grid_layer:
    # Get the centroid of each feature
    geom = feat.GetGeometryRef().Centroid()
    x = geom.GetX()
    y = geom.GetY()
    # Use the known field name 'fid'
    grid_id = feat.GetField(actual_field_name)
    centroids.append({"grid_id": grid_id, "x": x, "y": y})

# Close the grid data source to free up memory
grid_ds = None

# Create the output GeoPackage layer
# This layer will store the sampled points and their raster values
driver = ogr.GetDriverByName("GPKG")
if os.path.exists(output_gpkg):
    ds_out = driver.Open(str(output_gpkg), update=1)
else:
    ds_out = driver.CreateDataSource(str(output_gpkg))

# Remove existing layer if it exists (to avoid duplicate layers)
if ds_out.GetLayerByName(output_layer_name):
    ds_out.DeleteLayer(output_layer_name)

# Create a spatial reference object for the new layer
srs = osr.SpatialReference()
srs.ImportFromEPSG(epsg)
out_layer = ds_out.CreateLayer(output_layer_name, srs, ogr.wkbPoint)

# Add basic coordinate and ID fields
out_layer.CreateField(ogr.FieldDefn("grid_id", ogr.OFTInteger))
out_layer.CreateField(ogr.FieldDefn("x", ogr.OFTReal))
out_layer.CreateField(ogr.FieldDefn("y", ogr.OFTReal))

# Add fields for each raster (limited to 10 characters for GPKG compatibility)
raster_files = [f for f in os.listdir(raster_folder) if f.endswith(".tif")]
for raster_file in raster_files:
    field_name = os.path.splitext(raster_file)[0][:10]  # Limit to 10 characters for GPKG compatibility
    out_layer.CreateField(ogr.FieldDefn(field_name, ogr.OFTReal))

# Sample raster values at each centroid
for center in centroids:
    x, y = center["x"], center["y"]

    # Create the point feature
    pt = ogr.Geometry(ogr.wkbPoint)
    pt.AddPoint(x, y)

    feat = ogr.Feature(out_layer.GetLayerDefn())
    feat.SetField("grid_id", int(center["grid_id"]))
    feat.SetField("x", x)
    feat.SetField("y", y)
    feat.SetGeometry(pt)

    # Sample each raster at this point
    for raster_file in raster_files:
        raster_path = raster_folder / raster_file
        raster_name = os.path.splitext(raster_file)[0][:10]  # Use the truncated name as the field name

        # Open the raster and get the transform
        ds = gdal.Open(str(raster_path))
        gt = ds.GetGeoTransform()
        band = ds.GetRasterBand(1)

        # Convert point coordinates to pixel coordinates
        px = int((x - gt[0]) / gt[1])
        py = int((y - gt[3]) / gt[5])

        try:
            # Read the single pixel value
            value = band.ReadAsArray(px, py, 1, 1)[0][0]
            feat.SetField(raster_name, value)
        except Exception as e:
            # Use None for out-of-bounds or nodata errors
            feat.SetField(raster_name, None)

        ds = None  # Close the raster

    # Create the feature in the output layer
    out_layer.CreateFeature(feat)
    feat = None  # Clean up

ds_out = None  # Close the output GeoPackage
print(f"✅ Sample table saved to layer '{output_layer_name}' in {output_gpkg}")


Available fields in the layer: ['col', 'row', 'n_value', 'elevation', 'water_elevation', 'flow_depth']


KeyError: "Field 'fid' not found in the layer. Available fields: ['col', 'row', 'n_value', 'elevation', 'water_elevation', 'flow_depth']. Note: 'fid' might be a special field and not listed explicitly."