# Streaming Chat Responses

Learn how to use Storm API's streaming endpoint for real-time chat responses.

## Setup

In [None]:
import requests
import json
import os
import time

# Configuration
API_KEY = os.getenv("STORM_API_KEY", "your-api-key-here")
API_URL = "https://https://live-stargate.sionic.im"

headers = {"storm-api-key": API_KEY}

print("✅ Setup complete")

## 1. Understanding Streaming

Streaming provides real-time responses using Server-Sent Events (SSE).

In [None]:
print("🌊 Streaming vs Non-Streaming:\n")

print("Non-Streaming (/api/v2/answer):")
print("  • Wait for complete response")
print("  • Get answer all at once")
print("  • Simple to implement")
print("  • Good for short responses")

print("\nStreaming (/api/v2/answer/stream):")
print("  • Get response in real-time")
print("  • Better user experience")
print("  • Show progress as it generates")
print("  • Ideal for long responses")

print("\n📋 SSE Event Format:")
print("""data: {"status": "success", "data": {...}}
data: {"status": "success", "data": {...}}
data: {"status": "success", "data": {"is_final_event": true}}""")

## 2. Basic Streaming Request

In [None]:
def stream_chat(question, bucket_ids=None, thread_id=None):
    """Stream chat response from Storm API."""
    
    data = {
        "question": question,
        "bucketIds": bucket_ids or [],
    }
    
    if thread_id:
        data["threadId"] = thread_id
    
    # Make streaming request
    response = requests.post(
        f"{API_URL}/api/v2/answer/stream",
        headers=headers,
        json=data,
        stream=True  # Enable streaming
    )
    
    if response.status_code != 200:
        print(f"❌ Error: {response.status_code}")
        print(response.text)
        return
    
    # Process SSE stream
    full_answer = ""
    contexts = []
    
    print("💬 Streaming response:\n")
    
    for line in response.iter_lines():
        if line:
            line = line.decode('utf-8')
            
            # SSE format: "data: {json}"
            if line.startswith("data: "):
                try:
                    json_str = line[6:]  # Remove "data: " prefix
                    event = json.loads(json_str)
                    
                    if event["status"] == "success":
                        data = event["data"]
                        
                        # Check for message content
                        if "message" in data and "content" in data["message"]:
                            content = data["message"]["content"]
                            print(content, end="", flush=True)
                            full_answer += content
                        
                        # Check for contexts
                        if "contexts" in data:
                            contexts = data["contexts"]
                        
                        # Check if this is the final event
                        if data.get("is_final_event"):
                            print("\n\n✅ Stream complete")
                            break
                
                except json.JSONDecodeError:
                    print(f"\n⚠️ Failed to parse: {line}")
    
    return full_answer, contexts

# Test streaming
question = "What are the benefits of using Storm API for document processing?"
print(f"🤔 Question: {question}\n")

answer, contexts = stream_chat(question)

if contexts:
    print(f"\n📚 Used {len(contexts)} sources")

## 3. Advanced Streaming Handler

In [None]:
class StreamingChatHandler:
    """Advanced handler for streaming chat responses."""
    
    def __init__(self, api_key, api_url="https://https://live-stargate.sionic.im"):
        self.api_key = api_key
        self.api_url = api_url
        self.headers = {"storm-api-key": api_key}
    
    def parse_sse_event(self, line):
        """Parse a Server-Sent Event line."""
        if not line or not line.startswith("data: "):
            return None
        
        try:
            json_str = line[6:]  # Remove "data: " prefix
            return json.loads(json_str)
        except json.JSONDecodeError:
            return None
    
    def stream_with_callbacks(
        self, 
        question, 
        on_token=None, 
        on_context=None, 
        on_complete=None,
        bucket_ids=None,
        thread_id=None
    ):
        """Stream with callback functions for different events."""
        
        data = {
            "question": question,
            "bucketIds": bucket_ids or []
        }
        
        if thread_id:
            data["threadId"] = thread_id
        
        response = requests.post(
            f"{self.api_url}/api/v2/answer/stream",
            headers=self.headers,
            json=data,
            stream=True
        )
        
        if response.status_code != 200:
            raise Exception(f"API Error {response.status_code}: {response.text}")
        
        full_answer = ""
        contexts = []
        metadata = {}
        
        for line in response.iter_lines():
            if not line:
                continue
            
            event = self.parse_sse_event(line.decode('utf-8'))
            if not event or event.get("status") != "success":
                continue
            
            data = event["data"]
            
            # Handle message content
            if "message" in data and "content" in data["message"]:
                content = data["message"]["content"]
                full_answer += content
                
                if on_token:
                    on_token(content)
            
            # Handle contexts
            if "contexts" in data:
                contexts = data["contexts"]
                if on_context:
                    on_context(contexts)
            
            # Store metadata
            if "id" in data:
                metadata["chat_id"] = data["id"]
            if "task_id" in data:
                metadata["task_id"] = data["task_id"]
            if "model" in data:
                metadata["model"] = data["model"]
            
            # Check for completion
            if data.get("is_final_event"):
                if on_complete:
                    on_complete(full_answer, contexts, metadata)
                break
        
        return full_answer, contexts, metadata

# Create handler
handler = StreamingChatHandler(API_KEY)

# Define callbacks
def on_token(token):
    """Called for each token received."""
    print(token, end="", flush=True)

def on_context(contexts):
    """Called when contexts are received."""
    print(f"\n\n📚 Received {len(contexts)} context sources")

def on_complete(answer, contexts, metadata):
    """Called when streaming is complete."""
    print("\n\n✅ Streaming complete!")
    print(f"📊 Metadata: {metadata}")

# Test with callbacks
print("🎯 Streaming with callbacks:\n")

try:
    answer, contexts, metadata = handler.stream_with_callbacks(
        "Explain how Storm API handles document parsing",
        on_token=on_token,
        on_context=on_context,
        on_complete=on_complete
    )
except Exception as e:
    print(f"\n❌ Error: {e}")

## 4. Building a Streaming UI

In [None]:
# Simulate a streaming UI component
class StreamingUI:
    """Simulated UI for streaming responses."""
    
    def __init__(self):
        self.buffer = ""
        self.tokens_received = 0
        self.start_time = None
    
    def start_response(self):
        """Called when response starts."""
        self.start_time = time.time()
        print("💭 Thinking...", end="", flush=True)
    
    def add_token(self, token):
        """Add a token to the display."""
        if self.tokens_received == 0:
            print("\r💬 ", end="", flush=True)  # Clear "Thinking..."
        
        self.buffer += token
        self.tokens_received += 1
        print(token, end="", flush=True)
    
    def show_typing_indicator(self):
        """Show typing animation."""
        indicators = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
        for i in range(10):
            print(f"\r{indicators[i % len(indicators)]} Generating...", end="", flush=True)
            time.sleep(0.1)
    
    def complete_response(self, contexts):
        """Called when response is complete."""
        elapsed = time.time() - self.start_time
        
        print(f"\n\n📊 Response Stats:")
        print(f"  • Tokens: {self.tokens_received}")
        print(f"  • Time: {elapsed:.2f}s")
        print(f"  • Speed: {self.tokens_received / elapsed:.1f} tokens/s")
        print(f"  • Sources: {len(contexts)}")

# Example: Streaming with UI
ui = StreamingUI()

def stream_with_ui(question):
    """Stream response with UI updates."""
    
    ui.start_response()
    
    try:
        answer, contexts, metadata = handler.stream_with_callbacks(
            question,
            on_token=ui.add_token,
            on_complete=lambda a, c, m: ui.complete_response(c)
        )
        return answer
    except Exception as e:
        print(f"\n❌ Streaming error: {e}")
        return None

# Test streaming UI
print("🖥️ Streaming UI Demo:\n")
question = "What are the key features of Storm API's streaming endpoint?"
print(f"Q: {question}\n")

answer = stream_with_ui(question)

## 5. Handling Streaming Errors

In [None]:
def robust_stream_chat(question, max_retries=3, timeout=60):
    """Stream with error handling and recovery."""
    
    for attempt in range(max_retries):
        try:
            response = requests.post(
                f"{API_URL}/api/v2/answer/stream",
                headers=headers,
                json={"question": question, "bucketIds": []},
                stream=True,
                timeout=timeout
            )
            
            if response.status_code != 200:
                print(f"❌ HTTP {response.status_code}: {response.text}")
                continue
            
            full_answer = ""
            error_count = 0
            last_event_time = time.time()
            
            for line in response.iter_lines():
                # Check for timeout between events
                if time.time() - last_event_time > 30:
                    raise TimeoutError("No events for 30 seconds")
                
                if not line:
                    continue
                
                last_event_time = time.time()
                
                try:
                    line = line.decode('utf-8')
                    if line.startswith("data: "):
                        event = json.loads(line[6:])
                        
                        if event["status"] == "error":
                            print(f"\n⚠️ Stream error: {event}")
                            error_count += 1
                            if error_count > 3:
                                raise Exception("Too many stream errors")
                        
                        elif event["status"] == "success":
                            data = event["data"]
                            
                            if "message" in data and "content" in data["message"]:
                                content = data["message"]["content"]
                                print(content, end="", flush=True)
                                full_answer += content
                            
                            if data.get("is_final_event"):
                                print("\n✅ Success")
                                return full_answer
                
                except json.JSONDecodeError as e:
                    print(f"\n⚠️ Parse error: {e}")
                    error_count += 1
            
            # If we get here, stream ended without final event
            print("\n⚠️ Stream ended unexpectedly")
            
        except requests.exceptions.Timeout:
            print(f"\n⏱️ Timeout on attempt {attempt + 1}")
        
        except requests.exceptions.ConnectionError:
            print(f"\n🔌 Connection error on attempt {attempt + 1}")
        
        except Exception as e:
            print(f"\n❌ Error on attempt {attempt + 1}: {e}")
        
        if attempt < max_retries - 1:
            wait = 2 ** attempt
            print(f"⏳ Retrying in {wait}s...")
            time.sleep(wait)
    
    return None

# Test error handling
print("🛡️ Testing robust streaming:\n")

test_cases = [
    "Normal question about Storm API",
    # Add more test cases as needed
]

for q in test_cases:
    print(f"\nQ: {q}")
    print("A: ", end="")
    answer = robust_stream_chat(q)
    if not answer:
        print("Failed to get response")

## 6. Streaming Best Practices

In [None]:
print("📚 Streaming Best Practices:\n")

print("1. Connection Management:")
print("   • Set appropriate timeouts")
print("   • Handle connection drops gracefully")
print("   • Implement reconnection logic")
print("   • Monitor connection health")

print("\n2. Error Handling:")
print("   • Parse SSE events safely")
print("   • Handle malformed events")
print("   • Implement retry logic")
print("   • Provide fallback options")

print("\n3. UI/UX:")
print("   • Show typing indicators")
print("   • Display partial responses")
print("   • Update UI smoothly")
print("   • Handle long responses")

print("\n4. Performance:")
print("   • Buffer tokens for smooth display")
print("   • Avoid blocking the UI thread")
print("   • Monitor memory usage")
print("   • Implement backpressure if needed")

# Example: Production-ready streaming class
class ProductionStreamingClient:
    """Production-ready streaming client."""
    
    def __init__(self, api_key, config=None):
        self.api_key = api_key
        self.config = config or {
            "api_url": "https://https://live-stargate.sionic.im",
            "timeout": 60,
            "max_retries": 3,
            "buffer_size": 100,
            "event_timeout": 30
        }
    
    def stream(self, question, callbacks=None):
        """Stream with production features."""
        # Implementation would include:
        # - Connection pooling
        # - Event buffering
        # - Metrics collection
        # - Error recovery
        # - Rate limiting
        pass

print("\n✅ Ready for production streaming!")

## Summary

You've learned how to:
- ✅ Use Storm API's streaming endpoint
- ✅ Handle Server-Sent Events (SSE)
- ✅ Build streaming UI components
- ✅ Handle streaming errors
- ✅ Implement production-ready streaming
- ✅ Follow streaming best practices

## Next Steps

- [Context Search](./03-context-search.ipynb) - Search for relevant contexts
- [Building a Chatbot](./04-chatbot-example.ipynb) - Complete chatbot with streaming
- [Advanced Topics](../06-advanced-topics/02-webhooks.md) - Webhook integration