![](https://github.com/destination-earth/DestinE-DataLake-Lab/blob/main/img/DestinE-banner.jpg?raw=true)

**Author**: EUMETSAT <br>
**Copyright**: 2024 EUMETSAT <br>
**Licence**: MIT <br>

# DEDL - Hook Tutorial - Data Harvest (data-harvest)

This notebook demonstrates how to use the Hook service.

Author: EUMETSAT

The detailed API and definition of each endpoint and parameters is available in the **OnDemand Processing API OData v1**  OpenAPI documentation found at:
https://odp.data.destination-earth.eu/odata/docs

Further documentation is available at:
    https://destine-data-lake-docs.data.destination-earth.eu/en/latest/dedl-big-data-processing-services/Hook-service/Hook-service.html

## Install python package requirements and import environment variables 

In [None]:
# Note: The destinelab python package (which helps with authentication) is available already if you are using Python DEDL kernel
# Otherwise, the destinelab python package can be installed by uncommenting the following line
#%pip install destinelab
%pip install dedllogin
# For the importing of environment variables using the load_dotenv(...) command 
%pip install python-dotenv
# for example code navigating private S3 compatible storage (PRIVATE bucket storage)
%pip install boto3


In [None]:
import os
import json
import requests
from dotenv import load_dotenv
from getpass import getpass
import destinelab as destinelab
import dedllogin as dedllogin

# load (optional) environment variables from root .env file.
load_dotenv(override=True)

# Load (optional) notebook specific environment variables from .env_tutorial_data_harvest
load_dotenv("./.env_tutorial", override=True)

## Authentification - Get token

In [None]:
# By default users should use their DESP credentials
is_desp_auth = True
print(f"is_desp_auth:{is_desp_auth}")

if is_desp_auth:

    # Enter DESP credentials.
    DESP_USERNAME = os.getenv("HOOK_DESP_USERNAME") or input("Please input your DESP username or email: ")
    DESP_PASSWORD = os.getenv("HOOK_DESP_PASSWORD") or getpass("Please input your DESP password: ")
    token = destinelab.AuthHandler(DESP_USERNAME, DESP_PASSWORD)

else:

    # Enter DEDL credentials.
    DEDL_USERNAME = os.getenv("HOOK_DEDL_USERNAME") or input("Please input your DEDL username or email: ")
    DEDL_PASSWORD = os.getenv("HOOK_DEDL_PASSWORD") or getpass("Please input your DEDL password: ")
    token = dedllogin.DEDL_auth(DEDL_USERNAME, DEDL_PASSWORD)

access_token = token.get_token()

# Check the status of the request
if access_token is not None:
    print("DEDL/DESP Access Token Obtained Successfully")
    # Save API headers
    api_headers = {"Authorization": "Bearer " + access_token}
else:
    print("Failed to Obtain DEDL/DESP Access Token")

## Setup static variables

In [4]:
# Hook service url (ending with odata/v1/ - e.g. https://odp.data.destination-earth.eu/odata/v1/)
hook_service_root_url = "https://odp.data.destination-earth.eu/odata/v1/"

## List available workflows

Next we can check what possible workflows are available to us by using method   
```https://odp.data.destination-earth.eu/odata/v1/Workflows```

In [None]:
# Send request and return json object listing all provided workfows, ordered by Id
result = requests.get(
    f"{hook_service_root_url}Workflows?$orderby=Id asc", headers=api_headers   
).json()

print("List of available DEDL provided Hooks")
for i in range(len(result["value"])):
    print(
        f"Name:{str(result['value'][i]['Name']).ljust(20, ' ')}DisplayName:{str(result['value'][i]['DisplayName'])}"
    )  # print JSON string

In [None]:
# Print result JSON object: containing provided workflow list
workflow_details = json.dumps(result, indent=4)
print(workflow_details)

## Select a workflow and see parameters

If we want to see the details of a specific workflow, showing us the parameters that can be set for that workflow, we can add a filter to the query as follows:

```https://odp.data.destination-earth.eu/odata/v1/Workflows?$expand=WorkflowOptions&$filter=(Name eq data-harvest)```   

**\\$expand=WorkflowOptions** shows all parameters accepted by workflow   
**\\$filter=(Name eq data-harvest)** narrows the result to workflow called "data-harvest"

In [None]:
# Select workflow : defaults to data-harvest
workflow = os.getenv("HOOK_WORKFLOW", "data-harvest")
print(f"workflow: {workflow}")

# Send request
result = requests.get(
    f"{hook_service_root_url}Workflows?$expand=WorkflowOptions&$filter=(Name eq '{workflow}')",
    headers=api_headers,
).json()
workflow_details = json.dumps(result, indent=4)
print(workflow_details)  # print formatted workflow_details, a JSON string

## Order selected workflow

The order selected above will now be configured and executed.

e.g. workflow = "data-harvest". 

- Make an order to 'harvest data' using Harmonised Data Access API. 
- i.e. data from an input source can be transferred to a Private bucket or a Temporary storage bucket.

### Name your order

In [None]:
# Here we set the variable order_name, this will allow us to:
# Easily identify the running process (e.g. when checking the status)
# order_name is added as a suffix to the order 'Name'
order_name = os.getenv("HOOK_ORDER_NAME") or input("Name your order: ")
print(f"order_name:{order_name}")

### Define output storage

In workflow parameters, among others values, storage to retreive the result has to be provided.  
**Two possibilites:**
1. Use your user storage 
2. Use a temporary storage 

#### 1. - Your user storage (provided by DEDL ISLET service)

Example using a S3 bucket created with ISLET Storage service  - result will be available in this bucket
> *workflow parameter: {"Name": "output_storage", "Value": "PRIVATE"}*

In [None]:
# Output storage - Islet service

# Note: If you want the output to go to your own PRIVATE bucket rather than TEMPORARY storage (expires after 2 weeks),
#       i) This Configuration will need to be updated with your output_bucket, output_storage_access_key, output_secret_key, output_prefix
#       ii) You will need to change the output_storage in the order to PRIVATE and add the necessary source_ parameters (see workflow options and commented example)

# URL of the S3 endpoint in the Central Site (or lumi etc.)
output_storage_url = "https://s3.central.data.destination-earth.eu"
# output_storage_url = "https://s3.lumi.data.destination-earth.eu"

# Name of the object storage bucket where the results will be stored.
output_bucket = os.getenv("HOOK_OUTPUT_BUCKET", "your-bucket-name")
print(f"output_bucket            : {output_bucket}")

# Islet object storage credentials (openstack ec2 credentials)
output_storage_access_key = os.getenv("HOOK_OUTPUT_STORAGE_ACCESS_KEY", "your-access-key")
output_storage_secret_key = os.getenv("HOOK_OUTPUT_STORAGE_SECRET_KEY", "your-secret-key")
print(f"output_storage_access_key: {output_storage_access_key}")
print(f"output_storage_secret_key: {output_storage_secret_key}")


# This is the name of the folder in your output_bucket where the output of the hook will be stored.
# Here we concatenate 'dedl' with the 'workflow' and 'order_name'
output_prefix = f"dedl-{workflow}-" + order_name
print(f"output_prefix            : {output_prefix}")

#### 2 - Use temporary storage

The result of processing will be stored in shared storage and download link provided in the output product details
> *workflow parameter: {"Name": "output_storage", "Value": "TEMPORARY"}*

### Define parameters and send order

In [10]:
# URL of the STAC server where your collection/item can be downloaded
STAC_HDA_API_URL = "https://hda.data.destination-earth.eu/stac"

In [None]:
# Note: The data (collection_id and data_id) will have been previously discovered and searched for

# Set collection where the item can be found : defaults to example for data-harvest
collection_id = os.getenv("HOOK_COLLECTION_ID", "EO.ESA.DAT.SENTINEL-2.MSI.L1C")
print(f"STAC collection url: {STAC_HDA_API_URL}/collections/{collection_id}")

# Set the Item to Retrieve : defaults to example for data-harvest
data_id = os.getenv("HOOK_DATA_ID", "S2A_MSIL1C_20230910T050701_N0509_R019_T47VLH_20230910T074321.SAFE")
print(f"data_id: {data_id}")


########## BUILD ORDER BODY : CHOOSE PRIVATE or TEMPORARY output_storage ##########

# Get boolean value from String, default (False)
is_private_storage = os.getenv("HOOK_IS_PRIVATE_STORAGE", "False") == "True"
print(f"is_private_storage: {is_private_storage}")
print(f"is_desp_auth: {is_desp_auth}")

# PRIVATE STORAGE

if is_private_storage:

    if is_desp_auth:

        # Build your order body : Using DESP source type for simplified configuration AND and tranferring the output to your private bucket storage
        # Using DESP credentials is standard way of executing Hooks.
        print("##### Preparing Order Body for PRIVATE STORAGE with DESP credentials #####")
        order_body_custom_bucket = {
            "Name": "Tutorial " + workflow + " - " + order_name,
            "WorkflowName": workflow,
            "IdentifierList": [data_id],
            "WorkflowOptions": [
                {"Name": "output_storage", "Value": "PRIVATE"},
                {"Name": "output_s3_access_key", "Value": output_storage_access_key},
                {"Name": "output_s3_secret_key", "Value": output_storage_secret_key},
                {"Name": "output_s3_path", "Value": f"s3://{output_bucket}/{output_prefix}"},
                {"Name": "output_s3_endpoint_url", "Value": output_storage_url},
                {"Name": "source_type", "Value": "DESP"},
                {"Name": "desp_source_username", "Value": DESP_USERNAME},
                {"Name": "desp_source_password", "Value": DESP_PASSWORD},
                {"Name": "desp_source_collection", "Value": collection_id},
            ],
        }

    else:


        # Build your order body : Example using EXTERNAL source type and source_catalogue_api_type STAC. 
        # This would allow you to access products directly from a configured STAC server
        # Here we show the configuration that would work on the DEDL HDA STAC server (DEDL credentials required)
        # This is shown for example purposes only. The standard way of configuring is with DESP credentials shown above.
        print("##### Preparing Order Body for PRIVATE STORAGE with DEDL credentials #####")
        order_body_custom_bucket = {
            "Name": "Tutorial " + workflow + " - " + order_name,
            "WorkflowName": workflow,
            "IdentifierList": [data_id],
            "WorkflowOptions": [
                {"Name": "output_storage", "Value": "PRIVATE"},
                {"Name": "output_s3_access_key", "Value": output_storage_access_key},
                {"Name": "output_s3_secret_key", "Value": output_storage_secret_key},
                {"Name": "output_s3_path", "Value": f"s3://{output_bucket}/{output_prefix}"},
                {"Name": "output_s3_endpoint_url", "Value": output_storage_url},
                {"Name": "source_type", "Value": "EXTERNAL"},
                {"Name": "source_catalogue_api_url", "Value": "https://hda.data.destination-earth.eu/stac"},
                {"Name": "source_catalogue_api_type", "Value": "STAC"},
                {"Name": "source_token_url", "Value": "https://identity.data.destination-earth.eu/auth/realms/dedl/protocol/openid-connect/token"},
                {"Name": "source_grant_type", "Value": "PASSWORD"},
                {"Name": "source_auth_header_name", "Value": "Authorization"},
                {"Name": "source_username", "Value": DEDL_USERNAME},
                {"Name": "source_password", "Value": DEDL_PASSWORD},
                {"Name": "source_client_id", "Value": "hda-public"},
                {"Name": "source_client_secret", "Value": ""},
                {"Name": "source_catalogue_collection", "Value": collection_id},
            ],
        }


else:

    # TEMPORARY STORAGE

    if is_desp_auth:

        # Build your order body : Using DESP source type for simplified configuration AND making the output available in temporary S3 bucket storage
        # Using DESP credentials is standard way of executing Hooks.
        print("##### Preparing Order Body for TEMPORARY storage with DESP credentials #####")
        order_body_custom_bucket = {
            "Name": "Tutorial " + workflow + " - " + order_name,
            "WorkflowName": workflow,
            "IdentifierList": [data_id],
            "WorkflowOptions": [
                {"Name": "output_storage", "Value": "TEMPORARY"},
                {"Name": "source_type", "Value": "DESP"},
                {"Name": "desp_source_username", "Value": DESP_USERNAME},
                {"Name": "desp_source_password", "Value": DESP_PASSWORD},
                {"Name": "desp_source_collection", "Value": collection_id},
            ],
        }

    else:

        # Build your order body : Example using EXTERNAL source type and source_catalogue_api_type STAC. 
        # This would allow you to access products directly from a configured STAC server
        # Here we show the configuration that would work on the DEDL HDA STAC server (DEDL credentials required)
        # This is shown for example purposes only. The standard way of configuring is with DESP credentials shown above.
        print("##### Preparing Order Body for TEMPORARY storage with DEDL credentials #####")
        order_body_custom_bucket = {
            "Name": "Tutorial " + workflow + " - " + order_name,
            "WorkflowName": workflow,
            "IdentifierList": [data_id],
            "WorkflowOptions": [
                {"Name": "output_storage", "Value": "TEMPORARY"},
                {"Name": "source_type", "Value": "EXTERNAL"},
                {"Name": "source_catalogue_api_url", "Value": "https://hda.data.destination-earth.eu/stac"},
                {"Name": "source_catalogue_api_type", "Value": "STAC"},
                {"Name": "source_token_url", "Value": "https://identity.data.destination-earth.eu/auth/realms/dedl/protocol/openid-connect/token"},
                {"Name": "source_grant_type", "Value": "PASSWORD"},
                {"Name": "source_auth_header_name", "Value": "Authorization"},
                {"Name": "source_username", "Value": DEDL_USERNAME},
                {"Name": "source_password", "Value": DEDL_PASSWORD},
                {"Name": "source_client_id", "Value": "hda-public"},
                {"Name": "source_client_secret", "Value": ""},
                {"Name": "source_catalogue_collection", "Value": collection_id},
            ],
        }


########## ADDITIONAL OPTIONS ##########

# additional_options = [
#     {"Name": "12345", "Value": "abcdef"},
# ]

additional_options = []

# Checks environment variables for the form HOOK_ADDITIONAL1="NAME=12345;VALUE=abcdef"
for env_key, env_value in os.environ.items():
    if env_key.startswith('HOOK_ADDITIONAL'):
        #print(f"{env_key}: {env_value}")        
        parts = env_value.split(';')
        # Extract the name and value
        name = parts[0].split('=')[1]
        value = parts[1].split('=')[1]
        value_type = parts[2].split('=')[1]
        additional_options.append({"Name": name, "Value": value if value_type == 'str' else int(value)})

print(f"addditional_options:{additional_options}")

if additional_options:
    print("Adding additional_options")
    order_body_custom_bucket["WorkflowOptions"].extend(additional_options)

########## BUILD ORDER BODY : END ##########

# Send order
order_request = requests.post(
    hook_service_root_url + "BatchOrder/OData.CSC.Order",
    json.dumps(order_body_custom_bucket),
    headers=api_headers,
).json()

# If code = 201, the order has been successfully sent

# Print order_request JSON object: containing order_request details
order_reques_details = json.dumps(order_request, indent=4)
print(order_reques_details)

It is possible to order multiple product using endpoint: 
```https://odp.data.destination-earth.eu/odata/v1/BatchOrder/OData.CSC.Order```   

## Check The status of the order

Possible status values
- queued (i.e. queued for treatment but not started)
- in_progress (i.e. order being treated)
- completed (i.e. order is complete and data ready)

In [None]:
requests_status = requests.get(
    hook_service_root_url
    + "ProductionOrders?$orderby=Id desc&$filter=(endswith(Name,'"
    + order_name
    + "'))",
    headers=api_headers,
).json()


for i in range(len(requests_status["value"])):
    print(f"\norder id: {requests_status['value'][i]['Id']}")
    print(f"Status: {requests_status['value'][i]['Status']}")

requests_status  # see requests status

## Access workflow output

#### Private storage
Let us now check our private storage using this boto3 script.
You can also go and check this in the Islet service using the Horizon user interface

In [13]:
# PRIVATE STORAGE: Prints contents of Private Bucket
import boto3

if is_private_storage:

    s3 = boto3.client(
        "s3",
        aws_access_key_id=output_storage_access_key,
        aws_secret_access_key=output_storage_secret_key,
        endpoint_url=output_storage_url,
    )

    paginator = s3.get_paginator("list_objects_v2")
    pages = paginator.paginate(Bucket=output_bucket, Prefix=output_prefix + "/")

    for page in pages:
        try:
            for obj in page["Contents"]:
                print(obj["Key"])
        except KeyError:
            print("No files exist")
            exit(1)

### Temporary storage

In [None]:
# List order items within a production order
# When the output_storage is of type TEMPORARY we can get a DownloadLink from the following code (Can also optionally download items here in code with the flag is_download_products)

# If TEMPORARY storage
if not is_private_storage:

    # Set to True to download products at the same level as the notebook file. File name will be in format "output-{workflow}-{order_id}-{product_id}.zip"
    is_download_products = False

    # Check Status again using order_name. This can give multiple order_ids (if hook executed mutliple times using same order_name)
    requests_status = requests.get(
        f"{hook_service_root_url}ProductionOrders?$orderby=Id desc&$filter=(endswith(Name,'{order_name}'))",
        headers=api_headers,
    ).json()

    # Iterate over requests_status values
    for i in range(len(requests_status["value"])):
        print(f"\norder id: {requests_status['value'][i]['Id']}")
        print(f"Status: {requests_status['value'][i]['Status']}")

        # order_id = input("Order id: ")

        # set order id
        order_id = requests_status["value"][i]["Id"]
        # set status
        status = requests_status["value"][i]["Status"]
        if status == "completed":
            response = requests.get(
                f"https://odp.data.destination-earth.eu/odata/v1/BatchOrder({order_id})/Products",
                headers=api_headers,
            ).json()
            print(json.dumps(response, indent=4))

            if is_download_products:

                # Iterate over items/products : There can be multiple products per order
                for j in range(len(response["value"])):

                    print(f"j:{j}")
                    # Get Id (The product Id)
                    product_id = response["value"][j]["Id"]
                    # Infer the url of the product
                    url_product = f"https://odp.data.destination-earth.eu/odata/v1/BatchOrder({order_id})/Product({product_id})/$value"
                    print(f"url_product: {url_product}")
                    # Download the product
                    r = requests.get(
                        url_product, headers=api_headers, allow_redirects=True
                    )
                    product_file_name = f"output-{workflow}-{order_id}-{product_id}.zip"
                    open(product_file_name, "wb").write(r.content)
                    print(f"Download Complete: product_file_name: {product_file_name}")

        else:
            print(f"Status for order:{order_id} is not 'completed'. status:{status}")

    # requests_status