# 1.&nbsp;Install and Import Statements

In [None]:
!pip -q install geemap geopandas shapely

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.6 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.6/1.6 MB[0m [31m59.7 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m23.3 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
# PEP 8 imports: std-lib first, then 3rd-party alphabetized within each group.

from datetime import datetime, timedelta
import pandas as pd
import pprint
import sys
import logging
from dataclasses import dataclass, field
from collections.abc import Callable
from pathlib import Path
import time

import ee
import geemap
import geopandas as gpd
from google.colab import output, drive

#below is for testing only, delete for production
from shapely.geometry import Point

In [None]:
PROJECT_NAME = # ex.: "my_remote_sensing_project"

ee.Authenticate()
ee.Initialize(project=PROJECT_NAME)

# 2.&nbsp;Configuration

## 2.1 Configuration

In [None]:
TARGET_DATE = '2025-01-21' # 'yyyy-DD-mm'
LOG_LEVEL = "INFO" # "PROGRESS"
EXPORT_IMAGE_CHIPS = False

FOCAL_COUNTY_IDS = [ # use any list of county geoids
    "55009","55015","55029","55061","55071","55087","55005","55011",
    "55013","55033","55091","55093","55095","55109","55021","55025","55045"
]


# ---- this can be swapped out for any FC
AOI_FC: ee.FeatureCollection = (ee.FeatureCollection("TIGER/2018/Counties")
      .filter(ee.Filter.inList("GEOID", FOCAL_COUNTY_IDS))
)

@dataclass(frozen=True)
class DataImportConfig:
    id_col: str   = "id"
    date_col: str = "use_date"

    type_col: str = "type"
    manure_label: str      = "manure"
    non_manure_label: str  = "fp"
    class_col: str = "class"

    # Parsing/normalization for Python-side dates in training files
    py_date_in: str  = "%Y-%m-%d"     # what you expect to read
    py_date_out: str = "%Y-%m-%d"     # how you normalize/store

    # Behavior toggles
    enforce_unique_ids: bool = True
    geodesic: bool           = True
    copy_gdf: bool           = True


@dataclass(frozen=True) # for reporting duplicate IDs in error
class DuplicateIDsReport:
    ids: list[str]
    rows: gpd.GeoDataFrame
    def is_empty(self) -> bool: return not self.ids
    def head(self, n: int = 20): return self.rows.head(n)

class DuplicateIDsError(ValueError):
    def __init__(self, message: str, report: DuplicateIDsReport):
        super().__init__(message)
        self.report = report

@dataclass(frozen=True)
class DateFormats:
    # Python strftime/strptime
    py_iso: str     = "%Y-%m-%d"
    py_compact: str = "%Y%m%d"
    # Earth Engine (Joda-time) patterns
    ee_iso: str     = "yyyy-MM-dd"
    ee_compact: str = "yyyyMMdd"

@dataclass(frozen=True)
class Paths:
    gt: Path
    so: Path
    sm: Path
    rf: Path | None

@dataclass(frozen=True)
class Kernel:
    size: int
    shape: str
    units: str

@dataclass(frozen=True)
class SensorConfig:
    pretty          : str
    ic              : ee.ImageCollection
    optical_bands   : list[str]
    band_lookup     : dict[str, str]
    cloud_band      : str
    vis_brightness_thresh     : float
    clear_cluster   : int
    cloud_kernel    : Kernel
    paths           : Paths
    scalar          : float
    cloud_pct_prop  : str | None
    cirrus_pct_prop : str | None
    min_area_after_subtraction: int ## if there is a prior dection near the new detection, how large must the area of the new detection be?
    rgb_scale       : int
    export_scale    : int
    cluster_thresh  : int


@dataclass(frozen=True)
class RFParams:
    num_trees: int = 400
    vars_per_split: int = 5
    min_leaf_pop: int = 5
    bag_frac: float = 0.6
    max_nodes: int | None = None
    seed: int = 0


@dataclass(frozen=True)
class FeatureExportParams:
# ---- feature collection property names ----
# ---- core properties ----
    sensor_prop_name: str = "sensor"
    date_prop_name: str = "detection_date"
    id_prop_name: str = "id"

    prior_spread_adjacency_prop_name: str = "touches_prior_spread"

# ---- core property format ---
    sensor_prop_delimiter: str = "_"
    date_fmt: str = "MMddYYYY"
    pad: int = 4 #leading zeros for ID

# ---- enriched properties ----
    county_prop_name: str = "county"
    lat_prop_name: str = "lat"
    lon_prop_name: str = "lon"
    acres_prop_name: str = "acres"

    ## compute list of tuble of export properties
    @property
    def export_fields(self) -> tuple[str, ...]:
        return (
            self.id_prop_name,
            self.sensor_prop_name,
            self.date_prop_name,
            self.county_prop_name,
            self.lat_prop_name,
            self.lon_prop_name,
            self.acres_prop_name,
            self.prior_spread_adjacency_prop_name
        )

#------image export properties
    bands: tuple[str,str,str] = 'red', 'green', 'blue'
    pct_low: int = 2
    pct_high: int = 98
    gamma: float = 1.2
    interpolation: str = 'bicubic'
    buffer: int = 30
    export_folder: str = "GEE_snapshot_exports"
    crs: str = 'EPSG:3857'
    dimension: int = 1200
    bbox_thickness: int = 4
    bbox_color: str = '19df00'

@dataclass
class FallCompositeResult:
    img: ee.Image | None
    messages: list[str]
    task: ee.batch.Task | None


@dataclass(frozen=True)
class GlobalConfig:

    ############################
    ## USER-SET CONFIGURATION ##
    ############################

    target_date    : str | None = None

    log_level      : str = "INFO"

    export_image_chips: bool = EXPORT_IMAGE_CHIPS

    gee_asset_root : Path = Path("projects/my_remote_sensing_project/assets") # configure accordingly

    fall_composite : Path | None = Path("projects/my_remote_sensing_project/assets/fall_composite") # configure accordingly

    mount_point: str = "/content/drive"
    drive_root : str = "MyDrive"
    drive_base : str = "Colab/manure"
    table_export_folder: str = "manure_spotter_table_exports"

    s2_paths       : Paths = Paths(
        gt=Path("training_data/s2_groundtruth.gpkg"),
        so=Path("training_data/s2_non_manure.gpkg"),
        sm=Path("training_data/s2_sat_manure.gpkg"),
        rf=Path("projects/my_remote_sensing_project/assets/rf_s2"), #configure accordingly
    )
    l89_paths      : Paths = Paths(
        gt=Path("training_data/l89_groundtruth.gpkg"),
        so=Path("training_data/l89_non_manure.gpkg"),
        sm=Path("training_data/l89_sat_manure.gpkg"),
        rf=Path("projects/my_remote_sensing_project/assets/rf_l89"), #configure accordingly
    )

    ############################
    #### DEFAULT PARAMETERS ####
    ############################

    post_detection_suppression_days : int = 0 #9 # after a detection is found in a location, algorithm will ignore that location for this number of days.

    fall_comp_rng  : tuple[str, str] | None = ("2024-10-15","2025-01-30")

    date_format    : DateFormats = field(default_factory=DateFormats)

    data_import_params: DataImportConfig = DataImportConfig()

    max_cloud_cover: int = 90

    min_masked_pct : int = 1 # 1-100. loop will terminate if snow_crop_cloud mask is below this

    rf_params      : RFParams = RFParams()

    coarse_scale   : int = 200
    road_buffer    : int = 30

    noise_thresh   : int = 10

    post_kernel    : Kernel = Kernel(10, "square", "meters") ## controls how much larger the vector is than the actual classified pixels

    geo_blocks     : str = "TIGER/2020/BG" #"TIGER/2010/Blocks" ##

    feature_export_params: FeatureExportParams = FeatureExportParams()
    export_prefix  : str = "manure_detections"
    poll_seconds   : int = 120
    timeout_minutes: int = 40


    ############################
    # CONSTANTS (DO NOT TOUCH) #
    ############################

    nbsi_constant  : float = 0.36

    m2acres        : ee.Number = field(
        default_factory=lambda: ee.Number(0.000_247_105_381)
        )

    counties_all: ee.FeatureCollection = field(
        default_factory=lambda: ee.FeatureCollection("TIGER/2018/Counties")
    )

    ############################
    ### METHODS & PROPERTIES ###
    ############################

    @property
    def drive_root_path(self) -> Path:
        return Path(self.mount_point) / Path(self.drive_root)

    @property
    def data_root(self) -> Path:
        return self.drive_root_path / Path(self.drive_base)

    @property
    def table_export_path(self) -> Path:
        return self.drive_root_path / self.table_export_folder

    def ensure_table_export_path(self) -> Path:
        p = self.table_export_path
        p.mkdir(parents=True, exist_ok=True)
        return p

    def resolve(self, p: str | Path) -> Path:
        p = Path(p)
        return p if p.is_absolute() else (self.data_root / p)


CFG = GlobalConfig(target_date=TARGET_DATE, log_level= LOG_LEVEL)

SENSORS = {
    "s2": SensorConfig(
        pretty="Sentinel‑2",
        ic = ee.ImageCollection("COPERNICUS/S2_SR_HARMONIZED"),
        optical_bands=[
            "B2","B3","B4","B5","B6","B7",
            "B8","B8A","B9","B11","B12","B1"
        ],
        band_lookup={
            "aerosols":"B1", "blue":"B2", "green":"B3", "red":"B4",
            "red edge 1":"B5", "red edge 2":"B6", "red edge 3":"B7",
            "nir":"B8", "red edge 4":"B8A", "water vapour":"B9",
            "swir 1":"B11", "swir 2":"B12"
        },
        cloud_band="SCL",
        vis_brightness_thresh= .644, #.608, # 0.701476,
        clear_cluster =400,
        cloud_kernel= Kernel(3, "square", "pixels"),
        paths=CFG.s2_paths,
        scalar=0.0001,
        cloud_pct_prop= 'CLOUDY_PIXEL_PERCENTAGE',
        cirrus_pct_prop= 'THIN_CIRRUS_PERCENTAGE',
        min_area_after_subtraction = 3000, # ~50% of main 60 pixel thresh
        rgb_scale = 10,
        export_scale = 3,
        cluster_thresh = 65
    ),

    "l89": SensorConfig(
        pretty="Landsat‑8/9",
        ic = (ee.ImageCollection("LANDSAT/LC08/C02/T1_RT_TOA")
            .merge(ee.ImageCollection("LANDSAT/LC09/C02/T1_TOA"))
        ),
        optical_bands=[
            "B2","B3","B4","B5","B6","B7",
            "B8","B9","B10","B11","B1"
        ],
        band_lookup={
            "aerosols":"B1", "blue":"B2", "green":"B3", "red":"B4",
            "nir":"B5", "swir 1":"B6", "swir 2":"B7", "panchromatic":"B8",
            "cirrus":"B9", "thermal 1":"B10", "thermal 2":"B11"
        },
        cloud_band="QA_PIXEL",
        vis_brightness_thresh= 0.701476,
        clear_cluster =400,
        cloud_kernel=Kernel(0, "square", "pixels"),
        paths=CFG.l89_paths,
        scalar=1,
        cloud_pct_prop = 'CLOUD_COVER_LAND',
        cirrus_pct_prop = None,
        min_area_after_subtraction = 6000, # ~100% of main 60 pixel thresh
        rgb_scale = 30,
        export_scale = 10,
        cluster_thresh = 85

    ),
}



@dataclass(frozen=True)
class CloudCullResult:
    filtered: dict[str, ee.ImageCollection]
    available: list[str]
    counts: dict[str, int]

@dataclass(frozen=True)
class AOIContext:
    counties: ee.FeatureCollection
    date_str: str
    ee_date: ee.Date
    geom: ee.Geometry
    acres: ee.Number
    m2acres: ee.Number

# if counties are used to define AOI, aoi_fc and counties are the same variable
def make_aoi_context( aoi_fc: ee.FeatureCollection,
                      *,
                      date_str: str,
                      m2acres: ee.Number,
                      counties: ee.FeatureCollection) -> AOIContext:
    geom  = aoi_fc.geometry().dissolve()
    acres = geom.area(maxError=1000).multiply(m2acres)
    return AOIContext(
        date_str=date_str,
        ee_date=ee.Date(date_str),
        geom=geom,
        acres=acres,
        m2acres=m2acres,
        counties=counties
    )

@dataclass(frozen=True)
class SensorCoverage:
    date: str
    sensor: str
    geom: ee.Geometry
    mask: ee.Image
    acres: ee.Number
    percent_aoi: ee.Number
    m2acres: ee.Number = CFG.m2acres

@dataclass(frozen=True)
class ClearskyMaskParams:
    sensor_pretty   : str
    coarse_scale    : int
    buffer_kernel   : Kernel
    cluster_thresh  : int

@dataclass(frozen=True)
class ClearskyMaskResult:
    mask: ee.Image
    acres: ee.Number
    coverage: SensorCoverage
    params: ClearskyMaskParams

    @property
    def clear_pct_of_coverage(self) -> ee.Number:
        return self.acres.divide(self.coverage.acres).multiply(100)

@dataclass(frozen=True)
class VisBrightnessParams:
    band_mapping: dict[str, str]
    vis_brightness_thresh: float
    coarse_scale: int
    geo_blocks: ee.FeatureCollection
    k: float = CFG.nbsi_constant

@dataclass(frozen=True)
class VisBrightnessOutput:
    img: ee.Image
    stats: ee.FeatureCollection
    high_fc: ee.FeatureCollection
    high_mask: ee.Image
    pct_high: ee.Number
    high_acres: ee.Number

@dataclass
class SensorOutputs:

    ic              : ee.ImageCollection
    aoi_coverage    : SensorCoverage
    clearsky        : ClearskyMaskResult
    crop_mask       : ee.Image
    vis_brightness  : VisBrightnessOutput

    inference_bands : list[str] | None
    target_image    : ee.Image | None
    rf_output       : ee.Image | None
    noise_removed   : ee.Image | None
    detection_polys : ee.FeatureCollection | None



## 2.2 Logging Setup

In [None]:
PROGRESS_LEVEL = 15
logging.addLevelName(PROGRESS_LEVEL, "PROGRESS")

def _resolve_level(name: str) -> int:
    """Map a level name/string to its numeric value, incl. custom 'PROGRESS'."""
    if name.upper() == "PROGRESS":
        return PROGRESS_LEVEL
    return getattr(logging, name.upper(), logging.INFO)

logging.basicConfig(
    level= _resolve_level(CFG.log_level),
    format="%(asctime)s — %(levelname)s — %(message)s",
    datefmt = "%Y-%m-%d %H:%M",
    stream=sys.stdout,
    force=True
)
logger = logging.getLogger(__name__)


def progress(msg: str, *args, **kwargs) -> None:
    if logger.isEnabledFor(PROGRESS_LEVEL):
        logger.log(PROGRESS_LEVEL, msg, *args, **kwargs)


def ee2float(obj, *, prec: int = 2, level: int = logging.INFO) -> float:
    """
    Convert a server-side ee.Number to a client-side float
    only if the chosen level is enabled.  Prevents accidental
    getInfo() calls when verbose logging is off.
    """
    if logger.isEnabledFor(level):
        return round(obj.getInfo(), prec)
    raise RuntimeError(
        f"ee2float called for level {level} which is below the active logger threshold"
    )

def number_agree(n: int, singular: str, plural: str | None = None, zero_word: str | None = None) -> str:

    word = singular if n == 1 else (plural or singular + "s")
    num  = str(n) if (n != 0 or zero_word is None) else zero_word
    return f"{num} {word}"

## 2.3 Setup Drive

In [None]:
def setup_drive(
                drive_base: str,
                mount_point: str,
                ) -> Path:

    # Cache the mount step on the function itself
    if not getattr(setup_drive, "_mounted", False):
        drive.mount(mount_point, force_remount=False)
        setup_drive._mounted = True

    base = Path(drive_base)
    workdir = base if base.is_absolute() else Path(mount_point) / base
    return workdir

## 2.4 Get or Create Fall **Composite**

In [None]:
def clear_snowfree_s2_composite(date_range, aoi_geom, cfg):

    start_date = ee.Date(date_range[0])
    end_date = ee.Date(date_range[1])

    early_start_date = start_date.advance(-1, 'month')

    print(f'early_start_date: {early_start_date.format("yyyy-MM-dd").getInfo()}')

    def _prep_s2(img):
        scaled_optical_bands = img.select(cfg.optical_bands).divide(10000)
        return scaled_optical_bands.addBands(img.select('SCL'))

    #helper masking function
    def _mask_clear(img):
        scl  = img.select('SCL')
        clear_mask = (
            scl.eq(4)  # vegetation
            .Or(scl.eq(5))  # bare soil
            .Or(scl.eq(6))  # water
            .Or(scl.eq(11)) # snow / ice
        )
        img = img.updateMask(clear_mask)

        return img

    def _mask_lax(img):
        scl  = img.select('SCL')
        clear_mask = (
            scl.eq(1)  # saturated/defective
            .Or(scl.eq(2))  # dark area
            .Or(scl.eq(3))  # cloud shadow
            .Or(scl.eq(4)) # vegetated
            .Or(scl.eq(5))  # non vegetated
            .Or(scl.eq(6))  # water
            .Or(scl.eq(7))  # Unclassified
            #.Or(scl.eq(8))  # Cloud Medium
        )
        img = img.updateMask(clear_mask)

        return img

    base_coll = (cfg.ic
              .filterDate(start_date, end_date)
              .filterBounds(aoi_geom))

    early_coll = (cfg.ic
              .filterDate(early_start_date, end_date)
              .filterBounds(aoi_geom))

    strict_coll = (base_coll
                  .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 30))
                  .filter(ee.Filter.lt('SNOW_ICE_PERCENTAGE', 1))
                  .map(_prep_s2)
                   )

    lax_coll = (base_coll
                  .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 70))
                  .filter(ee.Filter.lt('SNOW_ICE_PERCENTAGE', 10))
                  .map(_prep_s2)
                   )

    early_lax_coll = (early_coll
                  .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 70))
                  .filter(ee.Filter.lt('SNOW_ICE_PERCENTAGE', 10))
                  .map(_prep_s2)
    )


    clear_image = strict_coll.map(_mask_clear).median()
    tier_2_image = strict_coll.map(_mask_lax).median()
    tier_3_image = lax_coll.map(_mask_clear).median()
    tier_4_image = lax_coll.map(_mask_lax).median()
    early_lax_image = early_lax_coll.map(_mask_lax).median()


    final_composite = (clear_image
                  .unmask(tier_2_image)      # Fill gaps with winn_2
                  .unmask(tier_3_image)      # Fill remaining gaps with winn_3
                  .unmask(tier_4_image)
                  .unmask(early_lax_image) # Fill remaining gaps with winn_early
)

    return final_composite.select(cfg.optical_bands).clip(aoi_geom)

def get_or_create_s2_fall_composite(
    fall_composite_img: Path | None,
    fall_composite_date_range: tuple[str, str],
    aoi_geom: ee.Geometry,
    *,
    cfg: SensorConfig,
    ):

    def _prep_fall_comp_band_names(composite):

        return composite.select(cfg.optical_bands).rename(
                [b + '_NS' for b in cfg.optical_bands] # NS = non snow
                )

    msgs: list[str] = []
    task: ee.batch.Task | None = None

    if fall_composite_img:
        composite_asset_name = fall_composite_img.name

        try:
            ee.data.getAsset(str(fall_composite_img))
            msgs.append(f'fall composite asset \'{composite_asset_name}\' already exists')

            fall_composite = ee.Image(str(fall_composite_img))
            msgs.append(f'fall composite asset \'{composite_asset_name}\' successfully fetched \n')

            return(FallCompositeResult(
                        _prep_fall_comp_band_names(fall_composite),
                        msgs,
                        task))

        except ee.ee_exception.EEException:
            raise ValueError(f"Asset '{fall_composite_img}' doesn’t exist or you lack access"
                              "set to False if you need it created")
    else:
        # JavaScript confirm dialog; returns true (Yes) or false (Cancel)
        user_wants_build = output.eval_js(
            """
            (async () => {
              // Show dialog in browser
              return confirm("User has not indicated a fall composite image in configuration. If asset exists, press 'cancel' and add to configuration. If you want to build an asset, press 'okay'. Note that assets can exceed 50gb in size.");
            })()
            """
        )

        if user_wants_build:

            fall_composite = clear_snowfree_s2_composite(fall_composite_date_range, aoi_geom, cfg)

            task = ee.batch.Export.image.toAsset(
                image=fall_composite,
                description='fall_composite_export',

                assetId='projects/remote-sensing-test-project/assets/2024_fall_composite_wi_v2',
                region=aoi_geom,
                scale=10,
                maxPixels=1e13
            )
            task.start()

            print(f"Building asset using {cfg.pretty}... \n\ncheck progress in GEE code editor")

            msgs.append(f"Building fall-composite asset from {cfg.pretty}...")
            msgs.append("Monitor the export in the GEE Code Editor")

            return FallCompositeResult(
                _prep_fall_comp_band_names(fall_composite),
                msgs,
                task)


        else:
            msgs.append(
                        "User cancelled — fall composite will NOT be built. "
                        "Please specify an existing asset and rerun the script."
                        )

            return FallCompositeResult(None, msgs, None)




## 2.5 Load Prior Detections

In [None]:
def load_newest_geojson_for_date(date_str: str,
                                  table_export_folder: Path,
                                  prefix: str) -> gpd.GeoDataFrame | None:
    """
    Find the most recent GeoJSON whose filename contains the target date token
    """
    pattern = f"{prefix}*_{date_str}.geojson"
    matches = sorted(table_export_folder.glob(pattern),
                     key=lambda p: p.stat().st_mtime,
                     reverse=True)
    if not matches:
        return None

    return gpd.read_file(matches[0])

def load_prior_detections(target_date: str,
                          table_export_folder: Path,
                          prefix: str,
                          lookback_days: int,
                          date_format: str
                          ) -> ee.FeatureCollection | None:
    if lookback_days <= 0:
        logger.info("lookback_days <= 0, not attempting to load prior detections\n")
        return None

    t_date = datetime.strptime(target_date, date_format).date()

    lookback_dates = []
    for i in range(1, lookback_days + 1):
        d_str = (t_date - timedelta(days=i)).strftime(date_format)
        lookback_dates.append(d_str)

    available_dates = []
    lookback_gdfs = []
    for date in lookback_dates:
        gdf = load_newest_geojson_for_date(date, table_export_folder, prefix)
        if gdf is not None:
            lookback_gdfs.append(gdf)
            available_dates.append(date)

    if lookback_gdfs == []:
        logger.info("no prior detection data found in %i-day lookback window \n",
                    lookback_days)
        return None

    logger.info("prior detection data found in %i-day lookback window: %s",
                 lookback_days,
                 ", ".join(available_dates)
                )


    merged_gdf = gpd.GeoDataFrame(pd.concat(lookback_gdfs, ignore_index=True))

    merged_fc = geemap.gdf_to_ee(merged_gdf, geodesic=True)

    return merged_fc

# 3.&nbsp;Check for valid imagery for target date

## 3.0 Build Image Collection

In [None]:
def daily_ic(
              sensor_cfg: SensorConfig,
              date: ee.Date,
              geom: ee.Geometry,
              ):

    ic = sensor_cfg.ic
    return (ic
            .filterDate(date, date.advance(1, "day"))
            .filterBounds(geom))

In [None]:
def annotate_effective_cloud(ic: ee.ImageCollection, sensor_cfg: SensorConfig) -> ee.ImageCollection:

    def _set(img):
        cloud_cover = ee.Number(img.get(sensor_cfg.cloud_pct_prop))

        if sensor_cfg.cirrus_pct_prop:
            cirrus_cover = ee.Number(img.get(sensor_cfg.cirrus_pct_prop))
            effective_cover = cloud_cover.subtract(cirrus_cover)
        else:
            effective_cover = cloud_cover

        return img.set({'EFFECTIVE_CLOUD': ee.Number(effective_cover)})

    return ic.map(_set)



In [None]:
def log_effective_cloud_per_image(ics: dict[str, ee.ImageCollection],
                                  ) -> None:
    """Log each image's EFFECTIVE_CLOUD at DEBUG level"""
    if not logger.isEnabledFor(logging.DEBUG):
        return

    for skey, ic in ics.items():
        ic2 = ic

        count = ic2.size().getInfo()
        if count == 0:
            logger.debug("%s: no images", SENSORS[skey].pretty)
            continue

        ids  = ic2.aggregate_array('system:index').getInfo()
        vals = ic2.aggregate_array('EFFECTIVE_CLOUD').getInfo()
        for pid, val in zip(ids, vals):
            logger.debug("%s image %s — EFFECTIVE_CLOUD = %.2f%%",
                         SENSORS[skey].pretty, pid, float(val))

In [None]:
def drop_and_log_high_cloud(
                            ics: dict[str, ee.ImageCollection],
                            sensors: dict[str, SensorConfig],
                            max_cloud: int,
                            logger: logging.Logger
                            ) -> CloudCullResult:
    filtered: dict[str, ee.ImageCollection] = {}
    for k, ic in ics.items():
        over = ic.filter(ee.Filter.greaterThanOrEquals('EFFECTIVE_CLOUD', max_cloud))
        n = over.size().getInfo()
        if n:
            ids  = over.aggregate_array('system:index').getInfo()
            vals = over.aggregate_array('EFFECTIVE_CLOUD').getInfo()
            logger.info(
                "Dropping %d %s image(s) with EFFECTIVE_CLOUD ≥ %d%%: %s",
                n, sensors[k].pretty, max_cloud,
                ", ".join(f"{i}:{round(v,2)}%" for i, v in zip(ids, vals))
            )
        filtered[k] = ic.filter(ee.Filter.lt('EFFECTIVE_CLOUD', max_cloud))

    available = [k for k, ic in filtered.items() if ic.size().getInfo() > 0]
    counts = {k: filtered[k].size().getInfo() for k in available}
    return CloudCullResult(filtered, available, counts)

## 3.1 Build Image (And Optional Export)

**In further review, check if _apply_masks is necessary**

In [None]:
def get_img_for_date(
                      date_str,
                      aoi_geom,
                      sensor_cfg: SensorConfig
                      ):

    start_date = ee.Date(date_str)
    end_date = start_date.advance(1, 'day')

    images = (
              sensor_cfg.ic
              .filterDate(start_date, end_date)
              .filterBounds(aoi_geom)
    )

    return (images.mosaic()
                  .select(sensor_cfg.optical_bands)
                  .multiply(sensor_cfg.scalar)
                  )

In [None]:
def export_image(image: ee.Image,
                 add_timestamp: bool = True,
                 *,
                 aoi: ee.Geometry,
                 filename: str,
                 scale: int = 10,
                 folder = 'GEE_Exports'):

    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

    if add_timestamp:
        filename="_".join([filename, timestamp])

    image_export_task = ee.batch.Export.image.toDrive(
        image=image.clip(aoi),
        description=filename,
        folder= folder,
        fileNamePrefix=filename,
        region=aoi,
        scale=scale,
        maxPixels=1e13
    )
    image_export_task.start()

## 3.2 Get Sensor AOI Coverage
(intersection of sensor coverage and AOI)

In [None]:
def get_sensor_aoi_coverage(skey: str,
                            image_collection: ee.ImageCollection,
                            aoi: AOIContext):

    def _get_ic_geom(image_collection):
        img_footprints = image_collection.map(lambda img: ee.Feature(img.geometry()))

        ic_coverage = (ee.FeatureCollection(img_footprints)
                      .union()
                      .geometry()
                      )

        ic_aoi_coverage = ic_coverage.intersection(aoi.geom, 1)

        return ic_aoi_coverage

    def _keep_only_polygons(geom):

        parts = ee.Geometry(geom).geometries()

        def _tag(g):
            g = ee.Geometry(g)
            return ee.Feature(g).set({
                'isPoly': ee.List(['Polygon', 'MultiPolygon']).contains(g.type())
            })

        fc = ee.FeatureCollection(parts.map(_tag))

        poly_fc = fc.filter(ee.Filter.eq('isPoly', True))
        return poly_fc.geometry()


    target_date_aoi_coverage = _get_ic_geom(image_collection)

    clean_geom = _keep_only_polygons(target_date_aoi_coverage)

    aoi_sensor_intersect = clean_geom.intersection(aoi.geom, maxError = 10)

    focal_mask = (ee.Image()
               .paint(aoi_sensor_intersect, 1)   # burn 1s inside the geometry
               .selfMask()

               )
    acres = aoi_sensor_intersect.area(maxError=1000).multiply(aoi.m2acres)

    return SensorCoverage(
      date = aoi.date_str,
      sensor = skey,
      geom = aoi_sensor_intersect,
      mask = focal_mask,
      acres = acres,
      percent_aoi = acres.divide(aoi.acres).multiply(100)
      )


## 3.3 Check for Clouds

This code should eventually be refactored

In [None]:
def _clean_and_analyze_cloud_mask(cloud_mask: ee.Image,
                                  coverage: SensorCoverage,
                                  params: ClearskyMaskParams,
                                  qa_mosaic = False):

    kernel = params.buffer_kernel

    # reprojecting saves us thousands of eecu-seconds
    coarse = cloud_mask.reproject(cloud_mask.projection(), None, params.coarse_scale)
    coarse_dilated   = coarse.focal_max(
        kernel.size,
        kernel.shape,
        kernel.units
        )

    inverse_mask = (coarse_dilated
                .unmask()
                .Not()
                .selfMask()
                .updateMask(coverage.mask)
                )

    if qa_mosaic:
        inverse_mask = inverse_mask.updateMask(qa_mosaic.mask())

    inv_cluster_size = (inverse_mask
                        .connectedPixelCount(maxSize=params.cluster_thresh,
                                            eightConnected=False))

    inverse_clean = inverse_mask.updateMask(inv_cluster_size.gte(params.cluster_thresh))

    # converting to and from vectors saves compute
    inv_vec = (inverse_clean
           .selfMask()
           .reduceToVectors(
               geometry       = coverage.geom,
               scale          = params.coarse_scale,
               geometryType   = 'polygon',
               eightConnected = False,
               labelProperty  = 'value',
               maxPixels      = 1e13))

    clearsky_mask = (ee.Image()
                .paint(inv_vec, 1)
                .rename('mask')
                .updateMask(coverage.mask)
                .reproject(inverse_clean.projection()))  # ensure 10 m grid

    clearsky_mask_acres = inv_vec.geometry().area(maxError=10_000).multiply(coverage.m2acres)

    return clearsky_mask, clearsky_mask_acres


# is the reliance on separate focal_mask and focal_geom redundant
def s2_build_buffered_cloud_mask(image_collection: ee.ImageCollection,
                                 coverage: SensorCoverage,
                                 params: ClearskyMaskParams):

    scl_only = image_collection.select('SCL')

    scl_mosaic = (scl_only
                  .mosaic()
                  .updateMask(coverage.mask)
                  )

    cloud_classes = [3, 7, 9] # shadow, unclassified, high (not touching medium)


    cloud_mask   = (scl_mosaic
                    .remap(cloud_classes, [1] * len(cloud_classes), 0)
                    .selfMask())

    clearsky_mask, clearsky_mask_acres = _clean_and_analyze_cloud_mask(
                                  cloud_mask = cloud_mask,
                                  coverage = coverage,
                                  params = params
                                  )


    return clearsky_mask, clearsky_mask_acres




def l89_build_buffered_cloud_mask(  image_collection: ee.ImageCollection,
                                    coverage: SensorCoverage,
                                    params: ClearskyMaskParams):
    # bit helpers
    def _bit(img, bit):
        return img.bitwiseAnd(1 << bit).neq(0)

    def _conf(img, start_bit):
        return img.rightShift(start_bit).bitwiseAnd(3)

    qa_only = image_collection.select('QA_PIXEL')

    qa_mosaic = (qa_only
              .mosaic()
              .updateMask(coverage.mask)
              )

    core_cloud = _bit(qa_mosaic, 3) # might be unnecessary to use both of these
    high_conf = _conf(qa_mosaic, 8).eq(3)

    cloud_flag = core_cloud.And(high_conf)
    cloud_mask = cloud_flag.selfMask()

    clearsky_mask, clearsky_mask_acres = _clean_and_analyze_cloud_mask(
                                  cloud_mask = cloud_mask,
                                  coverage = coverage,
                                  params = params,
                                  qa_mosaic = qa_mosaic
                                  )


    return clearsky_mask, clearsky_mask_acres

def build_clearsky_mask( ic: ee.ImageCollection,
                      sensor_aoi_coverage: SensorCoverage,
                      params: ClearskyMaskParams):

    if sensor_aoi_coverage.sensor == "s2":
        clearsky_mask, clearsky_mask_acres = s2_build_buffered_cloud_mask(
                                      image_collection = ic,
                                      coverage = sensor_aoi_coverage,
                                      params = params
                                      )

    elif sensor_aoi_coverage.sensor == "l89":
        clearsky_mask, clearsky_mask_acres = l89_build_buffered_cloud_mask(
                                      image_collection = ic,
                                      coverage = sensor_aoi_coverage,
                                      params = params
                            )

    return ClearskyMaskResult(
        mask = clearsky_mask,
        acres = clearsky_mask_acres,
        coverage = sensor_aoi_coverage,
        params = params
    )

## 3.4 Cropland Mask

In [None]:
# before inspecting for snow, we want to crop to just croplands for the date in question
# this masks out wc_2021 non-crop pixels AND tiger 2016 roads

def build_cropland_mask(aoi_geom):

    wc_2021 = ee.Image('ESA/WorldCover/v200/2021')

    # Remap: 40 to 1 (cropland), everything else to 0
    cropland_mask = (
        wc_2021
          .remap(
              [10, 20, 30, 40, 50, 60, 70, 80, 90, 95, 100],
              [ 0,  0,  0,  1,  0,  0,  0,  0,  0,  0,   0]
          )
          .rename('cropland')
          .selfMask()
          )

    roads_fc   = ee.FeatureCollection('TIGER/2016/Roads')
    roads_aoi = roads_fc.filterBounds(aoi_geom)
    roads_aoi_buff = roads_aoi.map(lambda f: f.buffer(CFG.road_buffer).set({'burn':1}))

    non_roads_mask = (ee.Image(1)
                  .byte()
                  .paint(roads_aoi_buff, 0)
                  .selfMask())

    cropland_mask_img = cropland_mask.And(non_roads_mask)

    return cropland_mask_img

## 3.5 Check for Snow

In [None]:

def calculate_vis_brightness(img: ee.Image,
                             params: VisBrightnessParams):
    b = img.select
    band_map = params.band_mapping
    blue = b(band_map["blue"])
    green = b(band_map["green"])
    red = b(band_map["red"])
    return blue.add(green).add(red).divide(3).rename('VIS_BRIGHT')

def vis_brightness_by_tract(image: ee.Image,
                            mask: ee.Image,
                            aoi: SensorCoverage,
                            params: VisBrightnessParams):

    nbsi_img = (calculate_vis_brightness(image, params)
            .clip(aoi.geom)
            .updateMask(mask)
            .reproject(
                crs   = image.select(params.band_mapping["blue"]).projection(),
                scale = params.coarse_scale))

    geo_blocks = params.geo_blocks.filterBounds(aoi.geom)

    stats = nbsi_img.reduceRegions(
          collection = geo_blocks,
          reducer    = ee.Reducer.mean(),
          scale      = params.coarse_scale
          )

    return nbsi_img, stats

def get_vis_brightness_outputs(img: ee.Image,
                              mask: ee.Image,
                              aoi: SensorCoverage,
                              params: VisBrightnessParams):

    brightness_img, stats = vis_brightness_by_tract(img, mask, aoi, params)
    high_fc   = stats.filter(ee.Filter.gt("mean", params.vis_brightness_thresh))
    high_mask = ee.Image().paint(high_fc.union().geometry(), 1).selfMask()

    high_nbsi_acres = (high_fc.geometry()
                .intersection(aoi.geom, 1)
                .area(maxError=10_000)
                .multiply(aoi.m2acres)
                )

    return VisBrightnessOutput(
                  img=brightness_img,
                  stats=stats,
                  high_fc= high_fc,
                  high_mask=high_mask,
                  pct_high=high_nbsi_acres.divide(aoi.acres).multiply(100),
                  high_acres=high_nbsi_acres
                  )

#4.&nbsp; Load or Train Classifier

## 4.1 Load Classifier

In [None]:
def asset_exists(asset_id: str) -> bool:
    try:
        ee.data.getAsset(asset_id)
        return True
    except ee.ee_exception.EEException:
        return False

## 4.2 Load Training Data

In [None]:
def find_duplicate_ids(gdf: gpd.GeoDataFrame,
                       *,
                       id_col: str ):

    counts = gdf[id_col].value_counts()        # how many times each id appears
    dup_ids = counts[counts > 1].index.tolist()    # only those >1
    dup_rows = gdf[gdf[id_col].isin(dup_ids)].sort_values(id_col)

    return DuplicateIDsReport(dup_ids, dup_rows)

def _read_gdf(path: Path) -> gpd.GeoDataFrame:
    try:
        gdf = gpd.read_file(path)
    except Exception as e:
        raise IOError(f"Failed to read vector data from {path}") from e
    logger.debug("Loaded %s: %d rows, CRS=%s", path.name, len(gdf), getattr(gdf.crs, "to_string", lambda: gdf.crs)())
    return gdf

def _raise_if_invalid_schema( gdf: gpd.GeoDataFrame,
                              *,
                              params: DataImportConfig):
    id_col = params.id_col
    date_col = params.date_col

    if id_col not in gdf.columns:
        raise ValueError(f"'{id_col}' column not found in GeoDataFrame.")

    report = find_duplicate_ids(gdf, id_col = id_col)
    if not report.is_empty():
        raise DuplicateIDsError(f"'{id_col}' values are not unique", report)

    if date_col not in gdf.columns:
        raise ValueError(f"'{date_col}' column not found in GeoDataFrame.")

def path_to_ee_fc(path: Path,
                  *,
                  params: DataImportConfig):

    gdf = _read_gdf(path)

    if params.copy_gdf:
        gdf = gdf.copy()

    _raise_if_invalid_schema(gdf, params = params)

    gdf[params.date_col] = gdf[params.date_col].astype(str)

    fc = geemap.gdf_to_ee(gdf, geodesic=params.geodesic)

    return fc

def safe_path_to_ee_fc(path: Path,
                       *,
                       params: DataImportConfig):
    try:
        return path_to_ee_fc(path, params = params)
    except DuplicateIDsError as e:
        print(f"Duplicate IDs (n={len(e.report.ids)}): {e.report.ids[:10]}",
              "…" if len(e.report.ids) > 10 else "")
        try:
            from IPython.display import display
            display(e.report.rows.head(20))
        finally:
            raise

In [None]:
def add_binary_class_from_label(fc: ee.FeatureCollection,
                                *,
                                params: DataImportConfig)->ee.FeatureCollection:
    manure_label = params.manure_label
    non_manure_label = params.non_manure_label
    type_prop = params.type_col
    class_prop = params.class_col

    allowed_types = ee.List([manure_label, non_manure_label])

    # look for bad labels
    bad_types = fc.filter(
        ee.Filter.Not(ee.Filter.inList(type_prop, allowed_types))
    )

    if bad_types.size().getInfo() > 0:          # client-side check
        bad_vals = bad_types.aggregate_array(type_prop).distinct().getInfo()
        raise ValueError(
            f"Unexpected '{type_prop}' values found: {bad_vals}"
        )

    # map the two legal strings to 1 and 0 classes
    def _add_class(feat):
        is_manure = ee.String(feat.get(type_prop)).equals(manure_label) #boolean

        class_num = ee.Algorithms.If(is_manure, ee.Number(1), ee.Number(0))
        return feat.set(class_prop, class_num)

    return fc.map(_add_class)

## 4.3 Build Samples

In [None]:
def build_training_img_dict(
                            training_polys,
                            sensor_cfg: SensorConfig
                            ):

    # fetch all the different daily geometries for the training data
    training_days = training_polys.aggregate_array('use_date').distinct().getInfo()

    day_geoms = {
        day: (training_polys
                .filter(ee.Filter.eq('use_date', day))
                .union()        # single feature
                .geometry())    # convert to Geometry
        for day in training_days
    }


    training_images = {
        day: get_img_for_date(day, day_geoms[day], sensor_cfg)
                        for day in training_days}

    return training_images


In [None]:
def sample_regions_for_date(date_str, img, training_polys, fall_composite):

    filtered_polys = training_polys.filter(ee.Filter.eq('use_date', date_str))

    img_stack = img.addBands(fall_composite)

    sampled_pix = (img_stack
                  .sampleRegions(
                      collection=filtered_polys,
                      properties=['class'],
                      scale=10)
                  )


    return sampled_pix

def create_flat_sample(training_images, training_polys, fall_composite):
    sample_complete = [sample_regions_for_date(
                                                date,
                                                training_images[date],
                                                training_polys,
                                                fall_composite)
                        for date in training_images.keys()
                      ]

    return ee.FeatureCollection(sample_complete).flatten()

## 4.4 Train Random Forest Classifer

In [None]:
def train_rf_classifier(sample_fc: ee.FeatureCollection,
                        bands: list[str],
                        params: RFParams,
                        class_prop: str = 'class',
                        ):

    classifier = ee.Classifier.smileRandomForest(
        numberOfTrees=params.num_trees,
        variablesPerSplit=params.vars_per_split,
        minLeafPopulation=params.min_leaf_pop,
        bagFraction=params.bag_frac,
        maxNodes=params.max_nodes,
        seed=params.seed
    ).train(
        features=sample_fc,
        classProperty=class_prop,
        inputProperties=bands
    )

    return classifier

In [None]:
def save_classifier(classifier, gee_asset_root, poly_count, sensor):

    datestamp = datetime.now().strftime('%Y%m%d')

    asset_path = gee_asset_root / f'rf_{sensor}_{poly_count}_polys_{datestamp}'

    task_rf_export = ee.batch.Export.classifier.toAsset(
        classifier = classifier,
        description = f'export_rf_{sensor}_{poly_count}',
        assetId     = str(asset_path)
    )

    task_rf_export.start()

    return asset_path

#5.&nbsp;Inference on Target Date Imagery
no helpers


#6.&nbsp;Post Processing

## 6.1 Convert raw classifier output to cleanup-up features

In [None]:
def drop_clusters_below_thresh( two_class_img: ee.Image,
                                cluster_thresh: int
                 ):
    class_1_mask = two_class_img.selfMask()

    cluster_size = class_1_mask.connectedPixelCount(
        maxSize=cluster_thresh, eightConnected=True)

    cleaned = class_1_mask.updateMask(cluster_size.gte(cluster_thresh))
    return cleaned

In [None]:
def merge_and_clean_clusters(two_class_img: ee.Image,
                             kernel: Kernel,
                             cluster_thresh: int,
                             aoi: ee.Geometry
                             ):
    ## Dilation
    closed = (two_class_img.unmask(0)
                .focal_max(kernel.size + 20, kernel.shape, kernel.units)
                .focal_min(20,               kernel.shape, kernel.units)
    )

    ## Drop small clusters
    closed_cleaned = drop_clusters_below_thresh(closed, cluster_thresh)

    ## Convert to vectors

    cluster_polys = (
        closed_cleaned
          .selfMask()
          .reduceToVectors(
              reducer        = ee.Reducer.countEvery().setOutputs(['pixel_count']), #test!!
              geometry       = aoi,  # or any AOI
              scale          = 10,
              geometryType   = 'polygon',
              eightConnected = True,
              labelProperty  = None,
              maxPixels      = 1e13
                  )
                )
    return cluster_polys

## 6.2 Merge S2 and L89 detections from same day

In [None]:
def merge_overlapping_features(*feature_collections: ee.FeatureCollection,
                               params: FeatureExportParams) -> ee.FeatureCollection:
    """
    Merges any features that overlap or touch. Accepts an arbitrary number of
    feature collections, and returns a single feature collection. Metadata will
    be stripped
    """
    sensor_property = params.sensor_prop_name
    delimiter = params.sensor_prop_delimiter

    if not feature_collections:
      raise ValueError("Provide at least one FeatureCollection.")

    combined_fc = feature_collections[0]
    for fc in feature_collections[1:]:
        combined_fc = combined_fc.merge(fc)


    dissolved_geom = combined_fc.geometry().dissolve(maxError=1)

    parts_geoms = dissolved_geom.geometries()

    parts_fc = ee.FeatureCollection(parts_geoms.map(lambda g: ee.Feature(ee.Geometry(g))))


    # add back metadata
    matches_key = "intersecting_spreads" # GEE argument for join.saveAll, creates property

    # first find features in combined_fc intersecting features in parts_fc
    spatial_filter = ee.Filter.intersects(leftField='.geo', rightField='.geo')

    saveall_join = ee.Join.saveAll(matchesKey=matches_key)

    parts_with_source_matches = ee.FeatureCollection(
        saveall_join.apply(parts_fc, combined_fc, spatial_filter)
        )

    # second create a sensor tag
    def _create_sensor_tag(f):
        f = ee.Feature(f)
        matching_feats = ee.List(f.get(matches_key))

        matching_sensors = ee.List(matching_feats
                                   .map(lambda m: ee.Feature(m).get(sensor_property))
                                   .distinct()
                                   .sort())


        sensor_label = ee.List(matching_sensors).join(delimiter)


        return f.set(sensor_property, sensor_label)

    labeled_fc = parts_with_source_matches.map(_create_sensor_tag)

    return labeled_fc

## 6.3 Clean up detection metadata

In [None]:
def make_feature_enricher(*,
                          params: FeatureExportParams,
                          aoi_context: AOIContext):

    def _add_attrs(feature):

        geom   = feature.geometry()
        centroid    = geom.centroid(10)
        coords      = centroid.coordinates()

        acres       = geom.area(10).multiply(aoi_context.m2acres)
        lon         = ee.Number(coords.get(0))
        lat         = ee.Number(coords.get(1))
        county_name = ee.Feature(aoi_context.counties
                                .filterBounds(centroid)
                                .first()
                                .get('NAME'))

        return feature.set({
            params.acres_prop_name    : acres,
            params.lat_prop_name      : lat,
            params.lon_prop_name      : lon,
            params.county_prop_name   : county_name
        })
    return _add_attrs

In [None]:
def set_global_ids(fc: ee.FeatureCollection,
                    params: FeatureExportParams) -> ee.FeatureCollection:
    """
    Make nice IDs like s2_01282025_0034 and set them as `id` properties.
    """
    #sort east/west then north/south
    fc_sorted = fc.sort(params.lat_prop_name).sort(params.lon_prop_name)

    n = fc_sorted.size()
    sorted_list = fc_sorted.toList(n)
    idx = ee.List.sequence(1, n)

    zeros = '0' * params.pad

    def _mk(i):
        i = ee.Number(i)
        f = ee.Feature(sorted_list.get(i.subtract(1)))

        county_name = ee.String(f.get(params.county_prop_name)).toLowerCase().split('[^a-z0-9]+').join('')
        sensor_name = ee.String(f.get(params.sensor_prop_name))
        date_str    = ee.String(f.get(params.date_prop_name)).split('-').join('')
        num_suffix  = ee.String(zeros).cat(i.format('%.0f')).slice(-params.pad)

        pretty_id   = county_name.cat('_').cat(date_str).cat('_').cat(num_suffix).cat('_').cat(sensor_name)
        return f.set(params.id_prop_name, pretty_id)

    return ee.FeatureCollection(idx.map(_mk))

# 7.&nbsp;Image Chip Export

## 7.1 Image Export

In [None]:
# helper that returns an RGB image visualised with per-image stretch

def dynamic_rgb_vis(img: ee.Image,
                    *,
                    aoi: ee.Geometry,
                    export_params: FeatureExportParams,
                    sensor_config: SensorConfig
                    ) -> ee.Image:
    """
    Compute per-band percentile stretch (e.g. p2–p98) for the given image
    over aoi, then visualise with gamma adjustment.
    """

    bands = [sensor_config.band_lookup[b] for b in export_params.bands]
    pct_low = export_params.pct_low
    pct_high = export_params.pct_high

    # Get per-band percentiles (returns server-side ee.Dictionary)
    percentiles = img.select(bands).reduceRegion(
        reducer=ee.Reducer.percentile([pct_low, pct_high]),
        geometry=aoi,
        scale=sensor_config.rgb_scale,
        bestEffort=True,
        maxPixels=1e8
    )

    # build per-band min/max lists
    mins = [ee.Number(percentiles.get(f'{b}_p{pct_low}'))  for b in bands]
    maxs = [ee.Number(percentiles.get(f'{b}_p{pct_high}')) for b in bands]

    # visualise with those dynamic limits and a gamma lift
    return img.visualize(
        bands=bands,
        min=mins,
        max=maxs,
        gamma=[export_params.gamma] * len(bands)
    )

def export_one(feature: ee.Feature,
               *,
               image_collection: ee.ImageCollection,
               export_params: FeatureExportParams,
               sensor_config: SensorConfig,
               filename_prefix: str):

    collection_size = image_collection.size().getInfo()

    if collection_size == 0:
        logger.warning("the image collection is empty for %s", # fix to include sensor
                       sensor_config.pretty)
        return

    geom = feature.geometry()
    buffered_geom = geom.buffer(export_params.buffer)
    bbox = buffered_geom.bounds()
    centroid = geom.centroid()
    bands = [sensor_config.band_lookup[b] for b in export_params.bands]

    scene_image = image_collection.filterBounds(geom).first().select(bands)

    scene_image_upsampled = (scene_image
            .resample(export_params.interpolation)
            .reproject(crs=export_params.crs, scale=sensor_config.export_scale))

    chip_geom  = centroid.buffer(export_params.dimension).bounds()

    chip = dynamic_rgb_vis(scene_image_upsampled,
                              aoi = chip_geom,
                              export_params = export_params,
                              sensor_config = sensor_config)


    detection_bbox = (ee.Image()
                          .paint(bbox, 0, export_params.bbox_thickness)
                          .visualize(palette=[export_params.bbox_color])
                      )
    chip_with_bbox = chip.blend(detection_bbox)


    export_image(chip_with_bbox,
           add_timestamp = False,
           aoi = chip_geom,
           filename = filename_prefix,
           scale = sensor_config.export_scale,
           folder = export_params.export_folder)

def export_all(fc: ee.FeatureCollection,
                *,
               image_collection: ee.ImageCollection,
               export_params: FeatureExportParams,
               sensor_config: SensorConfig):

    """export RGB chips with bboxes for every feature in a collection."""

    id_field = export_params.id_prop_name

    ## just for testing, delete
    n = fc.size().getInfo()
    print(f"Features in collection: {n}")

    # Pull IDs once
    ids = fc.aggregate_array(id_field).getInfo()

    ## Just for testing
    print(f"IDs fetched: {len(ids)}")

    # this will need to be map function instead. think lamba should do the trick
    # Launch exports
    for pid in ids:
        feat = ee.Feature(fc.filter(ee.Filter.eq(id_field, pid)).first())
        export_one(feat,
                   image_collection=image_collection,
                   export_params= export_params,
                   sensor_config=sensor_config,
                   filename_prefix=str(pid))

## 7.2 Wait for GEE

In [None]:
def wait_for_detection_geojson(geojson_name: str,
                                *,
                               folder_path: Path,
                               poll_seconds: int,
                               timeout_minutes: int,
                               logger=None) -> Path | None:

    start = time.time()
    file_name = f"{geojson_name}.geojson"

    if logger:
        logger.info("Watching %s for '%s' (poll=%ss, timeout=%smin)",
                    folder_path, file_name, poll_seconds, timeout_minutes)

    while True:
        matches = sorted(folder_path.glob(file_name), key=lambda p: p.stat().st_mtime, reverse=True)
        if matches:
            if logger:
                logger.info("Detected file: %s", matches[0].name)
            return matches[0]

        if time.time() - start > timeout_minutes * 60:
            if logger:
                logger.error("Timed out waiting for %s in %s", file_name, folder_path)
            return None

        if logger:
            logger.info("Not found yet. Checking again in %s seconds...", poll_seconds)
        time.sleep(poll_seconds)

# Main Script

### Validate configuration

In [None]:
if CFG.fall_composite is None and CFG.fall_comp_rng is None:
    raise ValueError("fall_composite and fall_comp_rng cannot both be None")

### Orchestrator

In [None]:
# as configured, aoi_fc and counties point to the same variable, but in cases
# where aoi is not defined by counties, they will be different
aoi = make_aoi_context( AOI_FC,
                        date_str = CFG.target_date,
                        m2acres = CFG.m2acres,
                        counties = CFG.counties_all)

datestamp = datetime.now().strftime('%Y%m%d')

if logger.isEnabledFor(PROGRESS_LEVEL):
    progress(
        "User’s area of interest (AOI) is %.2f million acres",
        ee2float(aoi.acres.divide(1_000_000), level=PROGRESS_LEVEL),
    )

logger.info("searching for satellite images intersecting user's AOI...\n")

# Check for satellite coverage for target date, add "effective_cloud" attribute
ics = {
    skey: annotate_effective_cloud(
        daily_ic(SENSORS[skey], aoi.ee_date, aoi.geom),
        SENSORS[skey]
        )
    for skey in SENSORS
}

log_effective_cloud_per_image(ics)

cull = drop_and_log_high_cloud(ics, SENSORS, CFG.max_cloud_cover,logger)
ics = cull.filtered
available = cull.available
acceptable_img_counts = cull.counts

if not available:
    logger.error("No AOI imagery available for %s. Aborting.",
                aoi.date_str)
    sys.exit(1)


logger.info(
    "AOI imagery available for %s on %s \n",
    ", ".join(
        f"{SENSORS[k].pretty} ({number_agree(acceptable_img_counts[k], 'scene')})"
        for k in available
    ),
    aoi.date_str
)

###########################################
# 2.3 Setup Deive
###########################################

workdir = setup_drive(
                      drive_base = CFG.drive_base,
                      mount_point = CFG.mount_point)

logger.info("working directory resolved to '%s'", workdir)

CFG.ensure_table_export_path()

###########################################
# 2.4 Fall Composite
###########################################
fall_composite_result = get_or_create_s2_fall_composite(
                                              CFG.fall_composite,
                                              CFG.fall_comp_rng,
                                              aoi.geom,
                                              cfg = SENSORS["s2"])


for line in fall_composite_result.messages:
    logger.info(line)

if fall_composite_result.img is None:
    logger.error("Fall composite could neither be found nor created. Aborting.")
    sys.exit(1)

fall_comp_img = fall_composite_result.img


###########################################
# 2.5 Load Prior Detections
###########################################

prior_detections = load_prior_detections(CFG.target_date,
                                          CFG.table_export_path,
                                          CFG.export_prefix,
                                          CFG.post_detection_suppression_days,
                                          CFG.date_format.py_iso)

## if there are prior detections, convert to raster mask
if prior_detections is not None:
    prior_detections_geom = prior_detections.geometry().dissolve(maxError=1)

    no_prior_detections_mask = (
                          ee.Image().byte().paint(prior_detections_geom, 1)
                          .unmask(0)
                          .eq(0)
                          .rename('no_prior_detection_mask')
                            )

###########################################
# 3 Preprocess Target Date Imagery
###########################################

sensor_outputs: dict[str, SensorOutputs] = {}

for skey in available:
    sensor_cfg = SENSORS[skey]
    sensor_txt = sensor_cfg.pretty

    logger.info("processing %s imagery...", sensor_txt)

    sensor_ic = ics[skey]

    ########################################
    # 3.2 raw coverage (no secondary helpers)
    ########################################

    logger.info("evaluating %s coverage of AOI on %s...",
                sensor_txt,
                aoi.date_str)

    sensor_aoi_coverage = get_sensor_aoi_coverage(skey, sensor_ic, aoi)

    if logger.isEnabledFor(PROGRESS_LEVEL):
      progress(
              "%s imagery is available for %.2f million acres (%.2f%%) "
              "of user's AOI on %s \n",
              sensor_txt,
              ee2float(sensor_aoi_coverage.acres.divide(1_000_000), level=PROGRESS_LEVEL),
              ee2float(sensor_aoi_coverage.percent_aoi, level=PROGRESS_LEVEL),
              aoi.date_str
      )


    logger.info("evaluating cloud conditions for imagery...")

    ########################################
    # 3.3 clouds
    ########################################
    clearsky_mask_params = ClearskyMaskParams(
        sensor_pretty=sensor_txt,
        coarse_scale=CFG.coarse_scale,
        buffer_kernel=sensor_cfg.cloud_kernel,
        cluster_thresh=sensor_cfg.clear_cluster,
        )

    clearsky = build_clearsky_mask(
                                    sensor_ic,
                                    sensor_aoi_coverage,
                                    clearsky_mask_params)


    if logger.isEnabledFor(PROGRESS_LEVEL):
      progress(
          "%.2f%% of the AOI %s imagery (%.2f million acres) is clear sky for %s \n",
          ee2float(clearsky.clear_pct_of_coverage),
          sensor_txt,
          ee2float(clearsky.acres.divide(1_000_000)),
          aoi.date_str
          )

    logger.info("evaluating snow cover in clear sky imagery...")

    ########################################
    # 3.4 cropland + cloud overlap
    ########################################

    crop_mask          = build_cropland_mask(sensor_aoi_coverage.geom)
    clear_crop_mask    = (clearsky.mask).And(crop_mask).And(sensor_aoi_coverage.mask)


    ########################################
    # 3.5 NBSI / snow
    ########################################

    mosaic            = sensor_ic.mosaic().multiply(sensor_cfg.scalar)

    brightness_params = VisBrightnessParams(
                            band_mapping=sensor_cfg.band_lookup,
                            vis_brightness_thresh=sensor_cfg.vis_brightness_thresh,
                            coarse_scale=CFG.coarse_scale,
                            geo_blocks=ee.FeatureCollection(CFG.geo_blocks)
                            )

    vis_brightness = get_vis_brightness_outputs(
                        mosaic,
                        clear_crop_mask,
                        sensor_aoi_coverage,
                        brightness_params)

    if logger.isEnabledFor(PROGRESS_LEVEL):

      pct_high_nbsi = ee2float(vis_brightness.pct_high)

      progress(
          "%.2f%% of %s AOI imagery (%.2f million acres) is snowy (NBSI > %.2f) on %s \n",
          pct_high_nbsi,
          sensor_txt,
          ee2float(vis_brightness.high_acres.divide(1_000_000)),
          sensor_cfg.vis_brightness_thresh,
          aoi.date_str
          )

      if pct_high_nbsi < CFG.min_masked_pct:
        progress("Skipping %s on %s: snow cover less than min snow threshold (%.2f%%). \n",
                  sensor_txt, aoi.date_str, CFG.min_masked_pct)

        artifact = SensorOutputs(
            ic = sensor_ic,
            aoi_coverage = sensor_aoi_coverage,
            clearsky = clearsky,
            crop_mask = crop_mask,
            vis_brightness = vis_brightness,
            inference_bands = None,
            target_image = None,
            rf_output = None,
            noise_removed = None,
            detection_polys = None
            )

        sensor_outputs[skey] = artifact

        continue

    clear_crop_snow_mask = clear_crop_mask.And(vis_brightness.high_mask)


    ###########################################
    # 4 Load or Train Classifier
    ###########################################

    #### 4.1 Load Classifier (if possible) ####
    classifier = None

    rf_path = sensor_cfg.paths.rf

    if rf_path and asset_exists(str(rf_path)):
        logger.info('%s classifier already exists. Loading classifier %s... \n',
                    sensor_txt,
                    rf_path.name)

        classifier = ee.Classifier.load(str(rf_path))



    elif rf_path and not asset_exists(str(rf_path)):
        logger.info("The assest '%s' does not exist. Aborting script...",
                    rf_path.name
                    )
        sys.exit(1)

    else:
        logger.info('no %s classifier indicated. training new classifier '
                    'attempting to import training data...',
                    sensor_txt)

        #### 4.2 Load Training Data and Add Binary Class Labels###########
        logger.info('attempting to upload %s training data to Google Earth Engine',
                    sensor_txt)

        groundtruth_fc = safe_path_to_ee_fc(CFG.resolve(sensor_cfg.paths.gt),
                                            params = CFG.data_import_params)
        sat_nonmanure_fc = safe_path_to_ee_fc(CFG.resolve(sensor_cfg.paths.so),
                                            params = CFG.data_import_params)
        sat_manure_fc = safe_path_to_ee_fc(CFG.resolve(sensor_cfg.paths.sm),
                                           params = CFG.data_import_params)

        groundtruth_fc = add_binary_class_from_label(groundtruth_fc, params = CFG.data_import_params)

        # sat_nonmanure is all non-manure or all manure, so easy
        sat_nonmanure_fc = sat_nonmanure_fc.map(lambda f: f.set('class', 0))
        sat_manure_fc = sat_manure_fc.map(lambda f: f.set('class', 1))

        training_polys = groundtruth_fc.merge(sat_manure_fc).merge(sat_nonmanure_fc)


        count_class_0 = training_polys.filter(ee.Filter.eq('class', 0)).size().getInfo()
        count_class_1 = training_polys.filter(ee.Filter.eq('class', 1)).size().getInfo()

        logger.info("Number of %s features with class 0: %i",
                    sensor_txt,
                    count_class_0)


        logger.info("Number of %s features with class 1: %i",
                    sensor_txt,
                    count_class_1)

        #### 4.3 Build Samples ###########
        training_images = build_training_img_dict(training_polys, sensor_cfg)

        sample_collection = create_flat_sample( training_images,
                                                training_polys,
                                                fall_comp_img
                                                )
        training_bands = sensor_cfg.optical_bands + fall_comp_img.bandNames().getInfo()

        classifier = train_rf_classifier(sample_collection,
                                         training_bands,
                                         CFG.rf_params)

        rf_asset_path = save_classifier(classifier,
                                        CFG.gee_asset_root,
                                        count_class_0 + count_class_1,
                                        skey)

        logger.info(
            "Classifier is being saved as %s. Check progress in code editor",
            rf_asset_path)

    if classifier is None:
      raise RuntimeError("Classifier was not created or loaded; cannot continue.")

    ###########################################
    # 5 Inference on Target Date Imagery
    ###########################################

    if prior_detections is not None:
        clear_crop_snow_mask = clear_crop_snow_mask.And(no_prior_detections_mask)


    target_image = (get_img_for_date(aoi.date_str,
                                     aoi.geom,
                                     sensor_cfg)
                    .updateMask(clear_crop_snow_mask)
                    .select(sensor_cfg.optical_bands)
                    .addBands(fall_comp_img)
                    )

    inference_bands = sensor_cfg.optical_bands + fall_comp_img.bandNames().getInfo()

    rf_output = target_image.select(inference_bands).classify(classifier)

    ###########################################
    # 6 Post Processing
    ###########################################

    noise_removed = drop_clusters_below_thresh(rf_output, CFG.noise_thresh)

    cluster_polys = merge_and_clean_clusters(
                                              noise_removed,
                                              CFG.post_kernel,
                                              sensor_cfg.cluster_thresh,
                                              sensor_aoi_coverage.geom)

    ## even though we masked prior detections, due to the dilation/erosion we
    ## need to clean this up a bit more
    if prior_detections is not None:
        cluster_polys_geom = cluster_polys.geometry().dissolve(maxError=1)

        new_detections = cluster_polys_geom.difference(prior_detections_geom, 1)
        new_detections_parts = ee.FeatureCollection(
                                    ee.List(new_detections.geometries())
                                    .map(lambda g: ee.Feature(ee.Geometry(g)))
                                      )

        parts_above_threshold = new_detections_parts.map(
            lambda f: f.set('area_m2', f.geometry().area())
            ).filter(ee.Filter.gte('area_m2', sensor_cfg.min_area_after_subtraction))

        cluster_polys = parts_above_threshold

    # unique run numbers could be here too
    cluster_polys_with_sensor = cluster_polys.map(
        lambda f: f.set('sensor', ee.String(skey))
        )

    sensor_output = SensorOutputs(
        ic = sensor_ic,
        aoi_coverage = sensor_aoi_coverage,
        clearsky = clearsky,
        crop_mask = crop_mask,
        vis_brightness = vis_brightness,
        inference_bands = inference_bands,
        target_image = target_image,
        rf_output = rf_output,
        noise_removed = noise_removed,
        detection_polys = cluster_polys_with_sensor
        )

    sensor_outputs[skey] = sensor_output


detection_fcs: list[ee.FeatureCollection] = [
    output.detection_polys
    for output in sensor_outputs.values()
    if output.detection_polys is not None
]

if len(detection_fcs) == 0:
    logger.info("no images had suitable atmospheric and surface conditions for inferencing")

else:
    # if multiple sensors collect imagery for this date, results are merged
    if len(detection_fcs) > 1:
        logger.info("multiple sensors with usable imagery. merging outputs to single feature collection")

        flat_detections = merge_overlapping_features(
                                          *detection_fcs,
                                          params = CFG.feature_export_params)

    else:
        flat_detections = detection_fcs[0]

    # tag detections that are expansions of previously detected spreads
    if prior_detections is not None:
        flat_detections = flat_detections.map(
            lambda f: f.set(CFG.feature_export_params.prior_spread_adjacency_prop_name,
                            f.geometry().intersects(
                                prior_detections_geom.buffer(10), maxError = 1
                                )))
    else:
        flat_detections = flat_detections.map(
            lambda f: f.set(CFG.feature_export_params.prior_spread_adjacency_prop_name, False))


    flat_detections_with_date = flat_detections.map(
        lambda f: f.set('detection_date', ee.String(aoi.date_str)))


    feature_enricher = make_feature_enricher(params = CFG.feature_export_params,
                                             aoi_context = aoi)

    detections_enriched = flat_detections_with_date.map(feature_enricher)

    detections_with_pretty_ids = set_global_ids(
                                      detections_enriched,
                                      CFG.feature_export_params
    )

    geojson_name = f"{CFG.export_prefix}_{CFG.target_date}"

    task = ee.batch.Export.table.toDrive(
        collection=detections_with_pretty_ids,
        description=geojson_name,
        folder=CFG.table_export_folder,
        fileNamePrefix= geojson_name,
        fileFormat="GeoJSON",
        selectors=[".geo"] + list(CFG.feature_export_params.export_fields)
        )

    task.start()

    logger.info('inferencing and postprocessing tasks initiated in cloud')
    logger.info('check GEE code for progress \n')



2025-11-07 15:20 — INFO — searching for satellite images intersecting user's AOI...

2025-11-07 15:20 — INFO — AOI imagery available for Sentinel‑2 (4 scenes), Landsat‑8/9 (2 scenes) on 2025-01-28 

Mounted at /content/drive
2025-11-07 15:21 — INFO — working directory resolved to '/content/drive/Colab/manure/ELPC_manure'
2025-11-07 15:22 — INFO — fall composite asset '2024_fall_composite_wi_v2' already exists
2025-11-07 15:22 — INFO — fall composite asset '2024_fall_composite_wi_v2' successfully fetched 

2025-11-07 15:22 — INFO — lookback_days <= 0, not attempting to load prior detections

2025-11-07 15:22 — INFO — processing Sentinel‑2 imagery...
2025-11-07 15:22 — INFO — evaluating Sentinel‑2 coverage of AOI on 2025-01-28...
2025-11-07 15:22 — INFO — evaluating cloud conditions for imagery...
2025-11-07 15:22 — INFO — evaluating snow cover in clear sky imagery...
2025-11-07 15:22 — INFO — Sentinel‑2 classifier already exists. Loading classifier rf_s2_846_polys_20250925... 

2025-11-

In [None]:
# sensor_outputs['s2'].vis_brightness.stats

## Image Chip Export (Optional)


In [None]:
if CFG.export_image_chips:

    wait_for_detection_geojson(geojson_name,
                              folder_path=CFG.table_export_path,
                              poll_seconds=CFG.poll_seconds,
                              timeout_minutes=CFG.timeout_minutes,
                              logger=logger
                              )

    detections_gdf = load_newest_geojson_for_date(CFG.target_date,
                                                  CFG.table_export_path,
                                                  CFG.export_prefix)
    if detections_gdf is None:
        logger.warning("No detection GeoJSON available for %s after waiting. Check task status in the GEE Code Editor.",
                      CFG.target_date)

    else:
        logger.info("There are %i features to export", len(detections_gdf))

        s2_detections_gdf = detections_gdf[detections_gdf["sensor"]
                                  .astype("string")
                                  .str.contains("s2", case=False, na=False)]
        s2_count = len(s2_detections_gdf)

        l89_detections = detections_gdf[detections_gdf["sensor"].eq("l89")]
        l89_count = len(l89_detections)

        if s2_count + l89_count != len(detections_gdf):
            logger.warning("sensor labels appear corrupted. check geojson")

        if s2_count > 0:
            logger.info("exporting detections with s2 imagery (%i total)", s2_count)

            s2_detections_fc = geemap.gdf_to_ee(s2_detections_gdf, geodesic=True)
            export_all(s2_detections_fc,
                      image_collection = ics['s2'],
                      export_params = CFG.feature_export_params,
                      sensor_config = SENSORS['s2']
                      )
        else:
            logger.info("no detections with s2 imagery to export")

        if l89_count > 0:
            logger.info("exporting detections with only l89 imagery (%i total)",
                        l89_count)

            l89_detections_fc = geemap.gdf_to_ee(l89_detections, geodesic=True)
            export_all(l89_detections_fc,
                      image_collection = ics['l89'],
                      export_params = CFG.feature_export_params,
                      sensor_config = SENSORS['l89']
                      )
        else:
            logger.info("no detections with only l89 imagery to export")

else:
    logger.info("image chips not exported, set to True if desired")


2025-09-25 20:14 — INFO — image chips not exported, set to True if desired
