Skip to content

Refine chat cleanup, streaming suspend, and fallback handling#60

Merged
JohnRichard4096 merged 3 commits intomainfrom
fix/bugs
Apr 26, 2026
Merged

Refine chat cleanup, streaming suspend, and fallback handling#60
JohnRichard4096 merged 3 commits intomainfrom
fix/bugs

Conversation

@JohnRichard4096
Copy link
Copy Markdown
Member

@JohnRichard4096 JohnRichard4096 commented Apr 26, 2026

close #59

Summary by Sourcery

Address stability and cleanup issues in chat management and streaming while bumping the package patch version.

Bug Fixes:

  • Prevent invalid future state errors when suspending and resuming streaming by guarding future completion and resetting internal suspend/resume signals.
  • Ensure fallback event triggering correctly ignores FallbackFailed exceptions during LLM retry handling.

Enhancements:

  • Increase the hard limit for per-session chat objects and log a warning when excessive objects are detected to aid memory leak detection.
  • Make chat object cleanup report whether it performed a cleanup, enabling callers to react to excessive object counts.

Build:

  • Bump project version from 0.8.4 to 0.8.4.1.

@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai Bot commented Apr 26, 2026

Reviewer's Guide

Adjusts chat session cleanup semantics, improves fallback event triggering and streaming suspend/resume robustness, and bumps the package patch version.

Sequence diagram for updated streaming suspend and resume flow

sequenceDiagram
    actor Client
    participant StreamingController
    participant SuspendSignal as Suspend_Future
    participant ResumeSignal as Resume_Future

    Client->>StreamingController: call _wait_for_continue(tag)
    alt first_suspend
        StreamingController->>SuspendSignal: await __suspend_signal
        SuspendSignal-->>StreamingController: result True
        StreamingController-->>Client: return True
    else already_suspended
        StreamingController->>StreamingController: check __suspend_signal.done()
        alt not done
            StreamingController->>SuspendSignal: set_result(True)
        end
        StreamingController->>ResumeSignal: create new __resume_signal
        StreamingController->>ResumeSignal: await __resume_signal
        ResumeSignal-->>StreamingController: result True
        StreamingController-->>Client: return True
    end

    Client->>StreamingController: call resume()
    StreamingController->>StreamingController: check __resume_signal and not done
    alt resume_signal_pending
        StreamingController->>ResumeSignal: set_result(True)
    end
    StreamingController->>StreamingController: _suspend_tags = None
    StreamingController->>StreamingController: __resume_signal = None
    StreamingController->>StreamingController: __suspend_signal = None
Loading

Class diagram for updated ChatManager and streaming control

classDiagram
    class ChatManager {
        +dict~str, ChatObjectMeta~ running_chat_object
        +dict~str, ChatObjectMeta~ running_chat_object_id2map
        +bool clean_obj(k: str, maxitems: int)
        +list~ChatObjectMeta~ get_all_objs()
    }

    class ChatObjectMeta {
        +str stream_id
        +datetime start_at
        +datetime end_at
        +bool is_done
    }

    class StreamingController {
        -Future __suspend_signal
        -Future __resume_signal
        -list~str~ _suspend_tags
        +async bool _wait_for_continue(tag: str)
        +void resume()
        +bool queue_closed()
    }

    ChatManager "1" o-- "many" ChatObjectMeta : manages
    StreamingController "1" o-- "many" ChatObjectMeta : streams
Loading

File-Level Changes

Change Details Files
Make chat object cleanup return a success flag and use a higher hard limit with logging when too many chat objects are detected for a session.
  • Change ChatManager.clean_obj to return a boolean indicating whether cleanup occurred
  • Update clean_obj docstring to describe the return value
  • Use a much larger maxitems value (10000) when cleaning objects in ChatObject._entry to act as a hard safety limit instead of aggressive cleanup
  • Log a warning when the number of chat objects exceeds the hard limit so potential memory leaks can be investigated
src/amrita_core/chatmanager.py
Fix fallback event triggering to ignore FallbackFailed exceptions instead of treating them as normal events.
  • Update MatcherManager.trigger_event invocation in _process_chat to pass FallbackFailed via the exception_ignored parameter, aligning with the intended API
src/amrita_core/chatmanager.py
Harden streaming suspend/resume behavior to avoid setting already-completed futures and to fully reset internal state on resume.
  • Guard suspend future completion in _wait_for_continue by checking if __suspend_signal is already done before calling set_result
  • On resume, always clear suspend-related tags and reset __resume_signal and __suspend_signal to None to avoid stale state and potential leaks
src/amrita_core/streaming.py
Bump project patch version to reflect bugfix changes.
  • Increment project version from 0.8.4 to 0.8.4.1
pyproject.toml

Assessment against linked issues

Issue Objective Addressed Explanation
#59 Prevent asyncio.exceptions.InvalidStateError in Streaming._wait_for_continue by adding protective logic before calling self.__suspend_signal.set_result(...) (e.g., checking that the future is not already done).
#59 Improve lifecycle handling of suspend/resume signals in Streaming.resume to avoid stale or inconsistent future state after resuming.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@JohnRichard4096
Copy link
Copy Markdown
Member Author

@sourcery-ai title

@sourcery-ai sourcery-ai Bot changed the title Fix/bugs Refine chat cleanup, streaming suspend, and fallback handling Apr 26, 2026
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • In ChatManager.clean_obj, accessing self.running_chat_object[k] directly will raise a KeyError if the key is missing; consider using get (and returning False early) to make this safer for callers that may invoke clean_obj defensively.
  • When logging the warning about "too many chat objects" in _entry, it may be more actionable to include the actual object count (and limit) in the log message to help diagnose memory leak patterns.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `ChatManager.clean_obj`, accessing `self.running_chat_object[k]` directly will raise a `KeyError` if the key is missing; consider using `get` (and returning `False` early) to make this safer for callers that may invoke `clean_obj` defensively.
- When logging the warning about "too many chat objects" in `_entry`, it may be more actionable to include the actual object count (and limit) in the log message to help diagnose memory leak patterns.

## Individual Comments

### Comment 1
<location path="src/amrita_core/streaming.py" line_range="153-156" />
<code_context>
         """Resume to run when suspend."""
         if self.__resume_signal and not self.__resume_signal.done():
             self.__resume_signal.set_result(True)
-            self._suspend_tags = None
+        self._suspend_tags = None
+        self.__resume_signal = None
+        self.__suspend_signal = None  # Just for cleanup

     def queue_closed(self) -> bool:
</code_context>
<issue_to_address>
**issue (bug_risk):** Resetting `__suspend_signal` to `None` increases the risk of `AttributeError` in `_wait_for_continue` if the lifecycle ever gets out of sync.

`_wait_for_continue` calls `self.__suspend_signal.set_result(True)` assuming it’s always a `Future`, but `resume()` now assigns `None` to `__suspend_signal`. Any path that triggers a new suspend without recreating the `Future` will raise `AttributeError`. To decouple these lifecycles, either guard the call (e.g. `if self.__suspend_signal and not self.__suspend_signal.done():`) or avoid setting `__suspend_signal` to `None` and simply overwrite it on the next suspend cycle.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread src/amrita_core/streaming.py Outdated
@JohnRichard4096 JohnRichard4096 merged commit 9d0b83c into main Apr 26, 2026
3 checks passed
@JohnRichard4096 JohnRichard4096 deleted the fix/bugs branch April 26, 2026 06:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] Consider add protect logic

1 participant