## Setup

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import geopandas as gpd
import json
import matplotlib.pyplot as plt
import numpy as np
import semantique as sq
import xarray as xr


import os
os.environ['USE_PYGEOS'] = '0'
import geopandas

In the next release, GeoPandas will switch to using Shapely by default, even if PyGEOS is installed. If you only have PyGEOS installed to get speed-ups, this switch should be smooth. However, if you are using PyGEOS directly (calling PyGEOS functions on geometries from GeoPandas), this will then stop working and you are encouraged to migrate from PyGEOS to Shapely 2.0 (https://shapely.readthedocs.io/en/latest/migration_pygeos.html).
  import geopandas as gpd


In [3]:
# Load a mapping.
with open("files/mapping.json", "r") as file:
    mapping = sq.mapping.Semantique(json.load(file))

# Represent an EO data cube.
with open("files/layout.json", "r") as file:
    dc = sq.datacube.GeotiffArchive(json.load(file), src = "files/layers.zip")

# Set the spatio-temporal extent.
space = sq.SpatialExtent(gpd.read_file("files/footprint.geojson"))
time = sq.TemporalExtent("2019-01-01", "2020-12-31")

## How the cache works

RAM memory requirements are proportional to the number of data layers that are stored as intermediate results. Caching data layers in RAM should only be done for those that are needed again when evaluating downstream parts of the recipe. This requires foresight about the evaluation order of the recipe, which accordingly requires a preview run preceding the actual evaluation. This preview run is performed by loading the data with drastically reduced spatial resolution (5x5 pixel grid). It resolves the data references and fills a cache by creating a list of the data references in the order in which they are evaluated. This list is then used dynamically during the actual evaluation of the recipe as a basis for keeping data layers in the cache and reading them from there if they are needed again.

Below the result of the preview run is shown first to demonstrate what the resolved data references look like. The resulting initialised cache can then be fed as a context element to the QueryProcessor in a second step for the actual recipe execution.

In [4]:
from semantique.processor.core import QueryProcessor

# define a simple recipe for a cloudfree composite
recipe = sq.QueryRecipe()
red_band = sq.reflectance("s2_band04")
green_band = sq.reflectance("s2_band03")
blue_band = sq.reflectance("s2_band02")
recipe["composite"] = sq.collection(red_band, green_band, blue_band).\
    filter(sq.entity("cloud").evaluate("not")).\
    reduce("median", "time").\
    concatenate("band")

# define context 
context = {
    "datacube": dc, 
    "mapping": mapping,
    "space": space,
    "time": time,
    "crs": 3035, 
    "tz": "UTC", 
    "spatial_resolution": [-10, 10],
}

In [5]:
# step I: preview run
qp = QueryProcessor.parse(recipe, **{**context, "preview": True})
qp.optimize().execute()
qp.cache.seq

[('reflectance', 's2_band04'),
 ('reflectance', 's2_band03'),
 ('reflectance', 's2_band02'),
 ['atmosphere', 'colortype']]

In [6]:
# step II: query processor execution
qp = QueryProcessor.parse(recipe, **{**context, "cache": qp.cache})
result = qp.optimize().execute()
result["composite"].shape

(3, 563, 576)

As you can see the preview run resolves the references to the data layers as they are provided by looking up the entities' references in the mapping.json. Note, that in the current case the result is not that interesting, though, since four different data layers are to be loaded. Therefore, there is nothing to be cached during recipe execution. Therefore the QueryProcessor will load all data layers from the referenced sources without storing any of them in the cache. 

As a user, however, you can directly initiate the entire caching workflow (preview & full resolution recipe execution) by setting the context parameter when calling `recipe.execute(..., caching=True)`. 

In [7]:
# same as above in a single step 
result = recipe.execute(**{**context, "caching": True})

## Assessment of cache performance

Now let's analyse some timing differences in executing a recipe with/without caching. Most importantly, the timing difference depends on...
* the redundancy of the data references in the recipe, i.e. if layers are called multiple times loading them from cache will reduce the overall time significantly
* the data source (EO data cube) from which they are loaded

Especially for the later it should be noted that in this demo only data loaded from a locally stored geotiff (i.e. the GeoTiffArchive layout) are analysed. This is sort of the worst case for demonstrating the benefits of caching since the data is stored locally and is therfore quickly accessible. Also geotiffs that are not stored in cloud-optimised format (CoGs) require to load the whole data into memory even when running in preview mode just to evaluate the sequence of data layers.

Consequently, you will observe that in almost all of the following cases, caching actually adds a small computational overhead. Keep in mind, however, that caching is designed for and particularly beneficial in case of STACCubes when loading data over the internet.

In [8]:
# function to compare timing for given recipe 
def eval_timing(recipe, caching=False):
    context = {
        "datacube": dc, 
        "mapping": mapping,
        "space": space,
        "time": time,
        "crs": 3035, 
        "tz": "UTC", 
        "spatial_resolution": [-10, 10],
        "caching": caching
    }
    res = recipe.execute(**context)

In [9]:
# recipe I
recipe_I = sq.QueryRecipe()
red_band = sq.reflectance("s2_band04")
green_band = sq.reflectance("s2_band03")
blue_band = sq.reflectance("s2_band02")
recipe_I["composite"] = sq.collection(red_band, green_band, blue_band).\
    filter(sq.entity("cloud").evaluate("not")).\
    reduce("median", "time").\
    concatenate("band")

In [10]:
%%timeit
# without caching
_ = eval_timing(recipe_I, False)

649 ms ± 14.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [11]:
%%timeit
# with caching
_ = eval_timing(recipe_I, True)

998 ms ± 5.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [12]:
# recipe II
recipe_II = sq.QueryRecipe()
recipe_II["dates"] = sq.entity("vegetation").\
    filter(sq.self()).\
    assign_time().\
    reduce("first", "time")

In [13]:
%%timeit
# without caching
_ = eval_timing(recipe_II, False)

5.09 s ± 61.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [14]:
%%timeit
# with caching
_ = eval_timing(recipe_II, True)

5.27 s ± 51.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [15]:
# recipe III
recipe_III = sq.QueryRecipe()
recipe_III["water_count_time"] = sq.entity("water").reduce("count", "time")
recipe_III["vegetation_count_time"] = sq.entity("vegetation").reduce("count", "time")
recipe_III["water_count_space"] = sq.entity("water").reduce("count", "space")
recipe_III["vegetation_count_space"] = sq.entity("vegetation").reduce("count", "space")

In [16]:
%%timeit
# without caching
_ = eval_timing(recipe_III, False)

499 ms ± 5.31 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [17]:
%%timeit
# with caching
_ = eval_timing(recipe_III, True)

547 ms ± 4.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


The more expressive examples for the STACCube are provided below. Note that they can't be executed for now (as STACCube in currently still under dev and not yet merged in the main branch). The question if caching brings significant advantages when loading data from a well-indexed OpenDataCube stored on a quickly accessible hot storage, remains to be assessed. 

In [10]:
from pystac_client import Client
from shapely.geometry import box
from semantique.processor.core import QueryProcessor
import warnings

# define temporal & spatial range to perform STAC query
xmin, ymin, xmax, ymax = 13.25,54.25,13.75,54.75
aoi = box(xmin, ymin, xmax, ymax)
t_range = ["2020-07-15", "2020-09-01"]

# STAC-based metadata retrieval
import planetary_computer as pc
platform = "Planet"
catalog = Client.open(
    "https://planetarycomputer.microsoft.com/api/stac/v1",
    modifier=pc.sign_inplace,
)
query = catalog.search(
    collections="sentinel-2-l2a", 
    datetime=t_range, 
    limit=100, 
    intersects=aoi
)
item_coll = query.item_collection()

# define datacube
with open("layout_planet.json", "r") as file:
    dc = sq.datacube.STACCube(
        json.load(file), 
        src = item_coll,
        dtype="int8",
        na_value=0,
        )
        
# define spatio-temporal context vars 
res = 20
epsg = 3035
space = sq.SpatialExtent(gpd.GeoDataFrame(geometry=[aoi], crs = 4326))
time = sq.TemporalExtent(*t_range)

# load mapping
with open("mapping.json", "r") as file:
    rules = json.load(file)
mapping = sq.mapping.Semantique(rules)

# define recipe
recipe = sq.QueryRecipe()
recipe["green_map"] = (
    sq.entity("vegetation")
    .filter(sq.entity("cloud").evaluate("not"))
    .reduce("percentage", "time")
)
recipe["all_count"] = (
    sq.entity("all")
    .reduce("count", "time")
)

In [11]:
# normal execution (no caching/no preview)
context = {
    "datacube": dc,
    "mapping": mapping,
    "space": space,
    "time": time,
    "crs": epsg,
    "tz": "UTC",
    "spatial_resolution": [-res, res]
}

with warnings.catch_warnings():
    warnings.simplefilter("ignore", UserWarning)
    response = recipe.execute(**context)

In [12]:
# preview mode
context = {
    "datacube": dc,
    "mapping": mapping,
    "space": space,
    "time": time,
    "crs": epsg,
    "tz": "UTC",
    "spatial_resolution": [-res, res],
    "preview": True
}

with warnings.catch_warnings():
    warnings.simplefilter("ignore", UserWarning)
    response = recipe.execute(**context)

In [13]:
# caching mode
context = {
    "datacube": dc,
    "mapping": mapping,
    "space": space,
    "time": time,
    "crs": epsg,
    "tz": "UTC",
    "spatial_resolution": [-res, res],
    "caching": True
}

with warnings.catch_warnings():
    warnings.simplefilter("ignore", UserWarning)
    response = recipe.execute(**context)