# EO Notebook: Ingestion, Access Patterns, Dataset Engineering

Questo notebook mostra una pipeline EO completa, orientata a dimostrare competenze pratiche di **data engineering geospaziale** e preparazione dati per AI.
Il flusso segue una logica di produzione: discovery -> accesso efficiente -> preprocessing -> packaging -> quality checks -> data loading.

## Obiettivi tecnici

1. **Data discovery** via STAC, con filtri riproducibili su AOI, tempo e cloud cover.
2. **Data access** da COG con pattern efficienti (windowed read e range request), evitando letture full-raster non necessarie.
3. **Dataset engineering** per creare artefatti AI-ready scalabili e riusabili.

## Perché questo approccio

- Separa la logica configurabile (`config.yaml`) dal codice.
- Produce output standard nel dominio EO/ML (`Zarr`, `GeoParquet`, `JSON metadata`).
- Rende il workflow ripetibile e verificabile tramite report QC.

## Deliverable esportati

- `config.yaml`
- `src/ingest_stac.py`
- `data/tiles.parquet` (GeoParquet tile index)
- `artifacts/statistics.json` (mean/std per banda)
- `data/data.zarr` (packaging scalabile)
- `artifacts/metadata.json`
- `reports/qc_report.md` + `reports/qc_report.html`
- `src/dataloader.py` + snippet d'uso PyTorch

In [19]:
from google.colab import drive
drive.mount('/content/drive')
import os

# Change to the 'notebook' directory in Google Drive
notebook_path = '/content/drive/My Drive/Notebooks/EO'
if not os.path.exists(notebook_path):
    os.makedirs(notebook_path) # Create the directory if it doesn't exist
os.chdir(notebook_path)
print(f"Current working directory changed to: {os.getcwd()}")

!pip install rioxarray pystac-client planetary-computer stackstac zarr

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Current working directory changed to: /content/drive/My Drive/Notebooks/EO


## 0) Setup ambiente

In questa sezione inizializziamo l'ambiente, carichiamo la configurazione e prepariamo le cartelle output.
L'obiettivo e rendere la pipeline **parametrica** e facilmente rieseguibile su AOI o periodi diversi.

### Librerie chiave

- `pystac-client`: interrogazione cataloghi STAC.
- `rasterio` / `rioxarray` / `xarray`: I/O raster, georeferenziazione, manipolazione array.
- `stackstac`: costruzione stack spazio-temporali consistenti a partire da STAC Items.
- `geopandas` / `shapely`: gestione geometrie AOI e tile index.

### Output atteso

- Config caricata da `config.yaml`.
- Directory `data/`, `artifacts/`, `reports/` create (se mancanti).
- Config risalvata per tracciare con precisione i parametri di run.

In [20]:
from pathlib import Path
import json
import warnings
import logging
import sys # Import sys

import geopandas as gpd
import numpy as np
import pandas as pd
import xarray as xr
import yaml
from shapely.geometry import box
sys.path.insert(0, str(Path().resolve()))

from src.ingest_stac import (
    get_asset_href,
    items_to_frame,
    load_config,
    read_cog_window,
    save_config,
    search_stac_items,
    to_geodataframe,
)

warnings.filterwarnings("ignore", category=FutureWarning)
logging.getLogger("rasterio._env").setLevel(logging.ERROR)

CONFIG_PATH = Path("config.yaml")
cfg = load_config(CONFIG_PATH)

DATA_DIR = Path(cfg["io"]["data_dir"])
ARTIFACT_DIR = Path(cfg["io"]["artifact_dir"])
REPORT_DIR = Path(cfg["io"]["report_dir"])

for d in [DATA_DIR, ARTIFACT_DIR, REPORT_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# Re-export della configurazione come deliverable riproducibile
save_config(cfg, CONFIG_PATH)
cfg

{'aoi': {'name': 'milan_test_site',
  'crs': 'EPSG:4326',
  'geometry': {'type': 'Polygon',
   'coordinates': [[[9.107, 45.43],
     [9.257, 45.43],
     [9.257, 45.52],
     [9.107, 45.52],
     [9.107, 45.43]]]}},
 'stac': {'provider': 'planetary-computer',
  'collections': ['sentinel-2-l2a'],
  'date_range': {'start': '2025-04-01', 'end': '2025-09-30'},
  'cloud_cover': {'field': 'eo:cloud_cover', 'max': 20},
  'assets': ['B02', 'B03', 'B04', 'B08', 'SCL']},
 'preprocessing': {'target_resolution': 10,
  'tile_size': 256,
  'train_fraction': 0.8,
  'cloud_mask': {'scl_invalid_classes': [0, 1, 3, 8, 9, 10, 11]},
  'normalization': {'reflectance_scale': 10000.0}},
 'io': {'data_dir': 'data',
  'artifact_dir': 'artifacts',
  'report_dir': 'reports',
  'zarr_path': 'data/data.zarr',
  'tiles_index_path': 'data/tiles.parquet',
  'stats_path': 'artifacts/statistics.json',
  'metadata_path': 'artifacts/metadata.json',
  'qc_report_md': 'reports/qc_report.md',
  'qc_report_html': 'reports/qc

## 1) Data Discovery (STAC)

Qui interroghiamo il catalogo STAC con tre vincoli principali:

- **AOI**: area geografica di interesse (GeoJSON in `config.yaml`).
- **Intervallo temporale**: finestra di osservazione.
- **Cloud cover**: filtro qualità iniziale per Sentinel-2.

### Cosa validiamo subito

- Numero di scene trovate.
- Distribuzione temporale (`datetime`) per verificare copertura del periodo.
- CRS/EPSG disponibili negli item, utile per decidere il sistema target dello stack.

Questa fase e critica: se i filtri sono troppo stretti, il dataset diventa povero; se troppo larghi, aumentano costi di I/O e rumore nei dati.

### Nota accesso Planetary Computer

Gli asset su Azure Blob richiedono URL firmate (SAS).
In questo notebook la firma viene applicata automaticamente agli item STAC; senza firma puoi ricevere errori HTTP `409` / `403` in `rasterio` o `stackstac`.

Per uso base di norma **non serve un token manuale**. Una subscription key e opzionale per workload intensivi (rate limit più stabili).

In [21]:
items = search_stac_items(cfg)

# Sign URLs for Planetary Computer assets (SAS) to avoid HTTP 409/403 on blob access
if cfg["stac"]["provider"].lower() in {"planetary-computer", "pc", "microsoft"}:
    try:
        import planetary_computer as pc
        items = [pc.sign(item) for item in items]
    except Exception as e:
        print("Warning: non sono riuscito a firmare gli item:", e)

items_df = items_to_frame(items)

print(f"Items trovati: {len(items)}")
if len(items_df):
    display(items_df.head(10))
    print("Range temporale:", items_df["datetime"].min(), "->", items_df["datetime"].max())
    print("Cloud cover medio:", round(items_df["cloud_cover"].dropna().mean(), 2))
else:
    raise RuntimeError("Nessun item trovato: allarga date/AOI o aumenta cloud cover max.")


Items trovati: 35


Unnamed: 0,id,collection,datetime,cloud_cover,epsg,bbox
0,S2C_MSIL2A_20250405T102041_R065_T32TMR_2025040...,sentinel-2-l2a,2025-04-05 10:20:41.025000+00:00,0.008494,,"[7.7069511, 45.0581917, 9.1261672, 46.0535047]"
1,S2C_MSIL2A_20250405T102041_R065_T32TNR_2025040...,sentinel-2-l2a,2025-04-05 10:20:41.025000+00:00,0.048613,,"[8.9997415, 45.0567486, 10.4189041, 46.0535744]"
2,S2C_MSIL2A_20250425T102041_R065_T32TMR_2025042...,sentinel-2-l2a,2025-04-25 10:20:41.025000+00:00,11.213718,,"[7.7069511, 45.0581917, 9.1261672, 46.0535047]"
3,S2B_MSIL2A_20250430T101559_R065_T32TMR_2025043...,sentinel-2-l2a,2025-04-30 10:15:59.024000+00:00,4.347169,,"[7.7069511, 45.0581917, 9.1261672, 46.0535047]"
4,S2B_MSIL2A_20250430T101559_R065_T32TNR_2025043...,sentinel-2-l2a,2025-04-30 10:15:59.024000+00:00,10.980903,,"[8.9997415, 45.0567486, 10.4189041, 46.0535744]"
5,S2C_MSIL2A_20250515T101611_R065_T32TMR_2025051...,sentinel-2-l2a,2025-05-15 10:16:11.025000+00:00,4.390318,,"[7.7069511, 45.0581917, 9.1261672, 46.0535047]"
6,S2C_MSIL2A_20250515T101611_R065_T32TNR_2025051...,sentinel-2-l2a,2025-05-15 10:16:11.025000+00:00,14.308147,,"[8.9997415, 45.0567486, 10.4189041, 46.0535744]"
7,S2A_MSIL2A_20250517T101701_R065_T32TNR_2025051...,sentinel-2-l2a,2025-05-17 10:17:01.024000+00:00,15.150085,,"[8.9997415, 45.0567486, 10.4189041, 46.0535744]"
8,S2B_MSIL2A_20250530T101559_R065_T32TMR_2025053...,sentinel-2-l2a,2025-05-30 10:15:59.024000+00:00,11.815874,,"[7.7069511, 45.0581917, 9.1261672, 46.0535047]"
9,S2B_MSIL2A_20250530T101559_R065_T32TNR_2025053...,sentinel-2-l2a,2025-05-30 10:15:59.024000+00:00,18.834792,,"[8.9997415, 45.0567486, 10.4189041, 46.0535744]"


Range temporale: 2025-04-05 10:20:41.025000+00:00 -> 2025-08-18 10:15:59.024000+00:00
Cloud cover medio: 7.89


### Opzionale: estensione a Sentinel-1

Per una pipeline multi-sensore puoi aggiungere `sentinel-1-rtc` alle collection.
In quel caso conviene definire chiaramente una strategia di fusione (ottico + radar):

- allineamento spaziale/temporale tra sensori,
- normalizzazione per domini radiometrici diversi,
- definizione feature finali per il modello.

In questa demo manteniamo solo Sentinel-2 L2A per focalizzarci su cloud masking con banda `SCL` e tenere il percorso didattico lineare.

## 2) Data Access Pattern (COG + HTTP range)

L'obiettivo non e leggere l'intera immagine, ma mostrare accesso efficiente a blocchi utili:

- **Windowed read**: estraiamo solo la porzione che interseca l'AOI.
- **Cloud-native access**: i COG supportano letture parziali via HTTP range request.

### Perché e importante

Su dataset EO grandi, il collo di bottiglia e spesso I/O remoto.
Leggere solo i byte necessari riduce latenza, banda e costo operativo.
Questo pattern e la base per pipeline scalabili in cloud e training distribuito.

In [22]:
first_item = items[0]
provider = cfg["stac"]["provider"]
example_band = "B04"
asset_href = get_asset_href(first_item, example_band, provider)

aoi_gdf = to_geodataframe(cfg)
aoi_bounds = tuple(aoi_gdf.total_bounds)

window_array, window_profile = read_cog_window(
    asset_href=asset_href,
    bounds=aoi_bounds,
    bounds_crs="EPSG:4326",
    band=1,
)

print("Asset:", asset_href[:120] + "...")
print("Shape window letta:", window_array.shape)
print("Profilo sintetico:", window_profile)


Asset: https://sentinel2l2a01.blob.core.windows.net/sentinel2-l2/32/T/NR/2025/08/18/S2B_MSIL2A_20250818T101559_N0511_R065_T32TN...
Shape window letta: (1003, 1170)
Profilo sintetico: {'crs': 'EPSG:32632', 'dtype': 'uint16', 'shape': (1003, 1170), 'nodata': 0.0, 'transform': (10.0, 0.0, np.float64(508369.9199700141), 0.0, -10.0, np.float64(5040750.568459621), 0.0, 0.0, 1.0)}


### Verifica pratica della range request (opzionale)

Questa cella invia una richiesta HTTP con header `Range` per scaricare solo un piccolo intervallo di byte.
Su endpoint COG compatibili, la risposta attesa e in genere `206 Partial Content`.

Nota: alcuni gateway/CDN possono comportarsi diversamente pur mantenendo la lettura efficiente lato GDAL/rasterio.
Per questo la verifica e utile come check didattico, ma non sostituisce il profiling end-to-end del workflow.

In [23]:
import requests

try:
    r = requests.get(asset_href, headers={"Range": "bytes=0-1023"}, timeout=30)
    print("HTTP status:", r.status_code)
    print("Content-Range:", r.headers.get("Content-Range"))
    print("Bytes ricevuti:", len(r.content))
except Exception as e:
    print("Range request demo non eseguita:", e)


HTTP status: 206
Content-Range: bytes 0-1023/278969866
Bytes ricevuti: 1024


## 3) Preprocessing: align, cloud mask, normalization

In questa fase trasformiamo scene STAC eterogenee in un cubo coerente e utilizzabile per ML.

### Step

1. Costruiamo uno stack spazio-temporale allineato con `stackstac` (stessa griglia/resolution/CRS).
2. Selezioniamo bande ottiche (`B02/B03/B04/B08`) e la banda `SCL` per quality mask.
3. Applichiamo masking di cloud/shadow/classi non valide (configurabili in `config.yaml`).
4. Normalizziamo i DN in riflettanza (`/10000`).
5. Creiamo un composito temporale robusto (mediana) per ridurre rumore residuo.

### Controlli robustezza inclusi

- Se `proj:epsg` manca negli item, il notebook deriva EPSG UTM dal centroide AOI.
- Se lo stack risultante ha shape troppo piccola (es. 1x1), il notebook si ferma con errore esplicito (non prosegue con tile non sensati).
- La cloud mask usa `xarray.isin`, evitando problemi di shape con `apply_ufunc`.


In [24]:
import stackstac

assets = cfg["stac"]["assets"]
optical_bands = [b for b in assets if b != "SCL"]
invalid_scl = cfg["preprocessing"]["cloud_mask"]["scl_invalid_classes"]

# Robust EPSG selection: prefer STAC metadata, fallback to UTM zone from AOI centroid
epsg_series = (
    pd.to_numeric(items_df["epsg"], errors="coerce").dropna().astype(int)
    if "epsg" in items_df.columns
    else pd.Series(dtype="int64")
)

if not epsg_series.empty:
    target_epsg = int(epsg_series.mode().iloc[0])
    epsg_source = "items_df mode(proj:epsg)"
else:
    centroid = aoi_gdf.to_crs("EPSG:4326").geometry.iloc[0].centroid
    zone = int((centroid.x + 180) // 6) + 1
    target_epsg = (32600 if centroid.y >= 0 else 32700) + zone
    epsg_source = "AOI centroid UTM fallback"

print(f"target_epsg={target_epsg} ({epsg_source})")

# Refresh signed URLs before stacking (useful if notebook stays open for long)
if cfg["stac"]["provider"].lower() in {"planetary-computer", "pc", "microsoft"}:
    try:
        import planetary_computer as pc
        items = [pc.sign(item) for item in items]
    except Exception as e:
        print("Warning: refresh signing skipped:", e)

stack = stackstac.stack(
    items,
    assets=assets,
    epsg=target_epsg,
    resolution=cfg["preprocessing"]["target_resolution"],
    bounds_latlon=tuple(aoi_gdf.total_bounds),
    chunksize=1024,
    dtype="float32",
    fill_value=np.float32(np.nan),
    rescale=False,
)

print("Stack sizes:", dict(stack.sizes))
if int(stack.sizes.get("x", 0)) < 2 or int(stack.sizes.get("y", 0)) < 2:
    raise RuntimeError(
        "Stack degenerato (x o y < 2). Verifica EPSG/resolution, AOI e disponibilita asset."
    )

reflectance = stack.sel(band=optical_bands)
scl = stack.sel(band="SCL")
if "band" in scl.dims:
    scl = scl.squeeze("band", drop=True)

# Robust cloud/shadow mask without shape changes on dask-backed arrays
valid_mask = (~scl.isin(invalid_scl)) & scl.notnull()

reflectance = reflectance.where(valid_mask)
scale = float(cfg["preprocessing"]["normalization"]["reflectance_scale"])
reflectance = reflectance / scale

# Composito temporale robusto
composite = reflectance.median(dim="time", skipna=True).expand_dims(
    time=[np.datetime64(cfg["stac"]["date_range"]["end"])]
)
composite = composite.astype("float32")

print("Composite sizes:", dict(composite.sizes))
print(composite)


target_epsg=32632 (AOI centroid UTM fallback)
Stack sizes: {'time': 35, 'band': 5, 'y': 1004, 'x': 1176}
Composite sizes: {'time': 1, 'band': 4, 'y': 1004, 'x': 1176}
<xarray.DataArray 'stackstac-2cb9663e77b73eb6a282e3137b0909c5' (time: 1,
                                                                band: 4,
                                                                y: 1004, x: 1176)> Size: 19MB
dask.array<getitem, shape=(1, 4, 1004, 1176), dtype=float32, chunksize=(1, 1, 1004, 749), chunktype=numpy.ndarray>
Coordinates: (12/13)
  * time                                     (time) datetime64[s] 8B 2025-09-30
  * band                                     (band) <U3 48B 'B02' ... 'B08'
  * y                                        (y) float64 8kB 5.041e+06 ... 5....
  * x                                        (x) float64 9kB 5.084e+05 ... 5....
    s2:saturated_defective_pixel_percentage  float64 8B 0.0
    s2:product_type                          <U7 28B 'S2MSI2A'
    ...         

## 4) Tiling + split train/val

Convertiamo il composito in tile regolari (es. 256x256) e costruiamo un indice geospaziale.

### Contenuto del tile index

- coordinate pixel (`x0`, `y0`) per slicing nel cubo,
- `tile_id` univoco,
- `geometry` reale del tile (utile per join con label o analisi GIS),
- split `train` / `val` deterministico.

### Nota robustezza

Se il composito e troppo piccolo, la pipeline si ferma con errore esplicito.
E preferibile fallire presto piuttosto che produrre tile 1x1 privi di significato operativo.


In [25]:
import hashlib

tile_size_requested = int(cfg["preprocessing"]["tile_size"])
train_fraction = float(cfg["preprocessing"]["train_fraction"])

height = int(composite.sizes["y"])
width = int(composite.sizes["x"])

x_coords = composite["x"].values
y_coords = composite["y"].values

dx = float(np.abs(np.diff(x_coords[:2])).mean()) if len(x_coords) > 1 else float(cfg["preprocessing"]["target_resolution"])
dy = float(np.abs(np.diff(y_coords[:2])).mean()) if len(y_coords) > 1 else float(cfg["preprocessing"]["target_resolution"])

min_tile_size = 32
if min(height, width) < min_tile_size:
    raise RuntimeError(
        f"Composito troppo piccolo per tiling utile (y={height}, x={width}). "
        "Controlla EPSG/resolution/AOI prima di procedere."
    )

tile_size = min(tile_size_requested, height, width)
if tile_size < tile_size_requested:
    print(f"Tile size adattato da {tile_size_requested} a {tile_size} per compatibilità con shape ({height}, {width}).")

records = []

def split_from_id(tile_id: str, train_frac: float) -> str:
    bucket = int(hashlib.md5(tile_id.encode("utf-8")).hexdigest()[:8], 16) % 100
    return "train" if bucket < int(train_frac * 100) else "val"

for y0 in range(0, height - tile_size + 1, tile_size):
    for x0 in range(0, width - tile_size + 1, tile_size):
        tile_id = f"t0_y{y0}_x{x0}"
        split = split_from_id(tile_id, train_fraction)

        x_min = float(x_coords[x0] - dx / 2)
        x_max = float(x_coords[x0 + tile_size - 1] + dx / 2)
        y_top = float(y_coords[y0] + dy / 2)
        y_bottom = float(y_coords[y0 + tile_size - 1] - dy / 2)

        geom = box(min(x_min, x_max), min(y_bottom, y_top), max(x_min, x_max), max(y_bottom, y_top))

        records.append(
            {
                "tile_id": tile_id,
                "time_idx": 0,
                "x0": x0,
                "y0": y0,
                "split": split,
                "geometry": geom,
            }
        )

if not records:
    raise RuntimeError(
        "Nessun tile generato. Riduci preprocessing.tile_size, aumenta AOI o modifica la risoluzione target."
    )

tiles_gdf = gpd.GeoDataFrame(records, geometry="geometry", crs=f"EPSG:{target_epsg}")
print("Numero tile:", len(tiles_gdf))
display(tiles_gdf.head())


Numero tile: 12


Unnamed: 0,tile_id,time_idx,x0,y0,split,geometry
0,t0_y0_x0,0,0,0,val,"POLYGON ((510905 5038205, 510905 5040765, 5083..."
1,t0_y0_x256,0,256,0,train,"POLYGON ((513465 5038205, 513465 5040765, 5109..."
2,t0_y0_x512,0,512,0,val,"POLYGON ((516025 5038205, 516025 5040765, 5134..."
3,t0_y0_x768,0,768,0,train,"POLYGON ((518585 5038205, 518585 5040765, 5160..."
4,t0_y256_x0,0,0,256,val,"POLYGON ((510905 5035645, 510905 5038205, 5083..."


## 5) Packaging AI-ready: Zarr + GeoParquet + metadata

Qui serializziamo il dataset in formati adatti a workload ML su larga scala.

### Artefatti prodotti

- `data.zarr`: cubo multidimensionale chunked, ottimo per accesso lazy/parallelo.
- `tiles.parquet`: indice tile in formato tabellare geospaziale.
- `statistics.json`: statistiche per banda (mean/std) per normalizzazione consistente.
- `metadata.json`: contesto del run (AOI, date, collection, CRS, path output, cardinalita).

### Nota robustezza Zarr

Prima del salvataggio, gli attributi xarray vengono ripuliti in formato JSON-safe.
Questo evita errori come `Object of type RasterSpec is not JSON serializable`.


In [26]:
zarr_path = Path(cfg["io"]["zarr_path"])
tiles_path = Path(cfg["io"]["tiles_index_path"])
stats_path = Path(cfg["io"]["stats_path"])
metadata_path = Path(cfg["io"]["metadata_path"])

zarr_path.parent.mkdir(parents=True, exist_ok=True)
tiles_path.parent.mkdir(parents=True, exist_ok=True)
stats_path.parent.mkdir(parents=True, exist_ok=True)
metadata_path.parent.mkdir(parents=True, exist_ok=True)


def _json_safe(value):
    if isinstance(value, np.generic):
        return value.item()
    if isinstance(value, np.ndarray):
        return value.tolist()
    if isinstance(value, tuple):
        return [_json_safe(v) for v in value]
    if isinstance(value, list):
        return [_json_safe(v) for v in value]
    if isinstance(value, dict):
        return {str(k): _json_safe(v) for k, v in value.items()}
    return value


def _clean_attrs(attrs: dict) -> dict:
    clean = {}
    for k, v in attrs.items():
        vv = _json_safe(v)
        try:
            json.dumps(vv)
        except TypeError:
            continue
        clean[str(k)] = vv
    return clean


# Dataset Xarray per export Zarr
cube = composite.chunk({"time": 1, "band": len(optical_bands), "y": tile_size, "x": tile_size})
cube.attrs = _clean_attrs(cube.attrs)
for coord in cube.coords:
    cube.coords[coord].attrs = _clean_attrs(cube.coords[coord].attrs)

ds = xr.Dataset({"cube": cube})
ds.attrs = _clean_attrs(ds.attrs)
for var_name in ds.variables:
    ds[var_name].attrs = _clean_attrs(ds[var_name].attrs)

ds.to_zarr(zarr_path, mode="w")

# GeoParquet tile index
tiles_gdf.to_parquet(tiles_path, index=False)

# Statistiche per banda
mean = cube.mean(dim=("time", "y", "x"), skipna=True).compute().values.tolist()
std = cube.std(dim=("time", "y", "x"), skipna=True).compute().values.tolist()

stats_payload = {
    "bands": optical_bands,
    "mean": [float(x) for x in mean],
    "std": [float(x) if float(x) > 1e-6 else 1.0 for x in std],
    "tile_size": tile_size,
}
with stats_path.open("w", encoding="utf-8") as f:
    json.dump(stats_payload, f, indent=2)

metadata = {
    "aoi": cfg["aoi"]["name"],
    "collections": cfg["stac"]["collections"],
    "date_range": cfg["stac"]["date_range"],
    "provider": cfg["stac"]["provider"],
    "num_items": int(len(items)),
    "num_tiles": int(len(tiles_gdf)),
    "bands": optical_bands,
    "tile_size": int(tile_size),
    "zarr_path": str(zarr_path),
    "tiles_index_path": str(tiles_path),
    "stats_path": str(stats_path),
    "crs": f"EPSG:{target_epsg}",
}
with metadata_path.open("w", encoding="utf-8") as f:
    json.dump(metadata, f, indent=2)

print("Export completato:")
print("-", zarr_path)
print("-", tiles_path)
print("-", stats_path)
print("-", metadata_path)




Export completato:
- data/data.zarr
- data/tiles.parquet
- artifacts/statistics.json
- artifacts/metadata.json


## 6) Quality checks e report

Eseguiamo controlli base ma operativi per stimare qualita e affidabilita del dataset:

- **Nodata ratio**: quantifica dati mancanti post-masking.
- **Band completeness**: copertura informativa per ciascuna banda.
- **CRS consistency**: verifica coerenza dei sistemi di riferimento tra item.
- **Outlier ratio**: stima valori anomali dopo standardizzazione.
- **Tile coverage**: rapporto tra tile generati e tile attesi sulla griglia.

Le metriche vengono esportate in report Markdown/HTML per audit tecnico, handoff team e confronto tra run.

In [27]:
qc_md_path = Path(cfg["io"]["qc_report_md"])
qc_html_path = Path(cfg["io"]["qc_report_html"])
qc_md_path.parent.mkdir(parents=True, exist_ok=True)
qc_html_path.parent.mkdir(parents=True, exist_ok=True)

# Metriche principali
missing_ratio = float(cube.isnull().mean().compute())
band_completeness = (1.0 - cube.isnull().mean(dim=("time", "y", "x")).compute()).values

crs_unique = int(items_df["epsg"].dropna().astype(str).nunique())
crs_status = "OK" if crs_unique <= 1 else "CHECK"

mean_arr = np.array(stats_payload["mean"])
std_arr = np.array(stats_payload["std"])
z = (cube - xr.DataArray(mean_arr, dims=["band"])) / xr.DataArray(std_arr, dims=["band"])
outlier_ratio = float((np.abs(z) > 6).mean().compute())

expected_tiles = (height // tile_size) * (width // tile_size)
coverage_ratio = float(len(tiles_gdf) / expected_tiles) if expected_tiles else 0.0

summary_df = pd.DataFrame(
    [
        {"check": "nodata_ratio", "value": missing_ratio, "status": "OK" if missing_ratio < 0.4 else "CHECK"},
        {"check": "crs_consistency", "value": crs_unique, "status": crs_status},
        {"check": "outlier_ratio_abs_z_gt_6", "value": outlier_ratio, "status": "OK" if outlier_ratio < 0.01 else "CHECK"},
        {"check": "tile_coverage_ratio", "value": coverage_ratio, "status": "OK" if coverage_ratio > 0.95 else "CHECK"},
    ]
)

bands_df = pd.DataFrame(
    {
        "band": optical_bands,
        "completeness": band_completeness,
        "mean": stats_payload["mean"],
        "std": stats_payload["std"],
    }
)

report_md = """# QC Report - EO Dataset\n\n## Summary\n""" + summary_df.to_markdown(index=False) + """\n\n## Band Statistics\n""" + bands_df.to_markdown(index=False)

qc_md_path.write_text(report_md, encoding="utf-8")

report_html = (
    "<html><head><meta charset='utf-8'><title>EO QC Report</title></head><body>"
    "<h1>QC Report - EO Dataset</h1>"
    "<h2>Summary</h2>" + summary_df.to_html(index=False) +
    "<h2>Band Statistics</h2>" + bands_df.to_html(index=False) +
    "</body></html>"
)
qc_html_path.write_text(report_html, encoding="utf-8")

display(summary_df)
display(bands_df)
print("Report salvato in:", qc_md_path, "e", qc_html_path)


Unnamed: 0,check,value,status
0,nodata_ratio,0.0,OK
1,crs_consistency,0.0,OK
2,outlier_ratio_abs_z_gt_6,0.001463,OK
3,tile_coverage_ratio,1.0,OK


Unnamed: 0,band,completeness,mean,std
0,B02,1.0,0.182251,0.048993
1,B03,1.0,0.202959,0.051099
2,B04,1.0,0.211384,0.060711
3,B08,1.0,0.33725,0.084617


Report salvato in: reports/qc_report.md e reports/qc_report.html


## 7) Access pattern per training: DataLoader PyTorch

Questa sezione dimostra un pattern pratico di integrazione con training loop.

### Come funziona

- Il dataset legge l'indice tile da `tiles.parquet`.
- Ogni `__getitem__` apre/slicea lazy il blocco corrispondente in `data.zarr`.
- La normalizzazione usa `statistics.json`, garantendo coerenza tra train/val/inferenza.

### Perché scala

Non serve materializzare tutto in RAM: i batch vengono costruiti on-demand.
Questo approccio e adatto a dataset EO molto grandi, multi-temporali o multi-area.

In [28]:
from src.dataloader import EOTileDataset, make_dataloader

train_dataset = EOTileDataset(
    tile_index_path=tiles_path,
    zarr_path=zarr_path,
    stats_path=stats_path,
    split="train",
    tile_size=tile_size,
    feature_name="cube",
)

print("Numero sample train:", len(train_dataset))
if len(train_dataset):
    sample = train_dataset[0]
    print("Shape sample image [C,H,W]:", tuple(sample["image"].shape))

train_loader = make_dataloader(
    tile_index_path=tiles_path,
    zarr_path=zarr_path,
    stats_path=stats_path,
    split="train",
    tile_size=tile_size,
    batch_size=4,
    num_workers=0,
    feature_name="cube",
)

batch = next(iter(train_loader))
print("Batch image shape [B,C,H,W]:", tuple(batch["image"].shape))


Numero sample train: 8
Shape sample image [C,H,W]: (4, 256, 256)
Batch image shape [B,C,H,W]: (4, 4, 256, 256)


## 8) Recap finale

Con questo notebook hai una demo completa di EO data engineering, centrata sui pattern richiesti in ruolo:

- discovery STAC configurabile,
- accesso COG cloud-native,
- preprocessing robusto e tracciabile,
- packaging AI-ready standard,
- quality checks esportabili,
- data loader PyTorch lazy.

### Evoluzioni naturali verso produzione

1. Orchestrazione batch (Prefect/Airflow) con scheduling.
2. Versionamento dataset/metadata (es. DVC o catalog interno).
3. Integrazione label pipeline e monitoraggio quality drift nel tempo.