# Geospatial Job Management and Visualization with OpenEO

When executing algorithms across large spatial areas, it is often necessary to divide the area of interest into smaller regions and run the algorithm on each region independently. To streamline this process and manage multiple jobs simultaneously, the `MultiBackendJobManager` was developed.

In this example, we demonstrate how to process an algorithm across a grid of smaller tiles and visualize job statuses using interactive maps. Our use case involves calculating `Best Available Pixel Composites`, using an openEO Process hosted in the [APEX repository](https://github.com/ESA-APEx).

We will go through the following steps:
1. **Authentication and Backend Initialization**
2. **Generating a Spatial Grid for the Antwerp Region**
3. **Preparing Jobs for Parallel Processing**
4. **Running and Tracking Jobs Using MultiBackendJobManager**
5. **Visualizing Job Status Using Plotly Maps**

### 1. Authentication and Backend Initialization

Before we start, we install the required non-native packages needed this notebook example. 

In [14]:
!pip install shapely geopandas plotly nbformat kaleido

Looking in indexes: https://pypi.org/simple, https://hansvrp:****@artifactory.vgt.vito.be/artifactory/api/pypi/python-packages/simple


In [None]:
# 1. Importing Required Packages
import time
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import box
import copy
from shapely import wkt

import openeo
from openeo.extra.job_management import (
        create_job_db,
        ProcessBasedJobCreator,
        MultiBackendJobManager
    )

import plotly.express as px
from plotly import offline
from IPython.display import clear_output



## 2. Generating a Spatial Grid for the Antwerp Region

To manage our large-scale task efficiently, we will execute the workflow across smaller tiles that together cover the entire area of interest. This spatial grid subdivides the region into manageable sections, making data processing and analysis more scalable.

In [None]:
def create_tiling_grid(grid_size_m=5000, minx=590000, miny=5660000, output_path="./antwerp_grid_5km.geojson"):
    """
    Creates a square tiling grid with specified dimensions and saves it as a GeoJSON file.
    
    Parameters:
        grid_size_m (int): Size of each grid cell in meters (default: 5000).
        minx (int): Minimum x-coordinate in UTM for grid bounding box.
        miny (int): Minimum y-coordinate in UTM for grid bounding box.
        output_path (str): File path to save the output GeoJSON file.

    Returns:
        None
    """
    # Fixed CRS codes
    crs_utm = "EPSG:32631"
    crs_latlon = "EPSG:4326"
    
    # Calculate max coordinates for the grid
    maxx = minx + 2 * grid_size_m
    maxy = miny + 2 * grid_size_m

    # Generate coordinates for grid
    x_coords = np.arange(minx, maxx, grid_size_m)
    y_coords = np.arange(miny, maxy, grid_size_m)
    
    # Create polygons for the grid cells
    polygons = [box(x, y, x + grid_size_m, y + grid_size_m) for x in x_coords for y in y_coords]

    # Create GeoDataFrame in UTM CRS
    grid_gdf_utm = gpd.GeoDataFrame({'geometry': polygons}, crs=crs_utm)
    
    # Convert to latitude/longitude CRS
    grid_gdf_latlon = grid_gdf_utm.to_crs(crs_latlon)
    grid_gdf_latlon['id'] = range(len(grid_gdf_latlon))

    # Save to GeoJSON file
    grid_gdf_latlon.to_file(output_path, driver="GeoJSON")

# Example usage
create_tiling_grid()


### 3. Visualizing the Spatial Grid

We can use Plotly to create an interactive visualization of the spatial grid. This allows us to examine the layout of tiles across the area of interest and ensure that the grid aligns correctly with our region.


In [None]:
bboxes = gpd.read_file("./resources/antwerp_grid_5km.geojson")

fig = px.choropleth_mapbox(
    bboxes,
    geojson=bboxes.geometry,
    locations=bboxes.index,
    mapbox_style="carto-positron",
    center={"lat": 51.15, "lon": 4.4},
    zoom=8,
    title="Spatial Grid for Antwerp Region"
)

fig.update_geos(fitbounds="locations")
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.show()


Rendered example of the tiling grid:

![Rendered example of the tiling grid](./resources/tiling_antwerp_example.png)


### 4. Preparing Jobs for Geospatial Processing

In this example, we will create Best Available Composites for every tile. For the implementation of this compositing workflow, we will directly make use of the standardized implementation hosed in the [APEX repository](https://github.com/ESA-APEx).

To configure jobs the individual openEO jobs, we first create a DataFrame with the necessary input parameters. This DataFrame, `jobs_df`, will lateron be used as a basis to organize all jobs across the tiling grid.

In [16]:
def prepare_jobs_df(temporal_range, grid_df) -> pd.DataFrame:
    jobs = []
    for _, row in grid_df.iterrows():
        jobs.append({
            "geometry": row.geometry,
            "temporal_extent": temporal_range
        })
    return pd.DataFrame(jobs)

# Create the jobs DataFrame
jobs_df = prepare_jobs_df(["2024-05-01", "2024-08-01"], bboxes)
job_db = create_job_db("jobs.csv", jobs_df)


### 5. Running the Jobs with MultiBackendJobManager

Next, we run the jobs using MultiBackendJobManager, which allows us to manage multiple job executions across. As a standard-user, you can run 2 parallel jobs at any time.
To initiate this, we use the `ProcessBasedJobCreator`, which simplifies the definition of the required `start_job` functionality for algorithms defined within an openEO process graph.

In [17]:
process_graph_url = "https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/main/openeo_udp/bap_composite.json"

#Make use of the Best Available Pixel openeo Process to obtain Sentinel 2 composites
start_job = ProcessBasedJobCreator(
        namespace=process_graph_url,
        parameter_defaults={},
    )

# Initiate MultiBackendJobManager 
job_manager = MultiBackendJobManager()  
connection = openeo.connect(url="openeo.dataspace.copernicus.eu").authenticate_oidc()
job_manager.add_backend("cdse", connection=connection, parallel_jobs=2)


Authenticated using refresh token.


### 6. Visualizing Job Statuses with a Custom Color Mapping

To effectively monitor the progress of geospatial processing tasks, we define a function to visualize job statuses on an interactive map. This visualization uses Plotly, with custom color mappings for each job status, providing a clear overview of the current state of all jobs.

In [18]:
colors = {
    "not_started": 'lightgrey', 
    "created": 'gold', 
    "queued": 'lightsteelblue', 
    "running": 'navy', 
    "finished": 'lime',
    "error": 'darkred',
    "skipped": 'darkorange',
    None: 'grey'  # Default color for any undefined status
}

This color scheme makes it easy to visually distinguish between different statuses.
The `plot_job_status` function generates an interactive map.

In [None]:
# Define the color mapping for job statuses

def plot_job_status(status_df, color_dict):
    """
    Reads job status from a CSV file and visualizes it on a map using Plotly.

    Parameters:
        job_db_path (str): Path to the CSV file containing job statuses and geometries.
    """
    status_plot = copy.deepcopy(status_df)
    
    # Parse spatial extent into geometries
    status_plot['geometry'] = status_plot['geometry'].apply(wkt.loads)

    status_plot = gpd.GeoDataFrame(status_plot, geometry='geometry', crs='EPSG:4326')
    
    # Apply color mapping based on job status
    status_plot['color'] = status_plot['status'].map(color_dict).fillna(color_dict[None])
    
    # Calculate map center based on geometry bounds
    minx, miny, maxx, maxy = status_plot.total_bounds
    center_lat = (miny + maxy) / 2
    center_lon = (minx + maxx) / 2
    
    # Plot the map with job status updates
    fig = px.choropleth_mapbox(
        status_plot,
        geojson=status_plot.geometry.__geo_interface__,
        locations=status_plot.index,
        color='status',
        color_discrete_map=color_dict,
        mapbox_style="carto-positron",
        center={"lat": center_lat, "lon": center_lon},
        zoom=8,
        title="Job Status Overview",
        labels={'status': 'Job Status'}
    )
    fig.update_geos(fitbounds="locations")
    fig.update_layout(margin={"r": 0, "t": 0, "l": 0, "b": 0})
    
    # Display the figure
    return fig


To enable the visualization of job statuses while concurrently running the jobs, threading is used. This approach allows the jobs to execute in parallel with the status updates, ensuring that the map is refreshed regularly without blocking job execution.

Threading in the Workflow:

- Job Execution: The jobs are initiated using a job manager (job_manager) that runs in its own thread. This allows the jobs to be executed asynchronously.
- Visualization: At the same time, we continually check the job statuses from a CSV file (jobs.csv) and update the visualization using the plot_job_status function.


In [None]:
# Start a threaded job manager
job_manager.start_job_thread(start_job=start_job, job_db=job_db)

while not job_manager._stop_thread:
    try:
        status_df = pd.read_csv('jobs.csv')
        fig = plot_job_status(status_df=status_df, color_dict=colors)
        clear_output()
        offline.iplot(fig)

        # Check if all jobs are done
        if status_df['status'].isin(["not_started", "created", "queued", "running"]).sum() == 0:
            job_manager.stop_job_thread()

        time.sleep(30)  # Wait before the next update

    except KeyboardInterrupt:
        break


Rendered example of dynamic output:

![Rendered example of dynamic output](./resources/output_antwerp_example.png)