# River mask development using DSWE/Landsat

The following code uses the DWSE watermasking method and Landsat remote sensing data to develop two-dimensional water masks of rivers within the given study area, drawing on locally stored shapefiles for study boundaries and Google Earth Engine for Landsat imagery. Executing the export_masks() function in step 7 generates median annual composite water masks for the for the specified range of years for the given watershed, exporting them as .tif files to Google Drive.

Author: James (Huck) Rees;
        PhD Student, UCSB Geography
        
Date: May 9th, 2024

## Import packages

In [1]:
import ee
import geemap
import geopandas as gpd
import os
from datetime import datetime
import math
ee.Initialize()

## Initialize functions for DSWE water masking

In [2]:
# Normalized Difference Water Index (MNDWI)
def Mndwi(image):
    """
    Calculate the Modified Normalized Difference Water Index (MNDWI) for a given image.

    Parameters:
    image (ee.Image): The input image.

    Returns:
    ee.Image: The resulting image with the MNDWI band named 'mndwi'.
    """
    return image.normalizedDifference(['Green', 'Swir1']).rename('mndwi')

# Modified Bare Soil Reflectance Variables
def Mbsrv(image):
    """
    Calculate the Modified Bare Soil Reflectance Variable (MBSRV) for a given image.

    Parameters:
    image (ee.Image): The input image.

    Returns:
    ee.Image: The resulting image with the MBSRV band named 'mbsrv'.
    """
    return image.select(['Green']).add(image.select(['Red'])).rename('mbsrv')

def Mbsrn(image):
    """
    Calculate the Modified Bare Soil Reflectance Normalized (MBSRN) for a given image.

    Parameters:
    image (ee.Image): The input image.

    Returns:
    ee.Image: The resulting image with the MBSRN band named 'mbsrn'.
    """
    return image.select(['Nir']).add(image.select(['Swir1'])).rename('mbsrn')

# Normalized Difference Vegetation Index (NDVI)
def Ndvi(image):
    """
    Calculate the Normalized Difference Vegetation Index (NDVI) for a given image.

    Parameters:
    image (ee.Image): The input image.

    Returns:
    ee.Image: The resulting image with the NDVI band named 'ndvi'.
    """
    return image.normalizedDifference(['Nir', 'Red']).rename('ndvi')

# Automated Water Extraction Index (AWESH)
def Awesh(image):
    """
    Calculate the Automated Water Extraction Index (AWEsh) for a given image.

    Parameters:
    image (ee.Image): The input image with the necessary bands for MBSRN calculation.

    Returns:
    ee.Image: The resulting image with the AWEsh band named 'awesh'.
    """
    return image.expression(
        'Blue + 2.5 * Green + (-1.5) * mbsrn + (-0.25) * Swir2',
        {
            'Blue': image.select(['Blue']),
            'Green': image.select(['Green']),
            'mbsrn': Mbsrn(image).select(['mbsrn']),
            'Swir2': image.select(['Swir2'])
        }
    ).rename('awesh')

# Decision Tree for Surface Water Extent (DSWE)
def Dswe(image):
    """
    Calculate the Decision Tree for Surface Water Extent (DSWE) for a given image.

    Parameters:
    image (ee.Image): The input image with bands required for the DSWE calculation.

    Returns:
    ee.Image: The resulting image with the DSWE classification named 'dswe'.
    """
    mndwi = Mndwi(image)
    mbsrv = Mbsrv(image)
    mbsrn = Mbsrn(image)
    awesh = Awesh(image)
    swir1 = image.select(['Swir1'])
    nir = image.select(['Nir'])
    ndvi = Ndvi(image)
    blue = image.select(['Blue'])
    swir2 = image.select(['Swir2'])

    # Decision tree thresholds
    t1 = mndwi.gt(0.124)
    t2 = mbsrv.gt(mbsrn)
    t3 = awesh.gt(0)
    t4 = (mndwi.gt(-0.44)
          .And(swir1.lt(0.09))
          .And(nir.lt(0.15))
          .And(ndvi.lt(0.7)))
    t5 = (mndwi.gt(-0.5)
          .And(blue.lt(0.1))
          .And(swir1.lt(0.3))
          .And(swir2.lt(0.1))
          .And(nir.lt(0.25)))

    # Combine results using weights to create unique classes
    t = t1.add(t2.multiply(10)).add(t3.multiply(100)).add(t4.multiply(1000)).add(t5.multiply(10000))

    # Define DSWE classification levels
    noWater = t.eq(0).Or(t.eq(1)).Or(t.eq(10)).Or(t.eq(100)).Or(t.eq(1000))
    hWater = t.eq(1111).Or(t.eq(10111)).Or(t.eq(11011)).Or(t.eq(11101)).Or(t.eq(11110)).Or(t.eq(11111))
    mWater = (t.eq(111).Or(t.eq(1011)).Or(t.eq(1101)).Or(t.eq(1110))
              .Or(t.eq(10011)).Or(t.eq(10101)).Or(t.eq(10110))
              .Or(t.eq(11001)).Or(t.eq(11010)).Or(t.eq(11100)))
    pWetland = t.eq(11000)
    lWater = (t.eq(11).Or(t.eq(101)).Or(t.eq(110)).Or(t.eq(1001))
              .Or(t.eq(1010)).Or(t.eq(1100)).Or(t.eq(10000))
              .Or(t.eq(10001)).Or(t.eq(10010)).Or(t.eq(10100)))

    # Assign classification levels to DSWE
    iDswe = (noWater.multiply(0)
             .add(hWater.multiply(1))
             .add(mWater.multiply(2))
             .add(pWetland.multiply(3))
             .add(lWater.multiply(4)))

    return iDswe.rename(['dswe'])

# Generate Binary Water Mask Based on DSWE
def ClassifyWaterJones2019(image, max_level):
    """
    Creates a binary water mask from an input image using DSWE classification.
    
    Args:
        image: The input image containing relevant bands and indices.
        max_level: The maximum classification level (inclusive) to include in the mask.
    
    Returns:
        ee.Image: A single-band image representing the binary water mask.
    """
    dswe = Dswe(image)
    # Start with level 1 (high-confidence water)
    water_mask = dswe.eq(1)

    # Add higher levels if within the specified max level
    for level in range(2, max_level + 1):
        water_mask = water_mask.Or(dswe.eq(level))

    return water_mask.rename(['waterMask'])

Error in callback <function _set_css_in_cell_output at 0x000001D763F53600> (for pre_run_cell), with arguments args (<ExecutionInfo object at 1d763e629d0, raw_cell="# Normalized Difference Water Index (MNDWI)
def Mn.." store_history=True silent=False shell_futures=True cell_id=None>,),kwargs {}:


TypeError: _set_css_in_cell_output() takes 0 positional arguments but 1 was given

## Initialize functions to curate Landsat imagery from GEE (collection 2)

In [3]:
def maskL8sr(image):
    """
    Masks out clouds and cloud shadows within Landsat 8 and 9 images using the BQA band.

    Args:
        image: ee.Image, the input Landsat image.

    Returns:
        ee.Image: The cloud-masked image.
    """
    # Bit positions for cloud shadow (bit 3) and clouds (bit 5) in the QA_PIXEL band
    cloud_shadow_bit_mask = (1 << 3)
    clouds_bit_mask = (1 << 5)
    # Select the BQA band
    qa = image.select('BQA')
    # Both bits should be zero for clear conditions
    mask = qa.bitwiseAnd(cloud_shadow_bit_mask).eq(0).And(
        qa.bitwiseAnd(clouds_bit_mask).eq(0)
    )
    return image.updateMask(mask)

def getLandsatCollection():
    """
    Merges Landsat 5, 7, 8, and 9 collections (Tier 1, Collection 2 SR) 
    and standardizes the band names for consistent analysis.

    Returns:
        ee.ImageCollection: A merged collection of standardized Landsat images.
    """
    # Define the band mappings for each Landsat version
    bn9 = ['SR_B1', 'SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B6', 'SR_B7', 'QA_PIXEL']
    bn8 = ['SR_B1', 'SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B6', 'SR_B7', 'QA_PIXEL']
    bn7 = ['SR_B1', 'SR_B1', 'SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B7', 'QA_PIXEL']
    bn5 = ['SR_B1', 'SR_B1', 'SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B7', 'QA_PIXEL']
    # Standardized names for all bands
    standard_bands = ['uBlue', 'Blue', 'Green', 'Red', 'Nir', 'Swir1', 'Swir2', 'BQA']

    # Fetch and rename bands in the Landsat collections
    ls5 = ee.ImageCollection("LANDSAT/LT05/C02/T1_L2").select(bn5, standard_bands)
    ls7 = ee.ImageCollection("LANDSAT/LE07/C02/T1_L2").filterDate('1999-04-15', '2003-05-30').select(bn7, standard_bands)
    ls8 = ee.ImageCollection("LANDSAT/LC08/C02/T1_L2").select(bn8, standard_bands)
    ls9 = ee.ImageCollection("LANDSAT/LC09/C02/T1_L2").select(bn9, standard_bands)

    # Merge all collections
    merged_collection = ls5.merge(ls7).merge(ls8).merge(ls9)

    return merged_collection

def rescale(image):
    """
    Rescale the reflectance values of Landsat imagery to allow for use of Landsat Collection 2 in DSWE.

    Parameters:
    image (ee.Image): The input image with bands to be rescaled.

    Returns:
    ee.Image: The image with rescaled bands added.
    """
    bns = ['uBlue', 'Blue', 'Green', 'Red', 'Nir', 'Swir1', 'Swir2', 'BQA']
    optical_bands = image.select(bns).multiply(0.0000275).add(-0.2)
    return image.addBands(optical_bands, None, True)

Error in callback <function _set_css_in_cell_output at 0x000001D763F53600> (for pre_run_cell), with arguments args (<ExecutionInfo object at 1d765c12e50, raw_cell="def maskL8sr(image):
    """
    Masks out clouds .." store_history=True silent=False shell_futures=True cell_id=None>,),kwargs {}:


TypeError: _set_css_in_cell_output() takes 0 positional arguments but 1 was given

## Function to import study area polygon/s or reaches.

In [4]:
def load_watershed_shapefile(folder_name, base_path):
    """
    Loads a watershed shapefile or reach shapefile into an Earth Engine FeatureCollection.

    Parameters:
        - folder_name: str, the name of the specific watershed/reach folder/shapefile. This can include one or more features.
        - base_path: str, optional base directory containing watershed folders (default set to your path).

    Returns:
        - ee.FeatureCollection: An Earth Engine FeatureCollection representing the watershed geometry.
    """
    # Construct the full path to the shapefile
    shapefile_path = os.path.join(base_path, folder_name, f"{folder_name}.shp")
    
    # Check if the shapefile exists
    if not os.path.exists(shapefile_path):
        print(f"No shapefile found at {shapefile_path}")
        return None
    
    # Load the shapefile into a GeoDataFrame
    try:
        gdf = gpd.read_file(shapefile_path)
    except Exception as e:
        print(f"Error loading shapefile: {e}")
        return None
    
    # Initialize the Earth Engine API if not already done
    if not ee.data._credentials:
        ee.Initialize()
    
    # Convert the GeoDataFrame to a GeoJSON-like structure and then to a FeatureCollection
    try:
        feature_collection = geemap.geopandas_to_ee(gdf)
    except Exception as e:
        print(f"Error converting GeoDataFrame to FeatureCollection: {e}")
        return None

    return feature_collection

Error in callback <function _set_css_in_cell_output at 0x000001D763F53600> (for pre_run_cell), with arguments args (<ExecutionInfo object at 1d75f186910, raw_cell="def load_watershed_shapefile(folder_name, base_pat.." store_history=True silent=False shell_futures=True cell_id=None>,),kwargs {}:


TypeError: _set_css_in_cell_output() takes 0 positional arguments but 1 was given

## Functions to generate DSWE water masks for the provided study/area or reaches

In [5]:
# Function to generate a water mask for a given year and polygon feature
def get_water_mask_for_feature(year, max_level, polygon):
    """
    Generate a water mask for a given year and polygon feature using Landsat imagery.

    Parameters:
    year (int): The year for which to generate the water mask.
    max_level (float): The maximum DSWE water level threshold for classification.
    polygon (ee.Geometry.Polygon): The polygon feature defining the area of interest.

    Returns:
    ee.Image: The water mask image for the specified year and polygon.
    """
    imagery = (getLandsatCollection()
               .map(maskL8sr)
               .map(rescale)
               .filterDate(f'{year}-01-01', f'{year}-12-31')
               .filterBounds(polygon))

    image_composite = imagery.median().clip(polygon)
    water_mask = ClassifyWaterJones2019(image_composite, max_level)

    return water_mask

# Main function to generate water masks for each feature in a FeatureCollection
def get_water_masks(year, folder_name, max_level, base_path=r'C:\Users\huckr\Desktop\UCSB\Dissertation\Data\RiverMapping\Reaches'):
    """
    Generate water masks for each feature in a FeatureCollection for a given year.

    Parameters:
    year (int): The year for which to generate the water masks.
    folder_name (str): The name of the folder containing the study area shapefile.
    max_level (float): The maximum DSWE water level threshold for classification.
    base_path (str): The base path to the directory containing the study area shapefile.

    Returns:
    ee.FeatureCollection: A FeatureCollection with water masks for each feature.
    """
    
    # Load study area shapefile, using the base path
    polygon_fc = load_watershed_shapefile(folder_name, base_path)

    # Map over each feature in the FeatureCollection and generate water masks
    water_masks = polygon_fc.map(lambda feature: get_water_mask_for_feature(year, max_level, feature.geometry()).set('polygon_id', feature.id()))

    return water_masks

# Function to export masks for a range of years and features within a polygon feature collection
def export_masks(folder_name, max_level, year_range="All", reach_range="All", 
                 base_path=r'C:\Users\huckr\Desktop\UCSB\Dissertation\Data\RiverMapping\Reaches', 
                 local_output_path=r'C:\Users\huckr\Desktop\UCSB\Dissertation\Data\RiverMapping\RiverMasks'):
    
    """
    Generates water masks for a given river and specified range of years and reaches.

    Parameters:
        year_range (str, int, tuple, optional): Range of years (start, end), a single year, or "All".
        reach_range (str, int, tuple, optional): Range of reaches (start, end), a single reach, or "All".
        folder_name (str): Name of the folder containing watershed shapefile.
        max_level (int): Maximum DSWE level for water mask generation.
        base_path (str, optional): Base path to the folder containing watershed shapefiles.
        local_output_path (str, optional): Local path where the output masks will be saved.

    Outputs:
        Exports water masks as TIFF files to the specified local output path.
    """
    # Load watershed shapefile
    polygon_fc = load_watershed_shapefile(folder_name, base_path)
        
    # Define the target CRS
    target_crs = 'EPSG:4326'
        
    # Determine the year range
    if year_range == "All":
        year_start = 1984
        year_end = 2025
    elif isinstance(year_range, int):
        year_start = year_range
        year_end = year_range
    elif isinstance(year_range, tuple) and len(year_range) == 2:
        year_start, year_end = year_range
    else:
        raise ValueError("year_range must be 'All', an int, or a tuple (start, end).")
        
    # Determine the reach range
    if reach_range == "All":
        reach_start = 1
        reach_end = 9999
    elif isinstance(reach_range, int):
        reach_start = reach_range
        reach_end = reach_range
    elif isinstance(reach_range, tuple) and len(reach_range) == 2:
        reach_start, reach_end = reach_range
    else:
        raise ValueError("reach_range must be 'All', an int, or a tuple (start, end).")
        
    for feature in polygon_fc.getInfo()['features']:
            
            # Get the geometry and ID of the feature
            feature_geom = ee.Feature(feature).geometry()
            feature_id = int(feature['properties']['ds_order'])
                
            # Check if the feature_id is within the reach range
            if reach_start <= feature_id <= reach_end:
                # Create the reach subfolder if it does not exist
                reach_folder_path = os.path.join(local_output_path, folder_name, f"reach_{feature_id}", "Raw")
                if not os.path.exists(reach_folder_path):
                    os.makedirs(reach_folder_path)
                    
                for year in range(year_start, year_end + 1):
                    # Generate the water mask for the current year and feature
                    water_mask = get_water_mask_for_feature(year, max_level, feature_geom)
                        
                    # Reproject the water mask to WGS 84
                    water_mask_reprojected = water_mask.reproject(crs=target_crs, scale=30)
                        
                    # Set the output file path based on the folder, year, and feature ID
                    output_file_name = f"{folder_name}_reach_{feature_id}_{year}_DSWE_level_{max_level}.tif"
                    output_file_path = os.path.join(reach_folder_path, output_file_name)
                        
                    # Export the water mask to the local folder
                    geemap.ee_export_image(
                        water_mask_reprojected,
                        filename=output_file_path,
                        scale=30,
                        region=feature_geom,
                        file_per_band=False
                    )
                        
                    print(f"Exported {output_file_name} to {reach_folder_path}.")

Error in callback <function _set_css_in_cell_output at 0x000001D763F53600> (for pre_run_cell), with arguments args (<ExecutionInfo object at 1d76051d490, raw_cell="# Function to generate a water mask for a given ye.." store_history=True silent=False shell_futures=True cell_id=None>,),kwargs {}:


TypeError: _set_css_in_cell_output() takes 0 positional arguments but 1 was given

## 7) Input parameters and generate masks, download to local folder:

In [6]:
# Required input variables
# Name of river (must match folder name)
Folder_name = 'Rakaia'

# DSWE water masking threshold, can be between 1 and 4. 4 will mask the most water, but overestimate water coverage.
# See Jones et al. (2019) for more info on DSWE water classes
DSWE_threshold_level = 2

# Specify the range in years to process. The user can enter either a range of years, a specific year, or "All" to process
# all years for the given reach/reaches. Example inputs are (1997, 2017) for a range of years, 2017 for a single year, or 
# "All" for all years. "All" will result in the code attempting to process the years 1984 - 2025, the period of the Landsat
# record plus some more.
year_range = 2022

# Specify reach/reaches to process. The user can enter either a range of reaches, a single reach, or "All" to process all
# reaches within the river reach shapefile. Example inputs are (1, 40) for a range of reaches, 8 for a single reach, or 
# "All" for all reaches
reach_range = (1, 6)

# Optional input variables
base_path = r'C:\Users\huckr\Desktop\UCSB\Dissertation\Data\RiverMapping\Reaches'
local_output_path = r'C:\Users\huckr\Desktop\UCSB\Dissertation\Data\RiverMapping\RiverMasks'

export_masks(Folder_name, 
             DSWE_threshold_level,
             year_range, 
             reach_range, 
             base_path,
             local_output_path
            )

Error in callback <function _set_css_in_cell_output at 0x000001D763F53600> (for pre_run_cell), with arguments args (<ExecutionInfo object at 1d765c13c90, raw_cell="# Required input variables
# Name of river (must m.." store_history=True silent=False shell_futures=True cell_id=None>,),kwargs {}:


TypeError: _set_css_in_cell_output() takes 0 positional arguments but 1 was given

Generating URL ...
Downloading data from https://earthengine.googleapis.com/v1/projects/earthengine-legacy/thumbnails/ff80609f91f9f9593b3acfd0dac0b18d-8ace6990cad563e028555e493316cec3:getPixels
Please wait ...
Data downloaded to C:\Users\huckr\Desktop\UCSB\Dissertation\Data\RiverMapping\RiverMasks\Rakaia\reach_6\Raw\Rakaia_reach_6_2022_DSWE_level_2.tif
Exported Rakaia_reach_6_2022_DSWE_level_2.tif to C:\Users\huckr\Desktop\UCSB\Dissertation\Data\RiverMapping\RiverMasks\Rakaia\reach_6\Raw.
Generating URL ...
Downloading data from https://earthengine.googleapis.com/v1/projects/earthengine-legacy/thumbnails/d175f1f3bcf15eba2ad8800745ab6620-5154c34857488aa4a5087bafa3362411:getPixels
Please wait ...
Data downloaded to C:\Users\huckr\Desktop\UCSB\Dissertation\Data\RiverMapping\RiverMasks\Rakaia\reach_5\Raw\Rakaia_reach_5_2022_DSWE_level_2.tif
Exported Rakaia_reach_5_2022_DSWE_level_2.tif to C:\Users\huckr\Desktop\UCSB\Dissertation\Data\RiverMapping\RiverMasks\Rakaia\reach_5\Raw.
Generating U