<table align='right'><tr>
<td style="padding:10px"><img src="resources/img/logos/EC_POS.png" style="max-height:50px;width:auto;"/></td>
<td style="padding:10px"><img src="resources/img/logos/ESA_logo_2020_Deep.png" style="max-height:40px;width:auto;"/></td>
<td style="padding:10px"><img src="resources/img/logos/Copernicus_blue.png" style="max-height:60px;width:auto;"/></td>
<td style="padding:10px"><img src="resources/img/logos/AIRBUS_Blue.png" style="max-height:30px;width:auto;"/></td>
<td style="padding:10px"><img src="resources/img/logos/CS-GROUP.png" style="max-height:50px;width:auto;"/></td>
</tr></table>

<a href="./ESA_checkpoint_v0.1_03_stac_catalog.ipynb" target="_blank"><< Part 3: use the STAC catalog</a>

<font color="#138D75">**Copernicus Reference System Python**</font> <br>
**Copyright:** Copyright 2024 ESA <br>
**License:** Apache License, Version 2.0 <br>
**Authors:** Airbus, CS Group

<div class="alert alert-block alert-success">
<h3>Copernicus Reference System Python tutorial for the ESA checkpoint 0.1</h3></div>

<div class="alert alert-block alert-warning">

<h4>Part 4: Prefect workflows</h4>

Prerequisites:
* <a href="./ESA_checkpoint_v0.1_01_initialisation.ipynb" target="_blank">Part 1: initialisation</a>

</div>
<hr>

# Introduction

## Links

* GitHub: https://github.com/RS-PYTHON
* Documentation: https://home.rs-python.eu/rs-documentation/

## Data used

In this notebook, we use simulated Auxip and Cadip data.

## Learning outcomes

At the end of this notebook you will know how to:
* Use the RS-Client Python library.
* Call Prefect flows to run parallel tasks to:
    * Stage multiple Auxip and Cadip files at once.
    * Run a simulated DPR processing on the staged files, and save results in the catalog.

<div class="alert alert-info" role="alert">

## Contents

</div>
    
1. [Check your installation](#Check-your-installation) 
1. [RsClient initialisation](#RsClient-initialisation)
 1. [Prefect workflows](#Prefect-workflows)
     1. [Initialisation](#Workflow:-initialisation)
     1. [Stage Cadip chunk files](#Workflow:-stage-Cadip-chunk-files)
     1. [Stage Auxip files](#Workflow:-stage-Auxip-files)
     1. [DPR simulator](#Workflow:-DPR-simulator)
     1. [Exercises](#Exercises)

<hr>

<div class="alert alert-info" role="alert">

## Check your installation

In this section, we will check that your Jupyter Notebook environment is correctly set.

[Back to top](#Contents)

</div>

### `rs-client-libraries` installation

The `rs-client-libraries` Python library is the preferred way to access the RS-Server services from your environment. It is automatically installed in this notebook.

**Note**: don't worry about these OpenTelemetry messages for now, they will be fixed in a later version:
```
Overriding of current TracerProvider is not allowed
Attempting to instrument while already instrumented
Transient error StatusCode.UNAVAILABLE encountered while exporting metrics to ..., retrying in ...s
Failed to export metrics to ..., error code: StatusCode.UNIMPLEMENTED
```

In [None]:
import rs_client
import rs_common
import rs_workflows

# Set logger level to info
import logging
rs_common.logging.Logging.level = logging.INFO

### Environment

In [None]:
import os

# In local mode, all your services are running locally.
# In hybrid or cluster mode, we use the services deployed on the RS-Server website.
# This configuration is set in an environment variable.
local_mode = (os.getenv("RSPY_LOCAL_MODE") == "1")

# In local mode, the service URLs are hardcoded in the docker-compose file
if local_mode:
    rs_server_href = None # not used
    RSPY_HOST_AUXIP = "http://localhost:8001/docs"
    RSPY_HOST_CADIP = "http://localhost:8002/docs"
    RSPY_HOST_CATALOG = "http://localhost:8003/api.html"
    RSPY_PREFECT_URL = "http://localhost:4200"
    RSPY_DPR_SIMU_URL = "http://dpr-simulator:8000"

# In hybrid or cluster mode, they are set in an environment variables
else:
    rs_server_href = os.environ["RSPY_WEBSITE"]
    RSPY_PREFECT_URL = os.environ['RSPY_PREFECT_URL']
    RSPY_DPR_SIMU_URL = os.environ["RSPY_DPR_SIMU_URL"]

### API key

In [None]:
apikey = os.getenv("RSPY_APIKEY")
if (not local_mode) and (not apikey):
    import getpass
    apikey = getpass.getpass(f"Enter your API key:")
    os.environ["RSPY_APIKEY"] = apikey

<div class="alert alert-info" role="alert">

## RsClient initialisation

Initialise Python RsClient class instances to access the RS-Server services.

[Back to top](#Contents)

</div>

In [None]:
import json
from rs_client.rs_client import RsClient
from rs_common.config import ECadipStation

# Init a generic RS-Client instance. Pass the:
#   - RS-Server website URL
#   - API key. If not set, we try to read it from the RSPY_APIKEY environment variable.
#   - ID of the owner of the STAC catalog collections.
#     By default, this is the user login from the keycloak account, associated to the API key.
#     Or, in local mode, this is the local system username.
#     Else, your API Key must give you the rights to read/write on this catalog owner (see next cell).
#   - Logger (optional, a default one can be used)
generic_client = RsClient(rs_server_href, rs_server_api_key=None, owner_id=None, logger=None)
print(f"STAC catalog owner: {generic_client.owner_id!r}")

# From this generic instance, get an Auxip client instance
auxip_client = generic_client.get_auxip_client()

# Or get a Cadip client instance. Pass the cadip station.
cadip_station = ECadipStation.CADIP # you can also have: INS, MPS, MTI, NSG, SGS
cadip_client = generic_client.get_cadip_client(cadip_station)

# Or get a Stac client to access the catalog
stac_client = generic_client.get_stac_client()

print("\nValidate that our catalog is valid to the STAC format...")
stac_client.validate_all()

print("\nDisplay the Stac catalog as a treeview in notebook:")
display(stac_client)

print("\nOr just display all its contents at once:")
print(json.dumps(stac_client.to_dict(), indent=2))

In [None]:
# In hybrid or cluster mode, show information from the keycloak account, associated to the api key
if not local_mode:

    # = keycloak account user login
    print(f"API key user login: {generic_client.apikey_user_login!r}")

    # Print the IAM (Identity and Access Management) roles
    # For this tutorial, you must have: 
    #   - read/download access for Adgs (=Auxip) = "rs_adgs_<read|download>"
    #   - read/download access to the Cadip station you passed on the above cell = "rs_cadip_<station>_<read|download>"
    #   - (optional) read/write/download access to STAC catalog collections from other owners = "rs_catalog_<owner_id>:<collection|*>_<read|write|download>"
    #     (you always have all access to your own collections with owner_id=apikey_user_login as printed above)
    iam_roles = "\n".join (sorted (generic_client.apikey_iam_roles))
    print(f"\nAPI key IAM roles: \n{iam_roles}")

<div class="alert alert-info" role="alert">

## Prefect workflows

Prefect is a workflow orchestration tool. We will use it to:
* Stage multiple Auxip and Cadip files at once.
* Run a simulated DPR processing on the staged files, and save results in the catalog.

[Back to top](#Contents)
</div>

In [None]:
# Do some initialisation
from datetime import datetime
import json

# We use this bucket name that is deployed on the cluster. 
# RS-Server has read/write access to this bucket, but as an end-user, you won't manipulate it directly.
RSPY_TEMP_BUCKET = os.environ["RSPY_TEMP_BUCKET"]

<div class="alert alert-info" role="alert">

### Workflow: initialisation

[Back to top](#Contents)

</div>

In [None]:
from datetime import timedelta
from rs_common.config import EPlatform

# The workflow will stage all files between a date interval.
# In this example, we will stage all the Cadip chunk files from the first Cadip session.

# Get the first Cadip session ID for this date interval and mission
start_date = datetime(2010, 1, 1, 12, 0, 0)
stop_date = datetime(2024, 1, 1, 12, 0, 0)
platforms = [EPlatform.S1A]
session_id = cadip_client.search_sessions(
    start_date=start_date, stop_date=stop_date, platforms=platforms)[0]["id"]
print(f"First Cadip session ID: {session_id!r}")

# We extract the mission and date from the session ID: <mission>_YYYYmmdd<other_info>
mission, date_and_other = session_id.split("_")
date = datetime.strptime (date_and_other[:8], "%Y%m%d")
start_date = date # start from midnight
stop_date = date + timedelta(days=1) # midnight the day after

print(f"Mission: {mission!r}")
print(f"Date interval: '{start_date} -> {stop_date}'")

# Note: we will miss files if the session overlaps two days. 
# We could also get the time interval from the session information 
# but the simulated data used in this notebook is not relevant.
session_info = cadip_client.search_sessions(session_ids=[session_id])
print(f"Date interval from the session information (not used): "
      f"'{session_info[0]['properties']['start_datetime']} -> {session_info[0]['properties']['end_datetime']}'")

In [None]:
# OPTIONAL: if you re-run this tutorial, you can remove your old collections to start from fresh
# for suffix in ["_aux", "_chunk", "_dpr"]:
#     stac_client.remove_collection (f"{mission}{suffix}")

In [None]:
# As a prerequisite, we must create manually the Auxip and Cadip 
# collections in the catalog, if they don't already exist.
from pystac import Collection, Extent, SpatialExtent, TemporalExtent
from pystac_client.exceptions import APIError
from rs_workflows import staging

for client in [auxip_client, cadip_client]:
    collection_name = staging.create_collection_name(mission, client.station_name)

    # Save the collection name for later
    if client == auxip_client:
        auxip_collection = collection_name
    else:
        cadip_collection = collection_name

    # Try to get collection information
    try:
        stac_client.get_collection(collection_id=collection_name)
        print(f"Collection already exists: {collection_name!r}")

    # If it fails, this means that the collection doesn't exist, so create it
    except APIError:

        print(f"Create collection: {collection_name!r}")
        response = stac_client.add_collection(
            Collection(
                id=collection_name,
                description=None, # rs-client will provide a default description for us
                extent=Extent(
                    spatial=SpatialExtent(bboxes=[-180.0, -90.0, 180.0, 90.0]),
                    temporal=TemporalExtent([start_date, stop_date])
                )
            ))
        response.raise_for_status()
        stac_client.validate_all()

<div class="alert alert-info" role="alert">

### Workflow: stage Cadip chunk files

Here we stage all the Cadip chunk files from the date interval corresponding to the Cadip session calculated above.

[Back to top](#Contents)

</div>

![Workflow: staging](resources/img/v0.1/workflow_staging.drawio.png)

In [None]:
print(f"\nView Prefect flow runs from: {RSPY_PREFECT_URL}\n")

In [None]:
# Number of tasks to be run in parallel
MAX_WORKERS = 5

# Staging workflow configuration
config = staging.PrefectFlowConfig(
    cadip_client, 
    mission, 
    s3_path = f"s3://{RSPY_TEMP_BUCKET}/{cadip_client.owner_id}/{cadip_client.station_name}",
    tmp_download_path=None, # no local download
    max_workers=MAX_WORKERS,
    start_datetime=start_date,
    stop_datetime=stop_date,
    limit=None) # no limit on the number of files

# Start the prefect flow
staging.staging_flow(config)

In [None]:
# Validate that our catalog is valid to the STAC format.
stac_client.validate_all()

In [None]:
# Find the staged Cadip files in the STAC catalog

# For searching, we need to prefix our collection name by <owner_id>_
owner_collection = f"{stac_client.owner_id}_{cadip_collection}"

# Use a cql2 filter to search by session ID
filter_on_session = {
    "op": "and",
    "args": [
      {"op": "=", "args": [{"property": "collection"}, owner_collection]},
      {"op": "=", "args": [{"property": "cadip:session_id"}, session_id]}
    ]
}

search = stac_client.search(filter=filter_on_session)
results = list(search.items_as_dicts())
assert len(results) > 0, f"At least one Cadip files should be staged for session ID: {session_id!r}"
print (f"\n{len(results)} Cadip files are staged for session ID: {session_id!r}.")
print (f"First one:\n{json.dumps (results[0], indent=2)}")

<div class="alert alert-info" role="alert">

### Workflow: stage Auxip files

We need to pass Auxip files to the DPR processing. They must be staged into the catalog.

As, for now, the DPR processing is only a simulation, we can pass any Auxip files.

[Back to top](#Contents)

</div>

In [None]:
# Let's use the 3 most recent files between today and any old date.
# NOTE: the "created" field corresponds to the publication date.
files = auxip_client.search_stations(
    datetime(year=1970, month=1, day=1), 
    datetime.today(), 
    sortby="-created",
    limit=None) # NOTE: for now "limit" is not working well with "sortby" so don't use it

# Only keep the first 3 files
files = files[:3]

# Save the IDs = the filenames
auxip_files = [f["id"] for f in files]
print_ids = "\n".join(auxip_files)
print(f"Auxip files: \n{print_ids}")

# Save the min and max dates for these 3 files
dates = [datetime.strptime (f["properties"]["created"], "%Y-%m-%dT%H:%M:%S.%fZ") for f in files]
start_date = min(dates) - timedelta(seconds=1) # remove 1 second because the interval is exclusive
stop_date = max(dates) + timedelta(seconds=1) # add 1 second
print(f"\nDate interval: '{start_date} -> {stop_date}'")

In [None]:
# Staging workflow configuration
config = staging.PrefectFlowConfig(
    auxip_client, 
    mission, 
    s3_path = f"s3://{RSPY_TEMP_BUCKET}/{auxip_client.owner_id}/{auxip_client.station_name}",
    tmp_download_path=None, # no local download
    max_workers=MAX_WORKERS,
    start_datetime=start_date,
    stop_datetime=stop_date,
    limit=None) # no limit on the number of files

# Start the prefect flow
staging.staging_flow(config)

In [None]:
# Validate that our catalog is valid to the STAC format.
stac_client.validate_all()

In [None]:
# These 3 items should be in the STAC catalog, in the Auxip collection

# Be sure that we don't have any duplicate filenames
auxip_files = list(set(auxip_files))

# Search by ID and collection
owner_collection = f"{stac_client.owner_id}_{auxip_collection}"
search = stac_client.search(ids=auxip_files, collections=[owner_collection])
results = list(search.items_as_dicts())
assert len(results) == len(auxip_files), f"{len(results)} Auxip files were staged, we expected {len(auxip_files)}"
print(f"Staged Auxip files:\n" + "\n".join(auxip_files))
print (f"\nFirst one:\n{json.dumps (results[0], indent=2)}")

<div class="alert alert-info" role="alert">

### Workflow: DPR simulator

For now, this simulated DPR processor takes any input and writes any output.

We use it to simulate a L0 processing that takes staged Cadip chunk files and Auxip files as input, and writes raw L0 products as output.

[Back to top](#Contents)

</div>

![Workflow: dpr](resources/img/v0.1/workflow_dpr.drawio.png)

In [None]:
from rs_workflows import s1_l0

# The product types to process can be any of these 4 values
product_types = ["S1SEWRAW", "S1SIWRAW", "S1SSMRAW", "S1SWVRAW"]

# DPR workflow configuration
config = s1_l0.PrefectS1L0FlowConfig(
    stac_client,
    RSPY_DPR_SIMU_URL,
    mission,
    session_id,
    product_types,
    auxip_files,
    s3_path = f"s3://{RSPY_TEMP_BUCKET}/{stac_client.owner_id}/DPR_S1L0",
    temp_s3_path = f"s3://{RSPY_TEMP_BUCKET}/{stac_client.owner_id}/DPR_S1L0",
)

# Start the prefect flow
s1_l0.s1_l0_flow(config)

In [None]:
# Validate that our catalog is valid to the STAC format.
stac_client.validate_all()

In [None]:
# Check output products in the STAC catalog

# The DPR collection name is hardcoded in the workflow source code
dpr_collection = f"{mission}_dpr"
dpr_products = list(stac_client.get_collection(dpr_collection).get_items())

assert len(dpr_products) > 0, f"At least one DPR product should be saved in the catalog."
print_ids = "\n".join([product.id for product in dpr_products])
print (f"\n{len(dpr_products)} DPR products are saved in the catalog:\n{print_ids}")
print (f"\nFirst one:\n{json.dumps (dpr_products[0].to_dict(), indent=2)}")

<div class="alert alert-danger" role="alert">

### Exercises

</div>

Run again the previous cells but this time: 
* Use Cadip chunk files from a different Cadip session.
* Use any other Auxip files.
* Check that these Cadip and Auxip files are staged in the STAC catalog.
* Check that the resulting simulated L0 products are staged in the STAC catalog.

[Back to top](#Contents)

<a href="./ESA_checkpoint_v0.1_03_stac_catalog.ipynb" target="_blank"><< Part 3: use the STAC catalog</a>

<hr>
<a href="https://github.com/RS-PYTHON" target="_blank">View on GitHub</a>