# Geocube Ingester Demo

-------

**Short description**

This notebook introduces you to the Geocube Ingester. You will learn how to populate a Geocube using an automatic ingester.


The Geocube Ingester is an example of a complete and parallelizable service to feed the Geocube. The user posts an AOI, a time interval and a set of parameters (to compute the output layers). The ingester takes care of everything from the downloading of the products, the computing and its parallelization and the ingestion in the Geocube.

It is composed of three services : workflow, downloader and processor. It is connected to a Geocube and has a couple of interfaces to integrate in the user environment. Some implementations of the interfaces are available and the user is free to implement others according to its environment.


<img src="data/IngesterArchitecture.png" width=800>

-------

**Requirements**

-------

- The Geocube Ingester (github.com/airbusgeo/geocube-ingester.git)
- A Scihub account (SCIHUB_USERNAME and SCIHUB_PASSWORD environment variable)
- A Geocube server and the parameters to connect (for the purpose of this notebook, GEOCUBE_SERVER and GEOCUBE_CLIENTAPIKEY environment variable)

If the Geocube ingester is run in a local environment:
- ESA SNAP >= 8.0 (https://step.esa.int/main/download/snap-download/)

-------

**Installation**

-------

Follow the [Geocube Ingester Installation](https://github.com/airbusgeo/geocube-ingester/blob/main/INSTALL.MD) guide.

-------

**Start services**

-------

NB: Geocube server must be started before running workflow server.
If you don't have any Geocube Server running, follow the [Geocube Installation Guide](https://github.com/airbusgeo/geocube/blob/main/INSTALL.MD).
You can run a [local geocube server](https://github.com/airbusgeo/geocube/blob/main/INSTALL.MD#docker-compose) using [docker-compose](https://docs.docker.com/compose/).

Start services [using docker-compose](https://github.com/airbusgeo/geocube-ingester/blob/main/INSTALL.MD#docker-compose) or one by one:
- [Pubsub emulator](https://github.com/airbusgeo/geocube-ingester/blob/main/INSTALL.MD#pubsub-emulator)
- [Downloader service](https://github.com/airbusgeo/geocube-ingester/blob/main/INSTALL.MD#downloader)
- [Processor service](https://github.com/airbusgeo/geocube-ingester/blob/main/INSTALL.MD#processor)
- [Workflow service](https://github.com/airbusgeo/geocube-ingester/blob/main/INSTALL.MD#workflow)


## 1 - Ingester pipeline & payload

The ingestion is done in five steps:

<img src="data/IngesterPipeline.png" width=800>

The input of the ingester is a payload called **Area**. It contains an AOI, a date interval, parameters defining the raw products, parameters defining the processing and parameters defining the products to be ingested in the Geocube.

The payload is a GeoJSON (all fields are mandatory unless otherwise stated):
- AOI according to GeoJSON standards (`type`, `geometry`, `coordinates`...)
- `name`: Unique name used to identify the Area in the workflow. After a first ingestion, new scenes can be added to the same area, benefiting from automatic scenes reference picking (useful for S1-bursts).
- `start_time`, `end_time`: date interval
- `scene_type`: describing the type of the products to be downloaded
    - `constellation`: Name of the Satellite Constellation (currently supported : sentinel1, sentinel2)
    - `parameters`: (optional) specific parameters to filter the results (see Scihub API guide)
- `scene_graph_name`: name of the graph that will be used just after downloading the scene (or "CopyToStorage")
- `tile_graph_name`: name of the graph that will be used to process each tiles (or "Pass")
- `graph_config`: (optional): specific configuration of the graphs
- `layers`: mapping between layers to be indexed in the Geocube and the corresponding variable.instance from the Geocube (see Geocube Documentation).
    - `layername: {"variable":"variable_name", "instance":"instance_name"}`
- `record_tags` (optional): user-defined tags for identifying/creating the record in the Geocube.

An example of a payload:

In [None]:
import json
payloadFile = "data/DenmarkDemo.json"
with open(payloadFile, "r") as f:
    j = json.load(f)
    print(json.dumps(j, indent=4))
payloadName = j["name"]
    
# Display AOI
import geopandas as gpd
import matplotlib.pyplot as plt
from geocube import utils
aoi = utils.read_aoi(payloadFile)
world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres'))
base = world.plot(color='lightgrey', edgecolor='white')
gpd.GeoSeries(aoi, crs='epsg:4326').plot(ax=base, edgecolor='black')
plt.xlim([aoi.bounds[0]-0.2, aoi.bounds[2]+0.2])
plt.ylim([aoi.bounds[1]-0.2, aoi.bounds[3]+0.2])

### Variable & Instance dependencies

The processor service will index images referenced by variables and instances.

For the purpose of this tutorial, these variables have to be created in the Geocube (Geocube server uri is defined as `GEOCUBE_SERVER` environment variable):

In [None]:
import geocube
import os
from geocube import entities, utils

# Define the connection to the server
secure = False # in local, or true to use TLS
geocube_client_server  = os.environ['GEOCUBE_SERVER']        # e.g. 127.0.0.1:8080 for local use
geocube_client_api_key = os.environ['GEOCUBE_CLIENTAPIKEY']  # Usually empty for local use

client = geocube.Client(geocube_client_server, secure, geocube_client_api_key)

def create(variable, instance, metadata, profile):
    try:
        client.create_variable(variable, **profile)
    except utils.GeocubeError:
        pass
    try:
        client.variable(variable).instantiate(instance, metadata)
    except utils.GeocubeError:
        pass


profile = {'dformat': ('float32', 0, 0, 1), 'bands': [''], 'resampling_alg': entities.Resampling.cubic}
profile['description'] = "Coherence VH - Terrain corrected (SRTM3sec)"
create("CoherenceVH", "master", {"processor": "snap8"}, profile)

profile['description'] = "Coherence VV - Terrain corrected (SRTM3sec)"
create("CoherenceVV", "master", {"processor": "snap8"}, profile)

profile['description'] = "Backscatter VV - Terrain corrected (SRTM3sec)"
create("BackscatterSigma0VV", "RNKell", {"method": "Kellndorfer", "processor": "snap8"}, profile)

profile['description'] = "Backscatter VH - Terrain corrected (SRTM3sec)"
create("BackscatterSigma0VH", "RNKell", {"method": "Kellndorfer", "processor": "snap8"}, profile)

### Notebook init

Set the URI (including PORT) of the workflow server and init the Notebook.

In [None]:
import json
import os
from shutil import copyfile
import warnings

try:
    os.mkdir('outputs')
except:
    pass

# Change workflow URI to your workflow (e.g. 127.0.0.1:8082)
workflow_server = os.environ.get('GEOCUBE_INGESTER_WORKFLOW')

def json_pretty_print(file):
    with open(file, "r") as f:
        j = json.load(f)
    print(json.dumps(j, indent=4, sort_keys=True))
    
def is_wrong_json(jsonData):
    if jsonData.read(1) != "{":
        warnings.warn("Wrong json file: " + jsonData.read())
        return True
    return False

## 2 - List scenes

The first step of the ingestion is to list the scenes available on the AOI at the given dates.
The ingester will query the scenes from a catalogue provider (by default Scihub).

In [None]:
!curl -X GET -s -F "area=@{payloadFile}" {workflow_server}/catalog/scenes > outputs/scenes.json

with open("outputs/scenes.json") as json_file:
    if is_wrong_json(json_file):
        print("Error retrieving scenes. Use backup instead")
        os.remove("outputs/scenes.json")
        copyfile("data/scenes.json", "outputs/scenes.json")

json_pretty_print("outputs/scenes.json")

## 3 - List tiles
Then, the scenes will be divided into tiles. By default, for Sentinel-2, the tile is the whole image and for Sentinel-1, the scenes are divided in bursts. The burst inventory is done using annotations available in the SAFE file. Creodias provides a service to download these annotations files without downloading the whole file.

In [None]:
!curl -X GET -s -F "area=@{payloadFile}" -F "scenes=@outputs/scenes.json" {workflow_server}/catalog/tiles > outputs/tiles.json

with open("outputs/tiles.json") as json_file:
    if is_wrong_json(json_file):
        print("Error retrieving scenes. Use backup instead")
        os.remove("outputs/tiles.json")
        copyfile("data/tiles.json", "outputs/tiles.json")

json_pretty_print("outputs/tiles.json")

## 4 - Post Area
Then the Area, with scenes and tiles, is posted to the workflow service that is in charge of creating and running the processing flow.

Using tiles.json :

In [None]:
!curl -F "area=@{payloadFile}" -F "tiles=@outputs/tiles.json" {workflow_server}/catalog/aoi

Using scenes.json :

In [None]:
!curl -F "area=@{payloadFile}" -F "scenes=@outputs/scenes.json" {workflow_server}/catalog/aoi

From scratch:

In [None]:
!curl -F "area=@{payloadFile}" {workflow_server}/catalog/aoi

## 5 - Monitoring
The scenes to be downloaded are sent to the Downloader Service, then the tiles to be processed are sent to the Processor Service. If an autoscaller is configured, the downloading and the processing are done in parallel using all available machines.

Some EndPoints are available to monitor this processing-flow.

### Aoi info
- Overview of the workload for an AOI: `GET: /aoi/{aoi}`
- Pretty display of the workflow: `GET: /aoi/{aoi}/dot`

In [None]:
!curl {workflow_server}/aoi/{payloadName}
!curl -s {workflow_server}/aoi/{payloadName}/dot > outputs/{payloadName}.dot

import graphviz
dot = graphviz.Source.from_file(payloadName + '.dot', directory="outputs")
filename=dot.render(format='png')
from IPython.display import Image
with open(os.path.join(os.getcwd(), filename),'rb') as f:
    display(Image(data=f.read(), format='png', width=1024, height=1024))

### Scene Info
- List Scenes of an AOI: `GET /aoi/{aoi}/scenes`


In [None]:
!curl -s {workflow_server}/aoi/{payloadName}/scenes  > outputs/listScenesFromAOI.json
json_pretty_print("outputs/listScenesFromAOI.json")

- Get Scenes of an AOI filtered by Status: `GET /aoi/{aoi}/scenes/{status}` (status in \[NEW, PENDING, DONE, RETRY, FAILED\])

In [None]:
!curl -s {workflow_server}/aoi/{payloadName}/scenes/PENDING  > outputs/pendingScenes.json
pending_scene_id = int
with open('outputs/pendingScenes.json') as json_file:
    data = json.load(json_file)
    pending_scene_id = data[0]['id']
json_pretty_print("outputs/pendingScenes.json")

- Get Scene using its id: `GET /scene/{scene}`

In [None]:
!curl -s {workflow_server}/scene/{pending_scene_id}  > outputs/scene.json
json_pretty_print("outputs/scene.json")

### Tiles Infos
- Get Tiles of a Scene: `GET /scene/{scene}/tiles`

In [None]:
!curl -s {workflow_server}/scene/{pending_scene_id}/tiles  > outputs/tilesFromScene.json
tile_id = int
with open('outputs/tilesFromScene.json') as json_file:
    data = json.load(json_file)
    tile_id = data[0]['id']
json_pretty_print("outputs/tilesFromScene.json")

- Get Tile using its id: `GET /tile/{tile}`

In [None]:
!curl -s {workflow_server}/tile/{tile_id}  > outputs/getTiles.json
json_pretty_print("outputs/getTiles.json")

- Get Tiles of an AOI filtered by Status: `GET /aoi/{aoi}/tiles/{status}` (status in \[NEW, PENDING, DONE, RETRY, FAILED\])


In [None]:
!curl -s {workflow_server}/aoi/{payloadName}/tiles/NEW  > outputs/tilesFromStatusAOI.json
json_pretty_print("outputs/tilesFromStatusAOI.json")

### Others monitoring endpoints availables
- `PUT /scene/{scene}/retry` > retry the scene (iif scene.Status=RETRY)
- `PUT /scene/{scene}/fail` > tag the scene and all its tiles as failed and update the graph of dependencies (iif scene.Status=RETRY if `/force` is not stated)


- `PUT /tile/{tile}/retry` > retry the tile (iif tile.Status=RETRY)
- `PUT /tile/{tile}/fail` > tag the tile as failed and update the graph of dependencies  (iif tile.Status=RETRY if `/force` is not stated)

- `POST /aoi/{aoi}` > create a new AOI
- `POST /aoi/{aoi}/scene` > add a new scene and its tiles to the graph of dependencies
- `PUT /aoi/{aoi}/retry` > retry all the scenes and tiles of the AOI (iif Status=RETRY)





## Check the results of the ingestion
As the aoi is quite small, all the images in the aoi can be retrieved at once.

Otherwise, it has to be tiled with, for example, `client.tile_aoi()`. See [Client-Python DataAccess notebook](https://github.com/airbusgeo/geocube-client-python/Jupyter/Geocube-Client-DataAccess.ipynb) for more details.

In [None]:
import geopandas as gpd
plt.rcParams['figure.figsize'] = [60, 15]

records = client.list_records(aoi=aoi, tags={"source":"tutorial", "constellation":"SENTINEL1"})
variables = {
    "BackscatterSigma0VV": "RNKell",
    "BackscatterSigma0VH": "RNKell",
    "CoherenceVV": "master",
    "CoherenceVH": "master"
}

tile = entities.Tile.from_bbox(gpd.GeoSeries(aoi, crs=4326).to_crs(32632).total_bounds, "epsg:32632", resolution=20)

for variable, instance in variables.items():
    v = client.variable(variable).instance(instance)
    cp = entities.CubeParams.from_tile(tile, records=records, instance=v)
    fig, axs = plt.subplots(1, len(records), constrained_layout=True)
    fig.suptitle(variable, fontsize=60)
    for i, (image, metadata, err) in enumerate(client.get_cube_it(cp)):
        if err is None:
            axs[i].imshow(image[...,0], cmap="gray", vmin=v.dformat.min_value, vmax=v.dformat.max_value)
            axs[i].set_title(metadata.min_date, fontsize=40)