# Workflow Runner and Session Poller

This notebook shows a generic pattern to trigger a remote workflow and poll its session until final agent outputs are available. The code is split into small, reusable cells for clarity and reuse.

## High-level steps
- Fetch workflow configuration from the gateway
- Identify sink agents (agents whose outputs are final)
- Start a workflow run with the provided input payload
- Poll the session for new events until each sink agent has produced an assistant message
- Return and print the final text outputs for each sink agent

## Prerequisites
- A valid API key with permission to run the target workflow

## How to use
1. Run the Install cell to ensure dependencies are available.
2. Review the Config cell and set your API key and workflow name if needed.
3. Run the remaining cells in order.
4. The last cell triggers the run and prints final results.



In [None]:
# Install dependencies needed for this notebook
%pip install requests -q


In [None]:
import json
import time
from typing import Any, Dict, Optional, List

import requests


In [None]:
FLOTORCH_GATEWAY_BASE_URL = "<FLOTROCH_GATEWAY_BASE_URL" # eg, "https://qa-gateway.flotorch.cloud"
WORKFLOW_NAME = "<WORKFLOW_NAME_CREATED_IN_CONSOLE>" # eg, "support-ticket-auto-resolution"
FLOTORCH_API_KEY = "<FLOTORCH_API_KEY>"
INPUT_DATA: Any = "Input to your workflow (string/JSON)>" 

''' input data example: 
{
    "ticket": {
      "subject": "Refund not received for order #A1234",
      "body": "I cancelled 10 days ago, portal shows 'Processed' but I still haven’t received the refund.",
      "channel": "email",
      "locale": "en-US"
    }
  }
'''


In [None]:
def get_run_endpoint(base_url: str = FLOTORCH_GATEWAY_BASE_URL, workflow_name: str = WORKFLOW_NAME) -> str:
    return f"{base_url.rstrip('/')}/v1/workflows/{workflow_name}/run"


def run_workflow(
    input_data: Any,
    base_url: str = FLOTORCH_GATEWAY_BASE_URL,
    workflow_name: str = WORKFLOW_NAME,
    api_key: str = FLOTORCH_API_KEY,
) -> requests.Response:
    url = get_run_endpoint(base_url, workflow_name)
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {api_key}",
    }
    body = {"input": input_data}
    response = requests.post(url, headers=headers, data=json.dumps(body), timeout=30)
    response.raise_for_status()
    return response


In [None]:
def fetch_workflow_config(workflow_name: str) -> dict:
    """
    Fetch the workflow configuration from the API.
    Example: /v1/workflows/{workflow_name}
    """
    url = f"{FLOTORCH_GATEWAY_BASE_URL.rstrip('/')}/v1/workflows/{workflow_name}"
    headers = {"Authorization": f"Bearer {FLOTORCH_API_KEY}"}
    resp = requests.get(url, headers=headers, timeout=30)
    resp.raise_for_status()
    return resp.json()


In [None]:
def get_sink_agents(workflow_json: dict) -> list[str]:
    """
    From the workflow JSON, find the agent(s) that are at the end of the flow.
    These are the agents whose outputs go directly to END (or nowhere else).
    """
    nodes = {n["id"]: n for n in workflow_json.get("nodes", [])}
    edges = workflow_json.get("edges", [])

    out_adj = {}
    for e in edges:
        out_adj.setdefault(e["sourceNodeId"], []).append(e["targetNodeId"])

    def is_agent(n): return n.get("type") == "AGENT"
    def is_end(n): return n.get("type") == "END"

    def to_author(callable_name: str) -> str:
        base = callable_name.split(":")[0]
        return base.replace("-", "_")

    sink_agents = []

    for nid, n in nodes.items():
        if not is_agent(n):
            continue
        outs = out_adj.get(nid, [])
        leads_to_agent = any(nodes[t]["type"] == "AGENT" for t in outs if t in nodes)
        if not leads_to_agent:
            sink_agents.append(to_author(n["callableName"]))

    return sink_agents


In [None]:
def get_session_endpoint(
    session_id: str,
    num_recent_events: int = 10,
    base_url: str = FLOTORCH_API_KEY,
) -> str:
    return f"{base_url.rstrip('/')}/v1/sessions/{session_id}?numRecentEvents={num_recent_events}"


def fetch_session(
    session_id: str,
    num_recent_events: int = 10,
    base_url: str = FLOTORCH_GATEWAY_BASE_URL,
    api_key: str = FLOTORCH_API_KEY,
) -> requests.Response:
    url = get_session_endpoint(session_id, num_recent_events, base_url)
    headers = {
        "Authorization": f"Bearer {api_key}",
    }
    resp = requests.get(url, headers=headers, timeout=30)
    resp.raise_for_status()
    return resp


In [None]:
def _latest_assistant_event_by_author(events: List[Dict[str, Any]], author: str) -> Optional[Dict[str, Any]]:
    """Return the latest assistant event from a specific author, or None."""
    hits = [e for e in events if e.get("author") == author and (e.get("content") or {}).get("role") == "assistant"]
    if not hits:
        return None
    hits.sort(key=lambda e: e.get("timestamp", ""))
    return hits[-1]


def _text_from_event(evt: Optional[Dict[str, Any]]) -> Optional[str]:
    """Extract plain text from an event (content.parts[0].text)."""
    if not evt:
        return None
    parts = (evt.get("content") or {}).get("parts") or []
    return parts[0].get("text") if parts and isinstance(parts[0], dict) else None


In [None]:
def poll_session_until_final(
    session_id: str,
    final_authors: List[str],
    *,
    interval_sec: float = 2.0,   # how often to poll
    timeout_sec: float = 600.0,  # max wait
    window: int = 200            # numRecentEvents window
) -> Dict[str, Any]:
    """
    Keep polling the session until all final_authors have at least one assistant message.
    Returns:
      {
        "status": "ok" | "timeout",
        "final_events": { author: event or None, ... },
        "final_text": { author: text or None, ... },
        "raw": <last session json>
      }
    """
    deadline = time.time() + timeout_sec
    final_events: Dict[str, Any] = {a: None for a in final_authors}

    while time.time() < deadline:
        resp = fetch_session(session_id, num_recent_events=window)
        data = resp.json()
        events = data.get("events", []) or []

        for a in final_authors:
            evt = _latest_assistant_event_by_author(events, a)
            if evt:
                final_events[a] = evt

        if all(final_events[a] is not None for a in final_authors):
            return {
                "status": "ok",
                "final_events": final_events,
                "final_text": {a: _text_from_event(final_events[a]) for a in final_authors},
                "raw": data
            }

        time.sleep(interval_sec)

    return {
        "status": "timeout",
        "final_events": final_events,
        "final_text": {a: _text_from_event(final_events[a]) for a in final_authors},
        "raw": data if 'data' in locals() else None
    }


In [None]:
# Run the workflow and poll until final
config = fetch_workflow_config(WORKFLOW_NAME)
sinks = get_sink_agents(config)
resp = run_workflow(INPUT_DATA)
data = resp.json()
session_id = data.get('invocationId') or data.get('invocationid')
print(f"InvocationId: {session_id}")
print("Workflow started… waiting for agents to respond.\n")
result = poll_session_until_final(session_id, sinks)
print("Workflow finished.\n")
print("Status: ", result["status"])
print("Final result: ", result["final_text"])
