In [1]:
import ee

In [8]:
# -*- coding: utf-8 -*-
"""
Daily LST fusion over buffered monitoring points — TABLE-FIRST (Python/EE)

Sensors:
- VIIRS (VNP21A1D), MODIS (MOD11A1 & MYD11A1), Landsat 8/9 L2 ST, ERA5-Land DAILY

Strategy (same as the JS script):
- Per-image reduce over buffered monitoring points
- Tag each row with *local* date (timezone offset)
- groupBy(key=site|date) → daily means per sensor
- Left-join daily tables on key
- Fuse (L8/9 > VIIRS > MODIS Terra > MODIS Aqua), then per-site ERA5 bias correction
- Final LST series: fused_obs if present else ERA5 (bias corrected)
"""

import ee

# If you haven't authenticated this machine before, run once:
# ee.Authenticate()

# Initialize with your GCP project that has Earth Engine API enabled
ee.Initialize(project="cuenca-soil-moisture")

# ---------------------------------------------------------------------
# ========== CONFIG ==========
# ---------------------------------------------------------------------
YEARS = dict(start=2013, end=2025)   # inclusive
TZ_OFFSET_H = -5                     # local time offset hours (e.g., Ecuador ~ UTC-5)
EXPORT_DESC = f"LST_monitoring{YEARS['start']}"

# Buffers (meters) and nominal reduction scales (meters) per sensor
BUF   = dict(L8_9=60,   MODIS=600,  VIIRS=600,  ERA5=5000)
SCALE = dict(L8_9=60,   MODIS=1000, VIIRS=1000, ERA5=9000)
TILE_SCALE = 2                     # reduceRegions tileScale
MAX_PIX_PER_REGION = 1e9           # safeguard for large polygons (harmless if ignored)

# Monitoring points (assets with points). Add/remove freely.
# The script normalizes a 'site' name and also tags an 'asset' label.
MONITOR_ASSETS = [
    ("projects/cuenca-soil-moisture/assets/Zhurucay_monitoring", "Zhurucay"),
    ("projects/cuenca-soil-moisture/assets/Quinuas_monitoring",  "Quinuas"),
]

# ---------------------------------------------------------------------
# ========== DATE HELPERS ==========
# ---------------------------------------------------------------------
startLocal = ee.Date.fromYMD(YEARS["start"], 1, 1)
endLocal   = ee.Date.fromYMD(YEARS["end"] + 1, 1, 1)  # exclusive

def local_date_str_from_image(img: ee.Image) -> ee.String:
    """
    Format *local* date string 'YYYY-MM-dd' from an image's time_start.
    We advance by TZ_OFFSET_H hours to convert UTC to local day buckets.
    """
    return ee.Date(img.get('system:time_start')).advance(TZ_OFFSET_H, 'hour').format('YYYY-MM-dd')

# ---------------------------------------------------------------------
# ========== MONITORING POINTS & REGIONS ==========
# ---------------------------------------------------------------------
def normalize_points(asset_id: str, label: str) -> ee.FeatureCollection:
    """
    Load points and normalize their properties:
      - Ensure a 'site' property (prefer 'site', then 'Codigo', then 'name', else feature id)
      - Tag an 'asset' label and a combined 'site2' = '<asset>:<site>' used in joins/keys
    """
    fc = ee.FeatureCollection(asset_id)

    def _norm(f):
        f = ee.Feature(f)
        props = f.propertyNames()
        # FIXED PARENTHESES HERE
        site = ee.String(
            ee.Algorithms.If(
                props.contains('site'), f.get('site'),
                ee.Algorithms.If(
                    props.contains('Codigo'), f.get('Codigo'),
                    ee.Algorithms.If(
                        props.contains('name'), f.get('name'),
                        f.id()
                    )
                )
            )
        )
        site2 = ee.String(label).cat(':').cat(site)
        return ee.Feature(f.geometry(), f.toDictionary()) \
                 .set({'site': site, 'asset': label, 'site2': site2})

    return fc.map(_norm)

# Merge all monitoring points
pts_list = [normalize_points(a, lbl) for (a, lbl) in MONITOR_ASSETS]
if len(pts_list) == 0:
    raise ValueError("MONITOR_ASSETS is empty.")
PTS = pts_list[0]
for i in range(1, len(pts_list)):
    PTS = PTS.merge(pts_list[i])

# Build buffered regions per sensor (copy properties so 'site2' is available during reduction)
def buffer_with_props(f: ee.Feature, dist: float) -> ee.Feature:
    return ee.Feature(ee.Feature(f).geometry().buffer(dist)).copyProperties(f)

regions_L8    = PTS.map(lambda f: buffer_with_props(f, BUF["L8_9"]))
regions_MOD   = PTS.map(lambda f: buffer_with_props(f, BUF["MODIS"]))
regions_VIIRS = PTS.map(lambda f: buffer_with_props(f, BUF["VIIRS"]))
regions_ERA5  = PTS.map(lambda f: buffer_with_props(f, BUF["ERA5"]))

# A modest bounds polygon for filtering datasets
bounds = PTS.geometry().buffer(10000)

# ---------------------------------------------------------------------
# ========== PROPERTY SAFETY HELPERS ==========
# ---------------------------------------------------------------------
def keep_props(src_img: ee.Image, out_img: ee.Image) -> ee.Image:
    """Copy all properties from src_img to out_img."""
    return out_img.copyProperties(src_img, src_img.propertyNames())

def ensure_time(img: ee.Image) -> ee.Image:
    """
    Ensure 'system:time_start' exists.
    If missing (some products), try parse from system:index; else set a fallback epoch.
    """
    has_time = img.propertyNames().contains('system:time_start')

    def _with_time():
        idx = ee.String(img.get('system:index'))
        millis = ee.Algorithms.If(
            idx,
            ee.Date.parse('YYYY_MM_dd', idx).millis(),
            ee.Date('1970-01-01').millis()
        )
        return img.set('system:time_start', millis)

    return ee.Image(ee.Algorithms.If(has_time, img, _with_time()))

# ---------------------------------------------------------------------
# ========== DATASETS & PER-IMAGE MAPPERS ==========
# ---------------------------------------------------------------------
# Raw collections
viirs      = ee.ImageCollection('NASA/VIIRS/002/VNP21A1D')        # daily L3
modisTerra = ee.ImageCollection('MODIS/061/MOD11A1')              # daily L3
modisAqua  = ee.ImageCollection('MODIS/061/MYD11A1')              # daily L3
l8         = ee.ImageCollection('LANDSAT/LC08/C02/T1_L2')
l9         = ee.ImageCollection('LANDSAT/LC09/C02/T1_L2')
landsat    = l8.merge(l9)
era5d      = ee.ImageCollection('ECMWF/ERA5_LAND/DAILY_AGGR')     # daily

def viirs_LST(img: ee.Image) -> ee.Image:
    """
    VIIRS VNP21A1D: LST_1KM scale 0.02 K; 0 is fill.
    We relax mask: keep only >0, scale by 0.02, rename to 'VIIRS'.
    """
    raw = img.select('LST_1KM')
    out = raw.updateMask(raw.gt(0)).multiply(0.02).rename('VIIRS')
    return ensure_time(keep_props(img, out))

def modis_LST(img: ee.Image, out_name: str) -> ee.Image:
    """MODIS (MOD11A1/MYD11A1): LST_Day_1km scale 0.02 K; 0 is fill → keep >0."""
    raw = img.select('LST_Day_1km')
    out = raw.updateMask(raw.gt(0)).multiply(0.02).rename(out_name)
    return ensure_time(keep_props(img, out))

def landsat_ST(img: ee.Image) -> ee.Image:
    """
    Landsat L2: ST_B10 in Kelvin per USGS:
      ST[K] = DN * 0.00341802 + 149.0
    We keep relaxed mask (no QA cloud mask here) to avoid over-masking; rename 'L8_9'.
    """
    stK = img.select('ST_B10').multiply(0.00341802).add(149.0)
    out = stK.rename('L8_9')
    return ensure_time(keep_props(img, out))

def era5_daily(img: ee.Image) -> ee.Image:
    """ERA5-Land DAILY_AGGR: skin_temperature already in Kelvin; rename 'ERA5'."""
    out = img.select('skin_temperature').rename('ERA5')
    return ensure_time(keep_props(img, out))

# ---------------------------------------------------------------------
# ========== PER-IMAGE → TABLE (no clipping) ==========
# ---------------------------------------------------------------------
def per_image_table(ic: ee.ImageCollection,
                    map_fn,
                    band_name: str,
                    regions_fc: ee.FeatureCollection,
                    scale: float) -> ee.FeatureCollection:
    """
    For each image (already mapped to single-band with name=band_name):
      - compute local date string
      - reduceRegions over the buffered polygons
      - emit rows { key=site2|date, site=<site2>, date=<YYYY-MM-dd>, <band>=mean }
    Returns a FeatureCollection of per-image rows (not daily yet).
    """
    ic0 = (ic.filterBounds(bounds)
             .filterDate(startLocal.advance(-TZ_OFFSET_H, 'hour'),
                         endLocal.advance(-TZ_OFFSET_H, 'hour'))
             .map(map_fn))

    def per_img(img):
        date_local = local_date_str_from_image(img)
        fc = ee.Image(img).reduceRegions(
            collection=regions_fc,
            reducer=ee.Reducer.mean(),
            scale=scale,
            tileScale=TILE_SCALE,
            maxPixelsPerRegion=MAX_PIX_PER_REGION  # ignored by EE if unsupported; harmless
        ).map(lambda ft: ee.Feature(None, {
            'site': ee.String(ft.get('site2')),
            'date': date_local,
            'key':  ee.String(ft.get('site2')).cat('|').cat(date_local),
            band_name: ft.get('mean')
        }))
        return fc

    # Build the per-image rows and flatten
    fc_all = ee.FeatureCollection(ic0.map(per_img)).flatten()
    # Drop rows where this band is null
    return fc_all.filter(ee.Filter.notNull([band_name]))

# Build per-image tables per sensor (not yet daily-aggregated)
tab_VIIRS_img = per_image_table(viirs,      viirs_LST,              'VIIRS',     regions_VIIRS, SCALE['VIIRS'])
tab_MT_img    = per_image_table(modisTerra, lambda i: modis_LST(i, 'MOD_TERRA'), 'MOD_TERRA',   regions_MOD,   SCALE['MODIS'])
tab_MA_img    = per_image_table(modisAqua,  lambda i: modis_LST(i, 'MOD_AQUA'),  'MOD_AQUA',    regions_MOD,   SCALE['MODIS'])
tab_L8_img    = per_image_table(landsat,    landsat_ST,              'L8_9',      regions_L8,    SCALE['L8_9'])
tab_E5_img    = per_image_table(era5d,      era5_daily,              'ERA5',      regions_ERA5,  SCALE['ERA5'])

# ---------------------------------------------------------------------
# ========== GROUP BY key → DAILY MEANS ==========
# ---------------------------------------------------------------------
def group_mean_by_key(table: ee.FeatureCollection, prop_name: str, out_name: str) -> ee.FeatureCollection:
    """
    Convert per-image rows → daily rows.
    Uses reduceColumns with group to compute mean per key, then re-emits rows with {key, site, date, <out_name>}.
    """
    sz = table.size()
    grouped_dict = ee.Dictionary(
        table.reduceColumns(
            selectors=['key', prop_name],
            reducer=ee.Reducer.mean().group(groupField=0, groupName='key')
        )
    )
    groups = ee.List(ee.Algorithms.If(sz.eq(0), ee.List([]), grouped_dict.get('groups')))

    def to_feat(g):
        g = ee.Dictionary(g)
        key = ee.String(g.get('key'))
        mean_val = ee.Number(g.get('mean'))
        # Split literal '|' (escape the regex OR operator)
        parts = key.split('\\|')
        site2 = ee.String(parts.get(0))
        date  = ee.String(parts.get(1))
        return ee.Feature(None, {'key': key, 'site': site2, 'date': date, out_name: mean_val})

    return ee.FeatureCollection(groups.map(to_feat))

# Daily tables (ERA5 already daily but grouping is harmless)
tab_E5    = group_mean_by_key(tab_E5_img,    'ERA5',      'ERA5')
tab_VIIRS = group_mean_by_key(tab_VIIRS_img, 'VIIRS',     'VIIRS')
tab_MT    = group_mean_by_key(tab_MT_img,    'MOD_TERRA', 'MOD_TERRA')
tab_MA    = group_mean_by_key(tab_MA_img,    'MOD_AQUA',  'MOD_AQUA')
tab_L8    = group_mean_by_key(tab_L8_img,    'L8_9',      'L8_9')

# ---------------------------------------------------------------------
# ========== LEFT JOINS BY key ==========
# ---------------------------------------------------------------------
def left_attach(base_fc: ee.FeatureCollection, other_fc: ee.FeatureCollection, other_prop: str) -> ee.FeatureCollection:
    """
    Left-attach the property 'other_prop' from other_fc to base_fc using key matching.
    We build a dictionary: key -> value, then set it on base rows.
    """
    other_size = other_fc.size()

    # Build dictionary key->value for fast lookups (safe even if empty)
    odict = ee.Dictionary.fromLists(
        other_fc.aggregate_array('key'),
        other_fc.aggregate_array(other_prop)
    )

    def _map(f):
        f = ee.Feature(f)
        k = ee.String(f.get('key'))
        v = ee.Algorithms.If(odict.contains(k), odict.get(k), None)
        return f.set(other_prop, v)

    # If 'other' is empty, just return base; else map and attach
    return ee.FeatureCollection(
        ee.Algorithms.If(other_size.eq(0), base_fc, base_fc.map(_map))
    )

# Use ERA5 as the base (has regular daily coverage), then attach others
t1  = left_attach(tab_E5,    tab_VIIRS, 'VIIRS')
t2  = left_attach(t1,        tab_MT,    'MOD_TERRA')
t3  = left_attach(t2,        tab_MA,    'MOD_AQUA')
tbl = left_attach(t3,        tab_L8,    'L8_9')

# ---------------------------------------------------------------------
# ========== FUSE OBS (L8/9 > VIIRS > MT > MA) ==========
# ---------------------------------------------------------------------
def add_fused_obs(ft):
    ft = ee.Feature(ft)
    l8 = ft.get('L8_9')
    v  = ft.get('VIIRS')
    mt = ft.get('MOD_TERRA')
    ma = ft.get('MOD_AQUA')
    fused = ee.Algorithms.If(l8, l8,
             ee.Algorithms.If(v,  v,
             ee.Algorithms.If(mt, mt,
             ee.Algorithms.If(ma, ma, None))))
    return ft.set('fused_obs', fused)

tbl = ee.FeatureCollection(tbl.map(add_fused_obs))

# ---------------------------------------------------------------------
# ========== PER-SITE BIAS (fused - ERA5) & FINAL SERIES ==========
# ---------------------------------------------------------------------
# Compute bias per site on overlapping days with both fused_obs and ERA5
def site_bias(siteName):
    siteName = ee.String(siteName)
    sub = (tbl.filter(ee.Filter.eq('site', siteName))
               .filter(ee.Filter.notNull(['fused_obs', 'ERA5'])))
    bias = ee.Number(
        ee.Algorithms.If(
            sub.size().gt(0),
            ee.Number(sub.aggregate_mean('fused_obs')).subtract(sub.aggregate_mean('ERA5')),
            0
        )
    )
    return ee.Feature(None, {'site': siteName, 'bias_obs_minus_era5': bias})

biasFC = ee.FeatureCollection(
    ee.List(tbl.aggregate_array('site')).distinct().map(site_bias)
)

biasDict = ee.Dictionary.fromLists(
    biasFC.aggregate_array('site'),
    biasFC.aggregate_array('bias_obs_minus_era5')
)

# Final table: ERA5 bias-corrected + select fused if available else ERA5_bc
def add_final_series(ft):
    ft = ee.Feature(ft)
    site = ee.String(ft.get('site'))
    fused = ee.Number(ft.get('fused_obs'))
    era5  = ee.Number(ft.get('ERA5'))
    b     = ee.Number(biasDict.get(site, 0))
    era5_bc = era5.add(b)
    lst_final = ee.Number(ee.Algorithms.If(ft.get('fused_obs'), fused, era5_bc))
    return ft.set({
        'ERA5_bias_corrected': era5_bc,
        'LST_daily_final': lst_final
    })

finalTable = ee.FeatureCollection(tbl.map(add_final_series))

# Optional: filter to study period exactly on local dates
finalTable = finalTable.filter(ee.Filter.greaterThanOrEquals('date', startLocal.format('YYYY-MM-dd'))) \
                       .filter(ee.Filter.lessThan('date', endLocal.format('YYYY-MM-dd')))

# ---------------------------------------------------------------------
# ========== EXPORT ==========
# ---------------------------------------------------------------------
task = ee.batch.Export.table.toDrive(
    collection=finalTable,
    description=EXPORT_DESC,
    fileFormat='CSV',
    folder='GoogleEarthEngine'  # change or remove as you prefer
)
task.start()
print(f"Started export task: {EXPORT_DESC}")


Started export task: LST_monitoring2013
