# FilmDrop Analytics with Sentinel-2
### Created by Element 84: http://element84.com/

This notebook explores Sentinel-2 data on Earth Search using

 - Earth Search STAC API: https://earth-search.aws.element84.com/v1/, catalog of public data
 - pystac-client: https://pystac-client.readthedocs.io/, for searching and access data
 - OpenDataCube: https://www.opendatacube.org/ and odc-stac https://odc-stac.readthedocs.io/ for loading STAC assets and representing geospatial data as XArrays
 - XArray: http://xarray.pydata.org/en/stable/, pandas https://pandas.pydata.org/ and geopandas https://geopandas.org/ for manipulating data
 - Dask: https://dask.org/ for performing parallel, distributed computing
 - Folium https://python-visualization.github.io/folium/index.html and hvplot https://hvplot.holoviz.org/ for visualization

Shown will be how find data for an area of interest, explore the resulting metadata,
perform calculations, and visualize the results.

In [None]:
# set pystac_client logger to DEBUG to see API calls
import logging
logging.basicConfig()
logger = logging.getLogger('pystac_client')
logger.setLevel(logging.INFO)

# ItemMap class for display on a slippy map

import folium
import requests

class ItemMap(object):
    _colors = {
        'red': '#fc0f03',
        'green': '#27AD0C',
        'blue': '#0f03fc'
    }
    
    def __init__(self, item, tiles='Stamen Watercolor'):
        self.item = item
        self.m = folium.Map(tiles=tiles)
        sw = item.bbox[1], item.bbox[0]
        ne = item.bbox[3], item.bbox[2]
        self.m.fit_bounds([sw, ne])
        self.legend = {}

    def display(self):
        return self.m
    
    @classmethod
    def create_map(cls, item, **kwargs):
        m = cls(item[0], **kwargs)
        
        # original footprint
        m.add_item(item, name='', color='red', weight=6)

        # add image asset
        #href = item.assets[asset].href
        #m.add_asset(href)
        return m

    def add_item(self, item, name, color='red', weight=2):
        style = {
            'fillColor': '#00000000',
            'color': self._colors[color],
            'weight': weight
        }
        folium.GeoJson(item.to_dict(), style_function=lambda x: style).add_to(self.m)
        #label = f"{name} {summarize_geometry(item)}"
        #self.legend[label] = self._colors[color]
        
    def add_geom(self, geom, name, color='blue', weight=2):
        style = {
            'fillColor': '#00000000',
            'color': self._colors[color],
            'weight': weight
        }
        folium.GeoJson(geom, style_function=lambda x: style).add_to(self.m)

    def add_asset(self, href):
        # add image
        stats = requests.get(f"http://titiler:8000/cog/statistics?url=" + href).json()['1']
        tileset = "http://127.0.0.1:8000/cog/tiles/{z}/{x}/{y}?&url=" + href
        tileset = tileset + f"&rescale={stats['percentile_2']},{stats['percentile_98']}"
        tile_layer = folium.TileLayer(
            tiles = tileset,
            attr=item.id
        )
        tile_layer.add_to(self.m)


In [None]:
# Use pystac-client to find data in the Earth Search STAC API.
#
# Open the Earth Search STAC API

from pystac_client import Client
URL = 'https://earth-search.aws.element84.com/v1/'
api = Client.open(URL)
print(api)

In [None]:
# Fetch the collection of interest and print the assets that are available.
import pandas as pd

collection = api.get_collection('sentinel-2-l2a')
pd.DataFrame.from_dict(collection.to_dict()['item_assets'], orient='index')

In [None]:
# load the geometry of the AOI (GeoJSON Feature)
filename = "aois/bear-fire.geojson"
from pathlib import Path
from json import loads
geom = loads(Path(filename).read_text())['geometry']

import geopandas as gpd
aoi = gpd.read_file(filename)['geometry'][0]

query = api.search(
    collections=["sentinel-2-l2a"],
    intersects=geom,
    datetime="2019-10-01/2021-10-01",
    limit=100,
    query = [
        "eo:cloud_cover<10"
    ]
)
item_collection = query.item_collection()

print(f"Found: {len(item_collection):d} STAC Items")
item_collection

In [None]:
# view footprints

asset = 'visual'
    
m = ItemMap.create_map(item_collection, tiles='OpenStreetMap')

m.add_geom(geom, 'aoi', color='blue', weight=4)

display(m.display())

In [None]:
%%time
# Here we load as a DataCube. A PySTAC ItemCollection is created from the found STAC Items,
# and we specify various parameters, such as bands of interest and chunk size.
# We are requesting to only load pixels within a bounding box of the requested
# geometry (`bbox=geom.bounds`).

from odc.stac import stac_load

dc = stac_load(item_collection,
               measurements=['red', 'green', 'blue', 'nir'],
               chunks={"x": 1024, "y": 1024},
               bbox=aoi.bounds,
               groupby='solar_day',
)
dc

In [None]:
# Calculations
#
# We will create an RGBA datacube representation (`nodata` values have `alpha=0`),
# and generate an NDVI datacube.

vis = dc.odc.to_rgba(vmin=1, vmax=2000, bands=['blue', 'green', 'red'])
vis

In [None]:
ndvi = ((dc['nir'] - dc['red']) / (dc['nir'] + dc['red'])).clip(0, 1).rename("ndvi")
ndvi

In [None]:
from dask_gateway import Gateway, GatewayCluster

gw = Gateway()
clusters = gw.list_clusters()

# Max number of CPU cores per worker 1 / Max memory per worker 4GB
# Specifying a bit less than whole number values for cpu cores and memory 
# will allow Dask worker pods to be packed more tightly onto 
# the underlying EC2 instances.
# It will take a few minutes for the underlying EC2 instances to be created and for
# the Dask Workers to be scheduled onto those EC2 instances.
# Running the client command as you see below will provide a dashboard view of the
# number of workers that have been successfully brought online.

if len(clusters) == 0:
    cluster = GatewayCluster(worker_cores=0.8, worker_memory=3.6)
else:
    cluster = gw.connect(clusters[0].name)

cluster.scale(25)
client = cluster.get_client()
client

In [None]:
%%time

# Now, we kick off our Dask computation by using the Dask persist function,
# which will keep the data in memory on the cluster for faster access later.
#
# The Dask `compute` function is used when we actually want the data, such as displaying it.

from dask.distributed import wait

ndvi, vis = client.persist([ndvi, vis])
_ = wait([ndvi, vis])

In [None]:
%%time

vis_ = vis.compute()

import hvplot.xarray

hvplot_kwargs = {
    "frame_width": 800,
    "xaxis": None,
    "yaxis": None,
    "widget_location": "bottom",
    "aspect": len(vis.x)/len(vis.y)
}

vis_.hvplot.rgb('x', 'y', bands='band', groupby='time', **hvplot_kwargs)

In [None]:
ndvi_ = ndvi.compute()
ndvi_.hvplot('x', 'y', groupby='time', **hvplot_kwargs)

In [None]:
%%time
ndvi_mean = ndvi.mean(dim=['x', 'y']).compute()
ndvi_mean.hvplot()

In [None]:
# Stopping Dask cluster and cleaning resources

client.close()
cluster.shutdown()
cluster.close()