In [None]:
from datetime import datetime, timedelta, timezone
import os

from dotenv import load_dotenv
load_dotenv()
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStartTime
from prefect.client.schemas.sorting import FlowRunSort

# Load PREFECT_API_URL and PREFECT_API_KEY from .env


# Optional: confirm env settings loaded correctly
print("API URL from env:", os.getenv("PREFECT_API_URL"))
from datetime import datetime, timedelta, timezone
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStartTime
from collections import defaultdict

async def find_interesting_running_flows():
    now = datetime.now(timezone.utc)
    one_hour_ago = now - timedelta(hours=1)
    one_month_ago = now - timedelta(days=30)

    offset = 0
    page_size = 200
    all_flow_runs = []

    async with get_client() as client:
        print("Connected to:", client._client._base_url)

        # Filter: Running, started between 1 month ago and 1 hour ago
        flow_run_filter = FlowRunFilter(
            state=FlowRunFilterState(type={"any_": ["RUNNING"]}),
            start_time=FlowRunFilterStartTime(
                after=one_month_ago,
                before=one_hour_ago
            )
        )

        while True:
            flow_runs = await client.read_flow_runs(
                flow_run_filter=flow_run_filter,
                limit=page_size,
                offset=offset
            )
            if not flow_runs:
                break
            all_flow_runs.extend(flow_runs)
            offset += page_size

        if not all_flow_runs:
            print("No running flow runs between 1 hour and 1 month old.")
            return

        # Optional: Cache flow names to avoid redundant API calls
        flow_name_cache = {}

        print(f"Found {len(all_flow_runs)} flow run(s) matching criteria:\n")
    
        for run in all_flow_runs:
            flow_id = run.flow_id
            if flow_id not in flow_name_cache:
                flow = await client.read_flow(flow_id)
                flow_name_cache[flow_id] = flow.name

            print(f"Flow Name: {flow_name_cache[flow_id]}")
            print(f"Run ID: {run.id}")
            print(f"Run Name: {run.name}")
            print(f"Start Time: {run.start_time}")
            print(f"State: {run.state.type}")
            print("-" * 40)


API URL from env: https://flow-prd.als.lbl.gov/api


In [11]:
await find_long_running_flow_runs()




Connected to: https://flow-prd.als.lbl.gov/api/
Using API URL: https://flow-prd.als.lbl.gov/api
Found 254 flow run(s) running longer than 1 hour(s):

Flow Name: dispatcher
Flow Run ID: ffcfdc23-6bb1-456d-a31d-38170f3c7cbf
Name: ivory-lynx
Start Time: 2025-05-20T19:21:32.721940+00:00
State: StateType.RUNNING
----------------------------------------
Flow Name: dispatcher
Flow Run ID: fe9bb01c-77c8-47ad-afb0-9ccd1b39bac8
Name: 20241207_091516_LDR_07_spall_10X_38mm_samp_to_scint_2.h5
Start Time: 2024-12-07T17:23:02.267035+00:00
State: StateType.RUNNING
----------------------------------------
Flow Name: alcf_globus_compute_reconstruction
Flow Run ID: fcbd3516-b8d5-4e67-aa49-1e7121c4bbf1
Name: warm-sheep
Start Time: 2024-12-05T20:47:37.943149+00:00
State: StateType.RUNNING
----------------------------------------
Flow Name: dispatcher
Flow Run ID: fa8518ec-e38d-4f07-96a7-43d0fdb3042e
Name: 20241207_121214_LDR_07_spall_10X_B_x00y04.h5
Start Time: 2024-12-07T20:54:13.481487+00:00
State: State