Skip to content

v0.1.19 — Streaming Handler Support

Choose a tag to compare

@decebal decebal released this 18 Mar 12:48
· 25 commits to main since this release
ff58f9e

Streaming Handler Support (#52)

Handlers can now send incremental updates during execution — enabling LLM token streaming, multi-step workflow progress, and real-time agent loops.

Highlights

  • StreamSender — Bounded channel sender with typed send() via IntoStreamItem trait
  • CancellationToken — Auto-cancelled when StreamReceiver is dropped, plus explicit cancel() and token.cancelled() for tokio::select!
  • Tauri IPC bridgeallframe_stream / allframe_stream_cancel commands with per-stream UUID event channels
  • TypeScript codegenStreamObserver, StreamSubscription, callStreamHandler with auto-listener-cleanup
  • RxJS adaptertoObservable() with lazy import("rxjs") (zero hard dependency)
  • #[tauri_compat(streaming)] — Macro variant for streaming handler migration
  • register_stream* — Bridge impl Stream to channel-based StreamSender

Adoption Guide

1. Add a streaming handler (callback-based):

router.register_streaming("stream_chat", |args: ChatArgs, tx: StreamSender| async move {
    for token in llm.stream_tokens(args.prompt).await {
        if tx.send(Json(token)).await.is_err() { break; }
    }
    Json(ChatResult { done: true })
});

2. Or return an impl Stream:

router.register_stream_with_args("stream_events", |args: EventArgs| async move {
    event_source.subscribe(args.topic).await  // returns impl Stream
});

3. Use CancellationToken for cooperative cancellation:

router.register_streaming("long_task", |tx: StreamSender| async move {
    let token = tx.cancellation_token();
    loop {
        tokio::select! {
            _ = token.cancelled() => break,
            result = do_work() => { tx.send(result).await.ok(); }
        }
    }
    "done".to_string()
});

4. Migrate from app.emit() using #[tauri_compat(streaming)]:

// Before (raw Tauri events):
#[tauri::command]
async fn stream_chat(prompt: String, app: AppHandle) -> String {
    for token in stream { app.emit("chat-token", &token).unwrap(); }
    "done".to_string()
}

// After (AllFrame streaming):
#[tauri_compat(streaming)]
async fn stream_chat(prompt: String, tx: StreamSender) -> String {
    for token in stream { tx.send(token).await.ok(); }
    "done".to_string()
}

// Register:
router.register_streaming_with_args::<StreamChatArgs, _, _, _>("stream_chat", stream_chat);

5. Frontend (auto-generated TypeScript):

// Generated by router.generate_ts_client()
const sub = await streamChat({ prompt: "Hello" }, {
  next: (token) => appendToUI(token),
  error: (err) => showError(err),
  complete: (result) => finalize(result),
});

// Cancel anytime:
sub.unsubscribe();

// Or with RxJS:
const obs$ = await toObservable((observer) => streamChat({ prompt: "Hello" }, observer));
obs$.pipe(takeUntil(cancel$)).subscribe(token => appendToUI(token));

Full Changelog

See CHANGELOG.md for the complete list of changes.