Release 0.8.0 with embedding adapters and streaming refactor#48
Merged
JohnRichard4096 merged 19 commits intomainfrom Apr 11, 2026
Merged
Release 0.8.0 with embedding adapters and streaming refactor#48JohnRichard4096 merged 19 commits intomainfrom
JohnRichard4096 merged 19 commits intomainfrom
Conversation
Contributor
Reviewer's GuideImplements AmritaCore 0.8.0 with new adapter types and embedding support, standardized suspend breakpoints, AnyIO-based streaming/backpressure, thread-safe managers, safer templates and adapter type validation, improved MCP client management, and corresponding doc/tests/scripts updates. Sequence diagram for call_completion with adapter type validationsequenceDiagram
autonumber
actor Dev as Developer
participant App as ApplicationCode
participant CC as call_completion
participant CFR as _call_with_reflection
participant AM as AdapterManager
participant MA as ModelAdapter
participant LLM as ModelOrEmbeddingService
Dev->>App: await call_completion(preset, messages)
App->>CC: call_completion(preset, messages, config)
CC->>CFR: _call_with_reflection(preset, _call_api, config, messages)
CFR->>AM: safe_get_adapter(preset.protocol)
AM-->>CFR: adapter_class
alt adapter_class is None
CFR-->>CC: raise ValueError("Undefined protocol adapter")
CC-->>App: exception
App-->>Dev: error
else adapter_class found
CFR->>MA: get_type()
MA-->>CFR: ada_type
alt "text-gen" not in ada_type
CFR-->>CC: raise RuntimeError("Invalid adapter type for text-gen...")
CC-->>App: exception
App-->>Dev: error
else ada_type includes "text-gen"
CC->>MA: call_api(serialized_messages)
loop streaming chunks
MA->>LLM: send request / receive chunk
LLM-->>MA: completion_chunk
MA-->>CC: yield completion_chunk
CC-->>App: yield completion_chunk
end
CC-->>App: final UniResponse
App-->>Dev: response
end
end
Updated class diagram for adapter and embedding systemclassDiagram
direction LR
class ModelAdapter {
<<abstract>>
+__init_subclass__() void
+call_api(messages, kwargs) AsyncGenerator
+call_tools(messages, tools, tool_choice) UniResponse
+call_embed(texts, kwargs) Sequence~EmbeddingChunk~
+get_adapter_protocol() str
+get_type() ADAPTER_TYPE
+protocol str
}
class AdapterManager {
<<singleton>>
-_adapter_class: dict~str, type_ModelAdapter~
+register_adapter(adapter_cls: type_ModelAdapter) void
+safe_get_adapter(protocol: str) type_ModelAdapter | None
}
class EmbeddingChunk {
+embedding: Sequence~float~
+index: int
}
class ADAPTER_TYPE {
<<enumeration>>
"text-gen"
"embed"
}
class ModelPreset {
+protocol: str
+model: str
}
class UniResponse {
}
class MessageContent {
}
class call_completion {
<<function>>
+call_completion(preset, messages, config) Any
}
class _call_with_reflection {
<<function>>
+_call_with_reflection(preset, caller, config, messages) Any
}
ModelAdapter --> EmbeddingChunk : uses
ModelAdapter ..> ADAPTER_TYPE : returns
AdapterManager o--> ModelAdapter : manages
ModelPreset --> AdapterManager : protocol
call_completion ..> ModelPreset : uses
call_completion ..> _call_with_reflection : calls
_call_with_reflection ..> AdapterManager : uses
_call_with_reflection ..> ModelAdapter : obtains adapter
UniResponse <|-- MessageContent
note for ModelAdapter "get_type default returns 'text-gen'; embedding adapters override with 'embed'"
Updated class diagram for ChatObject streaming and suspendclassDiagram
direction LR
class ChatObject {
+context: MemoryModel
+session_id: str
+user_input
+train: Message
+strategy: type_AgentStrategy
+template: Template
+jinja2_vars: dict~str, Any~
-_send_stream: ObjectSendStream
-_receive_stream: ObjectReceiveStream
-_q_tout: float | None
-_is_running: bool
-_is_done: bool
-_queue_done: bool
-_task: Task
+begin() ChatObject
+get_response_generator() AsyncGenerator
+yield_response(response) void
+set_queue_done() void
+wait_to_suspend(tags* str, timeout: float|None) void
+resume() void
+_wait_for_continue(tag: str|None) bool
+suspend_with_tag(tag: str)
+monitoring(func)
+_entry() void
+_run() void
+_run_agent(ctx: StrategyContext) void
+_process_chat(messages) UniResponse
+_prepare_send_messages() list
}
class SuspendEnum {
<<enumeration>>
MEMORY
SINGLE_TOOL
PRECOMPLE
COMPLE
}
class ObjectSendStream {
+send(item) void
+aclose() void
}
class ObjectReceiveStream {
+__aiter__() ObjectReceiveStream
+__anext__() Any
}
class MemoryLimiter {
+usage: int
+memory: MemoryModel
+__aenter__() MemoryLimiter
+__aexit__()
+run_enforce() void
}
class MemoryModel {
}
class Message {
}
class StrategyContext {
}
class AgentStrategy {
+single_execute() bool
+on_exception(exc: BaseException) void
+on_limited() void
+on_post_process() void
}
class MatcherManager {
+trigger_event(event, config, hook_args, hook_kwargs) void
}
class CompletionEvent {
}
ChatObject --> MemoryModel : context
ChatObject --> Message : train
ChatObject --> AgentStrategy : strategy
ChatObject --> StrategyContext : uses
ChatObject --> ObjectSendStream : uses
ChatObject --> ObjectReceiveStream : uses
ChatObject --> MemoryLimiter : uses
ChatObject ..> SuspendEnum : uses values
ChatObject ..> MatcherManager : triggers events
ChatObject ..> CompletionEvent : creates
note for ChatObject "Streaming now uses AnyIO memory object streams with backpressure; queue_timeout controls send timeout; suspend breakpoints use standardized SuspendEnum tags"
Class diagram for thread-safe managers and ContextThreadsafeclassDiagram
direction LR
class ContextThreadsafe {
<<abstract>>
-_ctx_lock: asyncio.Lock
+__aenter__() ContextThreadsafe
+__aexit__(exc_type, exc_val, exc_tb) void
}
class SessionsManager {
<<singleton>>
+sessions: dict~str, SessionData~
+is_session_registered(session_id: str) bool
+register(session_id: str, data: SessionData) void
+get_session(session_id: str) SessionData
}
class SessionData {
+memory: MemoryModel
+config: AmritaConfig
}
class MultiToolsManager {
+_models: dict~str, ToolData~
+_disabled_tools: set~str~
+register_tool(tool: ToolData) void
+remove_tool(name: str) void
+has_tool(name: str) bool
+__aenter__()
+__aexit__()
}
class MultiClientManager {
+clients: list~MCPClient~
+script_to_clients: dict~str, MCPClient~
+name_to_clients: dict~str, MCPClient~
+tools_manager: MultiToolsManager
+register_only(client: MCPClient) void
+update_tools(client: MCPClient) void
+_load_this(client: MCPClient, fail_then_raise: bool)
+unregister_client(script_name, lock: bool)
+__aenter__()
+__aexit__()
}
class AdapterManager {
<<singleton>>
-_adapter_class: dict~str, type_ModelAdapter~
+register_adapter(adapter_cls: type_ModelAdapter) void
+safe_get_adapter(protocol: str) type_ModelAdapter | None
+__aenter__()
+__aexit__()
}
class MultiPresetManager {
+presets: dict~str, ModelPreset~
+__aenter__()
+__aexit__()
}
class MCPClient {
+server_script: str
+openai_tools: list
+bound_to(tm: ClientManager)
}
class ClientManager {
}
ContextThreadsafe <|-- SessionsManager
ContextThreadsafe <|-- MultiToolsManager
ContextThreadsafe <|-- MultiClientManager
ContextThreadsafe <|-- AdapterManager
ContextThreadsafe <|-- MultiPresetManager
SessionsManager --> SessionData
SessionData --> MemoryModel
MultiClientManager --> MultiToolsManager : uses
MultiClientManager --> MCPClient : manages
ClientManager <|-- MultiClientManager
AdapterManager --> ModelAdapter
MultiPresetManager --> ModelPreset
note for ContextThreadsafe "Provides optional async-context lock via __aenter__/__aexit__ for thread safety in Python 3.14+ (no GIL). All manager subclasses share a class-level asyncio.Lock."
File-Level Changes
Assessment against linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Contributor
There was a problem hiding this comment.
Hey - I've found 6 issues, and left some high level feedback:
- The ChatObject API docs (both EN and ZH) still describe an
overflow_queue_sizeparameter and dual-queue behavior that no longer exist in the implementation—please update these sections so the documented constructor signature and backpressure description match the new AnyIO single-stream API. - The new
SuspendEnumZH documentation file mixes Chinese with Korean phrases (e.g., “값”, “사용标准断点의 외부 컨트롤러”), which will confuse readers—consider normalizing this page to consistent Chinese terminology. - In
logging.py,logger_idis now wrapped in aRef[int]while much existing code is likely to treat it as a plainint; either keeplogger_idas anintor introduce a separatelogger_id_refto avoid breaking existing usages that passlogger_iddirectly tologger.removeor similar.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The ChatObject API docs (both EN and ZH) still describe an `overflow_queue_size` parameter and dual-queue behavior that no longer exist in the implementation—please update these sections so the documented constructor signature and backpressure description match the new AnyIO single-stream API.
- The new `SuspendEnum` ZH documentation file mixes Chinese with Korean phrases (e.g., “값”, “사용标准断点의 외부 컨트롤러”), which will confuse readers—consider normalizing this page to consistent Chinese terminology.
- In `logging.py`, `logger_id` is now wrapped in a `Ref[int]` while much existing code is likely to treat it as a plain `int`; either keep `logger_id` as an `int` or introduce a separate `logger_id_ref` to avoid breaking existing usages that pass `logger_id` directly to `logger.remove` or similar.
## Individual Comments
### Comment 1
<location path="src/amrita_core/chatmanager.py" line_range="423-426" />
<code_context>
- _q_ovf_tout: float
+ _send_stream: ObjectSendStream
+ _receive_stream: ObjectReceiveStream
+ _q_tout: float | None
_is_running: bool = False # Whether it is running
_is_done: bool = False # Whether it has completed
</code_context>
<issue_to_address>
**issue (bug_risk):** Handle `queue_timeout=None` before passing it to `anyio.fail_after`.
`_q_tout` is `float | None`, but `_put_to_queue` always calls `anyio.fail_after(self._q_tout)`, which requires a non-negative float. If `queue_timeout=None` is passed, this will raise a `TypeError` at runtime. If `None` means "no timeout", either skip `fail_after` when `_q_tout is None` or normalize `None` (e.g. to `float("inf")`) in `__init__`.
</issue_to_address>
### Comment 2
<location path="src/amrita_core/threadsafe.py" line_range="10-11" />
<code_context>
def __init_subclass__(cls) -> None:
super().__init_subclass__()
- if not getattr(cls, "__abstract__", False):
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Reconsider creating a class-level `asyncio.Lock` at import time for cross-loop/thread usage.
Because `_ctx_lock` is created once per subclass at definition time, it’s tied to whatever event loop is current then. In multi-loop or multi-threaded environments, using that same class-level `asyncio.Lock` from a different loop can raise `RuntimeError`. To better align with the goal of being thread-safe in Python 3.14+ (no GIL), consider either lazily creating the lock on first `__aenter__` per loop, or using per-instance locks if you don’t need cross-instance serialization.
</issue_to_address>
### Comment 3
<location path="src/amrita_core/logging.py" line_range="11-12" />
<code_context>
from typing import TYPE_CHECKING, Protocol
-import loguru
+from loguru import _defaults as _lg_def
+from loguru import _logger as _lg_log
+
+from amrita_core.utils import Ref
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Avoid relying on loguru internals (`_logger`, `_defaults`) to construct `Logger`.
This change instantiates `Logger` via `loguru._logger.Logger` and checks `_defaults.LOGURU_AUTOINIT`, both of which are internal APIs and may break with loguru updates, even with the `<0.8.0` pin. Prefer configuring the public `loguru.logger` (e.g., `logger = loguru.logger` with `remove()/add()`), or at least isolate the internal usage in a helper to minimize future breakage scope.
Suggested implementation:
```python
from typing import TYPE_CHECKING, Protocol
import loguru
from amrita_core.utils import Ref
if TYPE_CHECKING:
from loguru import Logger, Record
# Use the public loguru logger instance rather than constructing Logger via internals.
logger: Logger = loguru.logger
```
1. Remove any remaining usages of `_lg_def` (for example, `_lg_def.LOGURU_AUTOINIT`) elsewhere in this file, since the import is gone.
2. If there is custom initialization logic that depended on `LOGURU_AUTOINIT` or manually creating `Core()`, refactor it to operate on the public `logger`:
- Use `logger.remove()` to clear default handlers when needed.
- Use `logger.add(...)` to configure outputs, levels, formats, etc.
3. If you need a helper to encapsulate loguru configuration (to keep the public API usage localized), create a small function such as `configure_logger()` in this module that performs any `remove()/add()` calls on `logger`, and call it from module import or from your application bootstrap code.
</issue_to_address>
### Comment 4
<location path="src/amrita_core/agent/strategy.py" line_range="199" />
<code_context>
+from amrita_core.agent.strategy import AgentStrategy
+
+class CustomAgentStrategy(AgentStrategy):
+ async def on_exception(self, exc: BaseException) -> None:
+ """Custom exception handling logic"""
+ # Log the exception
</code_context>
<issue_to_address>
**question (bug_risk):** Changing the default `on_exception` to a no-op may hide errors in custom strategies.
The previous default raised `NoExceptionHandler`, making missing error handling in strategies explicit. With a silent `pass`, failures in `_run_agent` or `_run_strategy` may appear successful, especially where callers expect `on_exception` to surface or log errors. If you want a more lenient default, consider at least logging the exception so unhandled errors are still visible.
</issue_to_address>
### Comment 5
<location path="tests/test_protocol.py" line_range="117-125" />
<code_context>
msg = ImageMessage(unsupported_data)
assert msg.get_content() == "[Unsupported image format]"
- # Skip complex curl tests for now as they require more sophisticated mocking
- @pytest.mark.skip(reason="Complex aiohttp mocking requires more setup")
- @pytest.mark.asyncio
- async def test_image_message_curl_image(self):
- pass
-
- @pytest.mark.skip(reason="Complex aiohttp mocking requires more setup")
- @pytest.mark.asyncio
- async def test_image_message_curl_image_error(self):
- pass
-
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding basic tests for the new `EmbeddingChunk` model alongside protocol tests.
Removing the skipped curl tests is fine, but the new `EmbeddingChunk` model and related protocol types currently have no coverage. Since this file already tests core protocol/model types, it would be a good place to add a simple `EmbeddingChunk` test that:
- Instantiates `EmbeddingChunk(embedding=[0.1, 0.2, 0.3], index=0)`.
- Verifies the fields and that `.model_dump()` returns the expected structure.
This gives minimal regression coverage for the new type without needing full embedding adapter tests.
Suggested implementation:
```python
msg = ImageMessage(unsupported_data)
assert msg.get_content() == "[Unsupported image format]"
class TestEmbeddingChunk:
def test_embedding_chunk_fields_and_model_dump(self):
chunk = EmbeddingChunk(embedding=[0.1, 0.2, 0.3], index=0)
assert chunk.embedding == [0.1, 0.2, 0.3]
assert chunk.index == 0
dumped = chunk.model_dump()
assert dumped == {
"embedding": [0.1, 0.2, 0.3],
"index": 0,
}
```
If `EmbeddingChunk` is not yet imported in `tests/test_protocol.py`, add an import at the top of the file consistent with how other protocol/model types are imported. For example, if other models are imported via:
```python
from mypkg.protocol import ImageMessage, ...
```
extend that import to include `EmbeddingChunk`:
```python
from mypkg.protocol import ImageMessage, EmbeddingChunk, ...
```
Adjust the module path (`mypkg.protocol`) to match your existing codebase.
</issue_to_address>
### Comment 6
<location path="README.md" line_range="67" />
<code_context>
This project is licensed under the MIT License - see the [LICENSE](./LICENSE) file for details.
-**Significants**
+### Significants
All versions of AmritaCore are released under the MIT License (Although the past versions are released under the AGPLv3 License, when this readme is created, we will release all versions under the MIT License).
</code_context>
<issue_to_address>
**suggestion (typo):** Consider renaming the heading `Significants` to a more idiomatic English term.
Here you may want a more natural term such as “Significance”, “Notes”, or “Remarks”, which will read more clearly to English speakers.
```suggestion
### Notes
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Deploying amritacore with
|
| Latest commit: |
4cae744
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://ae5a5fcf.amritacore.pages.dev |
| Branch Preview URL: | https://feat-and-fix.amritacore.pages.dev |
Member
Author
|
@sourcery-ai title |
Member
Author
|
@sourcery-ai summary |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
close #47
Summary by Sourcery
Release AmritaCore 0.8.0 with enhanced safety, streaming, and concurrency primitives, updated adapter and embedding support, standardized suspend breakpoints, and improved tooling and documentation.
New Features:
Bug Fixes:
Enhancements:
Build:
CI:
Documentation:
Tests: