fix(agentic): reasoning model token tracking, event-loop safety, stre…#989
Merged
brajrajnagar merged 1 commit intomasterfrom Mar 16, 2026
Merged
fix(agentic): reasoning model token tracking, event-loop safety, stre…#989brajrajnagar merged 1 commit intomasterfrom
brajrajnagar merged 1 commit intomasterfrom
Conversation
…aming and tool call passthrough
Minimum allowed line rate is |
Contributor
There was a problem hiding this comment.
Pull request overview
Improves AgenticModelClass streaming/tool-loop behavior by fixing token accounting (especially for reasoning models and streaming providers that emit usage per chunk) and addressing event-loop safety when consuming sync streaming iterators.
Changes:
- Adjust token accumulation to derive
completion_tokensfromtotal_tokens - prompt_tokensand add drain/reset semantics for thread-local token tracking. - Add streaming helpers to prevent usage double-counting and to avoid blocking the background event loop when iterating sync streaming responses.
- Refactor repeated tool-result extraction and responses input normalization into shared helpers; expand tests accordingly.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
clarifai/runners/models/agentic_class.py |
Token tracking fixes, streaming wrappers/executor bridging, helper refactors, and MCP streaming token finalization changes. |
tests/runners/test_agentic_class.py |
Updates and expands tests for token draining/finalization and new helper methods; adjusts MCP streaming test expectations. |
Comments suppressed due to low confidence (1)
tests/runners/test_agentic_class.py:78
- The updated token-tracking logic prefers
total_tokens - prompt_tokenswhentotal_tokensis present, but the tests currently only cover thetotal_tokens is Nonepath. Add a test case wheretotal_tokensis set (andcompletion_tokensdiffers) to ensure reasoning-token-inclusive accounting behaves as intended.
def test_add_tokens_from_usage(self, model):
"""Test adding tokens from response with usage attribute."""
mock_response = MagicMock()
mock_usage = MagicMock()
mock_usage.prompt_tokens = 10
mock_usage.completion_tokens = 20
mock_usage.total_tokens = None
mock_response.usage = mock_usage
model._add_tokens(mock_response)
assert model._thread_local.tokens['prompt'] == 10
assert model._thread_local.tokens['completion'] == 20
| """Yield chunks with usage=None on all but the last usage-bearing chunk. | ||
|
|
||
| def _create_stream_request(self, messages, tools, max_tokens, temperature, top_p): | ||
| Some providers(Gemini) send a usage object on every chunk; keeping only the last one |
Comment on lines
+805
to
+823
| def _stream_with_nulled_usage(self, chunks): | ||
| """Yield chunks with usage=None on all but the last usage-bearing chunk. | ||
|
|
||
| def _create_stream_request(self, messages, tools, max_tokens, temperature, top_p): | ||
| Some providers(Gemini) send a usage object on every chunk; keeping only the last one | ||
| avoids accumulating duplicated token counts across the multi-turn tool loop. | ||
| We buffer the most recent usage chunk and null out all earlier ones before yielding. | ||
| """ | ||
| self.buffered_usage_chunk = None | ||
| for chunk in chunks: | ||
| if getattr(chunk, "usage", None) is not None: | ||
| if self.buffered_usage_chunk is not None: | ||
| # Null out earlier usage so _set_usage only counts the final summary. | ||
| self.buffered_usage_chunk.usage = None | ||
| yield self.buffered_usage_chunk | ||
| self.buffered_usage_chunk = chunk | ||
| else: | ||
| yield chunk | ||
| if self.buffered_usage_chunk is not None: | ||
| yield self.buffered_usage_chunk |
Comment on lines
+812
to
+823
| self.buffered_usage_chunk = None | ||
| for chunk in chunks: | ||
| if getattr(chunk, "usage", None) is not None: | ||
| if self.buffered_usage_chunk is not None: | ||
| # Null out earlier usage so _set_usage only counts the final summary. | ||
| self.buffered_usage_chunk.usage = None | ||
| yield self.buffered_usage_chunk | ||
| self.buffered_usage_chunk = chunk | ||
| else: | ||
| yield chunk | ||
| if self.buffered_usage_chunk is not None: | ||
| yield self.buffered_usage_chunk |
Comment on lines
+467
to
+473
| completion_tokens = ( | ||
| (total_tokens - prompt_tokens) | ||
| if total_tokens is not None | ||
| else ( | ||
| getattr(usage, 'completion_tokens', 0) or getattr(usage, 'output_tokens', 0) or 0 | ||
| ) | ||
| ) |
| # empty accumulator. | ||
| bg_tokens = pool._run_async(self._clear_bg_tokens()) | ||
| if bg_tokens['prompt'] > 0 or bg_tokens['completion'] > 0: | ||
| logger.info( |
luv-bansal
reviewed
Mar 16, 2026
luv-bansal
approved these changes
Mar 16, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fix token tracking for reasoning models:
_add_tokensnow derivescompletion_tokensastotal_tokens - prompt_tokenswhentotal_tokensisavailable. Reasoning models (o1, o3) exclude reasoning tokens from
completion_tokensbut include them intotal_tokens, so this ensuresreasoning tokens are counted in the reported usage.
Fix streaming token double-counting: Added
_stream_with_nulled_usagewrapper that nulls out usage on all but the final usage-bearing chunk.
Providers like Gemini emit a usage object on every stream chunk, causing
token counts to be multiplied by the number of chunks.
Fix token collection for MCP streaming: Token accumulation during MCP
streaming happens inside the pool's background event loop thread (via
_sync_to_async_iter). Replaced the_finalize_tokens()call (which readsfrom the wrong thread's
_thread_local) withpool._run_async(_clear_bg_tokens())to correctly collect tokens from the background thread. Added
_drain_tokens()to atomically read-and-reset the accumulator.
Fix event loop blocking in streaming: Added
_sync_to_async_iterwhichwraps sync iterators (OpenAI streaming responses) to run each
next()call ina thread pool executor, preventing the background event loop thread from
blocking during I/O waits.
Add
reasoning_effortsupport: Passed through to_create_stream_requestand
_stream_chat_with_toolsso reasoning-capable models honor the parameterend-to-end.
Preserve extended tool call attributes:
_accumulate_tool_deltanow usesmodel_dumpto capture all fields generically (e.g.thought_signaturefromreasoning models) instead of only copying
id/type/function._finalize_tool_callspreserves all accumulated attributes accordingly.Refactor: Extracted
_extract_tool_contentand_normalize_input_itemsto eliminate duplicated inline logic across all four
_execute_*_tools*methods and two input-normalization sites.
Verification: verified the changes with gemini 3.1 flash lite and GPT-5.4 wrapper.