Server-Sent Events (SSE) is a standard for pushing real-time updates from a server to a client over HTTP. It's commonly used by LLM APIs (like OpenAI, Anthropic, etc.) to stream responses token-by-token.

This tutorial shows how to consume SSE streams using Python's `requests` library.

## What is SSE?

SSE is a one-way communication channel where the server sends events to the client. Each event follows this format:

```
event: message
data: {"key": "value"}

event: message  
data: {"key": "another_value"}
```

Events are separated by double newlines (`\n\n`), and each line is prefixed with a field name like `data:`, `event:`, `id:`, or `retry:`.

## Basic SSE Consumption with Requests

The key is to use `stream=True` and iterate over the response line by line.

In [None]:
import requests

def consume_sse(url, headers=None, payload=None):
    """Basic SSE consumer using requests."""
    
    response = requests.post(
        url,
        headers=headers,
        json=payload,
        stream=True  # This is crucial for SSE
    )
    response.raise_for_status()
    
    # Iterate over the response line by line
    for line in response.iter_lines():
        if line:
            decoded_line = line.decode('utf-8')
            
            # SSE data lines start with 'data: '
            if decoded_line.startswith('data: '):
                data = decoded_line[6:]  # Remove 'data: ' prefix
                
                # Many APIs use '[DONE]' to signal end of stream
                if data == '[DONE]':
                    break
                    
                yield data

## Parsing JSON from SSE Events

Most APIs send JSON data in their SSE events. Here's how to parse it:

In [None]:
import json
import requests

def consume_sse_json(url, headers=None, payload=None):
    """SSE consumer that parses JSON data."""
    
    response = requests.post(
        url,
        headers=headers,
        json=payload,
        stream=True
    )
    response.raise_for_status()
    
    for line in response.iter_lines():
        if line:
            decoded_line = line.decode('utf-8')
            
            if decoded_line.startswith('data: '):
                data = decoded_line[6:]
                
                if data == '[DONE]':
                    break
                
                try:
                    yield json.loads(data)
                except json.JSONDecodeError:
                    # Handle non-JSON data if needed
                    yield data

## Real-World Example: OpenAI-Compatible API

Here's a practical example for streaming from an OpenAI-compatible API:

In [None]:
import json
import requests

def stream_chat_completion(api_url, api_key, messages, model="gpt-4"):
    """Stream chat completions from an OpenAI-compatible API."""
    
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json",
        "Accept": "text/event-stream",  # Explicitly request SSE
    }
    
    payload = {
        "model": model,
        "messages": messages,
        "stream": True,  # Enable streaming
    }
    
    response = requests.post(
        api_url,
        headers=headers,
        json=payload,
        stream=True
    )
    response.raise_for_status()
    
    full_response = ""
    
    for line in response.iter_lines():
        if line:
            decoded_line = line.decode('utf-8')
            
            if decoded_line.startswith('data: '):
                data = decoded_line[6:]
                
                if data == '[DONE]':
                    break
                
                try:
                    chunk = json.loads(data)
                    # Extract the content delta
                    delta = chunk.get('choices', [{}])[0].get('delta', {})
                    content = delta.get('content', '')
                    
                    if content:
                        full_response += content
                        print(content, end='', flush=True)  # Print as we receive
                        
                except json.JSONDecodeError:
                    pass
    
    print()  # New line at the end
    return full_response

In [None]:
# Example usage (replace with your actual API details)
# response = stream_chat_completion(
#     api_url="https://api.openai.com/v1/chat/completions",
#     api_key="your-api-key",
#     messages=[{"role": "user", "content": "Hello!"}]
# )

## Handling Timeouts and Errors

SSE streams can be long-running. Here's how to handle timeouts gracefully:

In [None]:
import json
import requests
from requests.exceptions import ChunkedEncodingError, ConnectionError, Timeout

def robust_sse_consumer(url, headers=None, payload=None, timeout=60):
    """SSE consumer with error handling."""
    
    try:
        response = requests.post(
            url,
            headers=headers,
            json=payload,
            stream=True,
            timeout=(5, timeout)  # (connect timeout, read timeout)
        )
        response.raise_for_status()
        
        for line in response.iter_lines():
            if line:
                decoded_line = line.decode('utf-8')
                
                if decoded_line.startswith('data: '):
                    data = decoded_line[6:]
                    
                    if data == '[DONE]':
                        break
                    
                    try:
                        yield json.loads(data)
                    except json.JSONDecodeError:
                        yield {"raw": data}
                        
    except Timeout:
        yield {"error": "Request timed out"}
    except ConnectionError:
        yield {"error": "Connection failed"}
    except ChunkedEncodingError:
        yield {"error": "Stream interrupted"}

## Alternative: Using sseclient-py

For more complex SSE handling, consider using the `sseclient-py` library:

In [None]:
# pip install sseclient-py

import requests
import sseclient

def consume_sse_with_library(url, headers=None, payload=None):
    """Using sseclient-py for cleaner SSE handling."""
    
    response = requests.post(
        url,
        headers=headers,
        json=payload,
        stream=True
    )
    response.raise_for_status()
    
    client = sseclient.SSEClient(response)
    
    for event in client.events():
        if event.data == '[DONE]':
            break
        yield event.data

## Key Takeaways

1. **Always use `stream=True`** - This prevents requests from buffering the entire response
2. **Use `iter_lines()`** - This gives you the response line by line as it arrives
3. **Parse the `data:` prefix** - SSE events have this prefix before the actual data
4. **Handle `[DONE]`** - Many APIs use this to signal the end of the stream
5. **Set appropriate timeouts** - SSE streams can be long-running, so plan accordingly
6. **Consider using `sseclient-py`** - For complex SSE handling with proper event parsing