# Scene Noise Analysis

Evaluate post-retrieval noise components using matched-filter outputs.

In [None]:
import os
import sys
from pathlib import Path

import numpy as np
import pandas as pd
from osgeo import gdal, ogr

NOTEBOOK_ROOT = Path.cwd().resolve()
REPO_ROOT = NOTEBOOK_ROOT
while not (REPO_ROOT / "scripts").exists() and REPO_ROOT.parent != REPO_ROOT:
    REPO_ROOT = REPO_ROOT.parent
if not (REPO_ROOT / "scripts").exists():
    raise RuntimeError("Could not locate repository root containing scripts directory.")

os.environ.setdefault("PYTHONPATH", str(REPO_ROOT))
if str(REPO_ROOT) not in sys.path:
    sys.path.insert(0, str(REPO_ROOT))

print(f"Notebook root: {NOTEBOOK_ROOT}")
print(f"Repository root: {REPO_ROOT}")


In [None]:
# Configure processed scenes here. Update paths before running the analysis.
SCENES = [
    {
        "name": "Example Scene",
        "concentration_path": Path('path/to/concentration.tif'),
        "uncertainty_path": Path('path/to/uncertainty.tif'),
        # Optional polygon vector file (e.g. GeoJSON, Shapefile) describing plume areas to exclude
        "plume_polygons": None,
        # Optional additional boolean mask as NumPy .npy/.npz file (1==keep, 0==mask out)
        "mask_path": None,
        # Aggregation for Ïƒ_RMN within plume-free pixels: 'mean' or 'median'
        "sigma_rmn_aggregation": 'mean',
    },
]

SUMMARY_OUTPUT = NOTEBOOK_ROOT / 'outputs' / 'uncertainty' / 'scene_noise_summary.csv'


In [None]:
def _read_single_band(path: Path) -> np.ndarray:
    path = Path(path).expanduser()
    if not path.exists():
        raise FileNotFoundError(path)
    ds = gdal.Open(str(path))
    if ds is None:
        raise RuntimeError(f'Unable to open raster: {path}')
    arr = ds.ReadAsArray()
    ds = None
    return np.array(arr, dtype=float)

def _load_optional_mask(mask_path, reference_shape):
    if mask_path is None:
        return None
    mask_path = Path(mask_path).expanduser()
    if not mask_path.exists():
        raise FileNotFoundError(mask_path)
    if mask_path.suffix.lower() in {'.npy', '.npz'}:
        data = np.load(mask_path)
        if isinstance(data, np.lib.npyio.NpzFile):
            key = list(data.keys())[0]
            arr = data[key]
            data.close()
        else:
            arr = data
    else:
        arr = _read_single_band(mask_path)
    mask = np.asarray(arr).astype(bool)
    if mask.shape != reference_shape:
        raise ValueError(f'Mask shape {mask.shape} does not match reference {reference_shape}.')
    return mask

def _rasterize_plumes(vector_path, reference_path):
    if vector_path is None:
        return None
    vector_path = Path(vector_path).expanduser()
    if not vector_path.exists():
        raise FileNotFoundError(vector_path)
    ref_ds = gdal.Open(str(reference_path))
    if ref_ds is None:
        raise RuntimeError(f'Unable to open reference raster: {reference_path}')
    driver = gdal.GetDriverByName('MEM')
    mask_ds = driver.Create('', ref_ds.RasterXSize, ref_ds.RasterYSize, 1, gdal.GDT_Byte)
    mask_ds.SetGeoTransform(ref_ds.GetGeoTransform())
    mask_ds.SetProjection(ref_ds.GetProjection())
    vector_ds = ogr.Open(str(vector_path))
    if vector_ds is None:
        raise RuntimeError(f'Unable to open vector: {vector_path}')
    layer = vector_ds.GetLayer()
    gdal.RasterizeLayer(mask_ds, [1], layer, burn_values=[1])
    mask = mask_ds.ReadAsArray().astype(bool)
    mask_ds = None
    layer = None
    vector_ds = None
    ref_ds = None
    return mask


In [None]:
records = []
for scene_cfg in SCENES:
    name = scene_cfg['name']
    conc_path = scene_cfg['concentration_path']
    unc_path = scene_cfg['uncertainty_path']
    print(f'Processing {name}')
    concentration = _read_single_band(conc_path)
    uncertainty = _read_single_band(unc_path)
    if concentration.shape != uncertainty.shape:
        raise ValueError('Concentration and uncertainty shapes do not match.')

    plume_mask = _rasterize_plumes(scene_cfg.get('plume_polygons'), conc_path)
    additional_mask = _load_optional_mask(scene_cfg.get('mask_path'), concentration.shape)

    valid = np.isfinite(concentration) & np.isfinite(uncertainty)
    if plume_mask is not None:
        valid &= ~plume_mask
    if additional_mask is not None:
        valid &= additional_mask

    if valid.sum() == 0:
        raise ValueError(f'No valid pixels remain for scene {name}.')

    delta_x = concentration[valid]
    sigma_tot = float(np.nanstd(delta_x, ddof=1))

    sigma_rmn_vals = uncertainty[valid]
    agg = scene_cfg.get('sigma_rmn_aggregation', 'mean').lower()
    if agg == 'median':
        sigma_rmn_sq = float(np.nanmedian(sigma_rmn_vals ** 2))
    else:
        sigma_rmn_sq = float(np.nanmean(sigma_rmn_vals ** 2))

    clutter_var = max(0.0, sigma_tot ** 2 - sigma_rmn_sq)
    sigma_surf = float(np.sqrt(clutter_var))

    record = {
        'scene': name,
        'concentration_path': str(conc_path),
        'uncertainty_path': str(unc_path),
        'valid_pixels': int(valid.sum()),
        'plume_pixels': int(plume_mask.sum()) if plume_mask is not None else 0,
        'sigma_tot': sigma_tot,
        'sigma_rmn_sq_mean': sigma_rmn_sq,
        'sigma_surf': sigma_surf,
        'sigma_rmn_aggregation': agg,
    }
    records.append(record)

summary_df = pd.DataFrame(records)
summary_df


In [None]:
if records:
    output_path = SUMMARY_OUTPUT.expanduser()
    output_path.parent.mkdir(parents=True, exist_ok=True)
    summary_df.to_csv(output_path, index=False)
    print(f'Saved summary to {output_path}')
