In [None]:
# pip install httpx websockets

Note: you may need to restart the kernel to use updated packages.


In [5]:
#!/usr/bin/env python3
import yaml
import httpx
import sys
import json
import asyncio
import websockets
import time


In [6]:


# Backend 1 URL
backend1_url = "http://localhost:8000/execute"
# WebSocket URL to receive streaming responses
ws_url = "ws://localhost:8000/ws"

# Load the YAML flow file
yaml_file_path = "./execution_flows/simple-ai-flow.yaml"
print(f"Loading flow from {yaml_file_path}...")

with open(yaml_file_path, 'r') as file:
    yaml_data = yaml.safe_load(file)


Loading flow from ./execution_flows/simple-ai-flow.yaml...


In [8]:
flow_id = yaml_data.get("flow_id", "flow-" + yaml_file_path.split("/")[-1].split(".")[0])

In [None]:




# Print streaming responses
async def listen_to_stream():
    # Connect to Backend 1's WebSocket
    stream_url = f"{ws_url}/{flow_id}"
    print(f"Connecting to stream at {stream_url}...")
    
    try:
        async with websockets.connect(stream_url) as websocket:
            print("Connected to WebSocket stream. Waiting for messages...")
            
            while True:
                try:
                    # Receive message
                    message = await websocket.recv()
                    
                    # Parse and print the message
                    try:
                        data = json.loads(message)
                        event_type = data.get("type", "unknown")
                        timestamp = data.get("timestamp", time.time())
                        
                        # Format timestamp for display
                        formatted_time = time.strftime("%H:%M:%S", time.localtime(timestamp))
                        
                        print(f"[{formatted_time}] {event_type}")
                        
                        # For some events, show more detailed information
                        if event_type == "element_started":
                            element_data = data.get("data", {})
                            element_name = element_data.get("element_name", "Unknown")
                            element_type = element_data.get("element_type", "Unknown")
                            print(f"  ▶ Started: {element_name} ({element_type})")
                        
                        elif event_type == "element_completed":
                            element_data = data.get("data", {})
                            element_name = element_data.get("element_name", "Unknown")
                            print(f"  ✓ Completed: {element_name}")
                        
                        elif event_type == "element_error":
                            element_data = data.get("data", {})
                            element_name = element_data.get("element_name", "Unknown")
                            error = element_data.get("error", "Unknown error")
                            print(f"  ✗ Error in {element_name}: {error}")
                        
                        elif event_type == "llm_chunk":
                            chunk = data.get("data", {}).get("content", "")
                            print(f"  📝 LLM: {chunk}", end="", flush=True)
                        
                        elif event_type == "flow_completed":
                            print(f"  ✅ Flow completed")
                            return
                            
                    except json.JSONDecodeError:
                        print(f"Received non-JSON message: {message[:100]}...")
                        
                except websockets.exceptions.ConnectionClosed:
                    print("WebSocket connection closed")
                    break
                    
    except Exception as e:
        print(f"Error in WebSocket connection: {str(e)}")



In [10]:
# Our own WebSocket URL (we'll handle streaming ourselves)
my_ws_url = f"ws://localhost:9000/ws/{flow_id}"

# Prepare the request payload
request_data = {
    "flow_id": flow_id,
    "flow_definition": yaml_data.get("flow_definition"),
    "initial_inputs": yaml_data.get("initial_inputs", {}),
    "backend2_ws_url": my_ws_url  # The URL where Backend 1 should stream to
}

# Send the request to Backend 1
print(f"Sending flow '{flow_id}' to Backend 1...")
async with httpx.AsyncClient() as client:
    response = await client.post(backend1_url, json=request_data, timeout=30.0)

# Display the response
if response.status_code == 200:
    result = response.json()
    print(f"Success! Response: {json.dumps(result, indent=2)}")
    print(f"Flow execution started with ID: {result.get('flow_id')}")
    
    # Start listening for streaming responses
    await listen_to_stream()
else:
    print(f"Error: {response.status_code}")
    print(f"Response: {response.text}")


Sending flow 'simple-ai-flow' to Backend 1...


ReadTimeout: 