# Upscaling Service
This notebooks showcases a demo of the APEx Upscaling Service by demonstrating the capabilities of the [APEx Dispatch API](https://github.com/ESA-APEx/apex_dispatch_api). In this notebook we will perform a small upscaling exercise for one of the services in the [APEx Algoritm Services Catalogue](https://algorithm-catalogue.apex.esa.int/), specfically the [PV Farm Detection](https://algorithm-catalogue.apex.esa.int/apps/eurac_pv_farm_detection#description). We will split up an area of interest in a 20x20km grid and execute this  through this upscaling task through the APEx Dispatch API.

In [None]:
%pip install esa-apex-algorithms rasterio ipyleaflet pillow authlib

In [2]:
import requests
import rasterio
import numpy as np
import tempfile
import asyncio
import json
import websockets
import httpx
import io
import base64
import time
from ipyleaflet import ImageOverlay
from PIL import Image
from ipyleaflet import Map, GeoJSON, TileLayer
from rasterio.warp import transform_bounds
from shapely.geometry import shape
from pyproj import Transformer
from authlib.integrations.requests_client import OAuth2Session
from urllib.parse import urlparse, parse_qs
from esa_apex_toolbox.algorithms import GithubAlgorithmRepository

## Look up the algorithm to execute

In [3]:
repo = GithubAlgorithmRepository(
            owner="ESA-APEx",
            repo="apex_algorithms",
            folder="algorithm_catalog",
        )

In [4]:
repo.list_algorithms()

['wind_turbine',
 'eurac_pv_farm_detection',
 'gep_bas',
 'gep_ost',
 'sar_coin',
 'snap_insar_sentinel1_iw_slc',
 'bap_composite',
 'biopar',
 'fusets_mogpr',
 'max_ndvi',
 'max_ndvi_composite',
 'parcel_delineation',
 'random_forest_firemapping',
 'sentinel1_stats',
 'variabilitymap',
 'worldcereal_crop_extent',
 'worldcereal_crop_type',
 'worldcover_statistics',
 'worldagrocommodities']

In [5]:
service = repo.get_algorithm('eurac_pv_farm_detection')

In [6]:
service

Algorithm(id='eurac_pv_farm_detection', title='Photovoltaic farms mapping', description='Demonstrator service for the detection of photovoltaic farms. Photovoltaic farms (PV farms) mapping is essential for establishing valid policies regarding natural resources management and clean energy. ', udp_link=UdpLink(href='https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/refs/heads/main/algorithm_catalog/eurac/eurac_pv_farm_detection/openeo_udp/eurac_pv_farm_detection.json', title='openEO Process Definition'), service_links=[ServiceLink(href='https://openeofed.dataspace.copernicus.eu', title='CDSE openEO federation')], license=None, organization='Eurac Research')

## Definition of parameters

In [7]:
dispatch_api = "dispatch-api.dev.apex.esa.int"

In [21]:
spatial_extent ={
        "coordinates": [
          [
            [
              16.14820803974601,
              48.3081456959695
            ],
            [
              16.14820803974601,
              48.0326396134746
            ],
            [
              16.70922281740272,
              48.0326396134746
            ],
            [
              16.70922281740272,
              48.3081456959695
            ],
            [
              16.14820803974601,
              48.3081456959695
            ]
          ]
        ],
        "type": "Polygon"
      }
temporal_extent = ["2023-05-01", "2023-09-30"]
output_format = "gtiff"

In [22]:
# Map related settings
center = shape(spatial_extent).centroid
zoom = 10

## Authentication with the API
To access the different endpoints of the Dispatcher API it is important to first authenticate yourself with the APEx environment.

In [23]:
KEYCLOAK_HOST = "auth.dev.apex.esa.int"
CLIENT_ID = "apex-dispatcher-api-dev"

In [24]:
# Endpoints
authorization_endpoint = f"https://{KEYCLOAK_HOST}/realms/apex/protocol/openid-connect/auth"
token_endpoint = f"https://{KEYCLOAK_HOST}/realms/apex/protocol/openid-connect/token"

# Global token store
_token_data = None

def get_access_token():
    """
    Returns a valid access token. Refreshes it automatically if expired.
    """
    global _token_data

    # If we have a token and it hasn't expired yet, return it
    if _token_data and _token_data.get("expires_at", 0) > time.time() + 10:
        return _token_data["access_token"]

    # If token exists but is expired and has a refresh_token, refresh it
    if _token_data and "refresh_token" in _token_data:
        session = OAuth2Session(CLIENT_ID, token=_token_data)
        _token_data = session.refresh_token(token_endpoint)
        return _token_data["access_token"]

    # Otherwise, start a new OAuth2 flow
    session = OAuth2Session(
        client_id=CLIENT_ID,
        redirect_uri="http://localhost:8000/callback"
    )
    uri, state = session.create_authorization_url(authorization_endpoint)
    print("Open this URL in your browser:", uri)
    redirect_url = input("Paste the redirect URL here: ")
    parsed = urlparse(redirect_url)
    code = parse_qs(parsed.query).get("code")[0]

    _token_data = session.fetch_token(
        token_endpoint,
        code=code,
        client_secret=None,  # only if your client is confidential
        include_client_id=True
    )

    return _token_data["access_token"]

## Retrieval of the tiles
The first step in our upscaling exercise is to determine the different tiles to be processed based on the given `area_of_interest`. In this example we ask the dispatcher to split up the area in a `20x20km` grid. This results in a list of tiles that are visualised on the map.

In [25]:
tiles = requests.post(f"http://{dispatch_api}/tiles", json={
    "grid": "20x20km",
    "aoi": spatial_extent
}).json()
print(f"Processing {len(tiles['geometries'])} tiles for area of interest")

Processing 12 tiles for area of interest


In [26]:
# Create a map centered at the approximate center of the area of interest
m = Map(center=[center.y, center.x], zoom=zoom)
 
# Add the tiles (GeometryCollection) to the map
geo_json = GeoJSON(data=tiles)
m.add_layer(geo_json)

# Display the map
m

Map(center=[48.17039265472205, 16.42871542857436], controls=(ZoomControl(options=['position', 'zoom_in_text', …

## Launching the upscaling task

Next we trigger the upscaling task on the dispatcher. We provide the details of the processing jobs that need to be executed together with a `dimension`. This is an important parameter as this lets the dispatcher know how to scale up. In this case we are asking the dispatcher to scale up using the `spatial_extent`, creating a separate job for each geometry in the `values` section. The dispatcher will take care of all the rest. The result is the information on the created upscaling task.

In [27]:
upscaling_task = requests.post(
    f"http://{dispatch_api}/upscale_tasks", 
    headers={
        "Authorization": f"Bearer {get_access_token()}"        
    },
    json={
        "title": "Upscalinge - PV Detection",
        "label": "openeo",
        "service": {
            "endpoint": service.service_links[0].href,
            "application": service.udp_link.href
        },
        "format": output_format,
        "parameters": {
            "temporal_extent": temporal_extent
        },
        "dimension": {
            "name": "spatial_extent",
            "values": tiles["geometries"]
        }
    }
).json()
upscaling_task_id = upscaling_task['id']
upscaling_task

Open this URL in your browser: https://auth.dev.apex.esa.int/realms/apex/protocol/openid-connect/auth?response_type=code&client_id=apex-dispatcher-api-dev&redirect_uri=http%3A%2F%2Flocalhost%3A8000%2Fcallback&state=55hzS7XHezhzT1iJ7FfvSsjfHyflzx


Paste the redirect URL here:  http://localhost:8000/callback?state=55hzS7XHezhzT1iJ7FfvSsjfHyflzx&session_state=dcec5829-f03e-4360-9e37-8189ac7fc305&iss=https%3A%2F%2Fauth.dev.apex.esa.int%2Frealms%2Fapex&code=a9fabf9d-d444-4706-9b17-1201d424ec6a.dcec5829-f03e-4360-9e37-8189ac7fc305.ffef7bfc-a27e-4abd-aa7f-9e0925d275a9


{'id': 1,
 'title': 'Upscalinge - PV Detection',
 'label': 'openeo',
 'status': 'created'}

## Retrieve status of the upscaling task
We can now write a continuous monitoring process that fetches the status of the upscaling task and showcase the results on the map.

In [28]:
def add_cog_layer(cog_url, name=None, m=m):
    with rasterio.open(cog_url) as src:
        band = src.read(1).astype(np.float32)

        bounds = transform_bounds(src.crs, "EPSG:4326", *src.bounds)

        # Normalize 0–255
        band = 255 * (band - band.min()) / (band.max() - band.min())
        band = band.astype(np.uint8)

    # Convert to PNG data URI
    buf = io.BytesIO()
    Image.fromarray(band).save(buf, format="PNG")
    data_url = "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode("utf-8")

    bbox = ((bounds[1], bounds[0]), (bounds[3], bounds[2]))
    overlay = ImageOverlay(url=data_url, bounds=bbox, name=name or "Gray COG")
    m.add_layer(overlay)
    return overlay

def add_geojson_layer(url,name=None, m=m):
    data = requests.get(url).json()
    transformer = Transformer.from_crs(data["crs"]["properties"]["name"], "EPSG:4326", always_xy=True)

    for feature in data["features"]:
        geom = feature["geometry"]
        if geom["type"] == "Polygon":
            new_coords = []
            for ring in geom["coordinates"]:
                new_ring = [transformer.transform(x, y) for x, y in ring]
                new_coords.append(new_ring)
            geom["coordinates"] = new_coords
    geo_json = GeoJSON(data=data)
    m.add_layer(geo_json)

In [30]:
# Function to style jobs
def job_style(feature):
    status = feature["properties"]["status"]
    color = {
        "created": "blue",
        "queued": "orange",
        "running": "yellow",
        "finished": "green",
        "canceled": "gray",
        "failed": "red"
    }.get(feature["properties"]["status"], "black")
    return {
        "color": color,
        "fillColor": color,
        "fillOpacity": 0.5 if status != "finished" else 0.0
    }


m = Map(center=[center.y, center.x], zoom=zoom)
geo_json = GeoJSON(
    data={
        "type": "FeatureCollection",
        "features": []
    }
)
geo_json.style_callback = job_style
m.add_layer(geo_json)
m.layout.height = '1000px'
display(m)


# Keep track of processed jobs
processed_jobs = set()

async def show_results(job_id):
    async with httpx.AsyncClient() as client:
        result = await client.get(f"http://{dispatch_api}/unit_jobs/{job_id}/results", headers={
            "Authorization": f"Bearer {get_access_token()}"
        })
        response = result.json()
        if output_format.lower() == "geojson":
            result = response["assets"]["vectorcube.geojson"]["href"]
            add_geojson_layer(result, name=f"Job {job_id}", m=m)
        else:
            cog = response["assets"]["openEO.tif"]["href"]
            add_cog_layer(cog, name=f"Job {job_id}", m=m)
        return response

async def listen_for_updates():
    ws_url = f"ws://{dispatch_api}/ws/upscale_tasks/{upscaling_task_id}?interval=15&token={get_access_token()}"
    async with websockets.connect(ws_url) as websocket:
        while True:
            message = await websocket.recv()
            message = json.loads(message)
            if message.get("data"):
                features = []
                for job in message["data"]["jobs"]:
                    job_id = job["id"]
                    job_status = job["status"]

                    features.append({
                        "type": "Feature",
                        "geometry": job["parameters"]["spatial_extent"],
                        "properties": {
                            "status": job_status,
                        }
                    })
                    
                    # If the job is finished and not yet processed, fetch results
                    if job_status == "finished" and job_id not in processed_jobs:
                        processed_jobs.add(job_id)
                        await show_results(job_id)
         
                    geo_json.data = {
                        "type": "FeatureCollection",
                        "features": features
                    }
                
                if message["data"]["status"] in ["finished", "canceled", "failed"]:
                    print(f"Job finished with status {message['data']['status']}")
                    websocket.close()
                    break

# Run the websocket listener in the notebook
await listen_for_updates()

Map(center=[48.17039265472205, 16.42871542857436], controls=(ZoomControl(options=['position', 'zoom_in_text', …

InvalidURI: https://dispatch-api.dev.apex.esa.int/ws/upscale_tasks/1?interval=15&token=eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJTaEs1c1JIR0lLYlJlRXF6azZlenZxamdiZWdvVFJnQnRqQUxMSmtZdmZBIn0.eyJleHAiOjE3NjM5ODg5NzcsImlhdCI6MTc2Mzk4ODY3NywiYXV0aF90aW1lIjoxNzYzOTg4Njc0LCJqdGkiOiI0NDg0Nzc3Yi1jMjZkLTQ3YmYtOGY5Yi04ZjNiZThhMjZhOTMiLCJpc3MiOiJodHRwczovL2F1dGguZGV2LmFwZXguZXNhLmludC9yZWFsbXMvYXBleCIsInN1YiI6ImFmNDgyNjNkLTRkY2ItNGVkMy04MzA3LTA4MDAwY2I0ODU1MCIsInR5cCI6IkJlYXJlciIsImF6cCI6ImFwZXgtZGlzcGF0Y2hlci1hcGktZGV2Iiwic2lkIjoiZGNlYzU4MjktZjAzZS00MzYwLTllMzctODE4OWFjN2ZjMzA1IiwiYWNyIjoiMSIsImFsbG93ZWQtb3JpZ2lucyI6WyJodHRwOi8vbG9jYWxob3N0Il0sInNjb3BlIjoicHJvZmlsZSBlbWFpbCIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJuYW1lIjoiQnJhbSBKYW5zc2VuIiwicHJlZmVycmVkX3VzZXJuYW1lIjoiYnJhbS5qYW5zc2VuQHZpdG8uYmUiLCJnaXZlbl9uYW1lIjoiQnJhbSIsImZhbWlseV9uYW1lIjoiSmFuc3NlbiIsImVtYWlsIjoiYnJhbS5qYW5zc2VuQHZpdG8uYmUifQ.wRyqxRBWtsoS9-e8Xh8oWD_ommcAVqBwzYsDqF8O7dNCTw5I0KI2UCArSfRs1jfMFISvZVXMlym-TDCLrcd2tiXnkax02L9w1jBiN7tXL99aQ87zzaKsSEDJRp9hygMcp_uyxDGUNWXBiuwwg12Urc1SmEboOPVrEqA_E4mom-1VmXa_eQgiUlube3qQ8HvGtwv6JtKKvhDOXiysksKWRr175ggwXS-Mzu529W1IpXTXBZvhvGn_B5Nflff-pB959hgK7KPf8cSDcZJ0tR__06cFwNSEAVo12NAK1oPdbFHFj9mKuOFXZG0vW_vl__5HKkrtF36VUgvTURgKYarE6Q isn't a valid URI: scheme isn't ws or wss