Skip to content

feat(workflow): add observer/watch APIs for workflow streams#1100

Merged
omeraplak merged 1 commit intomainfrom
feat/workflow-observer-watch-api
Feb 22, 2026
Merged

feat(workflow): add observer/watch APIs for workflow streams#1100
omeraplak merged 1 commit intomainfrom
feat/workflow-observer-watch-api

Conversation

@omeraplak
Copy link
Copy Markdown
Member

@omeraplak omeraplak commented Feb 22, 2026

PR Checklist

Please check if your PR fulfills the following requirements:

Bugs / Features

What is the current behavior?

Workflow stream consumers can only observe events via the main async iterator (for await ... of stream).
There is no callback-based watch API, no ReadableStream observer surface, and no legacy bridge method on WorkflowStreamResult.

What is the new behavior?

Adds observer/watch APIs to workflow stream results:

  • watch(cb)
  • watchAsync(cb)
  • observeStream()
  • streamLegacy()

Available on:

  • workflow.stream(...)
  • workflow.timeTravelStream(...)
  • resumed stream results returned from .resume(...)

Also includes:

  • WorkflowStreamController subscription registry + callback isolation
  • tests for ordering, unsubscribe, multi-watcher behavior, throw isolation, and observeStream close semantics
  • docs updates in website/docs/workflows/streaming.md including SDK-only note for watch APIs and REST SSE equivalents
  • changeset with usage examples

fixes (issue)

N/A

Notes for reviewers

Validation run:

  • pnpm --filter @voltagent/core test -- src/workflow/stream.spec.ts src/workflow/core.spec.ts

Scope note:

  • docs/workflow-parity-plans/* intentionally not included in this PR.

Summary by cubic

Adds callback-based observers and a ReadableStream view to workflow streams so clients can watch events without consuming the async iterator. Improves integration options and keeps existing iterator behavior unchanged.

  • New Features
    • Added watch(cb), watchAsync(cb), observeStream(), and streamLegacy() to WorkflowStreamResult on workflow.stream(...), workflow.timeTravelStream(...), and resumed streams.
    • Implemented subscription registry with unsubscribe support and callback isolation (one watcher error won’t break others).
    • observeStream() provides a closing ReadableStream; streamLegacy() exposes { stream, getWorkflowState } for compatibility.
    • Added tests for event ordering, unsubscribe, multi-watchers, error isolation, and observeStream close behavior.
    • Updated docs (workflows/streaming.md) with SDK-only watch APIs and REST SSE equivalents.

Written for commit 66c152f. Summary will update on new commits.

Summary by CodeRabbit

  • New Features

    • Added observer APIs for workflow streams: watch(), watchAsync(), observeStream(), and streamLegacy() enable non-destructive subscription to stream events and workflow state retrieval.
  • Tests

    • Added comprehensive test coverage for event ordering, subscription lifecycle, callback error handling, and stream closure behavior.
  • Documentation

    • Updated documentation with usage examples for the new observer and streaming capabilities.

@changeset-bot
Copy link
Copy Markdown

changeset-bot bot commented Feb 22, 2026

🦋 Changeset detected

Latest commit: 66c152f

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 1 package
Name Type
@voltagent/core Minor

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@joggrbot

This comment has been minimized.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Feb 22, 2026

No actionable comments were generated in the recent review. 🎉


📝 Walkthrough

Walkthrough

This PR introduces observer-based streaming APIs for workflow stream results, enabling callback-based event subscriptions and readable stream consumption without blocking main iteration. New methods include watch(), watchAsync(), observeStream(), and streamLegacy() across all streaming contexts.

Changes

Cohort / File(s) Summary
Stream Controller Enhancement
packages/core/src/workflow/stream.ts
Implements watcher subscription system with watch() and watchAsync() methods, asynchronous event notification to watchers with error handling, and observeStream() to expose a ReadableStream interface with lifecycle management.
Workflow Streaming Integration
packages/core/src/workflow/core.ts
Exposes stream observation APIs on WorkflowStreamResult objects returned from workflow.stream(), workflow.timeTravelStream(), and resumed streams, forwarding calls to the underlying StreamController and providing streamLegacy() with memory-based state access.
Type Definitions
packages/core/src/workflow/types.ts
Adds method signatures to WorkflowStreamResult for watch(), watchAsync(), observeStream(), and streamLegacy(), defining callback shapes and return types for the new observer surface.
Test Coverage
packages/core/src/workflow/stream.spec.ts, packages/core/src/workflow/core.spec.ts
Validates event ordering, unsubscribe behavior, multiple watchers isolation, error resilience, observeStream ReadableStream semantics, and integration with main workflow iteration across sync/async watchers.
Documentation & Changelog
website/docs/workflows/streaming.md, .changeset/blue-bags-remember.md
Documents new run-level observer APIs, SDK usage patterns for watching and observing streams, and compatibility surface for legacy stream access with state queries.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Poem

🐰 New watchers hop and play,
Streams observed in every way,
No consuming, just a peek,
Callbacks answer what you seek,
stream.observe() makes workflows sleek! ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat(workflow): add observer/watch APIs for workflow streams' clearly and specifically summarizes the main change—introducing callback and stream-based observer APIs for workflow streaming results.
Description check ✅ Passed The PR description follows the template structure, completes all required checklist items (tests, docs, changesets added), clearly explains current vs. new behavior, lists all new APIs and their availability, and includes validation details and notes for reviewers.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/workflow-observer-watch-api

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 7 files

@cloudflare-workers-and-pages
Copy link
Copy Markdown

Deploying voltagent with  Cloudflare Pages  Cloudflare Pages

Latest commit: 66c152f
Status: ✅  Deploy successful!
Preview URL: https://328132e1.voltagent.pages.dev
Branch Preview URL: https://feat-workflow-observer-watch.voltagent.pages.dev

View logs

@omeraplak omeraplak merged commit 314ed40 into main Feb 22, 2026
23 checks passed
@omeraplak omeraplak deleted the feat/workflow-observer-watch-api branch February 22, 2026 18:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant