# Create MF ground truth dataset

Set up the config files to run our EMIT MF satellite data pipeline over the same "ground truth" sites used by the EMIT CV training script.

In [1]:
import datetime
import json

import pandas as pd
import shapely

In [2]:
gt_sites = pd.read_csv("../../src/data/ancillary/EMIT_ground_truth_plumes.csv")
gt_sites.head()

Unnamed: 0,lat,lon,date,site,quantification_kg_h,source
0,32.821792,-111.786123,2024-12-02,"Casa Grande, AZ",755.0369,SBR 2024
1,32.821792,-111.786123,2024-11-28,"Casa Grande, AZ",943.2397,SBR 2024
2,31.34677,-101.79872,2024-02-12,,18163.0,IMEO Notified Plumes
3,38.30074,-96.13034,2024-04-05,,92358.0,IMEO Notified Plumes
4,32.07549,-103.2788,2024-04-25,,1553.0,IMEO Notified Plumes


## Part 1: Create the MF configs

Create the configs for running the Matched Filter (MF) over our ground truth sites.
We should only have to do this once unless ground truth sites have changed.
The MF results for these sites are stored locally in `emit_mf_gt_retrieval.nc`, which can be used for comparison in Part 2 below.

In [8]:
# We pretend that our GT sites are operator assets for our config
mock_assets = []

extent_buffer = 0.00005

for idx, props in gt_sites.iterrows():
    geom = shapely.geometry.box(
        props.lon - extent_buffer, props.lat - extent_buffer, props.lon + extent_buffer, props.lat + extent_buffer
    )

    asset = {
        "asset_id": 1000000 + idx,
        "name": props.source,
        "lat": props.lat,
        "lon": props.lon,
        "geometry": geom.wkt,
        "tile_id": "",  # TODO do we need this?
    }

    mock_assets.append(asset)

In [9]:
config = {
    "operator_name": "emit_mf_for_unet_comparison",
    "operator_type": "pilot",
    "satellite_config": {
        "EMIT": {
            "tag": "0.2.18",
        }
    },
    "runs": mock_assets,
}

In [11]:
# with open("emit_mf_for_unet_comparison_mwaa.json", "w") as fs:
#     json.dump(config, fs)

We now have a config for running our pipelines end-to-end, but that will end up with a lot more than we need:
- writing lots to database
- many artifacts in s3

This is more problematic if our staging env is not fully up-and-running.

We could alternatively run `satellite_data_product.emit.run` locally. For that we need to have identified the unique EMIT granules we want to process.

In [57]:
from src.data.emit_data import query_emit_catalog

In [58]:
%%time

_all_emit_ids = []
_indices = []

for idx, props in gt_sites.iterrows():
    start_date = datetime.datetime.fromisoformat(props.date)
    end_date = start_date + datetime.timedelta(days=1)
    site_emit_ids = query_emit_catalog(props.lat, props.lon, start_date, end_date)
    _all_emit_ids += site_emit_ids
    _indices += [idx] * len(site_emit_ids)

all_emit_ids = pd.DataFrame({"site_id": _indices, "emit_id": _all_emit_ids})

2025-03-10 10:24:10,834 - INFO - Granules found: 1
2025-03-10 10:24:11,389 - INFO - Granules found: 0
2025-03-10 10:24:11,915 - INFO - Granules found: 1
2025-03-10 10:24:12,745 - INFO - Granules found: 1
2025-03-10 10:24:13,525 - INFO - Granules found: 1
2025-03-10 10:24:14,266 - INFO - Granules found: 1
2025-03-10 10:24:14,778 - INFO - Granules found: 1
2025-03-10 10:24:15,842 - INFO - Granules found: 1
2025-03-10 10:24:16,584 - INFO - Granules found: 1
2025-03-10 10:24:17,101 - INFO - Granules found: 1
2025-03-10 10:24:17,580 - INFO - Granules found: 2
2025-03-10 10:24:17,838 - INFO - Choosing EMIT_L1B_RAD_001_20240627T160707_2417911_021 out of ['EMIT_L1B_RAD_001_20240627T160655_2417911_020', 'EMIT_L1B_RAD_001_20240627T160707_2417911_021'] options
2025-03-10 10:24:18,383 - INFO - Granules found: 1
2025-03-10 10:24:19,060 - INFO - Granules found: 1
2025-03-10 10:24:19,863 - INFO - Granules found: 1
2025-03-10 10:24:20,585 - INFO - Granules found: 1
2025-03-10 10:24:21,385 - INFO - Gra

CPU times: user 535 ms, sys: 42.4 ms, total: 578 ms
Wall time: 43.6 s


In [60]:
# # Save a copy of the mapping between sites and EMIT granules

indexed_gt_sites = gt_sites.merge(all_emit_ids, left_index=True, right_on="site_id", how="inner")
indexed_gt_sites = indexed_gt_sites.assign(
    dual_index="siteid_" + indexed_gt_sites.site_id.astype(str) + "-" + "emitid_" + indexed_gt_sites.emit_id
)
indexed_gt_sites.set_index("dual_index", inplace=True)
# indexed_gt_sites.to_csv("emit_gt_granule_map.csv")

In [25]:
unique_emit_ids = all_emit_ids.drop_duplicates(subset="emit_id")
unique_emit_ids.shape

(41, 2)

In [29]:
individual_run_configs = []

for _, props in unique_emit_ids.iterrows():
    gt_site = gt_sites.loc[props.site_id]

    start_date = datetime.datetime.fromisoformat(gt_site.date)
    end_date = start_date + datetime.timedelta(days=1)

    geom = shapely.geometry.box(
        gt_site.lon - extent_buffer,
        gt_site.lat - extent_buffer,
        gt_site.lon + extent_buffer,
        gt_site.lat + extent_buffer,
    )

    run_config = {
        "run_area_name": gt_site.source,
        "run_area_geometry": geom.wkt,
        "tile_id": props.emit_id,
        "start_date": start_date.isoformat(),
        "end_date": end_date.isoformat(),
    }
    individual_run_configs.append(run_config)

In [31]:
# with open("emit_mf_for_unit_comparison_local.json", "w") as fs:
#     json.dump(individual_run_configs, fs)

## Part 2: Crop to common extent

**NOTE** run this on AWS *after* the satellite data pipelines have been run using the above configs.

We want matching crops around the points of interest between U-Net and MF.
Note that MF results will have been orthorectified but not the U-Net ones!

This should also only need to be run once

In [None]:
import datetime

import numpy as np
import pandas as pd
import rioxarray
import xarray as xr
from rasterio.errors import RasterioIOError

In [None]:
site_granule_map = pd.read_csv("emit_gt_granule_map.csv", index_col="dual_index")

In [None]:
%%time

# Concatenate together crops of all ground truth locations

_all_crops = []

crop_size = 128
crop_buffer = 8  # add a small buffer to account for rotations during orthorectification
buffered_crop_size = crop_size + crop_buffer
half_crop = buffered_crop_size // 2

for dual_index, site in site_granule_map.iterrows():
    granule_id = site["emit_id"]
    date = datetime.datetime.fromisoformat(site["date"])

    mf_retrieval_uri = (
        f"s3://orbio-scratch/emit_data/asset_data/{granule_id}/{date.year}/{date.month}/{date.day}/retrieval.tif"
    )

    try:
        da = rioxarray.open_rasterio(mf_retrieval_uri)
    except RasterioIOError:
        print(f"Cannot find granule {granule_id}. Skipping")
        continue

    center_x = np.argmin(np.abs(da.x.values - site.lon))
    center_y = np.argmin(np.abs(da.y.values - site.lat))

    xslice = slice(center_x - half_crop, center_x + half_crop)
    yslice = slice(center_y - half_crop, center_y + half_crop)

    crop = da.isel(x=xslice, y=yslice)

    # remove attributes and coords that might cause a conflict when concatenating
    crop = crop.drop_attrs().drop_vars("spatial_ref")

    try:
        # reindex our spatial dimension to a base-0 index rather than spatial coords (also to avoid
        # conflicts when concatentating)
        crop = crop.assign_coords(x=np.arange(buffered_crop_size), y=np.arange(buffered_crop_size))
    except ValueError:
        # For now just skip errors if our crop is less than complete
        print(f"Incomplete crop for granule {granule_id}. Skipping")
        continue

    crop = crop.expand_dims(dual_index=[dual_index])

    _all_crops.append(crop)

all_crops = xr.concat(_all_crops, dim="dual_index")
del _all_crops

In [None]:
# # NOTE only datasets can be written to NetCDF, hence why we convert it below
# # (it would be converted automatically but with a less clear name)
# # https://docs.xarray.dev/en/stable/generated/xarray.DataArray.to_netcdf.html#xarray.DataArray.to_netcdf
# all_crops.to_dataset(name="mf_retrievals").to_netcdf("emit_mf_gt_retrievals.nc", encoding={"mf_retrievals": {"zlib": True, "complevel": 5}})