In [1]:
import orjson, os
with open("local.settings.json") as f:
    os.environ.update(orjson.loads(f.read())["Values"])

# Config

In [2]:
from __future__ import annotations

import os
import json
import time
from collections import deque
from datetime import datetime, timezone
from typing import Any

import pandas as pd
import requests
from azure.data.tables import TableClient
from sqlalchemy import MetaData, Table, create_engine, select

# Polling / concurrency
POLL_SECONDS = 10
MAX_CONCURRENT = 3

# Durable Functions endpoints
DURABLE_BASE = "https://esquire-auto-audience.azurewebsites.net"
TASK_HUB = "production"
CONNECTION = "Storage"
MASTER_CODE = os.environ.get("AZFUNC_MASTER_CODE", "")

# Storage table
TABLE_NAME = "productionInstances"
NAME_FILTER = "orchestrator_esquire_audience"

# Safety switches
DRY_RUN_DELETE = False
DRY_RUN_START = False

pd.set_option("display.max_rows", 200)
pd.set_option("display.max_colwidth", 200)

# Database

In [3]:
engine = create_engine(os.environ["DATABIND_SQL_KEYSTONE"])
schema = "keystone"
audience_table_name = "Audience"

def fetch_audiences(engine) -> pd.DataFrame:
    metadata = MetaData()
    table = Table(audience_table_name, metadata, schema=schema, autoload_with=engine)
    cols = [table.c["id"], table.c["status"]]
    stmt = select(*cols)
    with engine.connect() as conn:
        result = conn.execute(stmt)
        rows = [row for row in result.mappings()]
    return pd.DataFrame(rows)

audiences = fetch_audiences(engine)
audience_ids = set(audiences.loc[audiences["status"].astype(bool), "id"].tolist())
print(f"Active audiences in DB: {len(audience_ids)}")

Active audiences in DB: 624


# Instances table

In [4]:
def _safe_json_loads(x: Any) -> Any:
    if isinstance(x, dict) or x is None:
        return x
    if isinstance(x, (bytes, bytearray)):
        try:
            return json.loads(x)
        except Exception:
            return x
    if isinstance(x, str):
        try:
            return json.loads(x)
        except Exception:
            return x
    return x

def get_instances_from_connection_string(
    conn_str: str,
    name_filter: str = NAME_FILTER,
) -> pd.DataFrame:
    """Query the instances table for orchestrator rows (server-side filtered).

    Columns returned: PartitionKey, RuntimeStatus, CustomStatus, CreatedTime, CompletedTime
    """
    table_client = TableClient.from_connection_string(conn_str, table_name=TABLE_NAME)

    select_fields = [
        "PartitionKey",
        "RuntimeStatus",
        "CustomStatus",
        "CreatedTime",
        "CompletedTime",
    ]
    odata_filter = f"Name eq '{name_filter}'"
    entities = list(table_client.query_entities(select=select_fields, query_filter=odata_filter))
    df = pd.DataFrame(entities)
    if df.empty:
        return df

    if "CustomStatus" in df.columns:
        df["CustomStatus"] = df["CustomStatus"].apply(_safe_json_loads)

    if "CreatedTime" in df.columns:
        df = df.sort_values(by="CreatedTime", ascending=True).reset_index(drop=True)

    return df

def get_instances() -> pd.DataFrame:
    return get_instances_from_connection_string(os.getenv("AzureWebJobsStorage", ""), NAME_FILTER)

instances = get_instances()
print(f"Instances in table: {len(instances)}")

Instances in table: 624


# Helpers: running detection + Durable API

In [5]:
def _state_endswith_ellipsis(custom_status: Any) -> bool:
    if not isinstance(custom_status, dict):
        return False
    state = custom_status.get("state")
    return isinstance(state, str) and state.endswith("...")

def is_running_row(row: pd.Series) -> bool:
    # If it has CompletedTime, it is not running (this includes errored/failed per your note).
    if "CompletedTime" in row and pd.notna(row["CompletedTime"]):
        return False

    runtime = row.get("RuntimeStatus")
    if runtime == "Pending":
        return True

    return _state_endswith_ellipsis(row.get("CustomStatus"))

def running_instance_ids(df: pd.DataFrame) -> set[str]:
    if df is None or df.empty:
        return set()
    tmp = df.copy()
    tmp["_is_running"] = tmp.apply(is_running_row, axis=1)
    return set(tmp.loc[tmp["_is_running"], "PartitionKey"].astype(str).tolist())

def durable_delete(instance_id: str) -> requests.Response | None:
    if DRY_RUN_DELETE:
        print(f"[DRY RUN] Would delete instance {instance_id}")
        return None
    return requests.delete(
        url=f"{DURABLE_BASE}/runtime/webhooks/durabletask/instances/{instance_id}",
        params={
            "taskHub": TASK_HUB,
            "connection": CONNECTION,
            "code": MASTER_CODE,
        },
        timeout=30,
    )

def durable_start(audience_id: str, force: int = 1) -> requests.Response | None:
    if DRY_RUN_START:
        print(f"[DRY RUN] Would start audience {audience_id}")
        return None
    return requests.post(
        url=f"{DURABLE_BASE}/api/audiences/{audience_id}",
        params={
            "force": force,
            "code": MASTER_CODE,
        },
        timeout=30,
    )

# Compare active audiences vs instances

In [6]:
instances = get_instances()
instance_ids = set(instances["PartitionKey"].astype(str).tolist()) if len(instances) else set()

absent_audiences = instance_ids - audience_ids          # instances that should be removed
missing_instance_audiences = audience_ids - instance_ids # active audiences missing an instance row entirely

print(f"Instances in table: {len(instance_ids)}")
print(f"Active audiences:   {len(audience_ids)}")
print(f"Overlap:           {len(instance_ids & audience_ids)}")
print(f"To delete:         {len(absent_audiences)}")
print(f"To start:          {len(missing_instance_audiences)}")
if len(instance_ids):
    print(f"% removed:         {len(absent_audiences)/len(instance_ids):.2%}")

Instances in table: 624
Active audiences:   624
Overlap:           624
To delete:         0
To start:          0
% removed:         0.00%


# Delete instances for inactive audiences

In [7]:
for instance_id in sorted(absent_audiences):
    print(f"Deleting inactive instance: {instance_id}")
    resp = durable_delete(instance_id)
    if resp is not None and resp.status_code >= 400:
        print(f"  ! delete failed: {resp.status_code} {resp.text[:200]}")

# Start missing active audiences (poll table every 10s, max 10 running)

In [8]:
def submit_missing_with_concurrency(
    ids_to_start: list[str],
    max_concurrent: int = MAX_CONCURRENT,
    poll_seconds: int = POLL_SECONDS,
) -> set[str]:
    q = deque(ids_to_start)
    submitted: set[str] = set()

    while q:
        df = get_instances()
        running = running_instance_ids(df)
        known = set(df["PartitionKey"].astype(str).tolist()) if len(df) else set()

        capacity = max_concurrent - len(running)
        now = datetime.now(timezone.utc).isoformat()
        print(f"[{now}] running={len(running)} capacity={capacity} remaining_to_submit={len(q)}")

        if capacity <= 0:
            time.sleep(poll_seconds)
            continue

        started_now = 0
        while started_now < capacity and q:
            audience_id = q.popleft()

            # Skip if it already exists (table caught up) or we already submitted it
            if audience_id in submitted or audience_id in known:
                continue

            resp = durable_start(audience_id, force=1)
            submitted.add(audience_id)
            started_now += 1

            if resp is None:
                print(f"Submitted {audience_id}")
            elif resp.status_code >= 400:
                print(f"  ! start failed for {audience_id}: {resp.status_code} {resp.text[:200]}")
            else:
                print(f"Submitted {audience_id}: {resp.status_code}")

        time.sleep(poll_seconds)

    print(f"Done. Submitted {len(submitted)} audience(s).")
    return submitted

ids_to_start = sorted(missing_instance_audiences)
print(f"Starting {len(ids_to_start)} missing audience(s) with max {MAX_CONCURRENT} running...")
submitted_missing = submit_missing_with_concurrency(ids_to_start)

Starting 0 missing audience(s) with max 3 running...
Done. Submitted 0 audience(s).


# Restart due instances (optional) with the same concurrency guard

In [None]:
def _parse_next_run(custom_status: Any) -> datetime | None:
    if not isinstance(custom_status, dict):
        return None
    nr = custom_status.get("next_run")
    if not isinstance(nr, str):
        return None
    try:
        return datetime.fromisoformat(nr)
    except Exception:
        try:
            return pd.to_datetime(nr, utc=True).to_pydatetime()
        except Exception:
            return None

def restart_with_concurrency(
    ids_to_restart: list[str],
    max_concurrent: int = MAX_CONCURRENT,
    poll_seconds: int = POLL_SECONDS,
) -> set[str]:
    q = deque(ids_to_restart)
    restarted: set[str] = set()

    while q:
        df = get_instances()
        running = running_instance_ids(df)
        capacity = max_concurrent - len(running)
        now = datetime.now(timezone.utc).isoformat()
        print(f"[{now}] running={len(running)} capacity={capacity} remaining_to_restart={len(q)}")

        if capacity <= 0:
            time.sleep(poll_seconds)
            continue

        restarted_now = 0
        while restarted_now < capacity and q:
            instance_id = q.popleft()

            # Don't touch anything currently running
            if instance_id in running:
                continue

            del_resp = durable_delete(instance_id)
            if del_resp is not None and del_resp.status_code >= 400:
                print(f"  ! delete failed for {instance_id}: {del_resp.status_code} {del_resp.text[:200]}")
                continue

            start_resp = durable_start(instance_id, force=1)
            restarted.add(instance_id)
            restarted_now += 1

            if start_resp is None:
                print(f"Restarted {instance_id}")
            elif start_resp.status_code >= 400:
                print(f"  ! restart start failed for {instance_id}: {start_resp.status_code} {start_resp.text[:200]}")
            else:
                print(f"Restarted {instance_id}: {start_resp.status_code}")

        time.sleep(poll_seconds)

    print(f"Done. Restarted {len(restarted)} instance(s).")
    return restarted

# Identify due instances
instances_now = get_instances()
if instances_now.empty:
    print("No instances found.")
else:
    now_utc = datetime.now(timezone.utc)
    instances_now["next_run_dt"] = instances_now["CustomStatus"].apply(_parse_next_run)
    instances_now["state"] = instances_now["CustomStatus"].apply(lambda cs: cs.get("state") if isinstance(cs, dict) else None)
    instances_now["is_running"] = instances_now.apply(is_running_row, axis=1)

    due = instances_now.loc[
        instances_now["next_run_dt"].notna() & (instances_now["next_run_dt"] <= now_utc)
    ].copy().sort_values("next_run_dt")

    print(f"Checked {len(instances_now)} instance(s) at {now_utc.isoformat()}")
    print(f"Found {len(due)} due instance(s)")
    # if not due.empty:
    #     display(due[["PartitionKey", "RuntimeStatus", "state", "CompletedTime", "is_running", "next_run_dt", "CreatedTime"]])

    # Only restart due instances that are NOT running
    due_not_running = due.loc[~due["is_running"], "PartitionKey"].astype(str).tolist()
    print(f"Due + not running => {len(due_not_running)} instance(s) eligible to restart")

    # Uncomment to actually restart
    restarted_due = restart_with_concurrency(due_not_running)


Checked 624 instance(s) at 2026-01-20T13:45:32.015902+00:00
Found 447 due instance(s)
Due + not running => 447 instance(s) eligible to restart
[2026-01-20T13:45:35.202761+00:00] running=0 capacity=3 remaining_to_restart=447
Restarted cmj062pg7000v13so25nqaoq3: 202
Restarted cly3aib3q008y21448clsb9nq: 202
Restarted cm0754r3c005e6499q92th9ek: 202
[2026-01-20T13:46:15.429895+00:00] running=3 capacity=0 remaining_to_restart=444
[2026-01-20T13:46:28.851993+00:00] running=1 capacity=2 remaining_to_restart=444
Restarted clyd2x6bh00hm21442ifgpggy: 202
Restarted clyg9a3fq000zdkjs8pkjdhrw: 202
[2026-01-20T13:46:43.247666+00:00] running=2 capacity=1 remaining_to_restart=442
Restarted cmhc1y6di002nzbo5kalqa1sw: 202
[2026-01-20T13:47:00.855771+00:00] running=5 capacity=-2 remaining_to_restart=441
[2026-01-20T13:47:14.260998+00:00] running=5 capacity=-2 remaining_to_restart=441
[2026-01-20T13:47:27.562449+00:00] running=5 capacity=-2 remaining_to_restart=441
[2026-01-20T13:47:41.049862+00:00] runnin

# Restart errored instances (CustomStatus.state == "Error")


In [None]:
instances_now = get_instances()
if instances_now.empty:
    print("No instances found.")
else:
    instances_now["state"] = instances_now["CustomStatus"].apply(
        lambda cs: cs.get("state") if isinstance(cs, dict) else None
    )
    instances_now["is_running"] = instances_now.apply(is_running_row, axis=1)

    errored = instances_now.loc[(instances_now["state"] == "Error") | instances_now["state"].isna()].copy()

    # Optional safety: only restart errored instances that are still active in the DB
    if "audience_ids" in globals() and len(errored):
        errored = errored.loc[errored["PartitionKey"].astype(str).isin(audience_ids)].copy()

    print(f"Found {len(errored)} errored instance(s) (CustomStatus.state == 'Error')")
    # if not errored.empty:
    #     display(
    #         errored[["PartitionKey", "RuntimeStatus", "state", "CompletedTime", "is_running", "CreatedTime"]]
    #     )

    # Only restart errored instances that are NOT running
    errored_not_running = errored.loc[~errored["is_running"], "PartitionKey"].astype(str).tolist()
    print(f"Error + not running => {len(errored_not_running)} instance(s) eligible to restart")

    # Uncomment to actually restart
    restarted_error = restart_with_concurrency(errored_not_running)
