# Lesson 5: Streaming Responses

In this lesson, we'll learn how to stream responses from Claude using the Anthropic API. Streaming allows you to receive and display partial responses as they're generated, creating a more interactive user experience.

## Learning Objectives
- Understand when and why to use streaming
- Learn how to implement streaming with the Anthropic Python SDK
- Handle streaming responses properly
- Implement error handling for streaming requests

## Setup

First, let's install the required packages and set up our API client.

In [None]:
# Install required packages
!pip install anthropic python-dotenv

In [None]:
import anthropic
import os
from dotenv import load_dotenv
import asyncio
import sys
import time

# Load environment variables
load_dotenv()

# Initialize the Anthropic client
client = anthropic.Anthropic(
    api_key=os.getenv("ANTHROPIC_API_KEY")
)

print("✅ Anthropic client initialized successfully!")

## Basic Streaming

Let's start with a simple streaming example. The key is to use the `stream=True` parameter in your message creation.

In [None]:
def simple_stream_example():
    """Basic streaming example"""
    print("🚀 Starting simple streaming example...\n")
    
    with client.messages.stream(
        model="claude-3-sonnet-20240229",
        max_tokens=1000,
        messages=[
            {
                "role": "user",
                "content": "Write a short story about a robot learning to paint."
            }
        ]
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
    
    print("\n\n✅ Streaming completed!")

simple_stream_example()

## Handling Different Event Types

When streaming, you can access different types of events and data. Let's explore the various event types:

In [None]:
def detailed_stream_example():
    """Example showing different streaming events"""
    print("🔍 Detailed streaming example with event handling...\n")
    
    full_response = ""
    
    with client.messages.stream(
        model="claude-3-sonnet-20240229",
        max_tokens=500,
        messages=[
            {
                "role": "user",
                "content": "Explain quantum computing in simple terms."
            }
        ]
    ) as stream:
        # Handle the stream events
        for event in stream:
            if event.type == "message_start":
                print(f"📨 Message started - ID: {event.message.id}")
                print(f"   Model: {event.message.model}")
                print(f"   Usage: {event.message.usage}\n")
                
            elif event.type == "content_block_start":
                print(f"📝 Content block started - Type: {event.content_block.type}\n")
                
            elif event.type == "content_block_delta":
                if hasattr(event.delta, 'text'):
                    text_chunk = event.delta.text
                    full_response += text_chunk
                    print(text_chunk, end="", flush=True)
                    
            elif event.type == "content_block_stop":
                print("\n\n📄 Content block completed")
                
            elif event.type == "message_delta":
                if hasattr(event.delta, 'usage'):
                    print(f"\n📊 Usage update: {event.delta.usage}")
                    
            elif event.type == "message_stop":
                print("\n🏁 Message completed")
    
    print(f"\n\n📋 Full response length: {len(full_response)} characters")
    print("✅ Detailed streaming completed!")

detailed_stream_example()

## Async Streaming

For applications that need non-blocking operations, you can use async streaming:

In [None]:
# Initialize async client
async_client = anthropic.AsyncAnthropic(
    api_key=os.getenv("ANTHROPIC_API_KEY")
)

async def async_stream_example():
    """Async streaming example"""
    print("⚡ Starting async streaming example...\n")
    
    async with async_client.messages.stream(
        model="claude-3-sonnet-20240229",
        max_tokens=500,
        messages=[
            {
                "role": "user",
                "content": "Write a haiku about streaming data."
            }
        ]
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)
            # Simulate some async processing
            await asyncio.sleep(0.01)
    
    print("\n\n✅ Async streaming completed!")

# Run the async example
await async_stream_example()

## Streaming with System Messages and Multi-turn Conversations

In [None]:
def multi_turn_streaming():
    """Example of streaming in a multi-turn conversation"""
    print("💬 Multi-turn streaming conversation...\n")
    
    # Conversation history
    conversation = [
        {
            "role": "user",
            "content": "I'm learning about machine learning. Can you explain what a neural network is?"
        },
        {
            "role": "assistant",
            "content": "A neural network is a computational model inspired by biological neural networks. It consists of interconnected nodes (neurons) organized in layers that process information and learn patterns from data."
        },
        {
            "role": "user",
            "content": "That's helpful! Now can you explain how they learn, specifically about backpropagation?"
        }
    ]
    
    with client.messages.stream(
        model="claude-3-sonnet-20240229",
        max_tokens=800,
        system="You are a helpful AI tutor. Explain complex topics clearly with examples.",
        messages=conversation
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
    
    print("\n\n✅ Multi-turn streaming completed!")

multi_turn_streaming()

## Error Handling for Streaming

In [None]:
def streaming_with_error_handling():
    """Example of proper error handling with streaming"""
    print("🛡️ Streaming with error handling...\n")
    
    try:
        with client.messages.stream(
            model="claude-3-sonnet-20240229",
            max_tokens=300,
            messages=[
                {
                    "role": "user",
                    "content": "Tell me about the benefits of streaming API responses."
                }
            ]
        ) as stream:
            for text in stream.text_stream:
                print(text, end="", flush=True)
                
    except anthropic.APIError as e:
        print(f"❌ API Error: {e}")
    except anthropic.RateLimitError as e:
        print(f"⏰ Rate limit exceeded: {e}")
    except Exception as e:
        print(f"💥 Unexpected error: {e}")
    else:
        print("\n\n✅ Streaming completed successfully!")

streaming_with_error_handling()

## Streaming Performance Comparison

Let's compare the perceived performance between streaming and non-streaming responses:

In [None]:
def performance_comparison():
    """Compare streaming vs non-streaming response times"""
    prompt = "Write a detailed explanation of how photosynthesis works, including the light and dark reactions."
    
    print("⏱️ Performance Comparison\n")
    
    # Non-streaming request
    print("1️⃣ Non-streaming request:")
    start_time = time.time()
    
    response = client.messages.create(
        model="claude-3-sonnet-20240229",
        max_tokens=800,
        messages=[{"role": "user", "content": prompt}]
    )
    
    end_time = time.time()
    print(f"   Total time: {end_time - start_time:.2f} seconds")
    print(f"   Response length: {len(response.content[0].text)} characters")
    print(f"   Time to first token: {end_time - start_time:.2f} seconds\n")
    
    # Streaming request
    print("2️⃣ Streaming request:")
    start_time = time.time()
    first_token_time = None
    char_count = 0
    
    with client.messages.stream(
        model="claude-3-sonnet-20240229",
        max_tokens=800,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        for text in stream.text_stream:
            if first_token_time is None:
                first_token_time = time.time()
            char_count += len(text)
            # Don't print the actual text to keep output clean
    
    end_time = time.time()
    print(f"   Total time: {end_time - start_time:.2f} seconds")
    print(f"   Response length: {char_count} characters")
    print(f"   Time to first token: {first_token_time - start_time:.2f} seconds")
    
    print("\n📈 Key benefit: Streaming provides much faster time-to-first-token!")

performance_comparison()

## Building a Simple Chat Interface with Streaming

Here's a practical example of how you might use streaming in a chat application:

In [None]:
class StreamingChatBot:
    def __init__(self):
        self.conversation_history = []
        self.client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
    
    def add_user_message(self, message):
        """Add a user message to the conversation history"""
        self.conversation_history.append({
            "role": "user",
            "content": message
        })
    
    def get_streaming_response(self):
        """Get a streaming response from Claude"""
        full_response = ""
        
        try:
            with self.client.messages.stream(
                model="claude-3-sonnet-20240229",
                max_tokens=1000,
                messages=self.conversation_history,
                system="You are a helpful assistant. Be concise but thorough."
            ) as stream:
                print("🤖 Claude: ", end="", flush=True)
                for text in stream.text_stream:
                    print(text, end="", flush=True)
                    full_response += text
                print("\n")
                
        except Exception as e:
            print(f"❌ Error: {e}")
            return None
        
        # Add the response to conversation history
        self.conversation_history.append({
            "role": "assistant",
            "content": full_response
        })
        
        return full_response
    
    def chat(self, user_input):
        """Handle a single chat interaction"""
        print(f"👤 You: {user_input}")
        self.add_user_message(user_input)
        return self.get_streaming_response()

# Demo the streaming chatbot
print("💬 Streaming ChatBot Demo\n")
chatbot = StreamingChatBot()

# Simulate a conversation
chatbot.chat("Hi! Can you help me understand what makes streaming useful for chatbots?")
chatbot.chat("That's interesting! How would I implement this in a web application?")

print("\n✅ ChatBot demo completed!")

## Best Practices for Streaming

### When to Use Streaming
- **Interactive applications**: Chat interfaces, real-time content generation
- **Long responses**: When generating substantial amounts of text
- **User experience**: When you want to show progress and reduce perceived latency

### When NOT to Use Streaming
- **Batch processing**: When processing multiple requests programmatically
- **Short responses**: For brief answers where the overhead isn't worth it
- **Data analysis**: When you need the complete response before processing

### Implementation Tips
1. **Always use context managers** (`with` statements) for proper resource cleanup
2. **Handle errors gracefully** - network issues can interrupt streams
3. **Buffer partial responses** if you need to process the complete text
4. **Consider rate limits** - streaming requests count toward your rate limits
5. **Test error scenarios** - what happens if the stream is interrupted?

## Summary

In this lesson, you learned:

- ✅ How to use the `stream=True` parameter with `client.messages.stream()`
- ✅ Different streaming event types and how to handle them
- ✅ Async streaming for non-blocking operations
- ✅ Proper error handling for streaming requests
- ✅ Performance benefits of streaming (faster time-to-first-token)
- ✅ Building a practical streaming chat interface
- ✅ Best practices and when to use streaming

Streaming is particularly powerful for creating responsive, interactive applications where users can see content being generated in real-time. The key advantage is the significantly reduced time-to-first-token, which makes applications feel much more responsive even if the total generation time is similar.

## Exercises

1. **Modify the chatbot** to save conversation history to a file after each interaction
2. **Create a streaming function** that generates a story paragraph by paragraph
3. **Implement streaming with different models** and compare their response characteristics
4. **Build error recovery** - if a stream is interrupted, retry the request
5. **Create a streaming tokenizer** that counts tokens as they're received