In [105]:
from pystac_client import Client
import json
import glob
import requests
import boto3
import base64
import time

In [34]:
discover_items_file_paths = glob.glob("../ingestion-data/discovery-items/*.json")
# discover_items_ids = [i.split("/")[-1].split(".")[0] for i in discover_items_file_paths]

In [52]:
API = "https://staging-stac.delta-backend.com/"
client = Client.open(API)
staging_collections = [i.id for i in list(client.get_collections())]

## Create transfer configs

In [119]:
for filename in discover_items_file_paths:
    with open(filename, "r") as f:
        dataset = json.load(f)
        if not isinstance(dataset, list):
            dataset = [dataset]

        id = dataset[0]["collection"]
        transfer_config = []
        for i in dataset:
            if (
                i["collection"] in staging_collections
            ):  # only create config for collections in staging
                config = dict()
                config["origin_bucket"] = i.get("bucket", "")
                config["origin_prefix"] = i.get("prefix", "")
                config["target_bucket"] = "veda-data-store"
                config["collection"] = i.get("collection")

                if "assets" in i:  # conditional for multi asset discovery items
                    for k, v in i["assets"].items():
                        config["filename_regex"] = v.get("regex")
                        transfer_config.append(config.copy())
                else:
                    config["filename_regex"] = i.get("filename_regex", "")
                    transfer_config.append(config)

            with open(
                f"../ingestion-data/transfer-config/{id}.json", "w", encoding="utf-8"
            ) as file:
                print(f"writing file: ingestion_data/transfer-config/{id}")
                json.dump(transfer_config, file, ensure_ascii=False, indent=4)

writing file: ingestion_data/transfer-config/campfire-lst-night-diff
writing file: ingestion_data/transfer-config/OMI_trno2-COG
writing file: ingestion_data/transfer-config/lis-global-da-tws
writing file: ingestion_data/transfer-config/conus-reach
writing file: ingestion_data/transfer-config/ndvi_diff_Ian_2022-09-30_2022-09-05
writing file: ingestion_data/transfer-config/entropy_difference_2022-09-05_2022-09-30
writing file: ingestion_data/transfer-config/snow-projections-median-585
writing file: ingestion_data/transfer-config/geoglam
writing file: ingestion_data/transfer-config/OMSO2PCA-COG
writing file: ingestion_data/transfer-config/lis-global-da-tws-trend
writing file: ingestion_data/transfer-config/lis-global-da-tws-trend
writing file: ingestion_data/transfer-config/lis-global-da-tws-trend
writing file: ingestion_data/transfer-config/grdi-v1-raster
writing file: ingestion_data/transfer-config/grdi-v1-raster
writing file: ingestion_data/transfer-config/grdi-v1-raster
writing file: 

## Run DAG in Airflow

In [106]:
mwaa_name = "veda-pipeline-dev-mwaa"
dry_run = False

In [107]:
def get_mwaa_cli_token(mwaa_name, profile):
    session = boto3.session.Session(profile_name=profile)
    airflow_client = session.client("mwaa")
    return airflow_client.create_cli_token(Name=mwaa_name)

In [118]:
transfer_items_file_paths = glob.glob("../ingestion-data/transfer-config/*.json")

for transfer_item_file in transfer_items_file_paths:
    print(f"Transferring using config: {transfer_item_file}")
    with open(transfer_item_file, "r") as f:
        transfer_configs = json.load(f)
        for transfer_config in transfer_configs:
            mwaa_cli_token = get_mwaa_cli_token(
                mwaa_name, "veda"
            )  # get token in inside loop or else token will expire in long lists
            mwaa_auth_token = "Bearer " + mwaa_cli_token["CliToken"]
            if (
                transfer_config["origin_prefix"] == ""
                or transfer_config["filename_regex"] == ""
            ):
                print(
                    f"{transfer_item_file} transfer skipped, no origin prefix or regex"
                )
                pass
            else:
                if not dry_run:
                    raw_data = f"dags trigger veda_transfer --conf '{json.dumps(transfer_config)}'"
                    mwaa_response = requests.post(
                        f"https://{mwaa_cli_token['WebServerHostname']}/aws_mwaa/cli",
                        headers={
                            "Authorization": mwaa_auth_token,
                            "Content-Type": "application/json",
                        },
                        data=raw_data,
                    )
                    try:
                        mwaa_std_err_message = base64.b64decode(
                            mwaa_response.json()["stderr"]
                        ).decode("utf8")
                        mwaa_std_out_message = base64.b64decode(
                            mwaa_response.json()["stdout"]
                        ).decode("utf8")
                        print(mwaa_response.status_code)
                        print(f"stderr: {mwaa_std_err_message}")
                        print(f"stdout: {mwaa_std_out_message}")
                    except:
                        print(mwaa_response)

Transferring using config: ../ingestion-data/transfer-config/campfire-lst-night-diff.json
200
stderr: 
stdout: Created <DagRun veda_transfer @ 2024-03-15T21:09:11+00:00: manual__2024-03-15T21:09:11+00:00, state:queued, queued_at: 2024-03-15 21:09:11.246305+00:00. externally triggered: True>

Transferring using config: ../ingestion-data/transfer-config/OMI_trno2-COG.json
200
stderr: 
stdout: Created <DagRun veda_transfer @ 2024-03-15T21:09:14+00:00: manual__2024-03-15T21:09:14+00:00, state:queued, queued_at: 2024-03-15 21:09:14.333873+00:00. externally triggered: True>

Transferring using config: ../ingestion-data/transfer-config/lis-global-da-tws.json
200
stderr: 
stdout: Created <DagRun veda_transfer @ 2024-03-15T21:09:17+00:00: manual__2024-03-15T21:09:17+00:00, state:queued, queued_at: 2024-03-15 21:09:17.497107+00:00. externally triggered: True>

Transferring using config: ../ingestion-data/transfer-config/HLSS30.json
Transferring using config: ../ingestion-data/transfer-config/con