# Input Layer paths

In [None]:
EE_PROJECT_NAME = ''
INPUT_CLART_LAYER = ''
SURFACE_CATCHMENT_AREA_LAYER = ''
SUBSURFACE_CATCHMENT_AREA_LAYER = ''
NATURAL_DEPRESSION_LAYER = ''
DISTANCE_TO_UPSTREAM_DL_LAYER = ''
OUTPUT_FOLDER_NAME = ''

# Import Statements

In [None]:
from google.colab import drive
from osgeo import gdal, ogr
from rasterio.mask import mask
from pathlib import Path
import numpy as np
import subprocess
import os
import rasterio
import geopandas as gpd

# Mount Google drive to colab runtime.
drive.mount('/content/drive')
drive_path = Path(f'/content/drive/My Drive/')


# Utility Functions: Convert MB tiles to shapefile

These functions define a workflow to convert raster maps (including MBTiles) to vector shapefiles. The process includes reprojecting and resampling, mapping RGBA values to classified categories, and converting classified rasters into shapefiles via polygonization. This is useful for converting map tile visualizations into geospatially analyzable layers.

In [None]:
def execute_gdal_command(input_file, output_file):
    """
    Converts an input raster (e.g., MBTiles) to GeoTIFF and reprojects it to EPSG:4326 
    with 30m resolution using GDAL CLI commands.
    
    Parameters:
        input_file (str): Path to the input raster file.
        output_file (str): Path to the output GeoTIFF file.
    """
    gdal_command = (
        f'gdal_translate -of GTiff {input_file} /vsistdout/ | '
        f'gdalwarp -t_srs EPSG:4326 -tr 0.000277778 0.000277778 /vsistdin/ {output_file}'
    )
    subprocess.run(gdal_command, shell=True)


def combine(input_raster_path, output_raster_path):
    """
    Reads an RGBA raster, identifies specific color combinations, and maps them to 
    categorical values (1–5). Creates a new single-band raster representing those classes.
    
    Parameters:
        input_raster_path (str): Path to the input RGBA raster.
        output_raster_path (str): Path where the output classified raster will be saved.
    """
    input_raster = gdal.Open(input_raster_path)
    width = input_raster.RasterXSize
    height = input_raster.RasterYSize
    num_bands = input_raster.RasterCount

    # Read all bands into memory
    band_data = [input_raster.GetRasterBand(i + 1).ReadAsArray() for i in range(num_bands)]

    # Output band initialized
    new_band_data = np.zeros((height, width), dtype=np.uint8)

    # RGB-A value to class label mapping
    value_mapping = {
        (0, 255, 0, 255): 1,     # Green
        (255, 255, 0, 255): 2,   # Yellow
        (255, 0, 0, 255): 3,     # Red
        (138, 43, 226, 255): 4,  # Purple
        (30, 144, 255, 255): 5   # Blue
    }

    # Map pixel-wise RGBA values to class labels
    for y in range(height):
        for x in range(width):
            combination = tuple(band_data[i][y, x] for i in range(num_bands))
            if combination in value_mapping:
                new_band_data[y, x] = value_mapping[combination]

    # Save the classified raster
    driver = gdal.GetDriverByName("GTiff")
    output_raster = driver.Create(output_raster_path, width, height, 1, gdal.GDT_Byte)
    output_band = output_raster.GetRasterBand(1)
    output_band.WriteArray(new_band_data)
    output_band.SetNoDataValue(0)
    output_raster.SetGeoTransform(input_raster.GetGeoTransform())
    output_raster.SetProjection(input_raster.GetProjection())


def raster_to_shapefile(raster_path, shapefile_path):
    """
    Converts a raster to a vector shapefile using GDAL's polygonization.

    Parameters:
        raster_path (str): Path to the input raster.
        shapefile_path (str): Path where the output shapefile will be stored.
    """
    raster = gdal.Open(raster_path)
    band = raster.GetRasterBand(1)
    driver = ogr.GetDriverByName("ESRI Shapefile")

    if os.path.exists(shapefile_path):
        driver.DeleteDataSource(shapefile_path)

    out_datasource = driver.CreateDataSource(shapefile_path)
    srs = ogr.osr.SpatialReference()
    srs.ImportFromWkt(raster.GetProjection())
    out_layer = out_datasource.CreateLayer("polygonized", srs=srs)
    
    # Add field for storing class values
    field = ogr.FieldDefn("CLART", ogr.OFTInteger)
    out_layer.CreateField(field)

    # Convert raster to polygons
    gdal.Polygonize(band, None, out_layer, 0, [], callback=None)
    out_datasource.FlushCache()
    print(f"Shapefile saved to {shapefile_path}")


def mbtiles_to_shp(input_mbtiles_path, output_shp_path):
    """
    Converts MBTiles to a shapefile by first converting to GeoTIFF, 
    classifying pixel colors, and polygonizing the result.
    
    Parameters:
        input_mbtiles_path (str): Path to the input MBTiles raster.
        output_shp_path (str): Output shapefile path.
    """
    temp = "converted_rgb_raster.tiff"
    base_filename, _ = os.path.splitext(input_mbtiles_path)
    output_tiff_path = base_filename + '.tiff'

    execute_gdal_command(input_mbtiles_path, temp)
    combine(temp, output_tiff_path)
    raster_to_shapefile(output_tiff_path, output_shp_path)
    os.remove(temp)


# Utility Functions: Binning various datasets
These functions define binning strategies for different geospatial layers (e.g., depressions, surface/subsurface catchment, and distance). Each function maps continuous values into discrete classes for easier interpretation and visualization.

In [None]:
# Binning logic for classifying continuous raster data into categories
def bin_nat_dep(arr):
    result = np.full(arr.shape, np.nan)
    result[arr < 1] = 1
    result[(arr >= 1) & (arr < 5)] = 2
    result[arr >= 5] = 3
    return result

def bin_surface_catchment(arr):
    result = np.full(arr.shape, np.nan)
    result[arr < 1000] = 1
    result[(arr >= 1000) & (arr < 5000)] = 2
    result[arr >= 5000] = 3
    return result

def bin_subsurface_catchment(arr):
    result = np.full(arr.shape, np.nan)
    result[arr < 1000] = 1
    result[(arr >= 1000) & (arr < 5000)] = 2
    result[arr >= 5000] = 3
    return result

def bin_distance(arr):
    result = np.full(arr.shape, np.nan)
    result[arr < 50] = 3
    result[(arr >= 50) & (arr < 150)] = 2
    result[arr >= 150] = 1
    return result


# Utility Functions: Raster Prcoessing

These utility functions handle raster processing, including clipping with shapefiles, applying binning functions, validating raster shape consistency, and saving processed rasters.

In [None]:
def load_and_bin(path, bin_func, clip_shapefile_path=None):
    """
    Loads a raster, optionally clips it using a shapefile, and applies binning.

    Parameters:
        path (str): Path to the input raster.
        bin_func (function): Function to bin the raster values.
        clip_shapefile_path (str, optional): Shapefile path for clipping.

    Returns:
        tuple: Binned data, raster profile, and a mask of invalid pixels.
    """
    with rasterio.open(path) as src:
        if clip_shapefile_path:
            gdf = gpd.read_file(clip_shapefile_path)
            geom = [feature["geometry"] for feature in gdf.__geo_interface__["features"]]
            data, transform = mask(src, geom, crop=True)
            data = data[0]
            profile = src.profile.copy()
            profile.update({
                "height": data.shape[0],
                "width": data.shape[1],
                "transform": transform
            })
        else:
            data = src.read(1)
            profile = src.profile.copy()

        nodata = src.nodata
        data = np.where(data == nodata, np.nan, data)
        mask_array = np.isnan(data)

        binned = bin_func(data)

    return binned, profile, mask_array


def assert_same_meta(*rasters):
    """
    Checks that all input rasters have the same shape.

    Parameters:
        *rasters (np.ndarray): List of raster arrays.

    Raises:
        ValueError: If shapes do not match.
    """
    shapes = [r.shape for r in rasters]
    if not all(shape == shapes[0] for shape in shapes):
        raise ValueError("All rasters must have the same shape.")
    return True


def write_output_raster(data, profile, output_path):
    """
    Writes a raster to disk with specified profile and sets nodata to 0.

    Parameters:
        data (np.ndarray): The raster data to write.
        profile (dict): Raster metadata profile.
        output_path (str): File path for output raster.
    """
    with rasterio.open(output_path, 'w', **profile) as dst:
        dst.write(data.astype(rasterio.uint8), 1)
        dst.nodata = 0
    print(f"Output written to {output_path}")


# CLART++
This block converts the CLART classification layer into a shapefile and uses it to clip and bin multiple input rasters. It computes two composite indicators—Recharge Efficiency and Storage Efficiency—by averaging relevant binned layers. The results are saved as GeoTIFF rasters for further visualization or analysis.

In [None]:
# Derive shapefile name from input CLART layer (e.g., MBTiles)
clart_filename, _ = os.path.splitext(INPUT_CLART_LAYER)
clart_shp_name = f"{clart_filename}.shp"
clart_shp = Path(OUTPUT_FOLDER_NAME) / clart_shp_name

# Convert CLART raster (e.g., MBTiles) to a vector shapefile
mbtiles_to_shp(INPUT_CLART_LAYER, clart_shp)

# Load and bin each raster, clipping them using the CLART shapefile
binned_nat_dep, profile, nodata_mask = load_and_bin(NATURAL_DEPRESSION_LAYER, bin_nat_dep, clart_shp)
binned_catch, _, _ = load_and_bin(SURFACE_CATCHMENT_AREA_LAYER, bin_surface_catchment, clart_shp)
binned_dist, _, _ = load_and_bin(DISTANCE_TO_UPSTREAM_DL_LAYER, bin_distance, clart_shp)
binned_subsurface, _, _ = load_and_bin(SUBSURFACE_CATCHMENT_AREA_LAYER, bin_subsurface_catchment, clart_shp)

# Ensure all rasters have the same spatial resolution and dimensions
assert_same_meta(binned_nat_dep, binned_catch, binned_dist, binned_subsurface)

# Compute Recharge Efficiency as average of surface catchment and natural depression bins
recharge_efficiency = (binned_catch + binned_nat_dep) / 2

# Compute Storage Efficiency as average of subsurface catchment, natural depression, and distance bins
storage_efficiency = (binned_subsurface + binned_nat_dep + binned_dist) / 3

# Apply nodata mask to set invalid values as NaN
recharge_efficiency[nodata_mask] = np.nan
storage_efficiency[nodata_mask] = np.nan

# Save recharge efficiency to GeoTIFF
output_recharge_path = Path(OUTPUT_FOLDER_NAME) / 'recharge_efficiency.tif'
write_output_raster(recharge_efficiency, profile, output_recharge_path)

# Save storage efficiency to GeoTIFF
output_storage_path = Path(OUTPUT_FOLDER_NAME) / 'storage_efficiency.tif'
write_output_raster(storage_efficiency, profile, output_storage_path)
