Skip to content

feat(stdlib): add streaming event types, events() iterator, and OTEL bridge (#902)#1095

Merged
jakelorocco merged 15 commits into
generative-computing:mainfrom
planetf1:feat/902-streaming-events-work
May 19, 2026
Merged

feat(stdlib): add streaming event types, events() iterator, and OTEL bridge (#902)#1095
jakelorocco merged 15 commits into
generative-computing:mainfrom
planetf1:feat/902-streaming-events-work

Conversation

@planetf1
Copy link
Copy Markdown
Contributor

@planetf1 planetf1 commented May 19, 2026

Pull Request

Issue

Fixes #902

Description

Wave 4 (final wave) of the streaming validation epic (#891). Adds typed streaming events, an events() async iterator, and OTEL instrumentation to StreamChunkingResult.

What changed

mellea/stdlib/streaming.py

  • Eight typed event dataclasses: StreamEvent base + ChunkEvent, QuickCheckEvent, StreamingDoneEvent, FullValidationEvent, RetryEvent, CompletedEvent, ErrorEvent
  • StreamChunkingResult.events() — single-consumer async iterator; raises RuntimeError on second call
  • OTEL application span wrapping orchestrator lifetime with span events and metric helper calls (record_requirement_check, _failure, _sampling_outcome, _error)
  • ErrorEvent replaces MelleaLogger.warning stopgap; removes TODO(#902) comment
  • StreamingDoneEvent emitted after the flush loop (not before), so the documented event order — QC/Chunk pairs → StreamingDoneEventFullValidationEventCompletedEvent — holds exactly
  • _orchestration_exception stash moved to first statement in except block so acomplete() always has it even if a subsequent await is interrupted by CancelledError
  • Terminal finally bookkeeping uses put_nowait() to eliminate await points under a pending CancelledError
  • mot.is_computed() guard in except block prevents double-cancel when exception arrives after natural stream end

mellea/stdlib/__init__.py

  • RetryEvent removed from __all__ (never emitted by v1 orchestrator; class remains accessible via mellea.stdlib.streaming.RetryEvent)

Docs

  • docs/docs/how-to/use-async-and-streaming.md — new "Streaming with per-chunk validation" section covering events(), astream(), and the tri-state table
  • docs/docs/concepts/requirements-system.md — new "Streaming validation" section covering stream_validate, state isolation, and the clone-per-attempt contract
  • docs/examples/streaming/streaming_chunking.py updated to events() API; new examples: word_chunking.py, paragraph_chunking.py, custom_chunking.py
  • docs/docs/examples/index.md — streaming/ row added
  • docs/docs/tutorials/06-streaming-validation.md — new Tutorial 06 covering stream_with_chunking end-to-end: basic call → early exit → chunking strategy choice → astream() shortcut → custom ChunkingStrategy subclass
  • docs/docs/docs.json — Tutorial 06 added to navigation

Review findings addressed (3-reviewer panel, 2026-05-19)

  • B1 — renamed quick_check_requirements=requirements= in all 4 new doc examples and how-to guide (was causing TypeError at runtime)
  • W1_orchestration_exception stash moved before all awaits in except block
  • W2StreamingDoneEvent moved after flush loop; docstrings updated
  • W3acomplete() docstring now accurately describes the raise-once contract
  • N1 — removed duplicate comment in acomplete()
  • N2RetryEvent removed from mellea.stdlib.__all__
  • N3 — added test_events_single_consumer_guard_raises_on_second_call
  • N4pvr.reason replaced with "" in record_requirement_failure to avoid unbounded metric label cardinality
  • N5 — added @pytest.mark.asyncio to test_unknown_chunking_alias_raises_value_error

Context

Wave 3 (#942) was squash-merged as 9e8a9636; this branch is rebased onto upstream/main via cherry-pick.

Testing

  • Tests added to the respective file if code was changed
  • New code has 100% coverage if code was added
  • Ensure existing tests and github automation passes (a maintainer will kick off the github automation when the rest of the PR is populated)

uv run pytest test/stdlib/test_streaming.py40 tests, all passing

Streaming tests cover: event ordering, single-consumer guard, early-exit semantics, OTEL span emission, ErrorEvent on exception, CancelledError raise-once contract, metric cardinality (pvr.reason not in label), @pytest.mark.asyncio on alias-error test.

Attribution

  • AI coding assistants used

Adding a new component, requirement, sampling strategy, or tool?

If your PR adds or modifies one of the types below, check the matching box. A checklist of type-specific review items will be posted as a comment.

  • Component
  • Requirement
  • Sampling Strategy
  • Tool

🤖 Generated with Claude Code

planetf1 added 12 commits May 19, 2026 15:02
…bridge (generative-computing#902)

Adds eight typed event dataclasses (StreamEvent base + ChunkEvent,
QuickCheckEvent, StreamingDoneEvent, FullValidationEvent, RetryEvent,
CompletedEvent, ErrorEvent) with auto-populated timestamps.

Wires event emission into _orchestrate_streaming:
- ChunkEvent emitted per chunk passed to the consumer
- QuickCheckEvent after each stream_validate batch (pass or fail)
- StreamingDoneEvent when the raw stream ends naturally
- FullValidationEvent after the post-stream validate() calls complete
- ErrorEvent replaces the MelleaLogger.warning stopgap in the except
  branch (removes TODO(generative-computing#902) marker)
- CompletedEvent in the finally block, guaranteed on every exit path

Adds StreamChunkingResult.events() single-consumer async iterator backed
by an independent queue — can be consumed concurrently with astream().

Wraps the orchestrator in trace_application("stream_with_chunking") to
open an OTEL application span for the full orchestration lifetime. Calls
record_requirement_check, record_requirement_failure, record_sampling_outcome,
and record_error at the appropriate emission points. Uses set_span_error on
early-exit fail and on unhandled exceptions.

Exports all eight event types from mellea.stdlib.__init__.

Assisted-by: Claude Code
…mputing#902)

Ten new tests for the Wave 4 additions:

- test_stream_event_types_have_auto_timestamp: all seven event types
  auto-populate timestamp on construction
- test_event_emission_order_happy_path: full sequence (ChunkEvent,
  QuickCheckEvent, StreamingDoneEvent, FullValidationEvent,
  CompletedEvent) in correct order on a two-sentence generation
- test_streaming_done_event_carries_full_text: StreamingDoneEvent.full_text
  matches result.full_text
- test_event_emission_on_early_exit: no StreamingDoneEvent or
  FullValidationEvent, QuickCheckEvent(passed=False) present,
  CompletedEvent(success=False)
- test_error_event_on_stream_validate_exception: ErrorEvent emitted with
  correct exception_type and detail, no log warning
- test_record_requirement_check_called_per_chunk: metric helper called
  once per sentence chunk
- test_record_requirement_failure_called_on_fail: called with requirement
  class name and reason string
- test_record_sampling_outcome_success: called with "stream_with_chunking"
  and success=True
- test_record_sampling_outcome_failure_on_early_exit: called with success=False
- test_concurrent_astream_and_events: astream() and events() consumed
  concurrently via asyncio.gather without interference

26 tests total, all passing.

Assisted-by: Claude Code
…ative-computing#902)

use-async-and-streaming.md: new "Streaming with per-chunk validation"
section covering stream_with_chunking() motivation, a minimal example
with MaxSentencesReq, the stream_validate tri-state table, and both
consumption patterns (astream() and events()) with a match dispatch
example. Notes single-consumer discipline and concurrent usage.

requirements-system.md: new "Streaming validation" section explaining
stream_validate() as the streaming counterpart to validate(), the
tri-state PartialValidationResult semantics, state isolation via
per-clone copy, and a cross-link to the how-to page.

Assisted-by: Claude Code
…omputing#902)

Replaces the astream() chunk loop with an events() loop using structural
pattern matching. Shows all six emitted event types: ChunkEvent,
QuickCheckEvent (pass and fail variants), StreamingDoneEvent,
FullValidationEvent, and CompletedEvent.

Updates the module docstring to describe the events() consumption pattern.

Assisted-by: Claude Code
…g#902)

The streaming/ directory (introduced in Wave 3) was missing from
docs/docs/examples/index.md, causing the CI examples-catalogue check
to fail. Add an entry under Core concepts alongside async/.

Assisted-by: Claude Code
…es (generative-computing#902)

The docstring quality gate (--fail-on-quality) requires Args: sections
in class docstrings for constructor parameters. Dataclass fields are
constructor parameters, so they need Args:, not Attributes:.

The seven event subclasses (ChunkEvent, QuickCheckEvent, StreamingDoneEvent,
FullValidationEvent, RetryEvent, CompletedEvent, ErrorEvent) previously used
Attributes: which the auditor could not resolve to __init__ params.

StreamEvent keeps Attributes: for `timestamp` because it is init=False
and does not appear as a constructor parameter.

Assisted-by: Claude Code
…tive-computing#902)

- Fix QuickCheckEvent.passed to reflect per-chunk result (was using
  cumulative failed_indices set, causing false negatives on all chunks
  after the first failure)
- Replace synthetic RuntimeError objects in early-exit set_span_error
  calls with set_span_status_error helper (no phantom exception events
  in OTEL traces); add set_span_status_error to mellea/telemetry/tracing.py
- Reorder result.completed = False to top of except block so the flag
  is set before ErrorEvent is enqueued (consistent consumer observation)
- Update acomplete() Raises: docstring to reflect that Exception types
  surface via astream(), only BaseException propagates directly
- Add events() docstring note that events() itself never raises
- Add _event_queue comment noting unconditional production / opt-in consumption
- Add StreamEvent docstring note for subclassers on init=False fields
- Add RetryEvent "not emitted in v1" comment in __init__.__all__
- Fix test: move import time to module level in test_streaming.py
- Add docstring to test_unknown_chunking_alias_raises_value_error
- Rewrite how-to streaming section to lead with events() as primary API;
  demote astream()-only example to secondary; add case _: pass fallback
  to all match event: blocks

Assisted-by: Claude Code
…enerative-computing#902)

Consistent with the how-to doc; covers RetryEvent and any future types.

Assisted-by: Claude Code
…omputing#902)

Three new streaming examples alongside the existing streaming_chunking.py:

- word_chunking.py: WordChunker alias — forbidden-word detection at the
  highest granularity; O(1) set check per token, early exit on first bad word
- paragraph_chunking.py: ParagraphChunker alias — per-paragraph word-count
  gate; validates entire \n\n-delimited blocks, useful for structure/length
  checks that require full paragraph context
- custom_chunking.py: ChunkingStrategy subclass — LineChunker splitting on
  single \n; validates numbered-list output line-by-line; demonstrates
  split()+flush() extension pattern

All three verified running against granite4:micro (Ollama local).

Assisted-by: Claude Code
…lly guard, telemetry pop race fix, super().__init__() in test doubles, e2e marker, ValueError test)

- Set result.completed=False in finally block before cancel_generation() so
  external CancelledError (BaseException, bypasses except Exception) does not
  leave result.completed=True and emit a misleading CompletedEvent/metric
- Add regression test test_cancelled_task_sets_completed_false (27th test);
  documents Python 3.12 C Task cancellation-before-start behaviour and the
  asyncio.sleep(0) scheduling requirement
- Document O(n) re-scan cost in ChunkingStrategy class docstring and split()
  Args; note copy()-cloning constraint for stateful subclasses

Assisted-by: Claude Code
…generative-computing#902)

- Fix misleading StreamEvent docstring (init=False ordering explanation)
- Fix events() docstring: QuickCheckEvent fires before ChunkEvent, not after
- Add _events_consumed guard to events() for single-consumer enforcement
- Move StreamingDoneEvent emission to before flush loop (token stream is done
  regardless of flush validation outcome)
- Guard FullValidationEvent/final_validations list with list() copy to prevent
  aliasing between result attribute and event payload
- Add cancelled-task guard in acomplete() to avoid CancelledError from
  task.exception() on externally-cancelled tasks
- Switch terminal finally bookkeeping to put_nowait() to eliminate await points
  and guarantee _done.set() runs under pending CancelledError
- Add mot.is_computed() guard in except block to avoid double-cancel
- Remove inline comment from __all__ RetryEvent entry
- Fix word_chunking.py example: preserve original-case word list for LLM prompt
- Add test for no-requirements path omitting FullValidationEvent
- Fix test: assert QuickCheckEvent precedes ChunkEvent within each pair

Assisted-by: Claude Code
Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
…acomplete() raise semantics (generative-computing#902)

- Add result._orchestration_exception = exc in _orchestrate_streaming except
  block so acomplete() can surface exceptions even when astream() was never
  called (regression introduced during cherry-pick conflict resolution)
- Rename quick_check_requirements -> requirements in two test call sites from
  bd2a5ae cherry-pick that post-dated the parameter rename
- Fix test_cancelled_task_sets_completed_false to expect CancelledError from
  acomplete() — consistent with test_external_task_cancellation_releases_consumers
  and test_external_cancellation_acomplete_raise_once (raise-once contract for
  external cancellation)

Assisted-by: Claude Code
@github-actions github-actions Bot added the enhancement New feature or request label May 19, 2026
planetf1 added 3 commits May 19, 2026 16:34
- B1: rename quick_check_requirements= -> requirements= in all doc
  examples and how-to guide (was causing TypeError in e2e tests)
- W1: move result._orchestration_exception = exc to first statement in
  except block so acomplete() always has a stash even if a subsequent
  await is interrupted by CancelledError
- W2: emit StreamingDoneEvent after the flush loop, not before — so the
  documented event order (QC/Chunk pairs, StreamingDoneEvent,
  FullValidationEvent, CompletedEvent) holds exactly; update docstrings
- W3: fix acomplete() docstring which incorrectly claimed ordinary
  exceptions do not propagate; describe the actual raise-once contract
- N1: remove duplicate comment in acomplete()
- N2: remove RetryEvent from mellea.stdlib __all__ (never emitted by
  v1 orchestrator; class remains accessible via mellea.stdlib.streaming)
- N3: add test_events_single_consumer_guard_raises_on_second_call
- N4: pass "" instead of pvr.reason to record_requirement_failure to
  avoid unbounded metric label cardinality; update assertion in test
- N5: add @pytest.mark.asyncio to test_unknown_chunking_alias_raises_value_error

Assisted-by: Claude Code
Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
Five-step tutorial covering stream_with_chunking() end-to-end:
- Step 1: basic call, events() iterator, acomplete()
- Step 2: early-exit cancellation and streaming_failures
- Step 3: choosing word/sentence/paragraph chunking strategy
- Step 4: custom ChunkingStrategy subclass (LineChunker + flush)
- Step 5: astream() for raw chunk access

Adds the tutorial to docs.json navigation.

Motivated by: streaming validation (wave 4 / generative-computing#902) is a substantial
new API surface that deserves its own tutorial rather than being a
late step in Tutorial 02 (which covers async mechanics) or Tutorial 04
(which covers post-generation reliability).

Assisted-by: Claude Code
Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
- Step 3: print every 5th word to show granularity contrast with
  sentence chunking; replace weak 'model tends to comply' note with
  concrete early-exit instruction (change _FORBIDDEN to 'and'/'the')
- Steps 4/5 reordered: astream() (simpler) now precedes custom
  ChunkingStrategy subclassing (more complex)
- Step 5: add comment + prose explaining why validate() always returns
  True in NumberedLineReq (streaming check guarantees any emitted line
  passed; validate() is a post-stream formality)
- Step 5: fix incorrect built-in list (was 'LineChunker(), WordChunker()'
  — LineChunker is the tutorial's custom class, not a built-in); now
  names WordChunker, SentenceChunker, ParagraphChunker
- Step 5: add cross-reference to docs/examples/streaming/custom_chunking.py

Assisted-by: Claude Code
Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
@planetf1 planetf1 marked this pull request as ready for review May 19, 2026 16:35
@planetf1 planetf1 requested a review from a team as a code owner May 19, 2026 16:35
@github-actions
Copy link
Copy Markdown
Contributor

This comment is managed by a bot. Editing it is fine — checking off boxes, adding notes — but please leave the HTML comment marker on the first line alone, otherwise checklist updates will break.

Requirement PR Checklist

Use this checklist when adding or modifying requirements in mellea/stdlib/requirements/.

Base Class

  • Extends appropriate base class:
    • Requirement - standard requirement
    • ALoraRequirement - uses specialized Intrinsic/Adapter for generation-based validation

Validation Logic

  • validation_fn defined (if using Python-based validation)
    • re-usable functionality within the validation_fn should be separated out into mellea/stdlib/tools/
  • validate returns a ValidationResult with
    • a thunk and context if using a backend to generate
    • a specific reason and score when possible

Integration

  • Requirement exported in mellea/stdlib/requirements/__init__.py or, if you are adding a library of requirements, from your sub-module

Copy link
Copy Markdown
Contributor

@jakelorocco jakelorocco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm; the docs are bit lengthy but I'm okay with having all of those examples / code snippets for now; thank you

@jakelorocco jakelorocco added this pull request to the merge queue May 19, 2026
Merged via the queue into generative-computing:main with commit 40d0741 May 19, 2026
10 checks passed
@planetf1
Copy link
Copy Markdown
Contributor Author

Thanks for the merge! If any of the docs (tutorial length, example verbosity, etc.) need trimming or adjusting, happy to address those as follow-up issues rather than hold things up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(stdlib): add standard streaming event types

2 participants