In [1]:
import xcube
import xarray as xr
import hda
import os
from shapely import geometry
import zipfile

from eocanvas import Credentials
from eocanvas.api import Input, Config, ConfigOption
from eocanvas.processes import SnapProcess
from eocanvas.snap.graph import Graph
from eocanvas.snap import Operator
from eocanvas.snap.binning import aggregators, BinningVariable, Aggregators, BinningVariables

1. test submission of SNAP graphs to eocanvas
2. scale them up and build a cube
3. access the cube using xcube remotely
4. make targetted data extractions for time series analysis 

# Part 1: find the source data

In [2]:
# Set source data: L1B, L2
data_sources = {
               "L1B" : {"dataset_id" : "EO:EUM:DAT:SENTINEL-3:OL_1_EFR___"},
               "L2"  : {"dataset_id" : "EO:EUM:DAT:SENTINEL-3:OL_2_WFR___"},
               "L3"  : {"dataset_id" : "cmems_obs-oc_bal_bgc-plankton_my_l3-multi-1km_P1D", "variables" : "CHL"}
               }

dtstart = "2022-07-03T00:00:00.000Z"
dtend = "2022-07-04T23:59:00.000Z"
sat = "Sentinel-3B"
bbox = [17, 55.0, 20.0, 60.0] # W, S, E, N
timeliness = "NT"

download_dir = os.path.join(os.getcwd(), "results")

In [3]:
# Set ROI
polygon = [[bbox[0], bbox[1]], [bbox[2], bbox[1]], [bbox[2], bbox[3]], [bbox[0], bbox[3]], [bbox[0], bbox[1]]]
WKT = geometry.Polygon([[p[0], p[1]] for p in polygon])
print(WKT)

POLYGON ((17 55, 20 55, 20 60, 17 60, 17 55))


In [4]:
# set eocanvas graph templates
graphs = {
         "L1B" : os.path.join(os.getcwd(), "SNAP_graphs", "SNAP_GPT_graph_OLCI_L1B_subset_MphChl_Idepix_reproject_eocanvas_template.xml"),
         "L2"  : os.path.join(os.getcwd(), "SNAP_graphs", "SNAP_GPT_graph_OLCI_L2_subset_flag_reproject_chl_eocanvas_template.xml")
         }

In [5]:
# find HDA URLs
for data_source in data_sources:
    if ":EUM:" not in data_sources[data_source]["dataset_id"]:
        continue
    # need to check on auth here
    c = hda.Client()
    query = {
            "dataset_id": data_sources[data_source]["dataset_id"],
            "dtstart"   : dtstart,
            "dtend"     : dtend,
            "timeliness": timeliness,
            "bbox"      : bbox,
            "sat"       : sat
            }
    
    results = c.search(query)

    data_sources[data_source]["urls"] = results.get_download_urls()
    data_sources[data_source]["product_names"] = [results.results[i]["id"] for i in range(len(results))]    
    data_sources[data_source]["graph"] = graphs[data_source]

In [None]:
binner = Operator('Binning')
binner.sourceProductPaths = '$img'
binner.sourceProductFormat = 'Sen3'
binner.region = str(WKT)
binner.timeFilterMethod = None
binner.numRows = 50094
binner.superSampling = 4
binner.maxDistanceOnEarth = -1
binner.maskExpr = True
binner.metadataAggregatorName = "NAME"
binner.sourceProductPaths = '$img1'
binner.sourceProductFormat = 'Sen3'
binner.outputFile = '$output'
binner.outputFormat = "NetCDF4-BEAM"

b1 = BinningVariable(name="Filtered_CHL_NN", expression="CHL_NN", valid_expression="if WQSF_lsb_LAND or WQSF_lsb_CLOUD or WQSF_lsb_CLOUD_AMBIGUOUS or WQSF_lsb_CLOUD_MARGIN or WQSF_lsb_INVALID or WQSF_lsb_COSMETIC or WQSF_lsb_SATURATED or WQSF_lsb_SUSPECT or WQSF_lsb_HISOLZEN or WQSF_lsb_HIGHGLINT or WQSF_lsb_SNOW_ICE or WQSF_lsb_OCNN_FAIL then False else True")
b2 = BinningVariable(name="Filtered_CHL_OC4ME", expression="CHL_OC4ME", valid_expression="if WQSF_lsb_LAND or WQSF_lsb_CLOUD or WQSF_lsb_CLOUD_AMBIGUOUS or WQSF_lsb_CLOUD_MARGIN or WQSF_lsb_INVALID or WQSF_lsb_COSMETIC or WQSF_lsb_SATURATED or WQSF_lsb_SUSPECT or WQSF_lsb_HISOLZEN or WQSF_lsb_HIGHGLINT or WQSF_lsb_SNOW_ICE or WQSF_lsb_AC_FAIL or WQSF_lsb_WHITECAPS or WQSF_lsb_ADJAC or WQSF_msb_RWNEG_O2 or WQSF_msb_RWNEG_O3 or WQSF_msb_RWNEG_O4 or WQSF_msb_RWNEG_O5 or WQSF_msb_RWNEG_O6 or WQSF_msb_RWNEG_O7 or WQSF_msb_RWNEG_O8 or WQSF_lsb_OC4ME_FAIL then False else True")
binner.variableConfigs = BinningVariables([b1, b2])

ag1 = aggregators.AggregatorAvg(varName='Filtered_CHL_NN', targetName="Filtered_CHL_NN", outputCounts='false', outputSums='false', weightCoeff=1.0)
ag2 = aggregators.AggregatorAvg(varName='Filtered_CHL_OC4ME', targetName="Filtered_CHL_OC4ME", outputCounts='false', outputSums='false', weightCoeff=1.0)
binner.aggregatorConfigs = Aggregators([ag1, ag2])

graph = Graph()
graph.add_node(operator=binner,
           node_id='L3binner')

In [None]:
# write it back to the dictionary
graph = Graph.from_uri("/Users/benloveday/Desktop/test3.xml")
data_sources["L2"]["graph"] = graph

In [None]:
# write it back to the dictionary
data_sources["L1B"]["graph"] = graph

In [6]:
# as above for L2
graph = Graph.from_uri(data_sources["L2"]["graph"])
subset = Operator("Subset")
subset.geoRegion = str(WKT)
subset.fullSwath = "false"
subset.copyMetadata = "false"
subset.sourceBands = "CHL_NN,CHL_OC4ME,latitude,longitude,WQSF_lsb,WQSF_msb"

# add it to the graph
graph.add_node(subset, "Subset", "Read")

# write it back to the dictionary
data_sources["L2"]["graph"] = graph

In [7]:
jobs = []
for data_source in data_sources:
    if "WFR" not in data_sources[data_source]["dataset_id"]:
        continue

    for url, product_name in zip(data_sources[data_source]["urls"], data_sources[data_source]["product_names"]):
        inputs = Input(key="img1", url=url)
        config = Config(key="img1", options=ConfigOption(uncompress=True, sub_path="xfdumanifest.xml"))
        process = SnapProcess(snap_graph=data_sources[data_source]["graph"], eo_config=config, eo_input=inputs)
        process.prepare_inputs()
    
        this_job = process.submit()
        jobs.append(this_job)
        process.run(this_job, download=False)
        adf
        # rename and unzip output and clear up
        downloaded_file = os.path.join(download_dir, os.path.basename(this_job.results[0].title))
        output_file = os.path.join(download_dir, product_name.replace(".SEN3", "_eocanvas_processed.SEN3.zip"))
        os.rename(downloaded_file, output_file)

        with zipfile.ZipFile(output_file, 'r') as zip_ref:
            zip_ref.extractall(output_file.replace(".zip", ""))
        os.remove(output_file)

Job: 80a050f4-8ff2-56f5-9fb6-d885387a792a - Status: accepted at 2025-04-03T10:21:34.981742
Job: 80a050f4-8ff2-56f5-9fb6-d885387a792a - Status: running at 2025-04-03T10:21:45.195487
Job: 80a050f4-8ff2-56f5-9fb6-d885387a792a - Status: running at 2025-04-03T10:21:56.418153
Job: 80a050f4-8ff2-56f5-9fb6-d885387a792a - Status: running at 2025-04-03T10:22:08.711774
Job: 80a050f4-8ff2-56f5-9fb6-d885387a792a - Status: running at 2025-04-03T10:22:22.222091
Job: 80a050f4-8ff2-56f5-9fb6-d885387a792a - Status: running at 2025-04-03T10:22:37.110576
Job: 80a050f4-8ff2-56f5-9fb6-d885387a792a - Status: running at 2025-04-03T10:22:53.427620
Job: 80a050f4-8ff2-56f5-9fb6-d885387a792a - Status: running at 2025-04-03T10:23:11.347086
Job: 80a050f4-8ff2-56f5-9fb6-d885387a792a - Status: running at 2025-04-03T10:23:31.039154


NameError: name 'adf' is not defined

In [8]:
this_job.results[0].full_url

'https://gateway.prod.wekeo2.eu/serverless/download/result/9481'

In [10]:
this_job.results

[Result(api=<eocanvas.api.API object at 0x1644b50c0>, href='/download/result/9481', title='wks3:///80a050f4-8ff2-56f5-9fb6-d885387a792a/output/c6aca9e0-1064-11f0-a168-6e395a4d0343.znap.zip', rel='enclosure')]

In [None]:
import requests

response = requests.get(this_job.results[0].full_url, stream=True)

In [9]:
this_job.logs

[LogEntry(timestamp=datetime.datetime(2025, 4, 3, 8, 21, 56, 345054, tzinfo=datetime.timezone.utc), message='time="2025-04-03T08:21:56.344Z" level=info msg="Starting Workflow Executor" version=v3.5.7'),
 LogEntry(timestamp=datetime.datetime(2025, 4, 3, 8, 21, 56, 349547, tzinfo=datetime.timezone.utc), message='time="2025-04-03T08:21:56.349Z" level=info msg="Using executor retry strategy" Duration=1s Factor=1.6 Jitter=0.5 Steps=5'),
 LogEntry(timestamp=datetime.datetime(2025, 4, 3, 8, 21, 56, 349563, tzinfo=datetime.timezone.utc), message='time="2025-04-03T08:21:56.349Z" level=info msg="Executor initialized" deadline="0001-01-01 00:00:00 +0000 UTC" includeScriptOutput=false namespace=ws-serverless podName=workflow-bx75z-stage-in-131920003 templateName=stage-in version="&Version{Version:v3.5.7,BuildDate:2024-05-27T06:18:59Z,GitCommit:503eef1357ebc9facc3f463708031441072ef7c2,GitTag:v3.5.7,GitTreeState:clean,GoVersion:go1.21.10,Compiler:gc,Platform:linux/amd64,}"'),
 LogEntry(timestamp=dat

In [None]:
# load ZARR products

In [None]:
ds_all = xr.open_mfdataset(os.path.join(download_dir, "*WFR____20220704*"), engine="zarr", consolidated=False, combine="nested", concat_dim="time")

In [None]:
ds_all

In [None]:
ds_all.chl[1,:,:].plot()

In [None]:
ds_l2["Filtered_CHL_NN"].plot()

In [None]:
ds_l1b["chl"].plot()