In [1]:
from pathlib import Path

In [2]:
import openeo

In [3]:
from efast_openeo.define_udp import create_efast_udp

### Connect to openeo backend

In [4]:
connection = openeo.connect(
    "https://openeo.dataspace.copernicus.eu/"
).authenticate_oidc()

2025-10-05 09:01:42,008 [INFO] [config.py:193]	Loaded openEO client config from sources: []
2025-10-05 09:01:42,305 [INFO] [connection.py:255]	Found OIDC providers: ['CDSE']
2025-10-05 09:01:42,306 [INFO] [connection.py:274]	No OIDC provider given, but only one available: 'CDSE'. Using that one.
2025-10-05 09:01:42,450 [INFO] [connection.py:329]	Using default client_id 'sh-b1c3a958-52d4-40fe-a333-153595d1c71e' from OIDC provider 'CDSE' info.
2025-10-05 09:01:42,450 [INFO] [connection.py:601]	Found refresh token: trying refresh token based authentication.
2025-10-05 09:01:42,451 [INFO] [oidc.py:404]	Doing 'refresh_token' token request 'https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token' with post data fields ['grant_type', 'client_id', 'refresh_token'] (client_id 'sh-b1c3a958-52d4-40fe-a333-153595d1c71e')
2025-10-05 09:01:42,604 [INFO] [connection.py:352]	Obtained tokens: ['access_token', 'id_token', 'refresh_token']
2025-10-05 09:01:42,605 [INFO] [c

Authenticated using refresh token.


### Create the UDP and save it on the backend

`create_efast_udp` creates a list of parameters (UDP parameters are described here: https://open-eo.github.io/openeo-python-client/udp.html)



In [5]:
params, process_graph = create_efast_udp(connection)
process_id = "efast"

connection.save_user_defined_process(
    user_defined_process_id=process_id,
    process_graph=process_graph,
    parameters=params,
)

2025-10-05 09:01:42,897 [INFO] [efast.py:36]	Skipping intermediate 's2_bands'
2025-10-05 09:01:42,898 [INFO] [efast.py:154]	Setting s3_dtc_patch_length_px=36 and s3_dtc_overlap_length_px=18
2025-10-05 09:01:42,899 [INFO] [efast.py:36]	Skipping intermediate 's3_cloud_mask'
2025-10-05 09:01:42,901 [INFO] [efast.py:36]	Skipping intermediate 's3_distance_to_cloud'
2025-10-05 09:01:42,901 [INFO] [efast.py:36]	Skipping intermediate 's3_distance_score'
2025-10-05 09:01:42,901 [INFO] [efast.py:36]	Skipping intermediate 's3_bands_and_distance_score'
2025-10-05 09:01:42,902 [INFO] [efast.py:36]	Skipping intermediate 's3_composite_data_bands'
2025-10-05 09:01:42,902 [INFO] [efast.py:36]	Skipping intermediate 's3_composite_data_bands_smoothed'
2025-10-05 09:01:42,903 [INFO] [efast.py:36]	Skipping intermediate 's2_cloud_mask'
2025-10-05 09:01:42,903 [INFO] [efast.py:36]	Skipping intermediate 's2_cloud_mask_mean'
2025-10-05 09:01:42,903 [INFO] [efast.py:36]	Skipping intermediate 's2_cloud_mask_coars

### Create a datacube from the saved UDP

In [6]:
cube = connection.datacube_from_process(
    process_id=process_id,
    spatial_extent={
        "west": -15.456047,
        "south": 15.665024,
        # "east": -15.425491,
        # "north": 15.687501,
        "east": -15.325491,
        "north": 15.787501,
    },
    temporal_extent=["2022-09-07", "2022-09-27"],
    target_time_series=[
        "2022-09-07",
        "2022-09-09",
        "2022-09-11",
        "2022-09-13",
        "2022-09-15",
        "2022-09-17",
        "2022-09-19",
        "2022-09-21",
        "2022-09-23",
        "2022-09-25",
        "2022-09-27",
    ],
    s2_data_bands=["B02", "B03", "B04", "B8A"],
    fused_band_names=["B02_fused", "B03_fused", "B04_fused", "B8A_fused"],
)

### Send a request to the backend

OpenEO supports synchronous and asynchronous execution. When executing synchronously, the results are provided in the HTTP response. For synchronous execution, backend logs can't be accessed. This is only useful for quick tests but has the advantage that one does not need to wait for the job to be queued. Synchronous execution only supports small jobs and little data, as it will time out or refuse otherwise.

Asynchronous execution creates a job, starts it and fetches results asynchronously. Here, the session can be interrupted and results downloaded days later than the request submission.

For more on OpenEO execution and execution modes, see here: https://openeo.org/documentation/1.0/python/#execution the link at the document to the Glossary and https://open-eo.github.io/openeo-python-client/batch_jobs.html#batch-jobs

In [7]:
out_path = Path(".").parent / "test_outputs" / "efast_execution_modes"
out_path.mkdir(exist_ok=True, parents=True)

#### Synchronous

In [8]:
# cube.download(out_path / "fused_udp_sync.nc")

#### Asynchronous but blocking
Looks just like the synchronous version but supports larger jobs, can be interrupted and tracked from a different client, e.g. the web interface at https://openeo.dataspace.copernicus.eu/

In [9]:
# cube.execute_batch(outputfile=(out_path / "fused_udp_exec_batch.nc"))

#### Asynchronous non blocking
job = cube.create_job()
job.start()

In [10]:
job = cube.create_job()



#### Start execution

In [15]:
job.start()
# alternatively, blocking:
# job.start_and_wait()

##### Query the status

In [18]:
job.status()

'running'

The job id can be used later to reconnect to the job

In [19]:
job.job_id

'j-2510050701444c30ac7463e2331f2ceb'

In [20]:
job.logs()

#### When the job is done

In [25]:
# see results
results = job.get_results()
results

In [22]:
# download files
# the file format can be set with the save_result process

In [27]:
results.download_files(out_path)

2025-10-05 09:10:33,492 [INFO] [job.py:436]	Downloading Job result asset 'openEO_2022-09-07Z.tif' from https://s3.waw3-1.openeo.v1.dataspace.copernicus.eu/openeo-data-prod-waw4-1/batch_jobs/j-2510050701444c30ac7463e2331f2ceb/openEO_2022-09-07Z.tif?X-Proxy-Head-As-Get=true&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=9a3af7f17e824ed1976760cb8f0e43cd%2F20251005%2Fwaw4-1%2Fs3%2Faws4_request&X-Amz-Date=20251005T071018Z&X-Amz-Expires=86400&X-Amz-SignedHeaders=host&X-Amz-Security-Token=eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlX2FybiI6ImFybjpvcGVuZW93czppYW06Ojpyb2xlL29wZW5lby1kYXRhLXByb2Qtd2F3NC0xLXdvcmtzcGFjZSIsImluaXRpYWxfaXNzdWVyIjoib3BlbmVvLnByb2Qud2F3My0xLm9wZW5lby1pbnQudjEuZGF0YXNwYWNlLmNvcGVybmljdXMuZXUiLCJodHRwczovL2F3cy5hbWF6b24uY29tL3RhZ3MiOnsicHJpbmNpcGFsX3RhZ3MiOnsiam9iX2lkIjpbImotMjUxMDA1MDcwMTQ0NGMzMGFjNzQ2M2UyMzMxZjJjZWIiXSwidXNlcl9pZCI6WyIyNGVlOWI1Yy01ZWE0LTQ0YTQtODk3Zi1lMzU3Njc2MjAzODciXX0sInRyYW5zaXRpdmVfdGFnX2tleXMiOlsidXNlcl9pZCIsImpvYl9pZCJdfSwiaXNzIjoic3RzLndhd

[PosixPath('test_outputs/efast_execution_modes/openEO_2022-09-07Z.tif'),
 PosixPath('test_outputs/efast_execution_modes/openEO_2022-09-09Z.tif'),
 PosixPath('test_outputs/efast_execution_modes/openEO_2022-09-11Z.tif'),
 PosixPath('test_outputs/efast_execution_modes/openEO_2022-09-13Z.tif'),
 PosixPath('test_outputs/efast_execution_modes/openEO_2022-09-15Z.tif'),
 PosixPath('test_outputs/efast_execution_modes/openEO_2022-09-17Z.tif'),
 PosixPath('test_outputs/efast_execution_modes/openEO_2022-09-19Z.tif'),
 PosixPath('test_outputs/efast_execution_modes/openEO_2022-09-21Z.tif'),
 PosixPath('test_outputs/efast_execution_modes/openEO_2022-09-23Z.tif'),
 PosixPath('test_outputs/efast_execution_modes/openEO_2022-09-25Z.tif'),
 PosixPath('test_outputs/efast_execution_modes/openEO_2022-09-27Z.tif'),
 PosixPath('test_outputs/efast_execution_modes/job-results.json')]

### Python script (not as UDP)

The same code as the UDP can also be executed via the Python interface. In particular, you can download intermediate results by setting the `--save-intermediates` option. This is implemented in `main.py`.
`--skip-intermediates` takes a list of intermediate results (for names see code) that should not be downloaded to save time. You can use this either synchronously or asynchronously (via setting `--synchronous` or not. Only synchronous intermediate downloads have been tested by me.

An example configuration is the following:



```
python main.py --max-distance-to-cloud-m
5000
--temporal-score-stddev
5
--t-start
2022-09-07
--t-end-excl
2022-09-27
--bbox
-15.456047,15.665024,-15.425491,15.687501
--output-dir
./test_outputs/full_chain_new_cloud_more_time_steps
--synchronous
--save-intermediates
--fused-band-names
B02fused,B03fused
--skip-intermediates
s2_bands,s2_cloud_mask,s2_cloud_mask_mean,s2_cloud_mask_coarse,s2_distance_to_cloud,s2_distance_score,s2_bands_masked,s3_cloud_mask,s3_distance_to_cloud,s3_distance_score,s3_composite_target_interp,s2_bands_dtc_merge,s2_s3_pre_aggregate_merge,s3_bands_and_distance_score,s2_s3_aggregate,s3_composite_data_bands
--target-interval
2D
--s3-composite-interval
2D
``

# Binning

**This is only relevant for the S3 bands, not S2 or cloud flags!**

The S3 collection is currently loaded (in `efast.py`, `efast_openeo`) as shown below, using nearest neighbor resampling to resample the satellite coordinates of the S3 input files to a geographical grid with similar cell size: 

In [32]:
west, south, east, north = -15.456047, 15.665024, -15.425491, 15.687501
bbox = {"west": west, "south": south, "east": east, "north": north}
temporal_extent = ["2022-09-01", "2022-09-02"]

In [35]:
s3_nn = connection.load_collection(
    "SENTINEL3_SYN_L2_SYN",
    spatial_extent=bbox,
    temporal_extent=temporal_extent,
    bands=["Syn_Oa17_reflectance"],
)
s3_nn.download(out_path / "s3_nn.tif")

To do the same thing, but apply binning, add the `feature_flags` argument, to set `resampling_type` and `super_sampling`

In [43]:
s3_lc = connection.load_collection(
    "SENTINEL3_SYN_L2_SYN",
    spatial_extent=bbox,
    temporal_extent=temporal_extent,
    bands=["Syn_Oa17_reflectance"],
)
s3_lc.result_node().update_arguments(
    featureflags=dict(reprojection_type="binning", supersampling=2)
)
s3_lc.download(out_path / "s3_binned.tif")

## Important:

This must still be added to the UDF definition! Right now, nearest neighbor is used