# SSE Connection Monitor

> Pattern for monitoring Server-Sent Events (SSE) connections with visual status indicators and automatic reconnection

In [None]:
#| default_exp patterns.sse_connection_monitor

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
from typing import Optional, Dict, Any
from dataclasses import dataclass
from fasthtml.common import *

from cjm_fasthtml_interactions.core.html_ids import InteractionHtmlIds
from cjm_fasthtml_daisyui.components.data_display.status import status, status_colors, status_sizes
from cjm_fasthtml_daisyui.utilities.semantic_colors import text_dui
from cjm_fasthtml_tailwind.utilities.spacing import m
from cjm_fasthtml_tailwind.utilities.typography import font_size
from cjm_fasthtml_tailwind.utilities.layout import display_tw
from cjm_fasthtml_tailwind.core.base import combine_classes

## Connection Status Configuration

The `SSEConnectionConfig` dataclass defines configuration options for the SSE connection monitor.

In [None]:
#| export
@dataclass
class SSEConnectionConfig:
    """Configuration for SSE connection monitoring."""
    
    max_reconnect_attempts: int = 10  # Maximum number of reconnection attempts
    reconnect_delay: int = 1000  # Initial reconnect delay in milliseconds
    max_backoff_multiplier: int = 5  # Maximum backoff multiplier for reconnect delay
    monitor_visibility: bool = True  # Monitor tab visibility and reconnect when visible
    log_to_console: bool = True  # Enable console logging for debugging

## Status Indicator Creation

Helper function to create status indicators for different connection states.

In [None]:
#| export
def create_connection_status_indicators(
    status_size: str = "sm",  # Size of status indicator dot (xs, sm, md, lg)
    show_text: bool = True,  # Whether to show status text
    text_size: str = "text-sm",  # Text size class
    hide_text_on_mobile: bool = True  # Hide text on small screens
) -> Dict[str, FT]:  # Dictionary of status state to indicator element
    """Create status indicator elements for different connection states."""
    # Map size string to size class
    size_map = {
        "xs": status_sizes.xs,
        "sm": status_sizes.sm,
        "md": status_sizes.md,
        "lg": status_sizes.lg,
    }
    size_cls = size_map.get(status_size, status_sizes.sm)
    
    # Build text classes
    text_classes = [text_size] if text_size else [str(font_size.sm)]
    if hide_text_on_mobile:
        text_classes.extend([display_tw.hidden, display_tw.inline.sm])
    
    def _create_indicator(color_cls, text_cls, label: str) -> FT:
        """Helper to create a status indicator."""
        parts = [
            Span(cls=combine_classes(
                status,
                color_cls,
                size_cls,
                m.r(1),
                m.r(2).sm
            ))
        ]
        
        if show_text:
            parts.append(
                Span(label, cls=combine_classes(text_cls, *text_classes))
            )
        
        return Span(*parts)
    
    return {
        'active': _create_indicator(status_colors.success, text_dui.success, "Live"),
        'disconnected': _create_indicator(status_colors.warning, text_dui.warning, "Disconnected"),
        'error': _create_indicator(status_colors.error, text_dui.error, "Error"),
        'reconnecting': _create_indicator(status_colors.info, text_dui.info, "Reconnecting..."),
    }

## SSE Connection Monitor Script

The `SSEConnectionMonitorScript` function creates a JavaScript script that monitors SSE connection status.

Key features:
- Monitors HTMX SSE events (open, error, close, message)
- Updates visual status indicators
- Automatic reconnection with exponential backoff
- Handles server shutdown gracefully
- Tab visibility awareness (reconnects when tab becomes visible)
- Detects OOB swaps that remove SSE element

In [None]:
#| export
def SSEConnectionMonitorScript(
    connection_id: str,  # Unique identifier for this SSE connection
    status_indicators: Dict[str, FT],  # Status indicator elements for each state
    config: Optional[SSEConnectionConfig] = None  # Configuration options
) -> FT:  # Script element with monitoring code
    """Create a script that monitors SSE connection status and manages reconnection."""
    if config is None:
        config = SSEConnectionConfig()
    
    # Generate HTML IDs
    status_id = InteractionHtmlIds.sse_status(connection_id)
    element_id = InteractionHtmlIds.sse_element(connection_id)
    
    # Convert indicators to HTML strings for JavaScript
    status_html = {
        key: str(indicator)
        for key, indicator in status_indicators.items()
    }
    
    # Build monitoring script
    log_prefix = f"[SSE Monitor: {connection_id}]"
    
    def log(message: str) -> str:
        """Generate console.log statement if logging is enabled."""
        if config.log_to_console:
            return f"console.log('{log_prefix} {message}');"
        return ""
    
    # For log messages with JavaScript variables, build them differently
    def log_with_var(message_template: str) -> str:
        """Generate console.log with JavaScript variable interpolation."""
        if config.log_to_console:
            return f"console.log('{log_prefix} ' + {message_template});"
        return ""
    
    monitor_script = f"""
    (function() {{
        let reconnectAttempts = 0;
        const maxReconnectAttempts = {config.max_reconnect_attempts};
        const reconnectDelay = {config.reconnect_delay};
        const maxBackoffMultiplier = {config.max_backoff_multiplier};
        let isShuttingDown = false;
        const statusElement = document.getElementById('{status_id}');
        const sseElement = document.getElementById('{element_id}');

        const statusIndicators = {{
            active: `{status_html['active']}`,
            disconnected: `{status_html['disconnected']}`,
            error: `{status_html['error']}`,
            reconnecting: `{status_html['reconnecting']}`
        }};

        function updateStatus(state) {{
            if (statusElement && statusIndicators[state]) {{
                statusElement.innerHTML = statusIndicators[state];
            }}
        }}

        // Monitor HTMX SSE events
        document.body.addEventListener('htmx:sseOpen', function(evt) {{
            if (evt.detail.elt === sseElement) {{
                {log('Connection opened')};
                updateStatus('active');
                reconnectAttempts = 0;
            }}
        }});

        document.body.addEventListener('htmx:sseError', function(evt) {{
            if (evt.detail.elt === sseElement) {{
                {log('Connection error')};

                if (isShuttingDown) {{
                    {log('Server is shutting down, not attempting reconnection')};
                    updateStatus('disconnected');
                    return;
                }}

                updateStatus('error');

                if (reconnectAttempts < maxReconnectAttempts) {{
                    const backoff = Math.min(reconnectAttempts + 1, maxBackoffMultiplier);
                    const delay = reconnectDelay * backoff;
                    setTimeout(function() {{
                        reconnectAttempts++;
                        {log_with_var("'Attempting to reconnect... (attempt ' + reconnectAttempts + ')'")};
                        updateStatus('reconnecting');
                        htmx.trigger(sseElement, 'htmx:sseReconnect');
                    }}, delay);
                }} else {{
                    {log('Max reconnection attempts reached')};
                    updateStatus('disconnected');
                }}
            }}
        }});

        document.body.addEventListener('htmx:sseClose', function(evt) {{
            if (evt.detail.elt === sseElement) {{
                {log('Connection closed')};
                updateStatus('disconnected');
            }}
        }});

        document.body.addEventListener('htmx:sseMessage', function(evt) {{
            if (evt.detail.elt === sseElement && evt.detail.event === 'close') {{
                {log_with_var("'Server requested connection close: ' + evt.detail.data")};
                isShuttingDown = true;
                updateStatus('disconnected');
                reconnectAttempts = maxReconnectAttempts;
                if (sseElement._sseEventSource) {{
                    sseElement._sseEventSource.close();
                    delete sseElement._sseEventSource;
                }}
            }}
        }});

        document.body.addEventListener('htmx:oobAfterSwap', function(evt) {{
            if (evt.detail.target && evt.detail.target.id === '{element_id}') {{
                {log('SSE element removed via OOB swap - server shutting down')};
                isShuttingDown = true;
                updateStatus('disconnected');
            }}
        }});

        {'document.addEventListener("visibilitychange", function() {' if config.monitor_visibility else ''}
            {'''if (!document.hidden && sseElement && !isShuttingDown) {''' if config.monitor_visibility else ''}
                {'''const evtSource = sseElement._sseEventSource;''' if config.monitor_visibility else ''}
                {'''if (!evtSource || evtSource.readyState === EventSource.CLOSED) {''' if config.monitor_visibility else ''}
                    {log('Page became visible, reconnecting SSE...') if config.monitor_visibility else ''};
                    {'''updateStatus('reconnecting');''' if config.monitor_visibility else ''}
                    {'''htmx.trigger(sseElement, 'htmx:sseReconnect');''' if config.monitor_visibility else ''}
                {'''}''' if config.monitor_visibility else ''}
            {'''}''' if config.monitor_visibility else ''}
        {'''});''' if config.monitor_visibility else ''}

        // Initial status
        updateStatus('reconnecting');
    }})();
    """
    
    return Script(monitor_script)

## Complete SSE Connection Monitor

The `SSEConnectionMonitor` function creates a complete connection monitoring system with status indicator and monitoring script.

In [None]:
#| export
def SSEConnectionMonitor(
    connection_id: str,  # Unique identifier for this SSE connection
    status_size: str = "sm",  # Size of status indicator
    show_text: bool = True,  # Whether to show status text
    hide_text_on_mobile: bool = True,  # Hide text on small screens
    config: Optional[SSEConnectionConfig] = None,  # Configuration options
    container_cls: Optional[str] = None  # Additional CSS classes for status container
) -> tuple[FT, FT]:  # Tuple of (status_container, monitor_script)
    """Create a complete SSE connection monitoring system."""
    # Create status indicators
    indicators = create_connection_status_indicators(
        status_size=status_size,
        show_text=show_text,
        hide_text_on_mobile=hide_text_on_mobile
    )
    
    # Create status container
    status_id = InteractionHtmlIds.sse_status(connection_id)
    status_container = Div(
        # Initial state - will be updated by script
        indicators['reconnecting'],
        id=status_id,
        cls=container_cls
    )
    
    # Create monitor script
    monitor_script = SSEConnectionMonitorScript(
        connection_id=connection_id,
        status_indicators=indicators,
        config=config
    )
    
    return status_container, monitor_script

## Usage Examples

Here are complete examples showing different SSE monitoring use cases:

In [None]:
# Example 1: Simple connection monitor
status_container, monitor_script = SSEConnectionMonitor(
    connection_id="simple-stream"
)

print("Example 1: Simple connection monitor")
print(f"Status container ID: {InteractionHtmlIds.sse_status('simple-stream')}")
print(f"SSE element ID: {InteractionHtmlIds.sse_element('simple-stream')}")

Example 1: Simple connection monitor
Status container ID: sse-status-simple-stream
SSE element ID: sse-element-simple-stream


In [None]:
# Example 2: Custom configuration
config = SSEConnectionConfig(
    max_reconnect_attempts=5,
    reconnect_delay=2000,
    max_backoff_multiplier=3,
    log_to_console=False  # Disable logging in production
)

status_container, monitor_script = SSEConnectionMonitor(
    connection_id="dashboard",
    status_size="md",
    config=config
)

print("Example 2: Custom configuration")
print(f"Max reconnect attempts: {config.max_reconnect_attempts}")
print(f"Reconnect delay: {config.reconnect_delay}ms")

Example 2: Custom configuration
Max reconnect attempts: 5
Reconnect delay: 2000ms


In [None]:
# Example 3: Custom status indicators
from cjm_fasthtml_daisyui.components.data_display.badge import badge, badge_colors

# Create custom indicators using badges instead of status dots
custom_indicators = {
    'active': Span("Connected", cls=str(combine_classes(badge, badge_colors.success))),
    'disconnected': Span("Offline", cls=str(combine_classes(badge, badge_colors.warning))),
    'error': Span("Error", cls=str(combine_classes(badge, badge_colors.error))),
    'reconnecting': Span("Connecting", cls=str(combine_classes(badge, badge_colors.info))),
}

# Create monitor script with custom indicators
monitor_script = SSEConnectionMonitorScript(
    connection_id="custom",
    status_indicators=custom_indicators
)

status_container = Div(
    custom_indicators['reconnecting'],
    id=InteractionHtmlIds.sse_status("custom")
)

print("Example 3: Custom status indicators using badges")

Example 3: Custom status indicators using badges


## Complete Page Example

Here's a complete example showing how to use the SSE connection monitor in a FastHTML application:

In [None]:
from fasthtml.common import Div, H1, P
from cjm_fasthtml_daisyui.components.data_display.card import card, card_body
from cjm_fasthtml_daisyui.utilities.semantic_colors import bg_dui
from cjm_fasthtml_tailwind.utilities.flexbox_and_grid import flex_display, items, justify, gap
from cjm_fasthtml_tailwind.utilities.spacing import p as padding

def create_dashboard_page():
    """Example dashboard page with SSE monitoring."""
    connection_id = "dashboard"
    
    # Create connection monitor
    status_container, monitor_script = SSEConnectionMonitor(
        connection_id=connection_id,
        status_size="sm"
    )
    
    return Div(
        # Header with connection status
        Div(
            Div(
                H1("Dashboard"),
                status_container,  # Connection status indicator
                cls=combine_classes(
                    flex_display,
                    items.center,
                    justify.between,
                    padding.b(4)
                )
            )
        ),
        
        # SSE connection element (receives updates)
        Div(
            # Content will be updated via SSE
            P("Waiting for updates..."),
            hx_ext="sse",
            sse_connect="/stream/dashboard",
            sse_swap="update",
            id=InteractionHtmlIds.sse_element(connection_id),
            cls=str(combine_classes(card, card_body, bg_dui.base_100))
        ),
        
        # Monitor script (must be included)
        monitor_script
    )

# Example route handler
print("Example: Complete dashboard page with SSE monitoring")
print("Route: /dashboard")
print("SSE Endpoint: /stream/dashboard")

Example: Complete dashboard page with SSE monitoring
Route: /dashboard
SSE Endpoint: /stream/dashboard


## Server-Side SSE Implementation

Here's how to implement the server-side SSE endpoint:

```python
from fasthtml.common import *
from starlette.responses import StreamingResponse
import asyncio

@app.get("/stream/dashboard")
async def stream_dashboard():
    """SSE endpoint for dashboard updates."""
    
    async def event_generator():
        try:
            while True:
                # Generate your data
                data = get_dashboard_data()
                
                # Send SSE event
                yield f"event: update\n"
                yield f"data: {data}\n\n"
                
                # Wait before next update
                await asyncio.sleep(1)
                
        except asyncio.CancelledError:
            # Send close message before shutting down
            yield f"event: close\n"
            yield f"data: Server shutting down\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )
```

### Graceful Shutdown

To signal shutdown to clients, send a 'close' event:

```python
# In your shutdown handler
async def shutdown_sse_connections():
    """Send close event to all SSE clients."""
    for client in active_connections:
        await client.send("event: close\n")
        await client.send("data: Server shutting down\n\n")
```

### OOB Swap for Shutdown

Alternatively, use OOB swap to remove the SSE element:

```python
# Send before shutdown
Div(
    id=InteractionHtmlIds.sse_element(connection_id),
    hx_swap_oob="outerHTML"
)
```

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()