# Lesson 11: Real-Time Streaming Responses

üî¥ **Advanced** ¬∑ ‚è± **20 min**

---

By default, lyzr-adk waits for the complete response before returning it. Streaming changes this ‚Äî you receive tokens as they're generated, enabling real-time output in chatbots, CLIs, and interactive applications. This lesson covers how streaming works, when to use it, and its constraints.

## What you'll learn

- Understand the difference between streaming and non-streaming responses
- Iterate over response chunks as they arrive in real time
- Build a real-time streaming display and CLI-style chat loop
- Understand which features are incompatible with streaming (RAI guardrails, structured outputs)

## Prerequisites

> **Note:** This is an **optional advanced lesson**. It is not required to complete the core lyzr-adk series.

Before starting this lesson, you should have completed (or be familiar with):

- **Lesson 1** ‚Äî Getting Started (agent creation basics)
- **Lesson 2** ‚Äî Providers and Models
- **Lesson 3** ‚Äî Agent Lifecycle
- **Lesson 4** ‚Äî Structured Outputs
- **Lesson 5** ‚Äî Memory and Sessions

You will also need:
- A valid `LYZR_API_KEY` set as an environment variable (or replace the placeholder in the setup cell)

In [None]:
!pip install lyzr-adk -q

In [None]:
import os
import time
from lyzr import Studio

API_KEY = os.getenv("LYZR_API_KEY", "YOUR_LYZR_API_KEY")
studio = Studio(api_key=API_KEY)
print("Ready!")

## Streaming vs Non-Streaming

Understanding the trade-offs between the two modes helps you choose the right one for each situation.

| | Non-Streaming (default) | Streaming |
|---|---|---|
| **How it works** | Waits for complete response | Yields chunks as generated |
| **Latency to first token** | High (wait for all) | Very low |
| **Use case** | Batch processing, structured data | Chat UIs, CLIs, live display |
| **Response type** | `response.response` (string) | Iterator of string chunks |
| **With RAI?** | ‚úÖ Yes | ‚ùå No |
| **Structured output?** | ‚úÖ Yes | ‚ùå No |

The key API difference is a single argument: `stream=True` vs `stream=False` (default).

```python
# Non-streaming (default)
response = agent.run("message", stream=False)
print(response.response)          # full string

# Streaming
for chunk in agent.run("message", stream=True):
    print(chunk, end="", flush=True)  # chunk is a string fragment
```

## Creating an Agent for Streaming

Agent creation is identical for streaming and non-streaming. The `stream` parameter is only passed to `agent.run()`, not to `create_agent()`.

In [None]:
# Create an agent ‚Äî same as always
stream_agent = studio.create_agent(
    name="Stream Demo Agent",
    provider="openai/gpt-4o",
    role="Storyteller and explainer",
    goal="Give detailed, engaging responses",
    instructions="Be thorough and descriptive. Use complete sentences."
)
print(f"Agent created: {stream_agent.id}")

## Your First Streaming Response

Pass `stream=True` to `agent.run()` and iterate over the result. Each iteration yields a string chunk ‚Äî a fragment of the response as it is generated.

Two important details for real-time display:
- `end=""` prevents `print` from adding a newline after each chunk
- `flush=True` forces the output buffer to flush immediately so tokens appear as they arrive

In [None]:
print("Streaming response (tokens appear as they arrive):\n")
print("-" * 50)

# stream=True returns an iterator of string chunks
for chunk in stream_agent.run("Explain how neural networks learn in simple terms.", stream=True):
    print(chunk, end="", flush=True)  # flush=True ensures immediate display

print("\n" + "-" * 50)
print("\n‚úÖ Stream complete!")

## Collecting Streamed Chunks

Sometimes you want to display chunks in real time *and* have the complete response available afterward ‚Äî for logging, post-processing, or analysis. Simply accumulate chunks in a list and join them.

In [None]:
# Collect all chunks to reconstruct the full response
chunks = []
print("Streaming and collecting:\n")

for chunk in stream_agent.run("What are the three laws of robotics?", stream=True):
    chunks.append(chunk)
    print(chunk, end="", flush=True)

print("\n")

# Reconstruct the full text
full_response = "".join(chunks)
word_count = len(full_response.split())
print(f"\nüìä Stats: {len(chunks)} chunks received, {word_count} words total")

## Streaming with Sessions and Memory

Streaming is fully compatible with memory and sessions. You can pass `session_id` to `agent.run()` exactly as you would in non-streaming mode ‚Äî the agent maintains conversation context across turns.

This makes streaming suitable for real-time chat interfaces where continuity across turns is required.

In [None]:
import uuid

stream_agent.add_memory(max_messages=10)
session = str(uuid.uuid4())

# Turn 1: stream with session
print("Turn 1:")
for chunk in stream_agent.run("My name is Alex and I love astronomy.", stream=True, session_id=session):
    print(chunk, end="", flush=True)
print("\n")

# Turn 2: stream followup ‚Äî agent remembers
print("Turn 2 (agent should remember Alex):")
for chunk in stream_agent.run("What's my name and what do I love?", stream=True, session_id=session):
    print(chunk, end="", flush=True)
print("\n")

## Comparing Speed: Non-Streaming vs Streaming

The total time to receive the complete response is roughly the same in both modes ‚Äî the LLM generates the same number of tokens either way. The meaningful difference is **time to first token**:

- **Non-streaming**: you wait until the entire response is generated before seeing anything
- **Streaming**: the first token appears almost immediately, making the interaction feel much faster to the user

This perceived responsiveness is the primary reason to use streaming in interactive applications.

In [None]:
question = "List 5 interesting facts about black holes."

# Non-streaming: measure time to full response
start = time.time()
response = stream_agent.run(question, stream=False)
total_time = time.time() - start
print(f"Non-streaming: {total_time:.2f}s to full response")
print(f"Response: {response.response[:100]}...\n")

# Streaming: measure time to first token
start = time.time()
first_token_time = None
all_chunks = []
for chunk in stream_agent.run(question, stream=True):
    if first_token_time is None:
        first_token_time = time.time() - start
    all_chunks.append(chunk)
print(f"Streaming: {first_token_time:.2f}s to first token, {time.time()-start:.2f}s total")

## Common Mistakes: Incompatibilities with Streaming

Two features are **incompatible** with streaming:

### 1. Structured Outputs (`response_format`)
Structured outputs require the complete response to be available before it can be parsed and validated against a schema. Streaming yields raw chunks, making schema validation impossible mid-stream.

**Rule:** If you need a `response_format`, use `stream=False`.

### 2. RAI Guardrails
RAI (Responsible AI) policies inspect the full response content before returning it ‚Äî for toxicity checks, content filtering, and so on. This inspection step requires the complete response, which is unavailable during streaming.

**Rule:** If you have `add_rai_policy()` on an agent, use `stream=False`.

In [None]:
from pydantic import BaseModel

class Summary(BaseModel):
    text: str
    word_count: int

# ‚ùå Mistake 1: structured output with streaming
try:
    for chunk in stream_agent.run("Summarize AI", stream=True, response_format=Summary):
        print(chunk)
except Exception as e:
    print(f"‚ùå Structured output + streaming error: {e}")

print()

# ‚ùå Mistake 2: RAI policy + streaming
try:
    policy = studio.create_rai_policy(name="Test", toxicity=True)
    rai_agent = studio.create_agent(
        name="RAI Stream Test", provider="openai/gpt-4o",
        role="Test", goal="Test", instructions="Test"
    )
    rai_agent.add_rai_policy(policy)
    for chunk in rai_agent.run("Hello", stream=True):
        print(chunk)
except Exception as e:
    print(f"‚ùå RAI + streaming error: {e}")

print("\n‚úÖ Solution: use stream=False when using RAI or structured outputs.")

## Exercise: Build a Streaming CLI Chat Loop

Put it all together. Your task is to build a simple CLI-style chat loop that:

1. Creates a conversational agent with memory enabled
2. Starts a new session with a unique ID
3. Reads user input with `input()`
4. Streams the agent's response token by token
5. Maintains conversation context across turns (via `session_id`)
6. Exits cleanly when the user types `quit`, `exit`, or `q`

Fill in the `TODO` sections in the cell below.

In [None]:
import uuid

# TODO: Create an agent suitable for chat
chat_agent = studio.create_agent(
    name=...,
    provider="openai/gpt-4o",
    role=...,
    goal=...,
    instructions=...
)
chat_agent.add_memory(max_messages=20)

# TODO: Start a chat session
chat_session = str(uuid.uuid4())

# Simple streaming chat loop
print("Chat started! Type 'quit' to exit.\n")
while True:
    user_input = input("You: ").strip()
    if user_input.lower() in ["quit", "exit", "q"]:
        print("Chat ended.")
        break
    if not user_input:
        continue

    print("Agent: ", end="", flush=True)
    # TODO: Stream the response using stream=True and session_id=chat_session
    ...
    print()  # newline after each response

## Summary

### When to use streaming vs non-streaming

| Scenario | Recommendation |
|---|---|
| Chat UI or CLI with real-time output | `stream=True` |
| Batch processing or automation | `stream=False` |
| Structured output (`response_format`) | `stream=False` |
| RAI guardrails (`add_rai_policy`) | `stream=False` |
| Memory and sessions | Either ‚Äî both work |
| Tools and function calling | Either ‚Äî both work |
| Contexts and knowledge bases | Either ‚Äî both work |

### Compatibility matrix

| Feature | Non-Streaming | Streaming |
|---|---|---|
| Memory / sessions | ‚úÖ | ‚úÖ |
| Tools / functions | ‚úÖ | ‚úÖ |
| Contexts | ‚úÖ | ‚úÖ |
| Knowledge bases | ‚úÖ | ‚úÖ |
| Structured outputs | ‚úÖ | ‚ùå |
| RAI guardrails | ‚úÖ | ‚ùå |

### Key takeaways

- `agent.run("...", stream=True)` returns an iterator of string chunks
- Use `print(chunk, end="", flush=True)` for real-time terminal display
- Collect chunks into a list and `"".join(chunks)` to reconstruct the full response
- Streaming dramatically reduces **time to first token**, improving perceived responsiveness
- RAI guardrails and structured outputs require `stream=False`

## Next Steps

You have completed Lesson 11. From here you can:

- **Lesson 12: Image and File Generation** ‚Äî explore multimodal output capabilities (coming soon)
- **Back to Lesson 10: Capstone Project** ‚Äî if you haven't completed it yet, the capstone brings together memory, tools, RAI, and knowledge bases into a full end-to-end build

---

| Lesson | Topic |
|---|---|
| [01](./01_getting_started.ipynb) | Getting Started |
| [02](./02_providers_and_models.ipynb) | Providers and Models |
| [03](./03_agent_lifecycle.ipynb) | Agent Lifecycle |
| [04](./04_structured_outputs.ipynb) | Structured Outputs |
| [05](./05_memory_and_sessions.ipynb) | Memory and Sessions |
| [06](./06_tools_and_functions.ipynb) | Tools and Functions |
| [07](./07_knowledge_bases_rag.ipynb) | Knowledge Bases (RAG) |
| [08](./08_contexts.ipynb) | Contexts |
| [09](./09_rai_guardrails.ipynb) | RAI Guardrails |
| [10](./10_capstone_project.ipynb) | Capstone Project |
| **11** | **Streaming (this lesson)** |