# Query Prefect Resources

In [None]:
import os
import sys
import toml
from datetime import datetime

import boto3
import polars as pl
import pytz
import requests

In [None]:
_ = pl.Config.set_tbl_cols(50)
_ = pl.Config.set_fmt_str_lengths(1_000)
_ = pl.Config.set_tbl_width_chars(1_000)
_ = pl.Config.set_tbl_rows(100)

## About

Inspect various Prefect Cloud resources using the [Prefect Cloud REST API](https://app.prefect.cloud/api/docs?deviceId=a33a4353-797d-467c-a50c-b6811e99f347).

## User Inputs

In [None]:
flow_name = "status-flow"
deployment_name = "get-status-deployment"
s3_bucket_name = "bikeshare-toronto-dash"

my_timezone = 'America/Toronto'

work_queues_cols = [
    'id',
    'name',
    # 'updated',
    'is_paused',
    'last_polled',
    # 'work_pool_id',
    'work_pool_name',
]
flow_run_cols = [
    'name',
    # 'id',
    # 'created',
    # 'updated',
    'flow_name',
    # 'state_id',
    'deployment_id',
    # 'flow_version',
    # 'parameters',
    # 'idempotency_key',
    # 'context',
    # 'empirical_policy',
    # 'tags',
    # 'parent_task_run_id',
    # 'state_type',
    # 'state_name',
    'run_count',
    # 'expected_start_time',
    # 'next_scheduled_start_time',
    'start_time',
    # 'end_time',
    'total_run_time',
    # 'estimated_run_time',
    # 'estimated_start_time_delta',
    'auto_scheduled',
    # 'infrastructure_document_id',
    # 'infrastructure_pid',
    # 'created_by',
    # 'work_queue_id',
    'work_queue_name',
    # 'work_pool_id',
    'work_pool_name',
    # 'state',
]
deployment_cols = [
    'id',
    # 'created',
    'updated',
    'name',
    'version',
    'description',
    'flow_id',
    # 'schedule',
    'is_schedule_active',
    # 'infra_overrides',
    # 'parameters',
    'status',
    # 'last_polled',
    # 'pull_steps',
    # 'tags',
    'work_queue_name',
    # 'parameter_openapi_schema',
    # 'path',
    # 'entrypoint',
    # 'manifest_path',
    # 'storage_document_id',
    # 'infrastructure_document_id',
    # 'created_by',
    # 'updated_by',
    'work_pool_name',
    'enforce_parameter_schema',
]

In [None]:
# DIRECTORIES
prefect_dir = os.path.join(os.path.expanduser("~"), ".prefect")

# FILE PATHS
profiles_fpath = os.path.join(prefect_dir, "profiles.toml")

## Pre-Requisites

### Load Prefect Profiles File

In [None]:
profiles = toml.load(profiles_fpath)

### Get Prefect Server REST API URL for Prefect Cloud and API Key

In [None]:
prefect_cloud_creds = profiles["profiles"]["cloud"]
api_base_url = prefect_cloud_creds["PREFECT_API_URL"]
api_key = prefect_cloud_creds["PREFECT_API_KEY"]

### Get REST API Authentication Parameters

In [None]:
auth_headers = dict(
    headers={
        "Content-Type": "application/json",
        "Authorization": f"Bearer {api_key}",
    }
)

## Get Data

### Retrieve Flows

In [None]:
%%time
r = requests.post(
    f"{api_base_url}/flows/filter",
    **auth_headers,
)
assert r.status_code == 200
df_flows = (
    pl.from_records(r.json())
    .filter(pl.col('name') == flow_name)
    .with_columns(
        [
            pl.col(c)
            .str.to_datetime("%Y-%m-%dT%H:%M:%S%.f%Z")
            .dt.replace_time_zone('UTC')
            .dt.convert_time_zone(my_timezone)
            for c in ['created', 'updated']
        ]
    )
)
df_flows

### Retrieve Deployments

In [None]:
%%time
r = requests.post(
    f"{api_base_url}/deployments/filter",
    json={
        "sort": "NAME_ASC",
        'deployment': {'name': deployment_name},
        # 'status': 'READY',
        "limit": 1,
    },
    **auth_headers
)
assert r.status_code == 200
df_deployments = (
    pl.from_records(r.json())
    .select([c for c in deployment_cols])
    .rename(
        {
            "id": "deployment_id",
            'name': "deployment_name",
            'updated': "deployment_updated",
            'version': 'deployment_version',
        }
    )
    .with_columns(
        [
            pl.col(c)
            .str.to_datetime("%Y-%m-%dT%H:%M:%S%.f%Z")
            .dt.replace_time_zone('UTC')
            .dt.convert_time_zone(my_timezone)
            for c in ['deployment_updated']
        ]
    )
)
df_deployments

### Retrieve Work Queues

Get work queues

In [None]:
%%time
r = requests.post(
    f"{api_base_url}/work_queues/filter",
    **auth_headers,
)
assert r.status_code == 200
df_work_queues = (
    pl.from_records(r.json())
    .filter(pl.col('name') != 'default')
    .with_columns(
        [
            pl.col(c)
            .str.to_datetime("%Y-%m-%dT%H:%M:%S%.f%Z")
            .dt.replace_time_zone('UTC')
            .dt.convert_time_zone(my_timezone)
            for c in ['created', 'updated', 'last_polled']
        ]
    )
    .select(work_queues_cols)
    .rename(
        {
            'id': "work_queue_id",
            "name": "work_queue_name",
            'is_paused': 'work_queue_is_paused',
            'last_polled': "work_queue_last_polled",
        }
    )
)
df_work_queues

Get status for each work queue

In [None]:
%%time
dfs_work_queues_status = []
for row in df_work_queues.rows():
    r = requests.get(
        f"{api_base_url}/work_queues/{row[0]}/status",
        **auth_headers,
    )
    assert r.status_code == 200
    df_work_queues_status = (
        pl.from_records([r.json()])
        .with_columns(
        [
            pl.col(c)
            .str.to_datetime("%Y-%m-%dT%H:%M:%S%.f%Z")
            .dt.replace_time_zone('UTC')
            .dt.convert_time_zone(my_timezone)
            for c in ['last_polled']
        ]
    )
        .rename(
            {
                "healthy": "work_queue_healthy",
                "late_runs_count": "work_queue_late_runs_count",
                "last_polled": "work_queue_last_polled",
                "health_check_policy": "work_queue_health_check_policy",
            }
        )
        .with_columns(pl.lit(row[0]).alias('work_queue_id'))
    )
    dfs_work_queues_status.append(df_work_queues_status)
df_work_queues_status = pl.concat(dfs_work_queues_status)
df_work_queues_status

Append status to work queues metadata

In [None]:
df_work_queues = (
    df_work_queues
    .join(
        df_work_queues_status,
        on=['work_queue_id', 'work_queue_last_polled'],
        how='left'
    )
)
df_work_queues

Perform sanity check after `LEFT JOIN` to append status to work queues

In [None]:
assert df_work_queues.filter(pl.col('work_queue_health_check_policy') == "{None,None}").is_empty()

### Retrieve Completed Flow Runs

In [None]:
%%time 
r = requests.post(
    f"{api_base_url}/flow_runs/filter",
    json={
        "sort": "START_TIME_DESC",
        'flows': {'name': {'any_': [flow_name]}},
        # 'flow_runs': {'state': {'name': {'any_': ['Completed']}}},
        'deployments': {'name': {'any_': [deployment_name]}},
        "limit": 50,
    },
    **auth_headers
)
assert r.status_code == 200
df_flows_runs_completed = (
    # flows
    df_flows
    .select([pl.col(c) for c in ["id", "name"]])
    .rename({"id": "flow_id", "name": "flow_name"})
    # flow runs
    .join(pl.from_records(r.json()), on="flow_id", how="left")
    .filter(pl.col('state_name') == 'Completed')
    .with_columns(
        [
            pl.col(c)
            .str.to_datetime("%Y-%m-%dT%H:%M:%S%.f%Z")
            .dt.replace_time_zone('UTC')
            .dt.convert_time_zone(my_timezone)
            for c in [
                'start_time',
                'end_time',
                'created',
                'updated',
                'expected_start_time',
                'next_scheduled_start_time',
            ]
        ]
    )
    .with_columns(
        [
            (
                (pl.col('start_time')-pl.col('expected_start_time'))
                .dt.nanoseconds()/10**9
            )
            .alias('start_delay')
        ]
    )
    .select(flow_run_cols+['start_delay'])
    # deployments
    .join(
        (
            df_deployments
            .select(['deployment_id', 'deployment_name', 'deployment_version'])
        ),
        on='deployment_id',
        how='left',
    )
    # work queues
    .join(
        df_work_queues.drop(*['work_queue_id']),
        on=['work_queue_name', 'work_pool_name'],
        how='left',
    )
    .drop(*['deployment_id'])
)
df_flows_runs_completed.head(10)

**Notes**

1. Merging with `/work_queues` is optional since the last polled datetime is also available through the `/deployments` endpoint.

### Retrieve Scheduled Flow Runs

In [None]:
%%time 
r = requests.post(
    f"{api_base_url}/flow_runs/filter",
    json={
        "sort": "NEXT_SCHEDULED_START_TIME_ASC",
        'flows': {'name': {'any_': [flow_name]}},
        'flow_runs': {'state': {'name': {'any_': ['Scheduled']}}},
        'deployments': {'name': {'any_': [deployment_name]}},
        "limit": 3,
    },
    **auth_headers
)
assert r.status_code == 200
df_flows_runs_scheduled = (
    # flows
    df_flows
    .select([pl.col(c) for c in ["id", "name"]])
    .rename({"id": "flow_id", "name": "flow_name"})
    # flow runs
    .join(pl.from_records(r.json()), on="flow_id", how="left")
    .filter(pl.col('state_name') == 'Scheduled')
    .with_columns(
        [
            pl.col(c)
            .str.to_datetime("%Y-%m-%dT%H:%M:%S%.f%Z")
            .dt.replace_time_zone('UTC')
            .dt.convert_time_zone(my_timezone)
            for c in [
                # 'start_time',
                # 'end_time',
                'created',
                'updated',
                'expected_start_time',
                'next_scheduled_start_time',
            ]
        ]
    )
    .with_columns(
        [
            (
                (pl.col('next_scheduled_start_time')-datetime.now(tz=pytz.timezone(my_timezone)))
                .dt.nanoseconds()/10**9/60
            )
            .alias('next_start')
        ]
    )
    .select(flow_run_cols+['next_start', 'next_scheduled_start_time'])
    # deployments
    .join(
        (
            df_deployments
            .select(['deployment_id', 'deployment_name', 'deployment_version'])
        ),
        on='deployment_id',
        how='left',
    )
    # work queues
    .join(
        df_work_queues.drop(*['work_queue_id']),
        on=['work_queue_name', 'work_pool_name'],
        how='left',
    )
    .drop(*['deployment_id'])
)
df_flows_runs_scheduled

**Notes**

1. This could also be retrieved by querying `/deployments/get_scheduled_flow_runs`.
2. Merging with `/work_queues` is optional since the last polled datetime is also available through the `/deployments` endpoint.

---