# Geocube Data Consolidation Tutorial

-------

**Short description**

This notebook introduces you to the Geocube Python Client. You will learn how to optimize the data format of the images in the Geocube. This process is called Consolidation.

-------

**Requirements**

-------

- Python 3.7
- The Geocube Python Client library : https://github.com/airbusgeo/geocube-client-python.git
- The Geocube Server & Client ApiKey (for the purpose of this notebook, GEOCUBE_SERVER and GEOCUBE_CLIENTAPIKEY environment variable)

- To have done the **Geocube Data Indexation Tutorial** or to have access to a Geocube with data.

- **Geocube DataAccess** and **DataIndexation** tutorials are highly recommended

-------

**Installation**

-------

Install Python client:

```shell
pip install --user git+https://github.com/airbusgeo/geocube-client-python.git
```

Run dockers (example):
```shell
export STORAGE=$(pwd)

docker run --rm --network=host -e PUBSUB_EMULATOR_HOST=localhost:8085 -v $STORAGE:$STORAGE geocube -project geocube-emulator -ingestionStorage=$STORAGE/ingested -dbConnection=postgresql://user:password@localhost:5432/geocube -psEventsTopic events -psConsolidationsTopic consolidations -local -cancelledJobs $STORAGE/cancelled-jobs

docker run --rm --network=host -e PUBSUB_EMULATOR_HOST=localhost:8085 -v $STORAGE:$STORAGE geocube-consolidater /consolidater -project geocube-emulator -workdir=/tmp -psEventsTopic events -psConsolidationsSubscription

```

## 0 - Introduction to Consolidation

Consolidation is the process of optimizing the data format, the projection and the tiling of the datasets to fit with the needs of the project.
Depending on the depth of the timeseries that is usually needed, the size of the tiles requested, the memory requirements, etc, the datasets can be optimized to improve the speed of access to the data or the memory impact.

**Consolidation is not mandatory**, but some applications, especially those requiring massive data and deep timeseries, may suffer from poorly formatted images.

For instance, consolidation may be a game changer in the following cases (all the more, if the images are retrieved more than once - reprocessing, visualisation):
- if the image format is not cloud-optimized (jpeg2000, GeoTiff, ...)
- if the images are retrieved as timeseries
- if the storage has a high latency per object.
- if the datasets are not in the right projection or resolution
- if the images are retrieved with low resolutions (creation of overviews)
- if the processing requires small tiles (e.g. deep learning)


During the consolidation, the datasets will be tiled, reprojected, casted and merged into files optimized for timeseries.


## 1 - Connect to the Geocube


In [None]:
import math
import os
import uuid
from datetime import datetime
import numpy as np
from matplotlib import pyplot as plt
%matplotlib inline
plt.rcParams['figure.figsize'] = [20, 16]

`Consolidater` is derived from `Client`. It adds functionalities to Consolidate the datasets and handle jobs.

In [None]:
from geocube import Consolidater, utils, entities

# Define the connection to the server
secure = False # in local, or true to use TLS
geocube_client_server  = os.environ['GEOCUBE_SERVER']        # e.g. 127.0.0.1:8080 for local use
geocube_client_api_key = os.environ['GEOCUBE_CLIENTAPIKEY']  # Usually empty for local use

# Connect to the server
consolidater = Consolidater(geocube_client_server, secure, geocube_client_api_key)

## 2 - Consolidation parameters
The consolidation parameters that describe the data format of the optimized datasets are linked to a variable.
- Internal `Dataformat` 
- `Exponent` for the mapping between internal dataformat and `variable.dformat` (see formula below)
- Creation of `Overviews`
- `Resampling algorithm` used for reprojection and overviews
- `Compression` of the data
- `Bands interleave` if the variable is multi-bands


For the consolidation process, the external min/max (below: MinOut/MaxOut) are the Min/Max of the variable.
For an complete explanation of the internal dataformat and the exponent, see the [Data Indexation Tutorial # Dataset](./Geocube-Client-DataIndexation.ipynb#5---Dataset).
<img src="images/DataFormatExample.png" width=800>

The consolidation parameters of a variable are configured with `config_consolidation()`. A call to `config_consolidation` will update the consolidation parameters of the variable and it will only affect the future consolidations.

Below, the variable is configured to consolidate datasets using `int16` datatype (half the size of the float32 data type of the variable). The variable defines a range of values equals to `float32[0, 1]`. So `[0, 1]` will be internally mapped to `[0, 255]` with an internal nodata equals to -32768.

In [None]:
variable = consolidater.variable(name="RGB")
variable.config_consolidation(
    dformat=("i2", -32768, 0, 255), 
    resampling_alg=entities.Resampling.cubic
)

print("The consolidation process will cast {}[{},{}] to {}[{},{}]\n".format(
    variable.dformat.dtype,
    variable.dformat.min_value,
    variable.dformat.max_value,
    variable.consolidation_params.dformat.dtype, 
    variable.consolidation_params.dformat.min_value, 
    variable.consolidation_params.dformat.max_value))
print(variable.name, variable.consolidation_params)
    

## 3 - Layout
The datasets will be tiled, reprojected and stacked on a grid defined by a *Layout*.
The layout has *external parameters* that define the grid:
- `grid_parameters` : dict of parameters, containing at least a grid type (actually only `singlecell` and `regular` are available)
- `grid_flags` : list of flags

and *internal parameters* that define the internal tiling and the depth of the stacking:
- `block_shape`
- `max_records` per file

The layout must be carefully defined depending on the performance expected in terms of access.
The size of the cell of the grid multiplied by the maximum number of the records and the datatype will give the maximum size of the final files.

### Regular layout

The regular layout defined a regular grid in a given CRS. The `Layout.regular()` function is a shortcut to define a regular grid.


In [None]:
layout_name = "32632_20m"
layout = entities.Layout.regular(
    name=layout_name,
    crs="epsg:32632",
    cell_size=4096,
    resolution=20,
    block_size=256,
    max_records=1000
)
try:
    consolidater.create_layout(layout)
    print("Layout created")
except utils.GeocubeError as e:
    print(e)


# Example:
aoi = utils.read_aoi("inputs/Denmark.json")
cells=consolidater.tile_aoi(aoi, layout_name=layout_name)
base = entities.Tile.plot(cells)
import geopandas as gpd
aoi_gpd = gpd.GeoDataFrame({'id': ['1'], 'geometry': gpd.GeoSeries(aoi, crs='epsg:4326')})
aoi_gpd.plot(ax=base, color='None', edgecolor='black')

### Single-cell layout

The single-cell layout defines a grid of one cell in a given CRS. At the beginning of the consolidation, the aoi of all the datasets will be projected and merged in the given crs. The bounds of this aoi give the size of the cell.

Single-cell layout can be used to consolidate a bunch of already aligned records, like Sentinel-2 granules.

*Be careful with Single-cell Layout as the merged aoi may be very large and caused memory errors.*

The `Layout.single_cell()` function is a shortcut to define a single-cell grid.

In [None]:
name = "SingleCellUTM32N_20m"
layout = entities.Layout.single_cell(
    name=name,
    crs="epsg:32632",
    resolution=20,
    block_size=256,
    max_records=1000
)
try:
    consolidater.create_layout(layout)
    print("Layout created")
except utils.GeocubeError as e:
    print("Layout already exists")

# Example:
aoi = utils.read_aoi("inputs/Denmark.json")
cells=consolidater.tile_aoi(aoi, layout_name=name)
base = entities.Tile.plot(cells)
import geopandas as gpd
aoi_gpd = gpd.GeoDataFrame({'id': ['1'], 'geometry': gpd.GeoSeries(aoi, crs='epsg:4326')})
aoi_gpd.plot(ax=base, color='None', edgecolor='black')

### List available layouts
`list_layouts()` function lists all the layouts already defined in the geocube. With the `name_like` argument, the layouts are filtered by name.

In [None]:
layouts = consolidater.list_layouts("")
for l in layouts:
    print(l)
    

## 4 - Consolidate

Consolidation is an asynchronous process that is defined by a *Job*. A Job is a state machine that can be easily canceled or retried at any state.

<img src="images/ConsolidationProcess.png" width=800>

A consolidation job is defined by a **name**, a **variable**, a **layout** and **records** that can be passed as a list of records id or as filters (tags, from_time, to_time).


In [None]:
# Get records
jobName = f'MyConsolidation{uuid.uuid1()}'
records = consolidater.list_records(tags={'source':'tutorial','satellite':'SENTINEL2B','constellation':'SENTINEL2'}, with_aoi=True)
layout = consolidater.list_layouts("SingleCellUTM32N_20m")[0]

# Get the variable RGB:master
rgb = consolidater.variable(name="RGB").instance("master")

job = consolidater.consolidate(jobName, rgb, layout, records, execution_level=entities.ExecutionLevel.STEP_BY_STEP_CRITICAL)
print(job)

### Step-by-step jobs
A job can be done step-by-step (`STEP_BY_STEP_CRITICAL`, `STEP_BY_STEP_MAJOR`, `STEP_BY_STEP_ALL`) or all in a row.
When a step is finished, its status changed and it waits for a user action.
If a job is in a waiting mode, start the next step by calling `next()`.

In [None]:
print(job.refresh())
if job.waiting:
    job.next()

### View job tasks
Once the job has finished preparing the consolidation orders (state `CONSOLIDATION_CREATED`), the tasks can be visualized for control.

In [None]:
job.refresh()
base=job.plot_tasks()
aoi=entities.Record.list_to_geodataframe(records)
aoi.plot(ax=base)
bounds=aoi.total_bounds
margin=1
base.set_xlim(bounds[0] - margin, bounds[2] + margin)
base.set_ylim(bounds[1] - margin, bounds[3] + margin)

In [None]:
consolidater.wait_job(job, wait_secs=1)

### Check consolidation results

In [None]:
cube_params = entities.CubeParams.from_records(
    crs           = "epsg:32632",
    transform     = entities.geo_transform(563087,6195234, 200),
    shape         = (128, 128),
    instance      = rgb,
    records       = records)
metadata = consolidater.get_cube_metadata(cube_params)

for s in metadata.slices:
    for m in s.metadata:
        print(f"{os.path.basename(m.container_uri)}:{m.container_subdir} => {s.record.name}")

images, rs = consolidater.get_cube(cube_params, verbose=False)

nbimages=len(images)
plt.rcParams['figure.figsize'] = [20, 16]
for f in range(0, nbimages):
    plt.subplot(math.ceil(nbimages/4), 4, f+1).set_axis_off()
    plt.imshow(images[f])

### Deletion task
If the geocube owns the datasets that are consolidated (`managed=True` when containers are indexed), it will delete the original datasets using a separate, asynchronous job.

In [None]:
deletion_job_name = job.deletion_job_from_logs()
if deletion_job_name == "":
    raise Exception("Unable to find deletion_job_name from logs. " + ("Please wait until the job finishes" if job.state != 'DONE' else "Perhaps, there is nothing to delete"))
deletion_job = consolidater.job(deletion_job_name)
print(deletion_job)

### Retry or cancel a job 
In case of failure, a job can be retried or canceled.
If a job is cancelled, a complete rollback is done to retrieve the original state.

Both functions have a `force` parameter that can be used to retry or cancel a job that is stuck for an unexpected reason. Some steps can take a lot of time (in particular `CONSOLIDATION_INPROGRESS`). So be sure that the job is really stuck before calling these functions with the `force` parameter. 

In [None]:
# job.retry(force=False)
# job.cancel(force=False)

### List jobs

In [None]:
jobs = consolidater.list_jobs()

for job in jobs:
    print(job)

# 5 - Custom grids & Layouts
It's possible to define a custom grid as a set of cells.
A cell is defined by an ID (the couple (gridName, ID) must be unique), a CRS and geographic coordinates.

In [None]:
import geopandas
from geocube.utils import grid
g = geopandas.read_file("./inputs/UTM_grid.geojson")
cells = grid.utm(g.ZONE, g.ROW_, g.geometry)

print("UTM Cell:", cells[0])
try:
    consolidater.create_grid(entities.Grid("UTM", "UTM Grid", cells))
except utils.GeocubeError as e:
    print(e.codename + ": " + e.details)

In [None]:
for g in consolidater.list_grids(""):
    print(g)

Then, a Layout can be created and an AOI can be tiled using this grid.

In [None]:
layout = entities.Layout(
    name="UTM_4096_256_20m",
    grid_parameters={"grid":"UTM", "resolution":"20"},
    grid_flags=[],
    block_shape=(256, 256),
    max_records=1000
)
aoi = utils.read_aoi("inputs/Denmark.json")
cells=consolidater.tile_aoi(aoi, layout=layout)

# Graphical visualization of tiles and AOI
import geopandas as gpd
gpd.GeoSeries(aoi, crs='epsg:4326').plot(ax=entities.Tile.plot(cells), color='None', edgecolor='black')

If necessary, the cells of the grid can be subdivided using another grid (currently, only regular is supported).
In the following example, the UTM grid, is subdivided using a regular grid of size 4096x4096

In [None]:
layout = entities.Layout(
    name="UTM_4096_256_20m",
    grid_parameters={"grid":"UTM", "subgrid":"regular", "resolution":"20", "cell_size": "4096"},
    grid_flags=[],
    block_shape=(256, 256),
    max_records=1000
)
aoi = utils.read_aoi("inputs/Denmark.json")
cells=consolidater.tile_aoi(aoi, layout=layout)

# Graphical visualization of tiles and AOI
import geopandas as gpd
gpd.GeoSeries(aoi, crs='epsg:4326').plot(ax=entities.Tile.plot(cells), color='None', edgecolor='black')

## 6 - Conclusion
In this notebook, you have learnt to consolidate datasets

## 7 - Benchmark
Consolidation of 100 datasets and retrieving GetCube request.
The following code creates 100 different datasets (it requires 500Mb of memory), indexes and consolidates them.

It can be used to do benchmarks:
- Copy the fake data to another storage and change the uris when indexing to benchmark different storages.
- Try to add workers or to increase the blockSize of the Server or the Downloader service (args `--workers` and `--gdalBlockSize`) to see the impact on the time of retrieval.

### Clean 

In [None]:
import shutil
import os

cwd = os.getcwd()
    
print('Clean consolidation files')
try:
    #`Admin` is derived from `Client`. It adds admin functionalities to be used with cautious.
    from geocube import Admin
    admin = Admin(geocube_client_server, secure, geocube_client_api_key, verbose=False)
    records = admin.list_records(tags={'source':'notebook_consolidation'})
    admin.admin_delete_datasets(records=records,instances=[],execution_level=entities.ExecutionLevel.SYNCHRONOUS)
    admin.delete_records(records=records)
except utils.GeocubeError as e:
    if e.codename != "NOT_FOUND":
        raise



### Generate Fake Data

In [None]:
import rasterio
import os

os.makedirs('inputs/data', exist_ok=True)

with rasterio.open('inputs/consolidation.tif') as ds:
    im = ds.read()
    profile = ds.profile
for i in range(100):
    with rasterio.open(f'inputs/data/consolidation{i}.tif', 'w', **profile) as ds:
        ds.write(im+i)
    
print('Done!')

### Create AOI

In [None]:
aoi = utils.read_aoi('inputs/consolidation.json')
aoi_id = consolidater.create_aoi(aoi, exist_ok=True)
print("AOI created with id "+aoi_id)


### Create variable

In [None]:
#Create Variable 
variable_name = "Geocube_benchmark"
variable = consolidater.create_variable(
    name=variable_name,
    dformat={"dtype":"u1", "no_data": 220, "min_value": 0, "max_value": 255},
    bands=['GREY'],
    description="",
    unit="",
    resampling_alg=entities.Resampling.bilinear,
    exist_ok=True)

try:
    instance = variable.instantiate("master", {})
except utils.GeocubeError as e:
    instance = variable.instance("master")
    
print(instance)

### Create records

In [None]:
from datetime import datetime
records = consolidater.create_records(aoi_ids=[aoi_id]*100,names=[f'record_consolidation{i}' for i in range(100)],dates=[datetime.now() for _ in range(100)],tags=[{'source':'notebook_consolidation'}]*100)
print(f'Records Created: {records}')

### Index Datasets

In [None]:
from geocube.entities import DataFormat

records = consolidater.list_records(tags={'source':'notebook_consolidation'})

# Indexation
for i in range(100):
    try:
        consolidater.index([entities.Container(
            uri=f'{cwd}/inputs/data/consolidation{i}.tif',
            managed=True,
            datasets=[entities.Dataset(records[i], instance)]
        )])
    except utils.GeocubeError as e:
        if e.codename == "ALREADY_EXISTS":
            print('Datasets already indexed')
        else:
            raise
    
print('Indexation complete')

### Get Cube benchmark

In [None]:
import affine
import time
from geocube import entities
from matplotlib import pyplot as plt

def get_cube_benchmark(records):
    crs="epsg:2154"
    transform = affine.Affine.translation(526988.10, 6704991.01) * affine.Affine.scale(20,20)
    width, height = 256, 256
    shape = (width, height)
    cube_params = entities.CubeParams.from_records(records=records,transform=transform,instance=instance,crs=crs,shape=shape)

    consolidater.get_cube_metadata(cube_params).info()
    start = time.time()
    images, records = consolidater.get_cube(cube_params, compression=0, verbose=False)
    print(f"{len(images)} images downloaded in {time.time() - start}s")

    from geocube.utils import timeseries_to_animation
    import numpy as np

    imagesu1=[]
    for i in images:   
        imagesu1.append(i.astype(np.uint8))

    os.makedirs('outputs', exist_ok=True)
    timeseries_to_animation(imagesu1, "./outputs/animation.gif", duration=0.2)

    from IPython.display import Image
    with open(os.getcwd() + '/outputs/animation.gif','rb') as f:
        display(Image(data=f.read(), format='png', width=512, height=512))
        
def get_overviews_benchmark(records, resolution=200):
    crs="epsg:2154"
    consolidater.load_aoi(records[0])
    cube_params=entities.CubeParams.from_tile(entities.Tile.from_record(records[0], resolution=resolution, crs=crs), instance=instance, records=records)

    start = time.time()
    imagesOvr, _ = consolidater.get_cube(cube_params, compression=0, verbose=False)
    print(f"{len(imagesOvr)} overviews downloaded in {time.time() - start}s")    
    
    plt.imshow(imagesOvr[0][:,:,0], cmap="gray")
    


### Get Cube before consolidation

In [None]:
records = consolidater.list_records(tags={'source':'notebook_consolidation'})
get_cube_benchmark(records)
get_overviews_benchmark(records[:10])

### Consolidate records

In [None]:
import uuid

name = "SingleCell_L93_20m"
layout = entities.Layout.single_cell(
    name=name,
    crs="epsg:2154",
    resolution=20,
    block_size=256,
    max_records=1000
)
try:
    consolidater.create_layout(layout)
    print("Layout created")
except utils.GeocubeError as e:
    print("Layout already exists")
    
variable = consolidater.variable(name=variable_name)
variable.config_consolidation(
    dformat=("i2", 220, 0, 255),
    compression=entities.Compression.LOSSLESS,
    overviews_min_size=256
)

jobName = f'notebook_job_conso_{uuid.uuid1()}'
print(jobName)
records = consolidater.list_records(tags={'source':'notebook_consolidation'})

job = consolidater.consolidate(jobName, variable.instance("master"), layout, records, execution_level=entities.ExecutionLevel.ASYNCHRONOUS)
    

### Consolidation Status

In [None]:
print(job.refresh())
consolidater.wait_job(job, wait_secs=1)

### GetCube request

In [None]:
records = consolidater.list_records(tags={'source':'notebook_consolidation'})
get_cube_benchmark(records)
get_overviews_benchmark(records[:10], resolution=200)