Install the needed libraries

In [1]:
!pip install boto3
!pip install numpy
!pip install prefect
!pip install prefect_dask



In [2]:
# We'll use boto3 to monitor the s3 bucket. 
# Note: the S3_ACCESSKEY, S3_SECRETKEY and S3_ENDPOINT are given in the docker-compose.yml file.
import boto3
import os

s3_session = boto3.session.Session()
s3_client = s3_session.client(
    service_name="s3",
    aws_access_key_id=os.environ["S3_ACCESSKEY"],
    aws_secret_access_key=os.environ["S3_SECRETKEY"],
    endpoint_url=os.environ["S3_ENDPOINT"],
    region_name=os.environ["S3_REGION"],
)
bucket_name = "tmp-download"
bucket_dir = "stations"
bucket_url = f"s3://{bucket_name}/{bucket_dir}"

# If bucket is already created, clear all files in order to start fresh for each demo. 
if bucket_name in [bucket["Name"] for bucket in s3_client.list_buckets()["Buckets"]]:
    if 'Contents' in s3_client.list_objects(Bucket=bucket_name):
        objects = s3_client.list_objects(Bucket=bucket_name)['Contents']
        for obj in objects:
            # clear up the bucket
            s3_client.delete_object(Bucket=bucket_name, Key=obj['Key'])
else:
    s3_client.create_bucket(Bucket=bucket_name)

print("Is bucket empty now ?: ", 'Contents' not in s3_client.list_objects(Bucket=bucket_name))

Is bucket empty now ?:  True


A bucket "tmp-download" is created for the purpose of this demo. Thus, the cadip and adgs prefect flows will be asking for the rs-server endpoints to download the files from CADIP and ADGS stations and to upload them to s3://auxiliary-files/stations/<station_name>
After a succesfull upload to s3 bucket, the stac catalog is updated with the info related to that file. 

In [3]:

import threading
from datetime import datetime
from rs_workflows.common import (
    PrefectFlowConfig,
    download_flow,
)

def run_flow(user, url, station, mission, tmp_local_download, bucket_url, no_of_tasks, start_date, stop_date):
    # start the prefect flow
    download_flow(PrefectFlowConfig(user,
                               url,
                               station,
                               mission,
                               tmp_local_download.format(station),
                               bucket_url + f"/{station}",
                               no_of_tasks,
                               datetime.strptime(start_date, "%Y-%m-%dT%H:%M:%SZ"),
                               datetime.strptime(stop_date, "%Y-%m-%dT%H:%M:%SZ"),                                   
            )
)

user = "DemoUser"
mission = "s1"
stations = ["CADIP", "ADGS"]
url = "http://rs-server-{}:8000"
tmp_local_download = "/tmp/{}_tmp"
no_of_tasks = 1
threads = []

for station in stations:
    run_flow(user,
             url.format(station.lower()),
             station,
             mission,
             tmp_local_download.format(station),
             bucket_url + f"/{station}",
             no_of_tasks,
             "2014-01-01T12:00:00Z",
             "2024-02-20T12:00:00Z",
             )
    

15:13:42.622 | ERROR   | MainThread   | prefect._internal.concurrency - Detected unsafe call to `from_sync` from thread with event loop. Use `await greenback.ensure_portal()` to allow call to run without blocking the event loop.


response.link = http://rs-server-cadip:8000/cadip/CADIP/cadu/search?datetime=2014-01-01T12%3A00%3A00Z%2F2024-02-20T12%3A00%3A00Z


15:13:43.978 | ERROR   | MainThread   | prefect._internal.concurrency - Detected unsafe call to `from_sync` from thread with event loop. Use `await greenback.ensure_portal()` to allow call to run without blocking the event loop.


15:13:45.692 | DEBUG   | Task run 'ingest_files-0' - Task 0: Files to be downloaded:


{   'assets': {'file': {'file:size': 36}},
    'geometry': None,
    'id': 'DCS_04_S1A_20231121072204051312_ch1_DSDB_00001.raw',
    'links': [],
    'properties': {   'cadip:block_number': 1,
                      'cadip:channel': 1,
                      'cadip:final_block': False,
                      'cadip:id': '2b17b57d-fff4-4645-b539-91f305c27c69',
                      'cadip:retransfer': False,
                      'cadip:session_id': 'some_session_id',
                      'datetime': '2023-11-26T17:01:39.528Z',
                      'eviction_datetime': '2023-12-03T17:01:00Z'},
    'stac_extensions': [   'https://stac-extensions.github.io/file/v2.1.0/schema.json'],
    'stac_version': '1.0.0',
    'type': 'Feature'}
{   'assets': {'file': {'file:size': 36}},
    'geometry': None,
    'id': 'DCS_04_S1A_20231121072204051312_ch1_DSDB_00002.raw',
    'links': [],
    'properties': {   'cadip:block_number': 1,
                      'cadip:channel': 1,
                      'ca

15:13:45.915 | [36mINFO[0m    | Task run 'ingest_files-0' - Task 0: The download progress for file DCS_04_S1A_20231121072204051312_ch1_DSDB_00001.raw is IN_PROGRESS
15:13:46.935 | [36mINFO[0m    | Task run 'ingest_files-0' - Task 0: File DCS_04_S1A_20231121072204051312_ch1_DSDB_00001.raw has been properly downloaded...

15:13:47.107 | [36mINFO[0m    | Task run 'ingest_files-0' - Task 0: The download progress for file DCS_04_S1A_20231121072204051312_ch1_DSDB_00002.raw is IN_PROGRESS
15:13:48.123 | [36mINFO[0m    | Task run 'ingest_files-0' - Task 0: File DCS_04_S1A_20231121072204051312_ch1_DSDB_00002.raw has been properly downloaded...

15:13:48.285 | [36mINFO[0m    | Task run 'ingest_files-0' - Task 0: The download progress for file DCS_04_S1A_20231121072204051312_ch1_DSDB_00003.raw is IN_PROGRESS
15:13:49.305 | [36mINFO[0m    | Task run 'ingest_files-0' - Task 0: File DCS_04_S1A_20231121072204051312_ch1_DSDB_00003.raw has been properly downloaded...

15:13:49.482 | [36mIN

Endpoint to be used to insert the item info  within the catalog: http://rs-server-cadip:8000/catalog/DemoUser/collections/s1/items/{'stac_version': '1.0.0', 'stac_extensions': ['https://stac-extensions.github.io/file/v2.1.0/schema.json'], 'type': 'Feature', 'id': 'DCS_04_S1A_20231121072204051312_ch1_DSDB_00001.raw', 'geometry': None, 'properties': {'datetime': '2023-11-26T17:01:39.528Z', 'eviction_datetime': '2023-12-03T17:01:00Z', 'cadip:id': '2b17b57d-fff4-4645-b539-91f305c27c69', 'cadip:retransfer': False, 'cadip:final_block': False, 'cadip:block_number': 1, 'cadip:channel': 1, 'cadip:session_id': 'some_session_id'}, 'links': [], 'assets': {'file': {'file:size': 36}}}
Endpoint to be used to insert the item info  within the catalog: http://rs-server-cadip:8000/catalog/DemoUser/collections/s1/items/{'stac_version': '1.0.0', 'stac_extensions': ['https://stac-extensions.github.io/file/v2.1.0/schema.json'], 'type': 'Feature', 'id': 'DCS_04_S1A_20231121072204051312_ch1_DSDB_00002.raw', 'g

eometry': None, 'properties': {'datetime': '2023-11-26T17:01:39.528Z', 'eviction_datetime': '2023-12-03T17:01:00Z', 'cadip:id': '2b17b57d-fff4-4645-b539-91f305c27c60', 'cadip:retransfer': False, 'cadip:final_block': False, 'cadip:block_number': 1, 'cadip:channel': 1, 'cadip:session_id': 'some_session_id'}, 'links': [], 'assets': {'file': {'file:size': 36}}}
Endpoint to be used to insert the item info  within the catalog: http://rs-server-cadip:8000/catalog/DemoUser/collections/s1/items/{'stac_version': '1.0.0', 'stac_extensions': ['https://stac-extensions.github.io/file/v2.1.0/schema.json'], 'type': 'Feature', 'id': 'DCS_04_S1A_20231121072204051312_ch1_DSDB_00003.raw', 'geometry': None, 'properties': {'datetime': '2023-11-26T17:01:39.528Z', 'eviction_datetime': '2023-12-03T17:01:00Z', 'cadip:id': '2b17b57d-fff4-4645-b539-91f305c27c61', 'cadip:retransfer': False, 'cadip:final_block': False, 'cadip:block_number': 1, 'cadip:channel': 1, 'cadip:session_id': 'some_session_id'}, 'links': [],

15:13:58.165 | ERROR   | MainThread   | prefect._internal.concurrency - Detected unsafe call to `from_sync` from thread with event loop. Use `await greenback.ensure_portal()` to allow call to run without blocking the event loop.


response.link = http://rs-server-adgs:8000/adgs/aux/search?datetime=2014-01-01T12%3A00%3A00Z%2F2024-02-20T12%3A00%3A00Z


15:13:59.472 | ERROR   | MainThread   | prefect._internal.concurrency - Detected unsafe call to `from_sync` from thread with event loop. Use `await greenback.ensure_portal()` to allow call to run without blocking the event loop.


15:14:01.114 | DEBUG   | Task run 'ingest_files-0' - Task 0: Files to be downloaded:


{   'assets': {'file': {'file:size': 8326253}},
    'geometry': None,
    'id': 'S2__OPER_AUX_ECMWFD_PDMC_20190216T120000_V20190217T090000_20190217T210000.TGZ',
    'links': [],
    'properties': {   'adgs:id': '2b17b57d-fff4-4645-b539-91f305c27c69',
                      'datetime': '2019-02-16T12:00:00.000Z',
                      'end_datetime': '2019-02-17T21:00:00.000Z',
                      'start_datetime': '2019-02-17T09:00:00.000Z'},
    'stac_extensions': [   'https://stac-extensions.github.io/file/v2.1.0/schema.json'],
    'stac_version': '1.0.0',
    'type': 'Feature'}
{   'assets': {'file': {'file:size': 8326253}},
    'geometry': None,
    'id': 'S2__OPER_AUX_ECMWFD_PDMC_20200216T120000_V20190217T090000_20190217T210000.TGZ',
    'links': [],
    'properties': {   'adgs:id': 'id2',
                      'datetime': '2020-02-16T12:00:00.000Z',
                      'end_datetime': '2020-02-17T21:00:00.000Z',
                      'start_datetime': '2020-02-17T09:00:00.000Z

15:14:01.335 | [36mINFO[0m    | Task run 'ingest_files-0' - Task 0: The download progress for file S2__OPER_AUX_ECMWFD_PDMC_20190216T120000_V20190217T090000_20190217T210000.TGZ is IN_PROGRESS
15:14:02.356 | [36mINFO[0m    | Task run 'ingest_files-0' - Task 0: File S2__OPER_AUX_ECMWFD_PDMC_20190216T120000_V20190217T090000_20190217T210000.TGZ has been properly downloaded...

15:14:02.520 | [36mINFO[0m    | Task run 'ingest_files-0' - Task 0: The download progress for file S2__OPER_AUX_ECMWFD_PDMC_20200216T120000_V20190217T090000_20190217T210000.TGZ is IN_PROGRESS
15:14:03.541 | [36mINFO[0m    | Task run 'ingest_files-0' - Task 0: File S2__OPER_AUX_ECMWFD_PDMC_20200216T120000_V20190217T090000_20190217T210000.TGZ has been properly downloaded...

15:14:03.707 | [36mINFO[0m    | Task run 'ingest_files-0' - Task 0: The download progress for file S2__OPER_AUX_ECMWFD_PDMC_20230216T120000_V20190217T090000_20190217T210000.TGZ is IN_PROGRESS
15:14:04.728 | [36mINFO[0m    | Task run 'in

Endpoint to be used to insert the item info  within the catalog: http://rs-server-adgs:8000/catalog/DemoUser/collections/s1/items/{'stac_version': '1.0.0', 'stac_extensions': ['https://stac-extensions.github.io/file/v2.1.0/schema.json'], 'type': 'Feature', 'id': 'S2__OPER_AUX_ECMWFD_PDMC_20190216T120000_V20190217T090000_20190217T210000.TGZ', 'geometry': None, 'properties': {'adgs:id': '2b17b57d-fff4-4645-b539-91f305c27c69', 'datetime': '2019-02-16T12:00:00.000Z', 'start_datetime': '2019-02-17T09:00:00.000Z', 'end_datetime': '2019-02-17T21:00:00.000Z'}, 'links': [], 'assets': {'file': {'file:size': 8326253}}}
Endpoint to be used to insert the item info  within the catalog: http://rs-server-adgs:8000/catalog/DemoUser/collections/s1/items/{'stac_version': '1.0.0', 'stac_extensions': ['https://stac-extensions.github.io/file/v2.1.0/schema.json'], 'type': 'Feature', 'id': 'S2__OPER_AUX_ECMWFD_PDMC_20200216T120000_V20190217T090000_20190217T210000.TGZ', 'geometry': None, 'properties': {'adgs:i

RSPY DPR Processor mockup demo:


The DPRProcessor is a class that simulates processing part performed by eopf-cpm triggering module. The input of the processor it's a yaml config file with all the input files and expected outputs locations (local or s3).

The implemented mockup performs the following actions:
1. Check the validity of input yaml file (chunks/aux existance / naming convention)
2. Downloads the zarr input from public s3 ovh based on product type required in payload yaml.
3. Updates the .zattrs with our processor name (RSPY_DprMockupProcessor) and timestamp (if product is zipped, our processor updates zattrs inside .zip without extracting files)
4. Computes the CRC of updated .zattrs
5. Update product name VVV (as per EOPF-CPM PSD) with computed CRC, in order to call processor multiple times with same input and generated different outputs.
6. Uploads the products to s3 server (minio for this demo).
7. Removes the local downloaded products (if a flag is set).
8. Retrieves the .zattrs into a serialisable format (dict) in order to upload catalog in the future step of our processing chain.

In [None]:
yaml_payload = """
general_configuration:
  logging:
    level: DEBUG
  triggering__validate_run: true
  triggering__use_default_filename: true
  triggering__use_basic_logging: true
  triggering__load_default_logging: false
breakpoints:
workflow:
- step: 1
  active: true
  module: rs.dpr.mockup # string corresponding to the python path of the module
  processing_unit: DprMockupProcessor # EOProcessingUnit class name
  name: DprMockupProcessor # identifier for the processing unit
  inputs:
    in1: CADU1
    in2: CADU2 # One CADU{N} entry by CADU chunk we want to pass as input. In this example we consider 2 chunks
    in3: AUX1
    in4: AUX2 # One AUX{N} entry by ADGS file we want to pass as input. In this example we consider 2 aux files
  outputs:
    out: outputs
  parameters:
    product_types: # List of EOPF product types we want to generate. In this example we simulate S1L0 processor that generates 4 products
      - S1SSMOCN
I/O:
  inputs_products:
  - id: CADU1
    path: chunk/S1/S1A_20231121072204051312/DCS_04_S1A_20231121072204051312_ch1_DSDB_00023.raw
    store_type: zarr
    store_params: {}
  - id: CADU2
    path: chunk/S1/S1A_20231121072204051312/DCS_04_S1A_20231121072204051312_ch1_DSDB_00022.raw
    store_type: zarr
    store_params: {}
  - id: AUX1
    path: AUX/S1/S1A_OPER_AMV_ERRMAT_MPC__20201124T040009_V20000101T000000_20201123T131345.EOF.zip
    store_type: zarr
    store_params: {}
  - id: AUX2
    path: AUX/S1/S1A_AUX_PP2_V20190228T092500_G20220228T120000.SAFE.zip
    store_type: zarr
    store_params: {}
  output_products:
  - id: outputs
    path: s3://test-data/zarr/dpr_processor_output/ # output folder or S3 bucket
    type: folder
    store_type: zarr
    store_params: {}
dask_context: {}
logging: {}
config: {}
"""

---
**NOTE**

You can also monitor the s3 bucket using the minio console: http://127.0.0.1:9001/browser with:

  * Username: _minio_
  * Password: _Strong#Pass#1234_

---

In [None]:
# We'll use boto3 to monitor the s3 bucket.
# Note: the S3_ACCESSKEY, S3_SECRETKEY and S3_ENDPOINT are given in the docker-compose.yml file.
import boto3
import os

s3_session = boto3.session.Session()
s3_client = s3_session.client(
    service_name="s3",
    aws_access_key_id=os.environ["S3_ACCESSKEY"],
    aws_secret_access_key=os.environ["S3_SECRETKEY"],
    endpoint_url=os.environ["S3_ENDPOINT"],
    region_name=os.environ["S3_REGION"],
)
bucket_name = "test-data"
bucket_dir = "zarr/dpr_processor_output"
bucket_url = f"s3://{bucket_name}/{bucket_dir}"

# If bucket is already created, clear all files in order to start fresh for each demo. 
if bucket_name in [bucket["Name"] for bucket in s3_client.list_buckets()["Buckets"]]:
    if 'Contents' in s3_client.list_objects(Bucket=bucket_name):
        objects = s3_client.list_objects(Bucket=bucket_name)['Contents']
        for obj in objects:
            # clear up the bucket
            s3_client.delete_object(Bucket=bucket_name, Key=obj['Key'])
else:
    s3_client.create_bucket(Bucket=bucket_name)

print("Is bucket empty now ?: ", 'Contents' not in s3_client.list_objects(Bucket=bucket_name))

Convert yaml to json in order to post it over HTTP and call the simulator webserver endpoint.
The output of run() method is a list of all stac-comptabile .zattrs.

In [None]:
import requests
import json
import yaml
import pprint

yaml_data = yaml.safe_load(yaml_payload)
json_data = json.dumps(yaml_data)

dpr_simulator_endpoint = "http://dpr-simulator:8000/run" # rs-server host = the container name
response = requests.post(dpr_simulator_endpoint, json=yaml_data)

pp = pprint.PrettyPrinter(indent=4)
for attr in response.json():
    pp.pprint(attr)

In [None]:
s3_client.list_objects(Bucket=bucket_name)['Contents']