# 02 — Streaming Token-by-Token Oversight

Demonstrates `StreamingKernel` and `AsyncStreamingKernel` for
real-time token-level coherence monitoring.

In [None]:
from director_ai.core.streaming import StreamingKernel, TokenEvent

## Sync Streaming
Process a token stream with 3 halt mechanisms.

In [None]:
kernel = StreamingKernel(hard_limit=0.3, window_size=5, window_threshold=0.5)

tokens = ["The", " quantum", " field", " maintains", " coherence", " across", " all", " layers"]
scores = [0.9, 0.85, 0.82, 0.80, 0.78, 0.75, 0.73, 0.70]
score_iter = iter(scores)

session = kernel.stream_tokens(tokens, lambda t: next(score_iter))

print(f"Tokens processed: {session.token_count}")
print(f"Output: {session.output}")
print(f"Avg coherence: {session.avg_coherence:.3f}")
print(f"Min coherence: {session.min_coherence:.3f}")
print(f"Halted: {session.halted}")

## Hard Limit Halt
Watch the kernel halt when coherence drops below threshold.

In [None]:
kernel2 = StreamingKernel(hard_limit=0.5)
degrading_scores = [0.9, 0.7, 0.6, 0.4, 0.3]  # 4th token triggers halt
si = iter(degrading_scores)

session2 = kernel2.stream_tokens(["a", "b", "c", "d", "e"], lambda t: next(si))
print(f"Halted: {session2.halted}")
print(f"Halt reason: {session2.halt_reason}")
print(f"Tokens before halt: {session2.halt_index}")

## Async Streaming (for WebSocket)
The `AsyncStreamingKernel` yields events as an async generator.

In [None]:
import asyncio
from director_ai.core.async_streaming import AsyncStreamingKernel

async def demo():
    kernel = AsyncStreamingKernel(hard_limit=0.3)
    tokens = ["Hello", " world", " from", " async"]
    events = []
    async for event in kernel.stream_tokens(tokens, lambda t: 0.85):
        events.append(event)
        print(f"  Token {event.index}: '{event.token}' → coherence={event.coherence:.2f}")
    print(f"Total events: {len(events)}")

await demo()