# Example of a GFMAP full-extraction pipeline

Designing a full-extraction pipeline using the openeo-gfmap GFMAPJobManager.

The pipeline consits of the following elements:
* An input dataframe where each row corresponds to each executed job.
* An user function defined to create OpenEO BatchJob from input rows of the beforementioned dataframe.
* An user function defined to generate an output path of each of the Job products
* An user function executed after the assets are downloaded and saved from a finished job (optional). This **post job action** can do anything and will be executed locally inside the GFMAPJobManager.


### Setting up the logging module 

In [1]:
# Configuring the logging for the openeo_gfmap package
from openeo_gfmap.manager import _log
import logging

_log.setLevel(logging.DEBUG)

stream_handler = logging.StreamHandler()
_log.addHandler(stream_handler)

formatter = logging.Formatter('%(asctime)s|%(name)s|%(levelname)s:  %(message)s')
stream_handler.setFormatter(formatter)

# Exclude the other loggers from other libraries
class MyLoggerFilter(logging.Filter):
    def filter(self, record):
        return record.name == _log.name

stream_handler.addFilter(MyLoggerFilter())


### First step: splitting the job

We load the initial crop-type dataset that will be the base of our extractions.

Splitting the dataset of extraction in multiple job based on position is necessary to respect OpenEO limitations.

This script performs a split with the H3 hexagonal grid, yielding a list of sub-geodataframes.

A subtility here is that some polygons are not directly extracted (field with `extract=False`), but should be kept for post-job actions. This requirement is filled by removing sub-dataframes that do not contain any extractable polyons.

In [2]:
from pathlib import Path
import geopandas as gpd
from openeo_gfmap.manager.job_splitters import split_job_hex

base_df_path = "https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap/DEMO_CROPTYPE.gpkg"
base_df = gpd.read_file(base_df_path)
# Splits the job using GFMAP
split_jobs = split_job_hex(
    base_df, max_points=60, grid_resolution=4
)

print(f'{len(split_jobs)} sub-datasets.')

# Remove the geometry where there are no points with the "extract" flag
split_jobs = [
    job for job in split_jobs if job.extract.any()
]
print(f'{len(split_jobs)} sub-datasets after filtering sub-datasets with no point to extract.')


  polygons["h3index"] = polygons.geometry.centroid.apply(


334 sub-datasets.
236 sub-datasets after filtering sub-datasets with no point to extract.


### Second step: creating a dataframe for the GFMAP Job Manager

Implementing a function that yields a `pandas.DataFrame` where each row correponds to a job.

The dataframe should contain the informations required by the GFMAP Job Manager, as well as additional information used by the datacube creation function and the post-job action function.

The output dataframe should be savable as a .csv file.

Note: the full information of a sub-geodataframe of polygons can be saved into a row of a `pandas.DataFrame` by storing it in a row as string implementing the `geojson.FeatureCollection` interface. To convert the `geopandas.GeoDataFrame` into a stirng, simply use the `.to_json()` function.

In [3]:
from openeo_gfmap import Backend
from typing import List
import pandas as pd

def create_job_dataframe_s2(backend: Backend, split_jobs: List[gpd.GeoDataFrame]) -> pd.DataFrame:
    """Create a dataframe from the split jobs, containg all the necessary information to run the job."""
    columns = ['backend_name', 'out_prefix', 'out_extension', 'start_date', 'end_date', 'geometry']
    rows = []
    for job in split_jobs:
        # Compute the average in the valid date and make a buffer of 1.5 year around
        median_time = pd.to_datetime(job.valid_date).mean()
        start_date = median_time - pd.Timedelta(days=275)  # A bit more than 9 months
        end_date = median_time + pd.Timedelta(days=275)  # A bit more than 9 months
        
        rows.append(
            pd.Series(
                dict(zip(columns, [backend.value, 'S2_10m', '.nc',  start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d'), job.to_json()]))
            )
        )

    return pd.DataFrame(rows)

job_df = create_job_dataframe_s2(Backend.CDSE, split_jobs)

job_df

Unnamed: 0,backend_name,out_prefix,out_extension,start_date,end_date,geometry
0,cdse,S2_10m,.nc,2020-08-30,2022-03-03,"{""type"": ""FeatureCollection"", ""features"": [{""i..."
1,cdse,S2_10m,.nc,2020-08-30,2022-03-03,"{""type"": ""FeatureCollection"", ""features"": [{""i..."
2,cdse,S2_10m,.nc,2020-08-30,2022-03-03,"{""type"": ""FeatureCollection"", ""features"": [{""i..."
3,cdse,S2_10m,.nc,2020-08-30,2022-03-03,"{""type"": ""FeatureCollection"", ""features"": [{""i..."
4,cdse,S2_10m,.nc,2020-08-30,2022-03-03,"{""type"": ""FeatureCollection"", ""features"": [{""i..."
...,...,...,...,...,...,...
231,cdse,S2_10m,.nc,2020-08-30,2022-03-03,"{""type"": ""FeatureCollection"", ""features"": [{""i..."
232,cdse,S2_10m,.nc,2020-08-30,2022-03-03,"{""type"": ""FeatureCollection"", ""features"": [{""i..."
233,cdse,S2_10m,.nc,2020-08-30,2022-03-03,"{""type"": ""FeatureCollection"", ""features"": [{""i..."
234,cdse,S2_10m,.nc,2020-08-30,2022-03-03,"{""type"": ""FeatureCollection"", ""features"": [{""i..."


### Sub-sampling job dataframe to reduce execution time

In [4]:
# Run a subset of the jobs to test the manager, the selected jobs have a fair amount of geometries to extract
job_df = job_df.iloc[[0, 2, 3, -6]].reset_index(drop=True)

In [5]:
import geojson

def get_job_nb_polygons(row: pd.Series) -> int:
    """Get the number of polygons in the geometry."""
    return len(list(filter(lambda feat: feat.properties.get("extract"), geojson.loads(row.geometry)['features'])))

job_df['nb_polygons'] = job_df.apply(get_job_nb_polygons, axis=1)
job_df.nb_polygons.values

array([3])

### Third step: implement the datacube creator function.

Implement a function to create, from the additional rows provided before, an `openeo.BatchJob` that will be used to run the job.

In this case we extract Sentinel-2 data around a 64x64 pixel square of polygons which have the field `extract=True` (although we keep them in the row for the post-job action.)

Note:
Because the polygons to extract are specified in UTM dimensions (required to have a specific size), the dataset of polygon cannot be send directly through the openeo process graph (GeoJSON only support lat/lon coordinates). The sub-datasets of polygons are therefore uploaded to a publicly accessible URL so they can be used later by openeo during the execution of the job.

In [6]:
import openeo

import requests
from tempfile import NamedTemporaryFile
import os
import pandas as pd
import geojson
from shapely.geometry import Point

from openeo_gfmap import TemporalContext, Backend, BackendContext, FetchType, SpatialContext
from openeo_gfmap.fetching import build_sentinel2_l2a_extractor

def upload_geoparquet_artifactory(gdf: gpd.GeoDataFrame, row_id: int) -> str:
    # Save the dataframe as geoparquet to upload it to artifactory
    temporary_file = NamedTemporaryFile()
    gdf.to_parquet(temporary_file.name)
    
    artifactory_username = os.getenv('ARTIFACTORY_USERNAME')
    artifactory_password = os.getenv('ARTIFACTORY_PASSWORD')

    headers = {
        "Content-Type": "application/octet-stream"
    }

    upload_url = f"https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap-temp/openeogfmap_dataframe_{row_id}.parquet"

    with open(temporary_file.name, 'rb') as f:
        response = requests.put(upload_url, headers=headers, data=f, auth=(artifactory_username, artifactory_password))

    assert response.status_code == 201, f"Error uploading the dataframe to artifactory: {response.text}"

    return upload_url


def create_datacube_s2(row: pd.Series, connection: openeo.DataCube, provider=None, connection_provider=None) -> openeo.BatchJob:

    def buffer_geometry(geometry: geojson.FeatureCollection, buffer: int) -> gpd.GeoDataFrame:
        gdf = gpd.GeoDataFrame.from_features(geometry).set_crs(epsg=4326)
        utm = gdf.estimate_utm_crs()
        gdf = gdf.to_crs(utm)

        gdf['geometry'] = gdf.centroid.apply(
            # Clips the point to the closest 20m from the S2 grid
            lambda point: Point(round(point.x / 20.0) * 20.0, round(point.y / 20.0) * 20.0)
        ).buffer(distance=buffer, cap_style=3)

        return gdf

    def filter_extractonly_geometries(collection: geojson.FeatureCollection):
        # Filter out geometries that do not have the field extract=True
        features = [f for f in collection.features if f.properties.get('extract', False)]
        return geojson.FeatureCollection(features)

    start_date = row.start_date
    end_date = row.end_date
    temporal_context = TemporalContext(start_date, end_date)

    # Get the feature collection containing the geometry to the job
    geometry = geojson.loads(row.geometry)
    assert isinstance(geometry, geojson.FeatureCollection)

    # Filter the geometry to the rows with the extract only flag
    geometry = filter_extractonly_geometries(geometry)
    assert len(geometry.features) > 0, "No geometries with the extract flag found"

    # Performs a buffer of 64 px around the geometry
    geometry_df = buffer_geometry(geometry, 320)
    spatial_extent_url = upload_geoparquet_artifactory(geometry_df, row.name)

    # Backend name and fetching type
    backend = Backend(row.backend_name)
    backend_context = BackendContext(backend)

    fetch_type = FetchType.POLYGON
    bands_to_download = ['S2-L2A-B01', 'S2-L2A-B02', 'S2-L2A-B03', 'S2-L2A-B04', 'S2-L2A-B05', 'S2-L2A-B06', 'S2-L2A-B07', 'S2-L2A-B08', 'S2-L2A-B8A', 'S2-L2A-B09', 'S2-L2A-B11', 'S2-L2A-B12', 'S2-L2A-SCL']

    # Create the job to extract S2
    extraction_parameters = {
        "target_resolution": 10,
        "load_collection": {
            "eo:cloud_cover": lambda val: val <= 95.0,
        },
    }
    extractor = build_sentinel2_l2a_extractor(
        backend_context, bands=bands_to_download, fetch_type=fetch_type.POLYGON, **extraction_parameters 
    )

    cube = extractor.get_cube(connection, spatial_extent_url, temporal_context)

    # Compute the SCL dilation and add it to the cube
    scl_dilated_mask = cube.process(
        "to_scl_dilation_mask",
        data=cube,
        scl_band_name="S2-L2A-SCL",
        kernel1_size=17,  # 17px dilation on a 20m layer
        kernel2_size=77,   # 77px dilation on a 20m layer
        mask1_values=[2, 4, 5, 6, 7],
        mask2_values=[3, 8, 9, 10, 11],
        erosion_kernel_size=3
    ).rename_labels("bands", ["S2-L2A-SCL_DILATED_MASK"])

    cube = cube.merge_cubes(scl_dilated_mask)

    # Compute the distance to cloud and add it to the cube
    scl = cube.filter_bands(['S2-L2A-SCL'])
    distance_to_cloud = scl.apply_neighborhood(
        process=openeo.UDF.from_file('udf_distance_to_cloud.py'),
        size=[{'dimension': 'x', 'unit': 'px', 'value': 256}, {'dimension': 'y', 'unit': 'px', 'value': 256}],
        overlap=[{'dimension': 'x', 'unit': 'px', 'value': 16}, {'dimension': 'y', 'unit': 'px', 'value': 16}]
    ).rename_labels('bands', ['S2-L2A-DISTANCE-TO-CLOUD'])

    cube = cube.merge_cubes(distance_to_cloud)
    cube = cube.linear_scale_range(0, 65534, 0, 65534)

    # Get the h3index to use in the tile
    h3index = geometry.features[0].properties['h3index']
    valid_date = geometry.features[0].properties['valid_date']

    # Increase the memory of the jobs depending on the number of polygons to extract
    number_polygons = get_job_nb_polygons(row)
    _log.debug(f"Number of polygons to extract: {number_polygons}")

    job_options = {
        "executor-memory": "5G",
        "executor-memoryOverhead": "2G",
    }

    return cube.create_job(
        out_format="NetCDF",
        title=f"GFMAP_Extraction_S2_{h3index}_{valid_date}",
        sample_by_feature=True,
        job_options=job_options
    )


### Fourth step: create output paths

Implement a function that from a temporary path containing a job result, from the job dataframe row and the root folder will choose the output path where to save that job result.

In [7]:
%%time

# Load the S2 grid
s2_grid = gpd.read_file("https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap/s2grid_bounds.gpkg")

CPU times: user 4.62 s, sys: 85.2 ms, total: 4.71 s
Wall time: 5.03 s


In [8]:
from pathlib import Path
import xarray as xr
from pyproj import Transformer, CRS
from shapely.geometry import box, Point

def generate_output_path_s2(root_folder: Path, tmp_path: Path, geometry_index: int, row: pd.Series):
    features = geojson.loads(row.geometry)
    sample_id = features[geometry_index].properties['sample_id']
    ref_id = features[geometry_index].properties['ref_id']
    
    # Loads the array lazily in-memory
    try:
        inds = xr.open_dataset(tmp_path, chunks='auto')
        
        source_crs = CRS.from_wkt(inds.crs.attrs['crs_wkt'])
        dst_crs = CRS.from_epsg(4326)
        
        transformer = Transformer.from_crs(source_crs, dst_crs, always_xy=True)

        # Get the center point of the tile
        centroid_utm = box(
            inds.x.min().item(), inds.y.min().item(), inds.x.max().item(), inds.y.max().item()
        ).centroid
        centroid_latlon = Point(*transformer.transform(centroid_utm.x, centroid_utm.y))

        # Intersecting with the s2 grid
        intersecting = s2_grid.geometry.intersects(centroid_latlon)

        # Select the intersecting cell that has a centroid the closest from the point
        intersecting_cells = s2_grid[intersecting]
        intersecting_cells['distance'] = intersecting_cells.distance(centroid_latlon)
        intersecting_cells.sort_values('distance', inplace=True)
        s2_tile = intersecting_cells.iloc[0]

        s2_tile_id = s2_tile.tile

        subfolder = root_folder / ref_id / str(source_crs.to_epsg()) / s2_tile_id / sample_id
    except Exception:
        subfolder = root_folder / 'unsortable'

    return subfolder / f'{row.out_prefix}_{sample_id}_{source_crs.to_epsg()}_{row.start_date}_{row.end_date}{row.out_extension}'
    

### Fifth step: Define the post-job action

The post-job action will be called once the job resut was downloaded and saved to a specific path.

A post-job action function must receive 3 parameters:
* `job_items`: STAC items containing the currently extracted data for the job.
* `row`: The current job dataframe row.
* `parameters`: User-defined parameters set in the `GFMAPJobManager` constructor.

The post-job action must return back the list of job items. The user is responsible for updating,
adding and removing the items. For example, in this case the user creates a raster file with
ground truth data, the user then adds an asset using the predefined `openeo_gfmap.stac.AUXILIARY`
AssetDefintion to the related item, pointing to the generated NetCDF file.

In [9]:
from importlib.metadata import version
from datetime import datetime

from openeo_gfmap.stac import AUXILIARY
from rasterio.features import rasterize
from rasterio.transform import from_bounds
import pystac
import json

def add_item_asset(related_item: pystac.Item, path: Path):
    asset = AUXILIARY.create_asset(
        href=path.as_posix()
    )
    related_item.add_asset('auxiliary', asset)

def post_job_action(job_items: List[pystac.Item], row: pd.Series, parameters: dict = {}) -> list:
    base_gpd = gpd.GeoDataFrame.from_features(json.loads(row.geometry)).set_crs(epsg=4326)
    assert (
        len(base_gpd[base_gpd.extract == True]) == len(job_items),
        "The number of result paths should be the same as the number of geometries"
    )
    extracted_gpd = base_gpd[base_gpd.extract == True].reset_index(drop=True)
    # In this case we want to burn the metadata in a new file in the same folder as the S2 product
    for idx, item in enumerate(job_items):
        sample_id = extracted_gpd.iloc[idx].sample_id
        ref_id = extracted_gpd.iloc[idx].ref_id
        confidence = extracted_gpd.iloc[idx].confidence
        valid_date = extracted_gpd.iloc[idx].valid_date

        item_asset_path = Path(
            list(item.assets.values())[0].href
        )
        # Read information from the item file (could also read it from the item object metadata)
        result_ds = xr.open_dataset(item_asset_path, chunks='auto')

        # Add some metadata to the result_df netcdf file
        result_ds.attrs.update({
            'start_date': row.start_date,
            'end_date': row.end_date,
            'valid_date': valid_date,
            'GFMAP_version': version('openeo_gfmap'),
            'creation_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'description': f'Sentinel2 L2A observations for sample: {sample_id}, unprocessed.',
            'title': f'Sentinel2 L2A - {sample_id}',
            'sample_id': sample_id,
            'spatial_resolution': '10m'
        })
        result_ds.to_netcdf(item_asset_path, format='NETCDF4', engine='h5netcdf')

        target_crs = CRS.from_wkt(result_ds.crs.attrs['crs_wkt'])

        # Get the surrounding polygons around our extracted center geometry to rastetize them
        bounds = (
            result_ds.x.min().item(),
            result_ds.y.min().item(),
            result_ds.x.max().item(),
            result_ds.y.max().item()
        )
        bbox = box(*bounds)
        surround_gpd = base_gpd.to_crs(target_crs).clip(bbox)

        # Burn the polygon croptypes
        transform = from_bounds(*bounds, result_ds.x.size, result_ds.y.size)
        croptype_shapes = list(zip(surround_gpd.geometry, surround_gpd.croptype_label))

        fill_value = 0
        croptype = rasterize(
            croptype_shapes,
            out_shape=(result_ds.y.size, result_ds.x.size),
            transform=transform,
            all_touched=False,
            fill=fill_value,
            default_value=0,
            dtype='int64'
        )

        # Create the attributes to add to the metadata
        attributes = {
            'ref_id': ref_id,
            'sample_id': sample_id,
            'confidence': str(confidence),
            'valid_date': valid_date,
            '_FillValue': fill_value,
            'Conventions': 'CF-1.9',
            'GFMAP_version': version('openeo_gfmap'),
            'creation_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'description': f'Contains rasterized WorldCereal labels for sample: {sample_id}.',
            'title': f'WORLDCEREAL Auxiliary file for sample: {sample_id}',
            'spatial_resolution': '10m'
        }

        aux_dataset = xr.Dataset(
            {'LABEL': (('y', 'x'), croptype), 'crs': result_ds['crs']},
            coords={'y': result_ds.y, 'x': result_ds.x},
            attrs=attributes
        )
        # Required to map the 'crs' layer as geo-reference for the 'LABEL' layer.
        aux_dataset['LABEL'].attrs['grid_mapping'] = 'crs'

        # Save the metadata in the same folder as the S2 product
        metadata_path = (
            item_asset_path.parent / 
            f'WORLDCEREAL_10m_{sample_id}_{target_crs.to_epsg()}_{valid_date}.nc'
        )
        aux_dataset.to_netcdf(metadata_path, format='NETCDF4', engine='h5netcdf')
        aux_dataset.close()

        # Adds this metadata as a new asset
        add_item_asset(item, metadata_path)
        
    return job_items


  assert (


### Sixth and last step: Running the manager

Let's initialize and execute the Job Manager as defined the GFMAP, and then run it using the functions defined previously

STAC related parameters such as `collection_id` and `collection_description` are also required.

In [10]:
from openeo_gfmap.manager.job_manager import GFMAPJobManager
from openeo_gfmap.backend import cdse_staging_connection


base_output_dir = Path('/data/users/Public/couchard/world_cereal/extractions_5/')
tracking_job_csv = base_output_dir / 'job_tracker.csv'

manager = GFMAPJobManager(
    output_dir=base_output_dir,
    output_path_generator=generate_output_path_s2,
    collection_id='SENTINEL-EXTRACTION',
    collection_description=(
        "Sentinel-2 and Auxiliary data extraction example."
    ),
    post_job_action=post_job_action,
    poll_sleep=60,
    n_threads=2,
    post_job_params={}
)

manager.add_backend(
    Backend.CDSE_STAGING.value, cdse_staging_connection, parallel_jobs=6
)

In [11]:
# Run the jobs and create the STAC catalogue
manager.run_jobs(job_df, create_datacube_s2, tracking_job_csv)
manager.create_stac()  # By default the STAC catalogue will be saved in the stac subfolder of the base_output_dir folder.

2024-03-07 14:32:38,392|openeo_gfmap.manager|INFO:  Starting job manager using 2 worker threads.
2024-03-07 14:32:38,395|openeo_gfmap.manager|INFO:  Workers started, creating and running jobs.
2024-03-07 14:32:38,592|openeo_gfmap.manager|DEBUG:  Normalizing dataframe. Columns: Index(['backend_name', 'out_prefix', 'out_extension', 'start_date', 'end_date',
       'geometry', 'nb_polygons', 'status', 'id', 'start_time', 'cpu',
       'memory', 'duration', 'description', 'costs'],
      dtype='object')


Authenticated using refresh token.


2024-03-07 14:32:40,375|openeo_gfmap.manager|DEBUG:  Number of polygons to extract: 3


DataCube(<PGNode 'dimension_labels' at 0x7fb2dd7cfa40>)


2024-03-07 14:34:05,989|openeo_gfmap.manager|DEBUG:  Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).
2024-03-07 14:35:06,713|openeo_gfmap.manager|DEBUG:  Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).
2024-03-07 14:36:07,149|openeo_gfmap.manager|DEBUG:  Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).
2024-03-07 14:37:08,393|openeo_gfmap.manager|DEBUG:  Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).
2024-03-07 14:38:08,991|openeo_gfmap.manager|DEBUG:  Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).
2024-03-07 14:39:09,497|openeo_gfmap.manager|DEBUG:  Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).
2024-03-07 14:40:11,296|openeo_gfmap.manager|DEBUG:  Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).
2024-03-07 14:41:12,