# Test Local Streaming Setup

In [1]:
import asyncio

import aiohttp
import nest_asyncio
from typing import Any

In [2]:
nest_asyncio.apply()
sample_payload = {
    "payload": {
        "data": ["Apple", "Samsung", "OnePlus", "Google"]
    }
}
request_ep = "http://localhost:8080/python/produce"
response_ep = "http://localhost:8080/python/consume"

## Using async Post and Polling to fetch results as they are processed
Inefficient because we still have to poll the GET endpoint via. the reverse proxy. The processed output from the response topic however is still being read by a Kafka Consumer and is therefore a NON Polling operation

In [3]:
# Async POST and GET polling using aiohttp
poll_interval = 1 # Seconds between GET polls
max_polls = 10 # Maximum number of GET polls per request_id

async def poll_for_result(session: aiohttp.ClientSession, request_id: str) -> dict[str, Any] | None:
    """Polls the response endpoint for the result corresponding to the given request_id."""
    for attempt in range(max_polls):
        async with session.get(f"{response_ep}/{request_id}") as resp:
            if resp.status == 200:
                data = await resp.json()
                if data.get("status") == "completed":
                    print(f"Result for request_id {request_id}: {data}")
                    return data
                else:
                    print(f"Response for request_id {request_id} malformed.")
            else:
                print(f"Attempt {attempt + 1}: Result not ready yet for request_id {request_id}. Retrying in {poll_interval} seconds...")
        await asyncio.sleep(poll_interval)
    print(f"Max polls reached for request_id {request_id}. Moving to next request.")
    return None


async def post_and_poll(N: int = 2) -> None:
    async with aiohttp.ClientSession() as session:
        tasks  = []
        for i in range(N):
            async with session.post(request_ep, json=sample_payload) as resp:
                if resp.status == 202:
                    data = await resp.json()
                    request_id = data.get("request_id")
                    print(f"Request {i + 1}: Sent payload. Received request_id: {request_id}")
                    if request_id:
                        tasks.append(poll_for_result(session, request_id))
                    else:
                        print(f"No request_id received. Status code: {resp.status}")
        # Running all polling tasks concurrently
        await asyncio.gather(*tasks)

In [4]:
# Run the async function
asyncio.run(post_and_poll(N=2))

Request 1: Sent payload. Received request_id: 120d0a6e-3ab9-4119-8d2c-42f184e9a34a
Request 2: Sent payload. Received request_id: 3da61acf-c868-4d5c-b625-df36c08ea472
Attempt 1: Result not ready yet for request_id 120d0a6e-3ab9-4119-8d2c-42f184e9a34a. Retrying in 1 seconds...
Attempt 1: Result not ready yet for request_id 3da61acf-c868-4d5c-b625-df36c08ea472. Retrying in 1 seconds...
Result for request_id 120d0a6e-3ab9-4119-8d2c-42f184e9a34a: {'request_id': '120d0a6e-3ab9-4119-8d2c-42f184e9a34a', 'status': 'completed', 'result': {'processed_data': "Processed ['Apple', 'Samsung', 'OnePlus', 'Google']"}}
Attempt 2: Result not ready yet for request_id 3da61acf-c868-4d5c-b625-df36c08ea472. Retrying in 1 seconds...
Attempt 3: Result not ready yet for request_id 3da61acf-c868-4d5c-b625-df36c08ea472. Retrying in 1 seconds...
Result for request_id 3da61acf-c868-4d5c-b625-df36c08ea472: {'request_id': '3da61acf-c868-4d5c-b625-df36c08ea472', 'status': 'completed', 'result': {'processed_data': "Pro

## Using WebSockets for Real-Time Results

The cell below demonstrates a more efficient, push-based approach using WebSockets. Instead of polling a GET endpoint, the client opens a persistent WebSocket connection and simply waits for the server to push the result. This reduces latency and eliminates unnecessary network traffic.

In [5]:
# Using WebSocket to get results
ws_ep = "ws://localhost:8080/python/ws/results"

async def listen_for_result_ws(session: aiohttp.ClientSession, request_id: str) -> None:
    """Connects to the WebSocket endpoint and waits for the result."""
    ws_url = f"{ws_ep}/{request_id}"
    print(f"Connecting to WebSocket: {ws_url}")
    try:
        async with session.ws_connect(ws_url, timeout=60.0) as ws:
            print(f"WebSocket connection established for request_id: {request_id}")
            # Wait for one message and then close
            msg = await ws.receive_json()
            print(f"Result for request_id {request_id}: {msg}")
            await ws.close()
    except aiohttp.ClientError as e:
        print(f"Error connecting or listening to WebSocket for {request_id}: {e}")

async def post_and_listen_ws(N: int = 2) -> None:
    async with aiohttp.ClientSession() as session:
        tasks = []
        for i in range(N):
            async with session.post(request_ep, json=sample_payload) as resp:
                if resp.status == 202:
                    data = await resp.json()
                    request_id = data.get("request_id")
                    print(f"Request {i + 1}: Sent payload. Received request_id: {request_id}")
                    if request_id:
                        tasks.append(listen_for_result_ws(session, request_id))
                    else:
                        print(f"No request_id received. Status code: {resp.status}")
        
        # Running all WebSocket listener tasks concurrently
        if tasks:
            await asyncio.gather(*tasks)

In [6]:
# Run the WebSocket-based async function
asyncio.run(post_and_listen_ws(N=2))

Request 1: Sent payload. Received request_id: c41b678e-1518-48d5-b661-231e711e9c78
Request 2: Sent payload. Received request_id: 93fae7a5-80d9-40e4-ad45-a96a9b5eac1e
Connecting to WebSocket: ws://localhost:8080/python/ws/results/c41b678e-1518-48d5-b661-231e711e9c78
Connecting to WebSocket: ws://localhost:8080/python/ws/results/93fae7a5-80d9-40e4-ad45-a96a9b5eac1e
WebSocket connection established for request_id: c41b678e-1518-48d5-b661-231e711e9c78
WebSocket connection established for request_id: 93fae7a5-80d9-40e4-ad45-a96a9b5eac1e
Result for request_id c41b678e-1518-48d5-b661-231e711e9c78: {'request_id': 'c41b678e-1518-48d5-b661-231e711e9c78', 'status': 'completed', 'result': {'processed_data': "Processed ['Apple', 'Samsung', 'OnePlus', 'Google']"}}
Result for request_id 93fae7a5-80d9-40e4-ad45-a96a9b5eac1e: {'request_id': '93fae7a5-80d9-40e4-ad45-a96a9b5eac1e', 'status': 'completed', 'result': {'processed_data': "Processed ['Apple', 'Samsung', 'OnePlus', 'Google']"}}
