Introduce SuspendObjectStream and expand tool schema validation#52
Merged
JohnRichard4096 merged 15 commits intomainfrom Apr 19, 2026
Merged
Introduce SuspendObjectStream and expand tool schema validation#52JohnRichard4096 merged 15 commits intomainfrom
JohnRichard4096 merged 15 commits intomainfrom
Conversation
Contributor
Reviewer's GuideRefactors ChatObject to inherit a new generic SuspendObjectStream for unified suspend/resume and streaming behavior, adds usage aggregation utilities, extends function-parameter JSON schema validation and docs, enhances ReAct reasoning flows and built-in reasoning tool schema, and updates documentation and tests to match the new streaming and validation model. Sequence diagram for enhanced reasoning flow with think_and_reason toolsequenceDiagram
actor User
participant ChatClient as ChatClient
participant ChatObject as ChatObject
participant Agent as BaseReActAgentStrategy
participant ToolsCaller as tools_caller
participant ReasonTool as REASONING_TOOL
participant LLM as call_completion
User->>ChatClient: send user_input
ChatClient->>ChatObject: create ChatObject(...)
ChatClient->>ChatObject: begin()
ChatObject->>Agent: _run_strategy()
Agent->>Agent: _generate_reasoning_msg(tools_ctx, then)
Agent->>ToolsCaller: tools_caller(reasoning_trigger_msg, [REASONING_TOOL,...])
ToolsCaller->>ReasonTool: invoke think_and_reason
ReasonTool-->>ToolsCaller: UniResponse~None,list~ToolCall~~
ToolsCaller-->>Agent: tool_response with ToolCall
Agent->>Agent: _generate_reasoning_content(tool_call, reasoning_trigger_msg)
Agent->>ChatObject: yield_response(MessageWithMetadata summary)
Agent->>LLM: call_completion(reasoning_trigger_msg,...)
LLM-->>Agent: streaming chunks
loop reasoning content streaming
Agent->>ChatObject: yield_response(MessageWithMetadata reasoning_chunk)
end
LLM-->>Agent: UniResponse~str,None~ ct
Agent->>ChatObject: update extra_usage via gather_usage
Agent-->>Agent: then(self, tool_call, ct)
Agent-->>ChatObject: append reasoning to context
ChatObject-->>ChatClient: stream final answer
ChatClient-->>User: display reasoning-aware response
Sequence diagram for get_last_response with optional streamingsequenceDiagram
participant Producer as AsyncGenerator
participant GetLast as get_last_response
participant Target as SuspendObjectStream
loop iterate generator
Producer-->>GetLast: yield chunk
alt chunk is UniResponse
GetLast->>GetLast: resp = chunk
else chunk is RESPONSE_TYPE and Target provided
GetLast->>Target: yield_response(wrapper(chunk))
else
GetLast->>GetLast: ignore chunk
end
end
alt resp is None
GetLast-->>GetLast: raise RuntimeError
else
GetLast-->>Caller: return resp (UniResponse~str,None~)
end
Class diagram for SuspendObjectStream and ChatObject streaming refactorclassDiagram
class SuspendObjectStream~ObjectTypeT~ {
-ObjectSendStream _send_stream
-ObjectReceiveStream _receive_stream
-CALLBACK_TYPE _callback_fun
-aiologic.Lock _callback_lock
-bool _queue_done
-bool _has_consumer
-float _q_tout
-tuple~str~ _suspend_tags
-asyncio.Future __suspend_signal
-asyncio.Future __resume_signal
-object __done_marker
+SuspendObjectStream(queue_size int=45, queue_timeout float|None=10.0, callback CALLBACK_TYPE|None=None)
+wait_to_suspend(tags str, timeout float|None) async
+resume() void
+queue_closed() bool
+set_queue_done() async
+yield_response(response ObjectTypeT) async
+yield_response_iteration(iterator AsyncGenerator~ObjectTypeT,None~) async
+get_response_generator() AsyncGenerator~ObjectTypeT,None~
+set_callback_func(func CALLBACK_TYPE) void
+suspend(func Callable, tag str|None) Callable$ static
+suspend_with_tag(tag str) Callable$ static
-_wait_for_continue(tag str|None) async bool
-_response_generator() async AsyncGenerator~ObjectTypeT,None~
-_put_to_queue(item Any) async
}
class ChatObject {
+str stream_id
+str user_input
+Message user_message
+Message system_message
+datetime last_call
+str session_id
+UniResponse~str,None~ response
+UniResponseUsage~int~ extra_usage
+ModelPreset preset
+AmritaConfig config
+SessionData session
+type AgentStrategy strategy
+Template template
+dict~str,Any~ jinja2_vars
+bool _is_running
+bool _is_done
+Task~None~ _task
+BaseException _err
+float _q_tout
+dict~str,Any~ _hook_kwargs
+tuple~Any,...~ _hook_args
+tuple~type~BaseException~~ _raised_exc
+ChatObject(..., queue_size int=45, queue_timeout float|None=10.0, callback RESPONSE_CALLBACK_TYPE|None=None)
+begin() ChatObject
+get_exception() BaseException|None
+get_response_generator() AsyncGenerator~RESPONSE_TYPE,None~
+full_response() async str
+set_queue_done() async
+yield_response(response RESPONSE_TYPE) async
+yield_response_iteration(iterator AsyncGenerator~RESPONSE_TYPE,None~) async
+set_callback_func(func RESPONSE_CALLBACK_TYPE) void
+wait_to_suspend(tags str, timeout float|None) async
+resume() void
+_entry() async
+_run() async
+_run_strategy() async
+_run_agent(ctx StrategyContext) async
+_process_chat(send_messages CONTENT_LIST_TYPE) async UniResponse~str,None~
}
class UniResponseUsage~T~ {
+T prompt_tokens
+T completion_tokens
+T total_tokens
}
SuspendObjectStream <|-- ChatObject : inherits
UniResponseUsage~int~ --> ChatObject : extra_usage
Class diagram for FunctionPropertySchema JSON Schema extensionsclassDiagram
class FunctionPropertySchema~T~ {
+str|list~str~ type
+str description
+bool required
+list~T~ enum
+Any const
+Any default
+float minimum
+float maximum
+bool exclusiveMinimum
+bool exclusiveMaximum
+float multipleOf
+int minLength
+int maxLength
+str pattern
+FunctionPropertySchema properties
+list~str~ required
+FunctionPropertySchema items
+int minItems
+int maxItems
+bool uniqueItems
+bool|dict~str,Any~ additionalProperties
+str format
+bool nullable
+FunctionPropertySchema validator() Self
}
FunctionPropertySchema~T~ ..> FunctionPropertySchema~T~ : recursive properties/items
File-Level Changes
Assessment against linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Member
Author
|
@sourcery-ai title |
Contributor
There was a problem hiding this comment.
Hey - I've found 6 issues, and left some high level feedback:
- In
chatmanager.py,RESPONSE_TYPEis still used (e.g.,ChatObject(SuspendObjectStream[RESPONSE_TYPE]),RESPONSE_CALLBACK_TYPE) but its localTypeAliasdefinition was removed; either reintroduce the alias or import it from a shared module to avoid a NameError/type-checking issues. - The updated
FunctionPropertySchema.validatoruseselif has_arrayafterif has_object, so schemas with union types that include both"object"and"array"will only get object validation; consider splitting these into independentif has_object:andif has_array:blocks so each type branch is validated when present in a union. - In
SuspendObjectStream,_wait_for_continueis effectively part of the public suspend API (used in tests and docs) but remains a private method; if it is intended to be called by users, consider renaming or wrapping it with a public helper to make this clearer and reduce reliance on underscored internals.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `chatmanager.py`, `RESPONSE_TYPE` is still used (e.g., `ChatObject(SuspendObjectStream[RESPONSE_TYPE])`, `RESPONSE_CALLBACK_TYPE`) but its local `TypeAlias` definition was removed; either reintroduce the alias or import it from a shared module to avoid a NameError/type-checking issues.
- The updated `FunctionPropertySchema.validator` uses `elif has_array` after `if has_object`, so schemas with union types that include both `"object"` and `"array"` will only get object validation; consider splitting these into independent `if has_object:` and `if has_array:` blocks so each type branch is validated when present in a union.
- In `SuspendObjectStream`, `_wait_for_continue` is effectively part of the public suspend API (used in tests and docs) but remains a private method; if it is intended to be called by users, consider renaming or wrapping it with a public helper to make this clearer and reduce reliance on underscored internals.
## Individual Comments
### Comment 1
<location path="src/amrita_core/utils.py" line_range="94-98" />
<code_context>
self.value = value
+
+
+def gather_usage(
+ base: UniResponseUsage[int],
+ *args: UniResponseUsage[int]
+ | UniResponseUsage[None]
+ | UniResponseUsage[int | None]
+ | None,
+) -> UniResponseUsage[int]:
+ """Gather usages
+
+ Args:
+ base(UniResponseUsage[int]): Base object of usage.
+ *args: Usages to gather.
+
+ Returns:
+ UniResponseUsage[int]: the gathered usage (base)
+ """
+ u = base
+ for usage in args:
+ if usage is None:
+ continue
+ u.prompt_tokens += n2zero(usage.prompt_tokens)
+ u.completion_tokens += n2zero(usage.completion_tokens)
+ u.total_tokens += usage.total_tokens or n2zero(usage.prompt_tokens) + n2zero(
+ usage.completion_tokens
+ )
+ return u
</code_context>
<issue_to_address>
**suggestion (bug_risk):** gather_usage may miscount total_tokens in some edge cases and relies on truthiness of numeric fields.
Using `usage.total_tokens or ...` conflates `0` and `None`. A legitimate `total_tokens == 0` will be treated as falsy and replaced with `prompt + completion`, which can lead to double-counting or inconsistent totals. It also means `total_tokens` is recomputed whenever it’s `None`, so the aggregated total may not match providers that define totals differently. Please branch explicitly on `is None`, e.g.:
```python
if usage.total_tokens is not None:
u.total_tokens += usage.total_tokens
else:
u.total_tokens += n2zero(usage.prompt_tokens) + n2zero(usage.completion_tokens)
```
```suggestion
u.prompt_tokens += n2zero(usage.prompt_tokens)
u.completion_tokens += n2zero(usage.completion_tokens)
if usage.total_tokens is not None:
u.total_tokens += usage.total_tokens
else:
u.total_tokens += n2zero(usage.prompt_tokens) + n2zero(
usage.completion_tokens
)
```
</issue_to_address>
### Comment 2
<location path="src/amrita_core/builtins/agent.py" line_range="128-130" />
<code_context>
+ MessageWithMetadata(
+ summary,
+ {
+ "type": "reasoning",
+ "extra_type": "pre_resolve",
+ "last_strp": last_step,
+ "summary": summary,
+ },
</code_context>
<issue_to_address>
**issue (bug_risk):** Metadata key "last_strp" looks like a typo and may break downstream consumers.
In the emitted reasoning metadata, the key is currently:
```python
"last_strp": last_step,
```
Please rename this to `"last_step"` to match the variable name and any existing consumers expecting that field.
</issue_to_address>
### Comment 3
<location path="tests/test_object_stream.py" line_range="10" />
<code_context>
+
+@pytest.mark.asyncio
+async def test_chatobject_suspend_tags():
+ obj = SuspendObjectStream()
+ suspend = False
+
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for queue/stream lifecycle and single-consumer guarantees
Since `SuspendObjectStream` manages an internal queue and enforces a single consumer, please add tests that:
- Push multiple items with `yield_response`, call `set_queue_done()`, then iterate `get_response_generator()` and assert all items are yielded in order, the generator stops at the done marker, and `queue_closed()` is `True`.
- Call `get_response_generator()` twice (or once plus setting a callback) and assert a `RuntimeError("Response is already being consumed.")` on the second consumer.
These will cover both normal streaming and the single-consumer constraint.
Suggested implementation:
```python
import asyncio
import pytest
from amrita_core.streaming import SuspendObjectStream
@pytest.mark.asyncio
async def test_chatobject_suspend_tags():
obj = SuspendObjectStream()
suspend = False
# Basic sanity checks for the suspend object stream
assert isinstance(obj, SuspendObjectStream)
assert suspend is False
@pytest.mark.asyncio
async def test_suspend_object_stream_yields_items_in_order_and_closes_queue():
obj = SuspendObjectStream()
# Push multiple items into the internal queue
await obj.yield_response("first")
await obj.yield_response("second")
await obj.yield_response("third")
# Mark the queue as done
obj.set_queue_done()
# Consume all items from the response generator
results = []
async for item in obj.get_response_generator():
results.append(item)
# All items should be yielded in order and the queue should be closed
assert results == ["first", "second", "third"]
assert obj.queue_closed() is True
@pytest.mark.asyncio
async def test_suspend_object_stream_enforces_single_consumer():
obj = SuspendObjectStream()
# First consumer acquires the response generator
gen1 = obj.get_response_generator()
assert gen1 is not None
# Second consumer attempt should raise a RuntimeError
with pytest.raises(RuntimeError, match="Response is already being consumed."):
obj.get_response_generator()
```
If `yield_response` is a synchronous method in your implementation, remove the `await` keywords before `obj.yield_response(...)`.
If `set_queue_done` or `queue_closed` are asynchronous in your codebase, update the test to `await` them accordingly (e.g. `await obj.set_queue_done()` / `await obj.queue_closed()`).
</issue_to_address>
### Comment 4
<location path="tests/test_object_stream.py" line_range="33" />
<code_context>
-
-
-@pytest.mark.asyncio
-async def test_chatobject_suspend():
- obj = ChatObject(
- train={"role": "system", "content": "system message"},
</code_context>
<issue_to_address>
**suggestion (testing):** Test callback mode of SuspendObjectStream in addition to queue mode
These tests only cover the queue pathway (no callback set). Please also add a test for the callback mode that:
- Constructs `SuspendObjectStream` with a callback (or via `set_callback_func`).
- Calls `yield_response` multiple times and asserts the callback receives the expected values (e.g., via an async-safe accumulator/event).
- Optionally asserts that `get_response_generator()` with a callback set raises the expected `RuntimeError`.
That way both modes are exercised and protected against regressions.
Suggested implementation:
```python
try:
await obj.wait_to_suspend(timeout=2)
obj.resume()
await asyncio.wait_for(hd, 0.2)
assert suspend, "Suspend not called"
finally:
hd.cancel()
@pytest.mark.asyncio
async def test_suspendobjectstream_callback_mode():
obj = SuspendObjectStream()
received: list[Any] = []
all_received = asyncio.Event()
def callback(value: Any) -> None:
received.append(value)
# Once we've seen multiple responses, signal the test to continue
if len(received) >= 2:
all_received.set()
# Configure callback mode
obj.set_callback_func(callback)
# In callback mode, responses should be delivered via the callback
await obj.yield_response("first")
await obj.yield_response("second")
await asyncio.wait_for(all_received.wait(), timeout=1.0)
assert received == ["first", "second"]
# In callback mode, queue-based consumption should not be available
with pytest.raises(RuntimeError):
_ = obj.get_response_generator()
```
This test assumes:
1. `SuspendObjectStream.set_callback_func` exists and accepts a sync callback.
2. `SuspendObjectStream.yield_response` is `async` and delivers the value to the callback.
3. `SuspendObjectStream.get_response_generator()` raises `RuntimeError` when a callback is set.
If any of these differ in your implementation, adjust the test accordingly (e.g., use the constructor to pass the callback, tweak the exception type/message, or adapt to a sync `yield_response`).
Also ensure `Any` is imported (`from typing import Any`) at the top of the test file if it is not already.
</issue_to_address>
### Comment 5
<location path="docs/guide/function-implementation.md" line_range="127" />
<code_context>
#### Exception Handling Best Practices
-Since version 0.8.0, the default `on_exception()` method in [AgentStrategy](../api-reference/classes/AgentStrategy.md) no longer raises exceptions by default. This change provides more flexibility for custom error handling:
+Since the default `on_exception()` method in [AgentStrategy](../api-reference/classes/AgentStrategy.md) no longer raises exceptions by default. This change provides more flexibility for custom error handling:
```python
</code_context>
<issue_to_address>
**issue (typo):** This sentence starting with "Since" is grammatically incomplete and should be rephrased.
"Since the default `on_exception()` method..." is a sentence fragment. Either drop "Since" (e.g., "The default `on_exception()` method... no longer raises...") or add a main clause after the comma (e.g., "Since the default ... no longer raises exceptions by default, this change provides more flexibility...").
```suggestion
The default `on_exception()` method in [AgentStrategy](../api-reference/classes/AgentStrategy.md) no longer raises exceptions by default. This change provides more flexibility for custom error handling:
```
</issue_to_address>
### Comment 6
<location path="docs/guide/builtins.md" line_range="24" />
<code_context>
- **Parameters**:
- - `content`: Describe what needs to be done next (required).
+ - `last_step`: The last step you took (if there are no steps that you had done, please leave this blank).
+ - `summary`: What are you thinking about (not thinking content) - a brief summary of your current focus or intention.
### 9.1.3 PROCESS_MESSAGE (Process Message Tool)
</code_context>
<issue_to_address>
**nitpick (typo):** The parenthetical phrase "not thinking content" is awkward and could be clarified.
Consider rephrasing this parenthetical to something like "(not the full thinking process)" or "(not your detailed reasoning)" so the meaning is grammatically clear to readers.
```suggestion
- `summary`: What are you thinking about (not your detailed reasoning) - a brief summary of your current focus or intention.
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Deploying amritacore with
|
| Latest commit: |
e697213
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://b1b44ec2.amritacore.pages.dev |
| Branch Preview URL: | https://feat-reasoning-and-streaming.amritacore.pages.dev |
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
close #51
Summary by Sourcery
Introduce a reusable SuspendObjectStream abstraction for suspendable streaming, refactor ChatObject and reasoning agents to use it with enhanced reasoning and usage tracking, expand tool schema validation and documentation, and bump the library version.
Enhancements:
Build:
Documentation:
Tests: