# Real-Time Streaming

This notebook demonstrates SurrealDB's real-time capabilities through the ORM: **Live Queries** (WebSocket) and **Change Feeds** (HTTP). These features enable reactive applications like live dashboards, chat systems, and multiplayer games.

## Use Case: Live Dashboard / Chat Notifications

We'll explore two real-time patterns:
1. **Live Queries** -- WebSocket-based subscriptions that push changes instantly
2. **Change Feeds** -- HTTP-based polling for event-driven architectures (microservices, CDC)

## Prerequisites

- SurrealDB running locally (`docker run --rm -p 8000:8000 surrealdb/surrealdb:latest start --user root --pass root`)
- Project dependencies installed (`uv sync`)
- A `.env` file in the project root (optional, falls back to defaults)

In [None]:
# Setup: add project root to path and configure SurrealDB connection
import os, sys
project_root = os.path.abspath(os.path.join(os.getcwd(), ".."))
sys.path.append(project_root)
from dotenv import load_dotenv
load_dotenv()

from src.surreal_orm import SurrealDBConnectionManager

SurrealDBConnectionManager.set_connection(
    os.getenv("SURREALDB_URL", "ws://localhost:8000"),
    os.getenv("SURREALDB_USER", "root"),
    os.getenv("SURREALDB_PASS", "root"),
    os.getenv("SURREALDB_NAMESPACE", "ns"),
    os.getenv("SURREALDB_DATABASE", "db"),
)

## IMPORTANT: Real-Time APIs Are Blocking

Live queries and change feeds run as **async iterators** -- they block the current task waiting for events. In a Jupyter notebook, this means the cell will not complete until the stream is closed.

The examples below show the **API patterns** with commented-out blocking code. To actually see live updates, you would run the stream in a background `asyncio.Task` while triggering changes from another cell or terminal.

In production, these typically run as separate async tasks in your application's event loop.

In [None]:
# Define a simple model for our real-time examples
from src.surreal_orm import BaseSurrealModel, SurrealConfigDict

class ChatMessage(BaseSurrealModel):
    model_config = SurrealConfigDict(table_name="chat_message")

    id: str | None = None
    text: str
    sender: str
    room: str = "general"

In [None]:
# Create some initial messages to work with
messages = [
    ChatMessage(text="Hello everyone!", sender="Alice", room="general"),
    ChatMessage(text="Hey Alice!", sender="Bob", room="general"),
    ChatMessage(text="Working on the API", sender="Charlie", room="dev"),
]

for msg in messages:
    await msg.save()
    print(f"Saved: [{msg.room}] {msg.sender}: {msg.text}")

## 1. Live Queries (WebSocket)

Live queries use a WebSocket connection to subscribe to changes on a table. When any record matching your filter is created, updated, or deleted, the event is pushed instantly to your application.

**When to use Live Queries:**
- Real-time UI updates (chat, dashboards, collaborative editing)
- Multiplayer game state synchronization
- Any scenario where sub-second latency matters

**How it works:**
1. `QuerySet.live()` opens a WebSocket (created lazily on first call)
2. Returns a `LiveModelStream` -- an async context manager + iterator
3. Each event is a `ModelChangeEvent[T]` with full Pydantic model instances
4. The stream auto-resubscribes on WebSocket disconnect by default

In [None]:
# Live Query API pattern -- subscribes to changes in the "general" room
# NOTE: This is commented out because it blocks the cell. Uncomment to run.
from src.surreal_orm import LiveAction

# async with ChatMessage.objects().filter(room="general").live() as stream:
#     async for event in stream:
#         match event.action:
#             case LiveAction.CREATE:
#                 print(f"New message from {event.instance.sender}: {event.instance.text}")
#             case LiveAction.UPDATE:
#                 print(f"Message edited: {event.instance.text}")
#             case LiveAction.DELETE:
#                 print(f"Message deleted: {event.record_id}")

print("Live query API shown above (commented out -- it blocks the cell).")
print("In production, run this in an asyncio.Task alongside your main app.")

In [None]:
# Live Query with DIFF mode -- receive only changed fields (bandwidth-efficient)
# Useful when records have large fields but you only care about what changed

# async with ChatMessage.objects().live(diff=True) as stream:
#     async for event in stream:
#         print(f"Changed fields: {event.changed_fields}")
#         print(f"Full record: {event.instance}")

print("DIFF mode sends only the changed fields, reducing payload size.")

In [None]:
# Auto-resubscribe: seamless recovery from WebSocket disconnects
# This is critical for production -- K8s pod restarts, network blips, etc.

# async with ChatMessage.objects().filter(room="general").live(
#     auto_resubscribe=True  # Default is True
# ) as stream:
#     async for event in stream:
#         # This keeps working even after WebSocket reconnection
#         print(f"{event.action}: {event.instance.text}")

print("auto_resubscribe=True (default) automatically re-subscribes after disconnect.")

## 2. Change Feeds (HTTP)

Change feeds provide **HTTP-based** change data capture. Unlike live queries, they don't require a persistent WebSocket connection -- they use polling instead.

**When to use Change Feeds:**
- Event-driven microservices (publish DB changes to a message queue)
- Data replication and ETL pipelines
- When WebSocket connections aren't available or desired
- When you need resumable streaming with cursor tracking

In [None]:
# Change Feed API pattern -- HTTP-based event streaming
# NOTE: Also blocks, shown as API reference

# async for event in ChatMessage.objects().changes(since="2026-01-01"):
#     # Publish to message queue for downstream services
#     await publish_to_queue({
#         "type": f"chat.{event.action.value.lower()}",
#         "data": event.raw,
#     })

print("Change feeds use HTTP polling -- no WebSocket required.")
print("Ideal for microservices and event-driven architectures.")

In [None]:
# Resumable streaming with cursor tracking
# Save the cursor position to resume after restarts

# stream = ChatMessage.objects().changes(since="2026-01-01")
# async for event in stream:
#     process(event)
#     # Save cursor for resumability
#     save_checkpoint(stream.cursor)

# On restart:
# cursor = load_checkpoint()
# stream = ChatMessage.objects().changes(since=cursor)

print("The .cursor property tracks your position for resumable streaming.")

## 3. ModelChangeEvent Structure

Both live queries and change feeds yield `ModelChangeEvent[T]` objects with these attributes:

| Attribute | Type | Description |
|---|---|---|
| `action` | `LiveAction` | CREATE, UPDATE, or DELETE |
| `instance` | `T` | Full Pydantic model instance (the changed record) |
| `record_id` | `str` | The record ID (e.g., `chat_messages:abc123`) |
| `changed_fields` | `dict \| None` | Changed fields only (when using DIFF mode) |
| `raw` | `dict` | Raw response data from SurrealDB |

## 4. Signals: React to Live Changes

The `post_live_change` signal lets you register handlers that fire whenever a live query event is received. This is separate from local CRUD signals (`post_save`, etc.) and is useful for broadcasting changes to WebSocket clients or triggering side effects.

In [None]:
# Signal-based handler for live query events
from src.surreal_orm import post_live_change, LiveAction

# Register a handler that fires when any ChatMessage live event arrives
# @post_live_change.connect(ChatMessage)
# async def on_message_change(sender, instance, action, record_id, **kwargs):
#     if action == LiveAction.CREATE:
#         # Broadcast to connected WebSocket clients
#         await ws_manager.broadcast({
#             "type": "new_message",
#             "sender": instance.sender,
#             "text": instance.text,
#         })

print("post_live_change signal fires for external DB changes received via live queries.")
print("Use it to bridge SurrealDB events to your WebSocket/SSE layer.")

## 5. Live Queries vs Change Feeds: Choosing the Right Tool

| Feature | Live Queries | Change Feeds |
|---|---|---|
| Protocol | WebSocket | HTTP (polling) |
| Latency | Sub-second (push) | Configurable poll interval |
| Connection | Persistent, stateful | Stateless (per-request) |
| Filter support | WHERE clause | Table-level |
| Resumability | Auto-resubscribe | Cursor-based |
| Use case | Real-time UIs, games | ETL, microservices, CDC |
| Scaling | Per-client connection | Shared HTTP endpoint |

**General rule:** Use live queries for user-facing real-time features. Use change feeds for backend event processing and data pipelines.

## Real-World Architecture Pattern

A common pattern for multiplayer games or collaborative apps:

```python
import asyncio

async def main():
    # Task 1: Listen for player changes
    async def watch_players():
        async with Player.objects().filter(table_id=my_table).live() as stream:
            async for event in stream:
                await broadcast_to_clients(event)

    # Task 2: Handle HTTP requests (FastAPI, etc.)
    async def serve_api():
        ...

    # Run both concurrently
    await asyncio.gather(watch_players(), serve_api())
```

This is how you'd integrate live queries with a web framework like FastAPI or Starlette.

In [None]:
# Cleanup: remove all test data
await ChatMessage.objects().delete_table()
print("Cleanup complete.")