In [1]:
import sys
from pathlib import Path
sys.path.append(str(Path().absolute().parent))

In [2]:
import ee 
import geemap

ee.Initialize(project="thurgau-irrigation")

*** Earth Engine *** Share your feedback by taking our Annual Developer Satisfaction Survey: https://google.qualtrics.com/jfe/form/SV_0JLhFqfSY1uiEaW?source=Init


In [13]:
from src.data_processing.downscaling import resample_image
from src.data_processing.sentinel_preprocessing import load_sentinel2_data

from utils.date_utils import set_to_first_of_month, print_collection_dates, create_centered_date_ranges
from utils.ee_utils import harmonized_ts
from utils.harmonic_regressor import HarmonicRegressor

from typing import List


---

In [4]:
PATH_TO_AOI = "projects/thurgau-irrigation/assets/Thurgau/thrugau_borders_2024"

aoi_feature_collection = ee.FeatureCollection(PATH_TO_AOI)
aoi_geometry = aoi_feature_collection.geometry().simplify(500)

AOI = aoi_geometry.buffer(100)

In [5]:
landsat_ET_30m = ee.ImageCollection(
    "projects/thurgau-irrigation/assets/ETlandsatmonthly"
).filterDate("2022-01-01", "2022-12-31")

landsat_ET_30m = set_to_first_of_month(landsat_ET_30m)

landsat_ET_100m = landsat_ET_30m.map(lambda img: resample_image(img, 100, ["ET"]))
landsat_ET_100m_list = landsat_ET_100m.toList(landsat_ET_100m.size())

In [6]:
s2_collection = load_sentinel2_data(2022, aoi=AOI)

time_intervals = create_centered_date_ranges(landsat_ET_100m_list, buffer_days=15)

bands = ["B3", "B4", "B8", "B11", "B12"]

options = {"agg_type": "mosaic", "mosaic_type": "least_cloudy", "band_name": "NDVI"}

s2_harmonized = harmonized_ts(
    masked_collection=s2_collection,
    band_list=bands,
    time_intervals=time_intervals,
    options=options,
)

In [7]:
def add_temporal_bands(collection: ee.ImageCollection) -> ee.ImageCollection:
    """Add temporal bands to each image in the collection."""
    def _add_bands(image: ee.Image) -> ee.Image:
        date = ee.Date(image.get('system:time_start'))
        years = date.difference(ee.Date('1970-01-01'), 'year')
        
        projection = image.select([0]).projection()
        time_band = ee.Image(years).float().rename('t')
        constant_band = ee.Image.constant(1).rename('constant')
        
        return image.addBands([
            time_band.setDefaultProjection(projection),
            constant_band.setDefaultProjection(projection)
        ])
    
    return collection.map(_add_bands)

s2_harmonized = add_temporal_bands(s2_harmonized)
s2_harmonized.first().bandNames().getInfo()

['B3', 'B4', 'B8', 'B11', 'B12', 't', 'constant']

In [8]:
def compute_vegetation_indexes(image: ee.Image) -> ee.Image:
    """
    Compute vegetation indexes for a given image

    Args:
        image (ee.Image): The image to compute the vegetation indexes for

    Returns:
        ee.Image: The input image with the vegetation indexes

    """
    ndvi = image.normalizedDifference(["B8", "B4"]).rename("NDVI")
    ndwi = image.normalizedDifference(["B3", "B8"]).rename("NDWI")
    ndbi = image.normalizedDifference(["B11", "B8"]).rename("NDBI")
    return image.addBands(ndvi).addBands(ndwi).addBands(ndbi)

s2_harmonized_w_vegetation_indexes = s2_harmonized.map(compute_vegetation_indexes)

In [9]:
s2_harmonized_w_vegetation_indexes.first().bandNames().getInfo()    

['B3', 'B4', 'B8', 'B11', 'B12', 't', 'constant', 'NDVI', 'NDWI', 'NDBI']

In [None]:
indexes = ["NDVI", "NDWI", "NDBI"]


s2_harmonized_gaps_filled = s2_harmonized_w_vegetation_indexes

for index in indexes:
    regressor = HarmonicRegressor(
        omega=1, max_harmonic_order=1, band_to_harmonize=index
    )

    regressor.fit(s2_harmonized_w_vegetation_indexes)
    fitted_collection = regressor.predict(s2_harmonized_w_vegetation_indexes)

    fitted_collection = fitted_collection.map(
        lambda img: img.select(["fitted"]).rename(f"fitted_{index}")
    )

    s2_harmonized_gaps_filled = s2_harmonized_gaps_filled.map(
        lambda img: img.addBands(
            fitted_collection.filterDate(img.date()).first().select([f"fitted_{index}"])
        )
    )

In [11]:
s2_harmonized_gaps_filled.first().bandNames().getInfo()

['B3',
 'B4',
 'B8',
 'B11',
 'B12',
 't',
 'constant',
 'NDVI',
 'NDWI',
 'NDBI',
 'fitted_NDVI',
 'fitted_NDWI',
 'fitted_NDBI']

In [14]:
def fill_gaps(
    img: ee.Image, source_band: str, fill_band: str, output_name: str
) -> ee.Image:
    """Fill gaps in a band with values from another band.

    Args:
        img (ee.Image): Input image containing both bands
        source_band (str): Name of band containing gaps to fill
        fill_band (str): Name of band to use for filling gaps
        output_name (str): Name for the output gap-filled band

    Returns:
        ee.Image: Image with gap-filled band
    """
    # Create mask where the source band is invalid (gaps)
    gap_mask = img.select(source_band).mask().Not()

    # Get the source band and fill band
    source = img.select(source_band)
    fill = img.select(fill_band)

    # Fill gaps: use source band where available, fill band where there are gaps
    filled = source.unmask().where(gap_mask, fill).rename(output_name)

    return filled


def apply_gap_filling(img: ee.Image, indexes: List[str]) -> ee.Image:
    """Apply gap filling to multiple bands.

    Args:
        img (ee.Image): Input image
        indexes (list[str]): List of index names to process (e.g., ['NDVI', 'NDWI', 'NDBI'])

    Returns:
        ee.Image: Original image with added gap-filled bands
    """
    # Start with the original image
    result = img

    # Add each gap-filled band one at a time
    for index in indexes:
        filled_band = fill_gaps(
            img=img,
            source_band=index,
            fill_band=f"fitted_{index}",
            output_name=f"gap_filled_{index}",
        )
        result = result.addBands(filled_band)

    return result


# Apply gap filling to the collection
def process_collection(
    collection: ee.ImageCollection, indexes: List[str]
) -> ee.ImageCollection:
    """Process entire collection by applying gap filling to each image.

    Args:
        collection (ee.ImageCollection): Input collection
        indexes (List[str]): List of index names to process

    Returns:
        ee.ImageCollection: Processed collection with gap-filled bands
    """
    return collection.map(lambda img: apply_gap_filling(img, indexes))


# Example usage:
indexes = ["NDVI", "NDWI", "NDBI"]
s2_harmonized_gaps_filled = process_collection(s2_harmonized_gaps_filled, indexes)

s2_harmonized_gaps_filled.first().bandNames().getInfo()

['B3',
 'B4',
 'B8',
 'B11',
 'B12',
 't',
 'constant',
 'NDVI',
 'NDWI',
 'NDBI',
 'fitted_NDVI',
 'fitted_NDWI',
 'fitted_NDBI',
 'gap_filled_NDVI',
 'gap_filled_NDWI',
 'gap_filled_NDBI']

In [15]:
Map = geemap.Map()

sentinel_vis_params = {
    "bands": ["NDVI"],
    "min": 0,
    "max": 1,
    "palette": ["white", "green"],
}

fitted_vis_params = {
    "bands": ["fitted_NDVI"],
    "min": 0,
    "max": 1,
    "palette": ["white", "green"],
}

gap_filled_vis_params = {
    "bands": ["gap_filled_NDVI"],
    "min": 0,
    "max": 1,
    "palette": ["white", "green"],
}


s2_image = ee.Image(s2_harmonized_gaps_filled.filterBounds(AOI).toList(12).get(8))

Map.addLayer(s2_image, sentinel_vis_params, "Sentinel 2")
Map.addLayer(s2_image, fitted_vis_params, "Sentinel 2 Fitted")
Map.addLayer(s2_image, gap_filled_vis_params, "Sentinel 2 Gap Filled")


Map.centerObject(AOI, 12)
Map

Map(center=[47.56858787382066, 9.092720596553875], controls=(WidgetControl(options=['position', 'transparent_b…