In [20]:
import os
import json
import requests
import boto3
from tqdm import tqdm
import time
import argparse

In [21]:
username = 'illia.yakushevskyi@uzh.ch' 
password = os.getenv("MY_API_PASSWORD")
product_name = 'S1A_IW_GRDH_1SDV_20250404T172343_20250404T172408_058610_07414D_31B0.SAFE'

In [22]:
config = {
    "auth_server_url": "https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token",
    "odata_base_url": "https://catalogue.dataspace.copernicus.eu/odata/v1/Products",
    "s3_endpoint_url": "https://eodata.dataspace.copernicus.eu",
}

def get_access_token(config, username, password):
    """
    Retrieve an access token from the authentication server.
    This token is used for subsequent API calls.
    """
    auth_data = {
        "client_id": "cdse-public",
        "grant_type": "password",
        "username": username,
        "password": password,
    }
    response = requests.post(config["auth_server_url"], data=auth_data, verify=True, allow_redirects=False)
    if response.status_code == 200:
        return json.loads(response.text)["access_token"]
    else:
        print(f"Failed to retrieve access token. Status code: {response.status_code}")
        exit(1)

def get_eo_product_details(config, headers, eo_product_name):
    """
    Retrieve EO product details using the OData API to determine the S3 path.
    """
    odata_url = f"{config['odata_base_url']}?$filter=Name eq '{eo_product_name}'"
    response = requests.get(odata_url, headers=headers)
    if response.status_code == 200:
        eo_product_data = response.json()["value"][0]
        return eo_product_data["Id"], eo_product_data["S3Path"]
    else:
        print(f"Failed to retrieve EO product details. Status code: {response.status_code}")
        exit(1)

def get_temporary_s3_credentials(headers):
    """
    Create temporary S3 credentials by calling the S3 keys manager API.
    """
    credentials_response = requests.post("https://s3-keys-manager.cloudferro.com/api/user/credentials", headers=headers)
    if credentials_response.status_code == 200:
        s3_credentials = credentials_response.json()
        print("Temporary S3 credentials created successfully.")
        print(f"access: {s3_credentials['access_id']}")
        print(f"secret: {s3_credentials['secret']}")
        return s3_credentials
    else:
        print(f"Failed to create temporary S3 credentials. Status code: {credentials_response.status_code}")
        print("Product download aborted.")
        exit(1)

def format_filename(filename, length=40):
    """
    Format a filename to a fixed length, truncating if necessary.
    """
    if len(filename) > length:
        return filename[:length - 3] + '...'
    else:
        return filename.ljust(length)

def download_file_s3(s3, bucket_name, s3_key, local_path, failed_downloads):
    """
    Download a file from S3 with a progress bar.
    Track failed downloads in a list.
    """
    try:
        file_size = s3.head_object(Bucket=bucket_name, Key=s3_key)['ContentLength']
        formatted_filename = format_filename(os.path.basename(local_path))
        with tqdm(total=file_size, unit='B', unit_scale=True, desc=formatted_filename, ncols=80, bar_format='{desc:.40}|{bar:20}| {percentage:3.0f}% {n_fmt}/{total_fmt}B') as pbar:
            def progress_callback(bytes_transferred):
                pbar.update(bytes_transferred)

            s3.download_file(bucket_name, s3_key, local_path, Callback=progress_callback)
    except Exception as e:
        print(f"Failed to download {s3_key}. Error: {e}")
        failed_downloads.append(s3_key)

def traverse_and_download_s3(s3_resource, bucket_name, base_s3_path, local_path, failed_downloads):
    """
    Traverse the S3 bucket and download all files under the specified prefix.
    """
    bucket = s3_resource.Bucket(bucket_name)
    files = bucket.objects.filter(Prefix=base_s3_path)

    for obj in files:
        s3_key = obj.key
        relative_path = os.path.relpath(s3_key, base_s3_path)
        local_path_file = os.path.join(local_path, relative_path)
        local_dir = os.path.dirname(local_path_file)
        os.makedirs(local_dir, exist_ok=True)
        download_file_s3(s3_resource.meta.client, bucket_name, s3_key, local_path_file, failed_downloads)

In [23]:
access_token = get_access_token(config, username, password)
access_token

'eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJYVUh3VWZKaHVDVWo0X3k4ZF8xM0hxWXBYMFdwdDd2anhob2FPLUxzREZFIn0.eyJleHAiOjE3NDM5MzU4MzAsImlhdCI6MTc0MzkzNTIzMCwianRpIjoiNDFjNGUzMmItOWRhMy00NjRmLWFiZGEtZjRkNWE2MTM1MjI4IiwiaXNzIjoiaHR0cHM6Ly9pZGVudGl0eS5kYXRhc3BhY2UuY29wZXJuaWN1cy5ldS9hdXRoL3JlYWxtcy9DRFNFIiwiYXVkIjpbIkNMT1VERkVSUk9fUFVCTElDIiwiYWNjb3VudCJdLCJzdWIiOiIyM2IzZGEzNy02ODU0LTQ1MmEtYTdiOC03NWUwY2U2MmJkNTciLCJ0eXAiOiJCZWFyZXIiLCJhenAiOiJjZHNlLXB1YmxpYyIsInNlc3Npb25fc3RhdGUiOiJjNjgxZWFhNy1hNDczLTRiNmYtODVhNS1jZDg3ZGI0Yjc4N2UiLCJhbGxvd2VkLW9yaWdpbnMiOlsiaHR0cHM6Ly9sb2NhbGhvc3Q6NDIwMCIsIioiLCJodHRwczovL3dvcmtzcGFjZS5zdGFnaW5nLWNkc2UtZGF0YS1leHBsb3Jlci5hcHBzLnN0YWdpbmcuaW50cmEuY2xvdWRmZXJyby5jb20iXSwicmVhbG1fYWNjZXNzIjp7InJvbGVzIjpbIm9mZmxpbmVfYWNjZXNzIiwidW1hX2F1dGhvcml6YXRpb24iLCJwdWJsaWMtY2NtIiwiZGVmYXVsdC1yb2xlcy1jZGFzIiwiY29wZXJuaWN1cy1nZW5lcmFsIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSw

In [24]:
access_token = get_access_token(config, username, password)

headers = {
    "Authorization": f"Bearer {access_token}",
    "Accept": "application/json"
}

 # Step 3: Get EO product details (including S3 path)
eo_product_id, s3_path = get_eo_product_details(config, headers, product_name)
bucket_name, base_s3_path = s3_path.lstrip('/').split('/', 1)