# Intermediate Tutorial 1: WebSocket Real-Time Events

**Level:** Intermediate  
**Time:** 25-30 minutes  
**Prerequisites:** Basic tutorials 1-3

## Overview

Learn real-time event streaming with WebSockets:
- Connecting to WebSocket server
- Channel subscriptions (tokens, grid, cdna)
- Receiving real-time events
- Heartbeat mechanism
- Error handling and reconnection

## Setup

In [None]:
import asyncio
import websockets
import json
import requests
from datetime import datetime

# Configuration
WS_URL = "ws://localhost:8000/ws"
REST_URL = "http://localhost:8000/api/v1"

# Get auth token
response = requests.post(f"{REST_URL}/auth/login", 
                        json={"username": "admin", "password": "admin"})
TOKEN = response.json()["access_token"]
headers = {"Authorization": f"Bearer {TOKEN}"}

print("‚úì Setup complete")

## Step 1: Simple WebSocket Connection

In [None]:
async def simple_connection():
    """Connect and receive a few messages."""
    async with websockets.connect(WS_URL) as websocket:
        print("‚úì Connected to WebSocket")
        
        # Subscribe to tokens channel
        subscribe_msg = {
            "type": "subscribe",
            "channel": "tokens"
        }
        await websocket.send(json.dumps(subscribe_msg))
        print("‚úì Subscribed to tokens channel")
        
        # Listen for 10 seconds
        try:
            async with asyncio.timeout(10):
                async for message in websocket:
                    data = json.loads(message)
                    print(f"[{datetime.now().strftime('%H:%M:%S')}] {data}")
        except asyncio.TimeoutError:
            print("\n‚úì Timeout reached")

# Run (will timeout after 10 seconds)
await simple_connection()

## Step 2: Subscribe to Multiple Channels

In [None]:
async def multi_channel_subscription():
    """Subscribe to multiple channels."""
    async with websockets.connect(WS_URL) as websocket:
        print("‚úì Connected")
        
        # Subscribe to all channels
        channels = ["tokens", "grid", "cdna"]
        for channel in channels:
            msg = {"type": "subscribe", "channel": channel}
            await websocket.send(json.dumps(msg))
            print(f"‚úì Subscribed to {channel}")
        
        # Listen for events
        try:
            async with asyncio.timeout(15):
                async for message in websocket:
                    data = json.loads(message)
                    channel = data.get('channel', 'unknown')
                    event_type = data.get('type', 'unknown')
                    print(f"üì® [{channel}] {event_type}")
        except asyncio.TimeoutError:
            print("\n‚úì Done")

await multi_channel_subscription()

## Step 3: Trigger Events from REST API

In [None]:
async def listen_and_trigger():
    """Listen to WebSocket while triggering REST API changes."""
    
    async def websocket_listener(ws):
        """Background task to listen for events."""
        try:
            async for message in ws:
                data = json.loads(message)
                if data.get('type') != 'pong':  # Ignore heartbeat
                    print(f"  üîî Event: {data.get('type')} on {data.get('channel')}")
                    if 'data' in data:
                        print(f"     Data: {data['data']}")
        except websockets.exceptions.ConnectionClosed:
            pass
    
    async with websockets.connect(WS_URL) as websocket:
        # Subscribe
        await websocket.send(json.dumps({"type": "subscribe", "channel": "tokens"}))
        print("‚úì Listening for token events...\n")
        
        # Start listener task
        listener_task = asyncio.create_task(websocket_listener(websocket))
        
        # Trigger events via REST API
        print("Creating token...")
        response = requests.post(
            f"{REST_URL}/tokens",
            json={"position": [1.0]*8, "radius": 1.0, "weight": 1.0},
            headers=headers
        )
        token_id = response.json()['token_id']
        await asyncio.sleep(0.5)
        
        print(f"\nUpdating token {token_id}...")
        requests.put(
            f"{REST_URL}/tokens/{token_id}",
            json={"weight": 5.0},
            headers=headers
        )
        await asyncio.sleep(0.5)
        
        print(f"\nDeleting token {token_id}...")
        requests.delete(f"{REST_URL}/tokens/{token_id}", headers=headers)
        await asyncio.sleep(0.5)
        
        # Stop listener
        listener_task.cancel()
        print("\n‚úì Demo complete")

await listen_and_trigger()

## Step 4: Heartbeat (Ping/Pong)

In [None]:
async def heartbeat_demo():
    """Demonstrate heartbeat mechanism."""
    async with websockets.connect(WS_URL) as websocket:
        print("‚úì Connected\n")
        
        # Send pings and measure latency
        for i in range(3):
            start = asyncio.get_event_loop().time()
            
            await websocket.send(json.dumps({"type": "ping"}))
            
            response = await websocket.recv()
            data = json.loads(response)
            
            latency = (asyncio.get_event_loop().time() - start) * 1000
            print(f"Ping #{i+1}: {data['type']} (latency: {latency:.2f}ms)")
            
            await asyncio.sleep(1)
        
        print("\n‚úì Heartbeat working")

await heartbeat_demo()

## Step 5: Error Handling & Reconnection

In [None]:
async def robust_connection():
    """Connection with error handling and auto-reconnect."""
    max_retries = 3
    retry_count = 0
    
    while retry_count < max_retries:
        try:
            async with websockets.connect(WS_URL) as websocket:
                print(f"‚úì Connected (attempt {retry_count + 1})")
                retry_count = 0  # Reset on successful connection
                
                # Subscribe
                await websocket.send(json.dumps({"type": "subscribe", "channel": "tokens"}))
                
                # Listen
                async for message in websocket:
                    data = json.loads(message)
                    print(f"Received: {data.get('type')}")
                    
        except websockets.exceptions.ConnectionClosed:
            retry_count += 1
            print(f"‚ùå Connection closed. Retrying... ({retry_count}/{max_retries})")
            await asyncio.sleep(2 ** retry_count)  # Exponential backoff
            
        except Exception as e:
            print(f"‚ùå Error: {e}")
            break
    
    if retry_count >= max_retries:
        print("‚ùå Max retries reached. Giving up.")

# Run with timeout
try:
    await asyncio.wait_for(robust_connection(), timeout=10)
except asyncio.TimeoutError:
    print("\n‚úì Demo timeout")

## Step 6: Event Filtering

In [None]:
async def filtered_events():
    """Filter events by type."""
    async with websockets.connect(WS_URL) as websocket:
        await websocket.send(json.dumps({"type": "subscribe", "channel": "tokens"}))
        print("‚úì Listening for token_created events only...\n")
        
        created_count = 0
        
        async def listen():
            nonlocal created_count
            async for message in websocket:
                data = json.loads(message)
                if data.get('type') == 'token_created':
                    created_count += 1
                    print(f"  ‚úì Token created: {data.get('data', {})}")
        
        listener = asyncio.create_task(listen())
        
        # Create some tokens
        for i in range(3):
            requests.post(
                f"{REST_URL}/tokens",
                json={"position": [float(i)]*8, "radius": 1.0, "weight": 1.0},
                headers=headers
            )
            await asyncio.sleep(0.5)
        
        await asyncio.sleep(1)
        listener.cancel()
        
        print(f"\n‚úì Captured {created_count} creation events")

await filtered_events()

## Step 7: Unsubscribe from Channel

In [None]:
async def subscribe_unsubscribe():
    """Dynamic subscription management."""
    async with websockets.connect(WS_URL) as websocket:
        # Subscribe
        await websocket.send(json.dumps({"type": "subscribe", "channel": "tokens"}))
        print("‚úì Subscribed to tokens")
        await asyncio.sleep(2)
        
        # Unsubscribe
        await websocket.send(json.dumps({"type": "unsubscribe", "channel": "tokens"}))
        print("‚úì Unsubscribed from tokens")
        
        # Verify no more events
        print("\nCreating token (should not receive event)...")
        requests.post(
            f"{REST_URL}/tokens",
            json={"position": [0.0]*8, "radius": 1.0, "weight": 1.0},
            headers=headers
        )
        
        await asyncio.sleep(2)
        print("‚úì No events received (as expected)")

await subscribe_unsubscribe()

## Summary

In this tutorial, you learned:

‚úÖ **WebSocket connections** - Real-time bidirectional communication  
‚úÖ **Channel subscriptions** - tokens, grid, cdna  
‚úÖ **Event handling** - Receive and process real-time events  
‚úÖ **Heartbeat** - Ping/pong for connection monitoring  
‚úÖ **Error handling** - Reconnection with exponential backoff  
‚úÖ **Event filtering** - Process specific event types  
‚úÖ **Subscription management** - Subscribe/unsubscribe dynamically  

## Next Steps

- **Tutorial 2**: REST API Integration - Authentication, rate limiting
- **Advanced Tutorial 1**: Performance optimization
- **Advanced Tutorial 2**: Production deployment

## Key Takeaways

1. **WebSockets provide real-time updates** (~5ms latency)
2. **Multiple channels** for different event types
3. **Heartbeat mechanism** ensures connection health
4. **Error handling critical** for production use
5. **Event filtering** reduces client-side processing

---

**Need help?** Check the [WebSocket API docs](https://your-docs-url/api/websocket.html)