In [None]:
import os, sys
import spyndex
import rasterio
import rioxarray 

import geopandas as gpd
import pandas as pd
import numpy as np
import xarray as xr

from shapely.geometry import shape, box

import matplotlib.pyplot as plt

In [None]:
import earthdaily.earthone as eo
from earthdaily.earthone.catalog import MaskBand, Image, Product, properties as p
from earthdaily.earthone.compute import Function

Setting global variables, including:
* Input Product ID corresponding to the EarthDaily AI Ready EarthMosaics
* List of bands to input into our NDVI-MNDWI calculation, including **green**, **nir**, **red**, and **swir16**
* Output resolution, native Sentinel-2 res at 10 meters

In [None]:
compute_image = f"python{sys.version_info.major}.{sys.version_info.minor}:latest"

In [None]:
COLLECTION = "earthdaily:ai-ready-mosaics:sentinel-2"
BANDS = ["green", "nir", "red", "swir16"]
RESOLUTION=10.

Input geometry, which is the extent of our EarthMosaic scene:

In [None]:
geom = shape({
    'type': 'Polygon',
    'coordinates': [[
        [-53.44996912333322, -29.066805892187226],
        [-53.44996912333322, -30.244736984602106],
        [-51.124606456600304, -30.244736984602106],
        [-51.124606456600304, -29.066805892187226],
        [-53.44996912333322, -29.066805892187226]
    ]]
})
aoi = eo.geo.AOI(geom, resolution=100., crs="EPSG:3857")

Searching imagery from the EarthMosaics product, sorting by acquired date:

In [None]:
prod = Product.get(COLLECTION)
ic = prod.images().intersects(aoi).sort('acquired').collect()
dates = pd.to_datetime(list(ic.each.acquired))
ic

Retrieving our data as an ndarray, including raster metadata to feed into rioxarray:

In [None]:
ndarr, info = ic.stack(BANDS, raster_info=True)
ndarr.shape

Creating an xarray DataArray from our data, including adding spatial extensions for interoperability with rasterio via rioxarray:

In [None]:
da = xr.DataArray(
    ndarr, 
    dims=("time", "band", "y", "x"),
    coords={"time": dates, "band": BANDS},
    name="data"
)
da = da.rio.set_spatial_dims(x_dim="x", y_dim="y")
da = da.rio.write_crs(aoi.crs, inplace=False)
da = da.rio.write_transform(rasterio.Affine.from_gdal(*info[0]['geoTransform']), inplace=False)
da = da.assign_coords(band=BANDS)
da = da.sortby("time")
# da.isel(time=0).rio.to_raster("fcc.tif")

Accessing a single date to plot imagery:

In [None]:
ds = da.to_dataset(dim="band")
# ds.isel(time=0)[['nir', 'red', 'green']].rio.to_raster("fcc.tif")
ds.isel(time=0)[['nir', 'red', 'green']].to_dataarray().plot.imshow(origin='upper')

Calculating NDVI-MNDWI using Spyndex, applying the calculations to all dates represented in our dataset:

In [None]:
ndvimndwi = spyndex.computeIndex(index="MNDWI", G=ds['green'], N=ds['nir'], R=ds['red'], S1=ds['swir16'])
ndvimndwi = ndvimndwi.rio.write_crs(da.rio.crs, inplace=False)
ndvimndwi = ndvimndwi.rio.write_transform(da.rio.transform(), inplace=False)
ndvimndwi_band = ndvimndwi.expand_dims(band=["ndvimndwi"])

da_with_ndvimndwi = xr.concat([da, ndvimndwi_band], dim="band")
da_with_ndvimndwi = da_with_ndvimndwi.to_dataset(dim="band")

And plotting the results:

In [None]:
fig, ax = plt.subplots(figsize=(10,10),nrows=2, ncols=2)
da_with_ndvimndwi.isel(time=0)[['nir', 'red', 'green']].to_dataarray().plot.imshow(ax=ax[0][0], origin='upper')
ax[0][0].set_title("PRE FCC")
da_with_ndvimndwi.isel(time=1)[['nir', 'red', 'green']].to_dataarray().plot.imshow(ax=ax[1][0], origin='upper')
ax[1][0].set_title("POST FCC")
da_with_ndvimndwi.isel(time=0)['ndvimndwi'].plot.imshow(ax=ax[0][1], origin='upper', add_colorbar=False)
ax[0][1].set_title("PRE NDVI-MNDWI")
da_with_ndvimndwi.isel(time=1)['ndvimndwi'].plot.imshow(ax=ax[1][1], origin='upper', add_colorbar=False)
ax[1][1].set_title("POST NDVI-MNDWI")
for a in ax:
    for x in a:
        x.set_xticklabels([])
        x.set_yticklabels([])
        x.set_ylabel(None)
        x.set_xlabel(None)

Calculating the difference between these indices to arrive at a rough flooded extent:

In [None]:
# Last - first
diff = da_with_ndvimndwi['ndvimndwi'].isel(time=-1) - da_with_ndvimndwi['ndvimndwi'].isel(time=0)
MASK_THRESHOLD = float(diff.quantile(0.9))
diff_binary = diff.where(diff>=MASK_THRESHOLD).astype(np.uint8)

In [None]:
fig, ax = plt.subplots()
diff_binary.plot.imshow(origin='upper', cmap='viridis', add_colorbar=False, ax=ax)
ax.set_title("Flooded Extent")
ax.set_xticklabels([])
ax.set_yticklabels([])
ax.set_xlabel(None)
ax.set_ylabel(None)

## Scaling on Batch Compute

Setting an output Product ID:

In [None]:
auth = eo.auth.Auth().payload
out_pid = f"{auth['org']}:ndvimndwi-rgs-test-v1-{auth['userid']}"
out_pid

Deleting old iteration of this product, if applicable:

In [None]:
out_product = Product.get(out_pid)
if out_product:
    print("Product already exists, deleting old iteration")
    status = out_product.delete_related_objects()
    if status:
        status.wait_for_completion()
    print("Deleted related objects")
    out_product.delete()
    print("Deleted Product")

Creating the output Product:

In [None]:
out_product = Product.get_or_create(out_pid)
out_product.name = "Testing NDVI-MNDWI"
out_product.tags = ["examples"]
out_product.readers = []
out_product.save()
out_product

Creating the output Band **flood-extent**:

In [None]:
band = MaskBand.get_or_create(
    id=f"{out_product.id}:flood-extent",
    band_index=0,
    data_type=eo.catalog.DataType.BYTE,
    data_range=[0,1],
    display_range=[0,1],
    nodata=0,
    resolution=eo.catalog.Resolution(
        value=RESOLUTION, 
        unit=eo.catalog.ResolutionUnit.METERS
    ),
)
band.save()

Wrapping the NDVI-MNDWI methodology into a self-contained function, which accepts the following input parameters:
* DLTile Key
* Input Collection ID
* Bands to retrieve
* Mask Threshold below which is set to 0
* Output Product ID to write these flood masks to

In [None]:
def calculate_ndvimndwi(TILE_KEY, COLLECTION, BANDS, MASK_THRESHOLD, OUT_PID):
    import earthdaily.earthone as eo
    from earthdaily.earthone.catalog import Product, Image, properties as p
    import pandas as pd
    import xarray as xr
    import numpy as np
    import rioxarray
    import rasterio
    import spyndex
    import os
    
    tile = eo.geo.DLTile.from_key(TILE_KEY)
    out_fpath = f"diff_{TILE_KEY.replace(':', '_')}.tif"
    print(f"Processing tile: {TILE_KEY}")
    
    prod = Product.get(COLLECTION)
    ic = prod.images().intersects(tile).sort('acquired').collect()
    dates = pd.to_datetime(list(ic.each.acquired))
    ndarr, info = ic.stack(BANDS, raster_info=True)
    print("Retrieved pixel data")
    da = xr.DataArray(
        ndarr, 
        dims=("time", "band", "y", "x"),
        coords={"time": dates, "band": BANDS},
        name="data"
    )
    
    da = da.rio.set_spatial_dims(x_dim="x", y_dim="y")
    da = da.rio.write_crs(tile.crs, inplace=False)
    da = da.rio.write_transform(rasterio.Affine.from_gdal(*info[0]['geoTransform']), inplace=False)
    da = da.assign_coords(band=BANDS)
    da = da.sortby("time")

    ds = da.to_dataset(dim="band")
    
    ndvimndwi = spyndex.computeIndex(index="MNDWI", G=ds['green'], N=ds['nir'], R=ds['red'], S1=ds['swir16'])
    ndvimndwi = ndvimndwi.rio.write_crs(da.rio.crs, inplace=False)
    ndvimndwi = ndvimndwi.rio.write_transform(da.rio.transform(), inplace=False)
    ndvimndwi_band = ndvimndwi.expand_dims(band=["ndvimndwi"])
    
    da_with_ndvimndwi = xr.concat([da, ndvimndwi_band], dim="band")
    da_with_ndvimndwi = da_with_ndvimndwi.to_dataset(dim="band")

    diff = da_with_ndvimndwi['ndvimndwi'].isel(time=-1) - da_with_ndvimndwi['ndvimndwi'].isel(time=0)
    threshold = diff.quantile(0.8)
    diff_binary = diff.where(diff>=threshold).astype(np.uint8)
    
    print(f"Calculated difference, masking to {MASK_THRESHOLD}")
    
    diff_binary = diff.where(diff>=MASK_THRESHOLD).astype(np.uint8)
    diff_binary.rio.to_raster(
        out_fpath,
        dtype="uint8",
        compress="deflate",
        predictor=2,
        zlevel=6,
        tiled=True,
        blockxsize=256,
        blockysize=256,
        BIGTIFF="IF_SAFER",
    )
    print(f"Saved {out_fpath}")
    image = Image(
        id=f"{OUT_PID}:{TILE_KEY.replace(':', '_')}"
    )
    image.acquired = ic[-1].acquired
    image.extra_properties = {"MASK_THRESHOLD": MASK_THRESHOLD}
    upload = image.upload([out_fpath], overwrite=True)
    upload.wait_for_completion()
    print(f"Upload: {upload.status}ful")
    os.remove(out_fpath)
    return image.id

Tiling up the geometry using [`DLTile`s]():

In [None]:
tiles = eo.geo.DLTile.from_shape(geom, resolution=10., tilesize=2048, pad=0)
len(tiles)

Getting a sample tile to investigate our scene at full resolution:

In [None]:
tile = tiles[20]

In [None]:
fig, ax = plt.subplots()
gpd.GeoDataFrame({"geometry":[geom]}).plot(ax=ax)
gpd.GeoDataFrame({"geometry": list(t.geometry for t in tiles)}).plot(ax=ax, facecolor='none', edgecolor='k')
gpd.GeoDataFrame({"geometry": [tile.geometry]}).plot(ax=ax, facecolor='none', edgecolor='r')

Testing locally:

In [None]:
img_id = calculate_ndvimndwi(tile.key, COLLECTION, BANDS, 0.8, out_pid)
plt.imshow(Image.get(img_id).ndarray('flood-extent')[0])

We _could_ wait forever and iterate in a for loop:

In [None]:
# for tile in tiles:
#     calculate_ndvimndwi(tile.key, COLLECTION, BANDS, out_pid)

Or deploy this using Compute by creating a `Function` which will map asynchronously across each of our tiles:

In [None]:
async_func = Function(
    calculate_ndvimndwi,
    name="NDVI-MNDWI",
    image=compute_image,
    cpus=1,
    memory=2,
    timeout=300,
    maximum_concurrency=20,
    retry_count=0,
    requirements=["rioxarray","spyndex"],
)
async_func.save()
print(f"Saved {async_func.id}")

In [None]:
args = [(t.key, COLLECTION, BANDS, MASK_THRESHOLD, out_pid) for t in tiles]
len(args)

In [None]:
jobs = async_func.map(args)
len(jobs)

Navigate to [earthone.earthdaily.com/compute](https://earthone.earthdaily.com/compute) to track your Function's build and progress.

In [None]:
import earthdaily.earthone.dynamic_compute as dc
from earthdaily.earthone.dynamic_compute import Mosaic

In [None]:
pre_start, pre_end = ('2024-02-20', '2024-02-22')
post_start, post_end = ('2024-05-05', '2024-05-07')

In [None]:
pre_mosaic = Mosaic.from_product_bands(COLLECTION, BANDS, start_datetime=pre_start, end_datetime=pre_end)
post_mosaic = Mosaic.from_product_bands(COLLECTION, BANDS, start_datetime=post_start, end_datetime=post_end)

In [None]:
m = dc.map
m.center = -29.9648, -51.4042
m.zoom = 12

In [None]:
pre_mosaic.pick_bands("nir red green").visualize("PRE FCC", m)
post_mosaic.pick_bands("nir red green").visualize("POST FCC", m)

In [None]:
out_mosaic = Mosaic.from_product_bands(out_pid, "flood-extent")
out_mosaic.pick_bands("flood-extent").visualize("Flood Mask", m, colormap='Blues', scales=[0,1])

In [None]:
m

In [None]:
# https://app.earthone.earthdaily.com/explorer?id=earthdaily:f108d742-528d-4a51-b023-645d87063a35