# Ecosystem Services Valuation Workflow

This notebook follows the 11-phase roadmap from the project README. Fill in each section sequentially to maintain provenance and reproducibility.

## 0. Setup
- Activate the `biomass` environment (`conda activate biomass`).
- Update the project path, AOI coordinates, and CDSE credentials in the cells below.

In [14]:
import json
import numpy as np
import pandas as pd
import geopandas as gpd
import rasterio
import matplotlib.pyplot as plt
from rasterio.plot import show
from rasterio.mask import mask
from rasterio.io import MemoryFile
from pathlib import Path
from typing import Optional

print("Core libraries imported.")

Core libraries imported.


In [15]:
PROJECT_ROOT = Path.cwd()
DATA_DIR = PROJECT_ROOT / "data"
DATA_DIR.mkdir(exist_ok=True)
print(f"Data directory: {DATA_DIR}")

SAFE_PATTERN = "S2C_MSIL2A_*.SAFE"
SAFE_ROOTS = sorted(DATA_DIR.glob(SAFE_PATTERN))
if not SAFE_ROOTS:
    raise FileNotFoundError(
        f"No Sentinel-2 SAFE directories found via pattern '{SAFE_PATTERN}' in {DATA_DIR}"
    )
print(f"Found {len(SAFE_ROOTS)} SAFE folders inside data/:")
for safe in SAFE_ROOTS:
    print(" •", safe.name)

ANCILLARY_DIR = DATA_DIR / "ancillary"
ANCILLARY_DIR.mkdir(exist_ok=True)
print(f"Ancillary data directory: {ANCILLARY_DIR}")

AOI_PATH = DATA_DIR / "aoi.geojson"
if AOI_PATH.exists():
    print(f"AOI file detected: {AOI_PATH}")
else:
    print(f"AOI file not found (expected at {AOI_PATH}). Drop a GeoJSON/shapefile copy there when ready.")


Data directory: /home/fred/Projects/Feishu-Hack/Biomass_fun/data
Found 2 SAFE folders inside data/:
 • S2C_MSIL2A_20251113T024511_N0511_R089_T50RQV_20251113T054013.SAFE
 • S2C_MSIL2A_20251113T024511_N0511_R089_T51RTQ_20251113T054013.SAFE
Ancillary data directory: /home/fred/Projects/Feishu-Hack/Biomass_fun/data/ancillary
AOI file detected: /home/fred/Projects/Feishu-Hack/Biomass_fun/data/aoi.geojson


In [16]:
def load_optional_vector(path: Path) -> Optional[gpd.GeoDataFrame]:
    if not path.exists():
        return None
    gdf = gpd.read_file(path)
    print(f"Loaded vector data from {path} ({len(gdf)} features, CRS={gdf.crs})")
    return gdf.to_crs("EPSG:4326") if gdf.crs else gdf

AOI_GDF = load_optional_vector(AOI_PATH)
if AOI_GDF is None:
    print("AOI_GDF not loaded yet. Provide a GeoJSON/GeoPackage/Shapefile in data/.")
else:
    AOI_CENTROID = AOI_GDF.geometry.centroid.unary_union
    print(f"AOI centroid: ({AOI_CENTROID.y:.4f}, {AOI_CENTROID.x:.4f})")


Loaded vector data from /home/fred/Projects/Feishu-Hack/Biomass_fun/data/aoi.geojson (1 features, CRS=EPSG:4326)
AOI centroid: (31.1209, 119.8241)



  AOI_CENTROID = AOI_GDF.geometry.centroid.unary_union
  AOI_CENTROID = AOI_GDF.geometry.centroid.unary_union


In [17]:
ANCILLARY_RASTERS = {
    "dem": ANCILLARY_DIR / "dem.tif",
    "slope": ANCILLARY_DIR / "slope.tif",
    "precip": ANCILLARY_DIR / "precipitation_mm_per_year.tif",
}


def inspect_raster(path: Path):
    if not path.exists():
        print(f"[missing] {path}")
        return None
    with rasterio.open(path) as ds:
        print(
            f"[loaded] {path.name}: {ds.width}x{ds.height} px, res={ds.res}, crs={ds.crs}, nodata={ds.nodata}"
        )
        return ds.profile

ANCILLARY_PROFILES = {name: inspect_raster(path) for name, path in ANCILLARY_RASTERS.items()}



[missing] /home/fred/Projects/Feishu-Hack/Biomass_fun/data/ancillary/dem.tif
[missing] /home/fred/Projects/Feishu-Hack/Biomass_fun/data/ancillary/slope.tif
[missing] /home/fred/Projects/Feishu-Hack/Biomass_fun/data/ancillary/precipitation_mm_per_year.tif


## Phase 0: CDSE API & Download
- Implement a reusable CDSE authentication helper (env vars or config file).
- Load AOI centroid coordinates and build a 25 km buffer polygon (`geopandas`/`shapely`).
- Use the CDSE search endpoint to query Sentinel-2 L2A products intersecting the AOI.
- Filter candidates by acquisition date, cloud cover, and processing baseline.
- Log the chosen product(s) with metadata (UUID, title, footprint, cloud %, download URL).
- Download the `.SAFE` archives into `DATA_DIR` and verify checksums if provided.

In [None]:
from datetime import datetime

try:
    SAFE_ROOTS
except NameError as exc:
    raise RuntimeError("SAFE_ROOTS not initialized. Run the setup cell above first.") from exc


def summarize_safe(path: Path) -> dict:
    tokens = path.name.split("_")
    return {
        "safe_name": path.name,
        "acquisition_utc": pd.to_datetime(tokens[2], format="%Y%m%dT%H%M%S"),
        "processing_baseline": tokens[3],
        "relative_orbit": tokens[4],
        "tile": tokens[5],
        "local_path": path.resolve().as_posix(),
    }

product_log = pd.DataFrame([summarize_safe(p) for p in SAFE_ROOTS])
product_log


Unnamed: 0,safe_name,acquisition_utc,processing_baseline,relative_orbit,tile,local_path
0,S2C_MSIL2A_20251113T024511_N0511_R089_T50RQV_2...,2025-11-13 02:45:11,N0511,R089,T50RQV,/home/fred/Projects/Feishu-Hack/Biomass_fun/S2...
1,S2C_MSIL2A_20251113T024511_N0511_R089_T51RTQ_2...,2025-11-13 02:45:11,N0511,R089,T51RTQ,/home/fred/Projects/Feishu-Hack/Biomass_fun/S2...


## Phase 1: Data Loading & Preprocessing
- Parse the `.SAFE` structure to locate band JP2 paths (10 m & 20 m resolutions).
- Load B2, B3, B4, B8, B11, B12 arrays; resample 20 m bands to the 10 m reference grid.
- Apply the scaling factor (÷10,000) and cast to `float32` reflectance.
- Generate nodata masks (<=0) and cloud masks using `MSK_CLDPRB_20m` or SCL rasters.
- Combine masks into a single `valid` mask to keep clean pixels for downstream phases.
- Build quick-look products: RGB, SWIR composites, cloud overlays for QA/QC.

In [None]:
from typing import Dict, Tuple
from contextlib import ExitStack
from rasterio.merge import merge
from rasterio.enums import Resampling

BAND_SPECS = {
    "B02": {"label": "blue", "folder": "R10m", "suffix": "10m"},
    "B03": {"label": "green", "folder": "R10m", "suffix": "10m"},
    "B04": {"label": "red", "folder": "R10m", "suffix": "10m"},
    "B08": {"label": "nir", "folder": "R10m", "suffix": "10m"},
    "B11": {"label": "swir1", "folder": "R20m", "suffix": "20m"},
    "B12": {"label": "swir2", "folder": "R20m", "suffix": "20m"},
}

SCALING_FACTOR = 10000.0
PIXEL_AREA_HA_10M = 0.01


def get_granule_dir(safe_dir: Path) -> Path:
    granules = sorted((safe_dir / "GRANULE").iterdir())
    if not granules:
        raise FileNotFoundError(f"No GRANULE directory found in {safe_dir}")
    return granules[0]


def find_band_path(safe_dir: Path, band_id: str) -> Path:
    spec = BAND_SPECS[band_id]
    granule = get_granule_dir(safe_dir)
    search_root = granule / "IMG_DATA" / spec["folder"]
    pattern = f"*_{band_id}_{spec['suffix']}.jp2"
    matches = sorted(search_root.glob(pattern))
    if not matches:
        raise FileNotFoundError(f"Band {band_id} not found in {safe_dir}")
    return matches[0]


def find_quality_mask(safe_dir: Path, filename: str) -> Path:
    granule = get_granule_dir(safe_dir)
    candidate = granule / "QI_DATA" / filename
    if not candidate.exists():
        raise FileNotFoundError(f"Quality mask {filename} missing in {safe_dir}")
    return candidate


def merge_datasets(paths, dst_profile=None, resampling=Resampling.bilinear):
    with ExitStack() as stack:
        datasets = [stack.enter_context(rasterio.open(p)) for p in paths]
        merge_kwargs = {}
        if dst_profile is not None:
            merge_kwargs.update(
                dst_transform=dst_profile["transform"],
                dst_crs=dst_profile["crs"],
                dst_width=dst_profile["width"],
                dst_height=dst_profile["height"],
            )
        mosaic, out_transform = merge(datasets, resampling=resampling, **merge_kwargs)
        base_profile = datasets[0].profile.copy()
    base_profile.update(
        height=mosaic.shape[1],
        width=mosaic.shape[2],
        transform=out_transform,
        crs=dst_profile["crs"] if dst_profile else base_profile["crs"],
        count=1,
        nodata=0,
        dtype=mosaic.dtype,
    )
    return mosaic[0], base_profile


def load_band(band_id: str, dst_profile=None, resampling=Resampling.bilinear):
    paths = [find_band_path(safe, band_id) for safe in SAFE_ROOTS]
    return merge_datasets(paths, dst_profile=dst_profile, resampling=resampling)


def to_reflectance(array: np.ndarray) -> np.ndarray:
    return (array.astype(np.float32) / SCALING_FACTOR).clip(0, 1.2)



In [None]:
reference_red, reference_profile = load_band("B04")
REFERENCE_PROFILE = {
    "transform": reference_profile["transform"],
    "crs": reference_profile["crs"],
    "width": reference_profile["width"],
    "height": reference_profile["height"],
}

BAND_DATA: Dict[str, np.ndarray] = {"red": to_reflectance(reference_red)}
for band_id, spec in BAND_SPECS.items():
    if spec["label"] == "red":
        continue
    arr, _ = load_band(band_id, dst_profile=reference_profile)
    BAND_DATA[spec["label"]] = to_reflectance(arr)

cloud_probability_paths = [find_quality_mask(safe, "MSK_CLDPRB_20m.jp2") for safe in SAFE_ROOTS]
cloud_prob_raw, _ = merge_datasets(
    cloud_probability_paths,
    dst_profile=reference_profile,
    resampling=Resampling.bilinear,
)
cloud_probability = cloud_prob_raw.astype(np.float32)

scl_paths = [find_quality_mask(safe, "MSK_CLASSI_B00.jp2") for safe in SAFE_ROOTS]
scl_raw, _ = merge_datasets(
    scl_paths,
    dst_profile=reference_profile,
    resampling=Resampling.nearest,
)
SCL = scl_raw.astype(np.uint8)

stack_for_mask = np.stack(list(BAND_DATA.values()), axis=0)
nodata_mask = np.any(stack_for_mask <= 0.0, axis=0)
cloud_mask = cloud_probability >= 40  # % probability threshold
scl_cloud = np.isin(SCL, [3, 8, 9, 10, 11])
valid_mask = ~(nodata_mask | cloud_mask | scl_cloud)

MASKS = {
    "nodata": nodata_mask,
    "cloud_prob": cloud_mask,
    "scl_cloud": scl_cloud,
    "valid": valid_mask,
    "cloud_probability": cloud_probability,
    "scl": SCL,
}

print(
    "Loaded bands:", ", ".join(sorted(BAND_DATA.keys())),
    "| valid pixels:", int(valid_mask.sum()),
)



In [None]:
def load_and_align_raster(path: Path, reference_profile: dict | None = None):
    if not path.exists():
        raise FileNotFoundError(path)
    with rasterio.open(path) as src:
        data = src.read(1)
        profile = src.profile
        if reference_profile is None:
            return data, profile
        reprojected, new_profile = merge_datasets([path], dst_profile=reference_profile, resampling=Resampling.bilinear)
        return reprojected, new_profile


def clip_raster_to_aoi(array: np.ndarray, profile: dict, aoi_gdf: gpd.GeoDataFrame):
    if aoi_gdf is None or aoi_gdf.empty:
        return array, profile
    geoms = [geom.__geo_interface__ for geom in aoi_gdf.to_crs(profile["crs"]).geometry]
    with rasterio.Env():
        with MemoryFile() as memfile:
            with memfile.open(**profile) as dataset:
                dataset.write(array, 1)
                clipped, clipped_transform = mask(dataset, geoms, crop=True)
    new_profile = profile.copy()
    new_profile.update(
        height=clipped.shape[1],
        width=clipped.shape[2],
        transform=clipped_transform,
    )
    return clipped[0], new_profile



In [None]:
ancillary_arrays = {}
for name, path in ANCILLARY_RASTERS.items():
    if not path.exists():
        continue
    arr, profile = load_and_align_raster(path, reference_profile=reference_profile)
    if AOI_GDF is not None:
        arr, profile = clip_raster_to_aoi(arr, profile, AOI_GDF)
    ancillary_arrays[name] = {"array": arr, "profile": profile}
    print(f"Aligned ancillary raster '{name}' -> shape {arr.shape}")

if not ancillary_arrays:
    print("No ancillary rasters have been aligned yet.")


## Phase 2: Vegetation Indices
- Implement NDVI, EVI, and optional SAVI with epsilon protection on denominators.
- Clip outputs to the [-1, 1] range and set invalid pixels to `NaN` using `MASKS`.
- Summarize each index with descriptive stats (min, max, mean, median, std, percentiles).
- Plot map panels and histograms to confirm distributions and detect anomalies.
- Flag out-of-range values before feeding indices into classification or biomass models.

## Phase 3: Land Cover Classification
- Define NDVI-based thresholds for `non-forest`, `sparse`, `moderate`, and `dense` classes.
- Create a categorical raster using the thresholds and `MASKS["valid"]`.
- Calculate area (ha) per class using pixel area conversions.
- Plot a classification map with an intuitive color palette.
- Summarize counts/areas in a table for reporting.


## Phase 4: Biomass Estimation
- Select or cite an allometric equation suitable for the study region.
- Apply the equation pixel-wise to generate a biomass raster (tons/ha).
- Mask low-vegetation pixels (e.g., NDVI < 0.2) before aggregating.
- Enforce realistic min/max biomass bounds to avoid outliers.
- Compute total biomass (tons) using pixel area (0.01 ha at 10 m).
- Visualize biomass distribution with a legend and summary statistics.


## Phase 5: Analysis & Validation
- Derive descriptive stats (mean, median, std, min, max) for biomass and key indices.
- Build histograms/boxplots to inspect distributions and potential skew.
- Compare aggregated values to literature benchmarks for similar ecosystems.
- Conduct sensitivity checks on main parameters (e.g., NDVI thresholds, coefficients).
- Document assumptions, uncertainties, and data quality caveats.


## Phase 6: Water Detection
- Compute NDWI and MNDWI using the reflectance bands already loaded.
- Apply tuned thresholds to delineate water bodies.
- Separate water classes by permanence/intensity if possible.
- Calculate total water area and percentage of valid pixels.
- Map the water mask(s) with transparent overlays on RGB for QA.


## Phase 7: Water Quality
- Calculate a turbidity proxy such as the Normalized Difference Turbidity Index (NDTI).
- Restrict calculations to water pixels only.
- Create quality classes (e.g., healthy, moderate, degraded).
- Plot maps showing turbidity gradients.
- Note potential confounders (suspended sediment, sensor noise).


## Phase 8: Hydrological Analysis
- Build riparian buffer zones (30 m, 100 m, 300 m) using AOI geometry.
- Summarize NDVI or biomass statistics within each buffer to assess vegetation quality.
- Evaluate wetland connectivity metrics (distance to water, corridor quality, area).
- Flag priority conservation corridors based on connectivity + condition.
- Document formulas/assumptions for reproducibility.


## Phase 9: Ecosystem Service Quantification
- Translate biophysical metrics to per-pixel service scores for the five services.
- **Water Flow Regulation:** estimate water storage capacity using NDVI-based vegetation factors.
- **Water Purification:** compute pollutant removal capacity using NDVI and wetland area fractions.
- **Sediment Control:** approximate sediment retention via simplified USLE proxies (NDVI, buffers).
- **Aquifer Recharge:** estimate recharge potential with precipitation + NDVI-derived infiltration.
- **Flood Protection:** approximate flood storage using floodplain extent, storage depth, roughness.
- Produce maps and tables for each service, highlight hotspots.


## Phase 10: Dynamic Ecosystem Service Valuation
- Assign base value coefficients ($/ha/year) to each service (per README guidance).
- Derive Quality, Scarcity, and Benefit multipliers from indices and contextual data.
- Compute per-pixel dynamic value = base × quality × scarcity × benefit.
- Aggregate to total annual value and unit value (per ha) for the AOI.
- Generate valuation maps and concise summary tables.


## Phase 11: Documentation & Presentation
- Write a clear methodology narrative covering input data, preprocessing, models, and assumptions.
- Export publication-ready figures (maps, histograms, tables) with consistent styling.
- Compile a concise PDF report (3–5 pages) summarizing objectives, methods, results, discussion.
- Prepare a short slide deck (≈5 minutes) highlighting key insights and visuals.
- Ensure notebook cells are clean, commented, and reproducible for final submission.
