In [None]:
stack = "TEST"
# stack = "STAGING"
# stack = "PRODUCTION"


# %load_ext autoreload
# %autoreload 2

import datetime
import subprocess
import math
import httpx
import json
import boto3
import tqdm
import os
import operator
import pandas as pd
import tqdm
from google.cloud import tasks_v2
from dotenv import load_dotenv

load_dotenv(".env")


(
    NAME,
    TITILER_URL,
    TITILER_API_KEY,
    API_KEY,
    INFERENCE_URL,
    DB_URL,
    ASA_QUEUE,
    ASA_URL,
    SR_URL,
    ORCHESTRATOR_URL,
) = operator.itemgetter(
    "NAME",
    "TITILER_URL",
    "TITILER_API_KEY",
    "API_KEY",
    "INFERENCE_URL",
    "DB_URL",
    "ASA_QUEUE",
    "ASA_URL",
    "SR_URL",
    "ORCHESTRATOR_URL",
)(
    json.loads(os.getenv("STACK_SECRETS"))[stack]
)

print((NAME + "\n") * 10)

In [None]:
# FROM CSV
scenes_csv = "/Users/jonathanraphael/Downloads/planet_backlog_scene_ids.csv"
scenes_pd = pd.read_csv(scenes_csv)
scenes = scenes_pd["s1_scene_id"].tolist()
print(len(scenes))

In [None]:
# FROM LIST
scenes = [
    "S1A_IW_GRDH_1SDV_20241006T172800_20241006T172825_055985_06D8A6_318D",
]
print(len(scenes))

# ORCHESTRATOR --SLOW-- 
## (via Scene Relevancy)

In [None]:
USE_FAKE_FOOTPRINT = False
BATCH_SIZE = 100
total_records = len(scenes)
num_batches = math.ceil(total_records / BATCH_SIZE)

s3_client = boto3.client("s3")


def get_coordinates(scene_id, USE_FAKE_FOOTPRINT):
    """
    Given a scene id, retrieve the corresponding productInfo.json and return the footprint coordinates.
    """
    if USE_FAKE_FOOTPRINT:
        return [
            [
                [0.0, 0.0],
                [0.0, 1.0],
                [1.0, 1.0],
                [1.0, 0.0],
                [0.0, 0.0],
            ]
        ]
    file_key = f"GRD/{scene_id[17:21]}/{int(scene_id[21:23])}/{int(scene_id[23:25])}/IW/DV/{scene_id}/productInfo.json"
    response = s3_client.get_object(
        Bucket="sentinel-s1-l1c",
        Key=file_key,
        RequestPayer="requester",  # if required by your bucket settings
    )
    data = json.loads(response["Body"].read())
    return data["footprint"]["coordinates"]


for batch_num in range(num_batches):
    print(f"batch_num: {batch_num} of {num_batches}")
    batch = scenes[batch_num * BATCH_SIZE : (batch_num + 1) * BATCH_SIZE]
    records = [
        {
            "Sns": {
                "Message": json.dumps(
                    {
                        "id": scene_id,
                        "mode": scene_id.split("_")[1],
                        "resolution": scene_id.split("_")[2][-1],
                        "polarization": scene_id.split("_")[3][-2:],
                        "footprint": {
                            "type": "Polygon",
                            "coordinates": get_coordinates(
                                scene_id, USE_FAKE_FOOTPRINT
                            ),
                        },
                    }
                )
            }
        }
        for scene_id in tqdm.tqdm(batch)
    ]

    try:
        orchestrator_result = httpx.post(
            SR_URL,
            json={"Records": records},
            timeout=20.0,  # Adjusted timeout
            headers={"Authorization": f"Bearer {API_KEY}"},
        )
        print(f"Batch {batch_num + 1}/{num_batches}:", orchestrator_result)
    except Exception as e:
        print(f"Error processing batch {batch_num + 1}: {e}")

# ORCHESTRATOR --FAST--

In [None]:
for scene_id in tqdm.tqdm(scenes[:40]):
    try:
        orchestrator_result = httpx.post(
            ORCHESTRATOR_URL + "/orchestrate",
            json={"scene_id": f"{scene_id}"},
            timeout=0.1,
            headers={"Authorization": f"Bearer {API_KEY}"},
        )
        print(scene_id, orchestrator_result)
    except Exception as e:
        # print(e)
        pass

# ASA --SLOW--

In [None]:
import concurrent.futures


def create_task(scene_id, parent):
    # Define the payload for the task
    payload = {
        "scene_id": scene_id,
        "dry_run": False,
        "overwrite_previous": True,
    }

    # Define the task with an HTTP request
    task = {
        "http_request": {  # Specify the type of request
            "http_method": tasks_v2.HttpMethod.POST,
            "url": ASA_URL,  # The URL to send the request to
            "headers": {
                "Content-type": "application/json",
                "Authorization": f"Bearer {API_KEY}",
            },
            "body": json.dumps(
                payload
            ).encode(),  # Convert payload to JSON and encode to bytes
        }
    }

    try:
        # Create the task in the specified queue
        response = client.create_task(request={"parent": parent, "task": task})
    except Exception as e:
        print(f"Error creating task for scene_id {scene_id}: {e}")


os.environ.pop("GOOGLE_APPLICATION_CREDENTIALS", None)
client = tasks_v2.CloudTasksClient()
parent = client.queue_path("cerulean-338116", "europe-west1", ASA_QUEUE)

max_workers = 100  # Example of aggressive concurrency
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    results = list(
        tqdm.tqdm(
            executor.map(lambda scene_id: create_task(scene_id, parent), scenes),
            total=len(scenes),
        )
    )

# for scene_id in tqdm.tqdm(scenes):
#     create_task(scene_id, parent)

# ASA --FAST--

In [None]:
for scene_id in tqdm.tqdm(scenes):
    try:
        asa_result = httpx.post(
            ASA_URL,
            json={
                "scene_id": f"{scene_id}",
                "dry_run": False,
                "overwrite_previous": True,
                # "run_flags": [2], # 1: ais, 2: infra
            },
            timeout=0.1,
            headers={"Authorization": f"Bearer {API_KEY}"},
        )
    except Exception as e:
        # print(e)
        pass

https://documentation.dataspace.copernicus.eu/APIs/SentinelHub/Catalog/Examples.html

# AWS OpenDataRegistry ls and productInfo.json

## Download GeoTiffs from AWS

In [38]:
# Download GeoTiffs directly from S3 Open Data Registry using request payer and a list of Scene IDs

import boto3
import subprocess
import tqdm

s3_client = boto3.client("s3")
bucket_name = "sentinel-s1-l1c"

for scene_id in tqdm.tqdm(scenes):
    measurement = f"GRD/{scene_id[17:21]}/{int(scene_id[21:23])}/{int(scene_id[23:25])}/IW/DV/{scene_id}/measurement/"
    try:
        obj = s3_client.get_object(
            Bucket=bucket_name, Key=measurement + "iw-vv.tiff", RequestPayer="requester"
        )
        file_path = f"/Users/jonathanraphael/git/ceruleanserver/local/temp/outputs/_rasters/{scene_id}.tiff"
        with open(file_path, "wb") as file:
            chunks = []
            while data := obj["Body"].read(1024):
                chunks.append(data)
            file.write(b"".join(chunks))

        print(f"{scene_id} downloaded successfully!")
    except:
        command = (
            f"aws s3 ls s3://{bucket_name}/{measurement} --request-payer requester"
        )
        output = subprocess.check_output(command, shell=True)
        print(output)
        print(f"{scene_id} is missing the VV file. Perhaps one of the above instead?")


print("wrapped")

In [None]:
import asyncio
import httpx
import boto3
import json
import subprocess
import datetime
import tqdm

start_date = datetime.date(2023, 1, 10)
end_date = datetime.date(2023, 1, 11)
concurrency = 25

s3_client = boto3.client("s3")
bucket_name = "sentinel-s1-l1c"
current_date = start_date
semaphore = asyncio.Semaphore(concurrency)


def extract_folders(output):
    lines = output.decode("utf-8").split("\n")
    folder_names = []
    for line in lines:
        if line.strip().startswith("PRE"):
            folder_name = line.split()[1].strip("/")
            folder_names.append(folder_name)
    return folder_names


def append_to_file(folders, filename="output.txt"):
    with open(filename, "a") as file:
        for folder in folders:
            file.write(folder + "\n")


async def send_request(semaphore, session, stack, scene_id, coordinates, API_KEY):
    async with semaphore:
        url = f"https://europe-west1-cerulean-338116.cloudfunctions.net/cerulean-cloud-{stack}-cf-sr"
        payload = {
            "Records": [
                {
                    "Sns": {
                        "Message": json.dumps(
                            {
                                "id": scene_id,
                                "mode": scene_id.split("_")[1],
                                "resolution": scene_id.split("_")[2][-1],
                                "polarization": scene_id.split("_")[3][-2:],
                                "footprint": {
                                    "type": "Polygon",
                                    "coordinates": coordinates,
                                },
                            }
                        )
                    }
                }
            ]
        }
        headers = {
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json",
        }
        try:
            await session.post(url, json=payload, headers=headers, timeout=60.0)
        except Exception as e:
            print(f"Error for scene_id: {scene_id}")
            print(e)


print(f"RUNNING ON: {stack}")
while current_date <= end_date:
    prefix = f"GRD/{current_date.year}/{current_date.month}/{current_date.day}/IW/DV/"
    command = f"aws s3 ls s3://{bucket_name}/{prefix} --request-payer requester"
    output = subprocess.check_output(command, shell=True)
    folders = extract_folders(output)
    id_footprint_dict = {}
    for folder in tqdm.tqdm(folders):
        file_key = f"{prefix}{folder}/productInfo.json"  # Ensure the correct path
        try:
            obj = s3_client.get_object(
                Bucket=bucket_name, Key=file_key, RequestPayer="requester"
            )
            content = obj["Body"].read()
            data = json.loads(content)
            # Extract 'id' and 'footprint' and add to dictionary
            # • store to GCP manifest
            id_footprint_dict[data["id"]] = data["footprint"]["coordinates"]
        except s3_client.exceptions.NoSuchKey:
            print(f"File not found: {file_key}")
        except Exception as e:
            print(f"Error accessing file {file_key}: {e}")

    async with httpx.AsyncClient() as session:
        tasks = [
            send_request(semaphore, session, stack, k, v, API_KEY)
            for k, v in id_footprint_dict.items()
        ]
        await asyncio.gather(*tasks)
    print("shipped")

    current_date += datetime.timedelta(days=1)
print("wrapped")

In [21]:
"""
Iterates over the dates from start_date to end_date, lists S3 folders and collects the scene ID found in it.
"""

start_date = datetime.date(2025, 1, 16)
end_date = datetime.date(2025, 1, 31)


def extract_folders(output):
    """Extract folder names from an 'aws s3 ls' command output."""
    lines = output.decode("utf-8").split("\n")
    folder_names = []
    for line in lines:
        if line.strip().startswith("PRE"):
            folder_name = line.split()[1].strip("/")
            folder_names.append(folder_name)
    return folder_names


scene_ids = []
current_date = start_date
while current_date <= end_date:
    prefix = f"GRD/{current_date.year}/{current_date.month}/{current_date.day}/IW/DV/"
    command = f"aws s3 ls s3://sentinel-s1-l1c/{prefix} --request-payer requester"
    current_date += datetime.timedelta(days=1)
    try:
        output = subprocess.check_output(command, shell=True)
    except Exception as e:
        print(f"Error listing S3 objects for prefix {prefix}: {e}")
        continue

    folders = extract_folders(output)
    scene_ids.extend(folders)

pd.DataFrame({"s1_scene_id": scene_ids}).to_csv(
    "/Users/jonathanraphael/Downloads/scene_ids.csv", index=False
)
print(f"len(scene_ids): {len(scene_ids)}")