feat: support separate turns container for short-term/long-term memory separation#14
Conversation
There was a problem hiding this comment.
Pull request overview
Adds an optional second Cosmos container for short-term "turn" documents, while keeping derived memories (facts, summaries, episodic, procedural, user_summary) in the main memories container. Both sync and async CosmosMemoryClient are updated with constructor/create_memory_store parameters, a routing helper pair, and dual-container fallbacks on update/delete/get_thread.
Changes:
- New
cosmos_turns_containerconstructor /create_memory_store(turns_container=...)arg, with provisioning that mirrors the main container's partition key, indexing, vector, and full-text policies. - New
_container_for_type(writes) and_container_for_query(reads) helpers;add_cosmos,push_to_cosmos,get_memories,update_cosmos,delete_cosmos, andget_threadupdated to use them. get_threadspecial-cases "no type filter" by querying both containers and concatenating results.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| agent_memory_toolkit/cosmos_memory_client.py | Sync client: turns-container plumbing, routing helpers, dual-container CRUD/get_thread fallback. |
| agent_memory_toolkit/aio/cosmos_memory_client.py | Async client: same routing/provisioning changes mirrored for async APIs. |
Comments suppressed due to low confidence (1)
agent_memory_toolkit/cosmos_memory_client.py:1506
- When turns are in a separate container, concatenating
items_main + items_turnsdoes not preserve theORDER BY c.created_at DESCof the merged result. Each container's items are individually sorted, but the combined list is not. As a consequence,items[:recent_k]may include older entries from one container while skipping newer ones from the other, and the final reversed list will not be in true chronological order. The two result lists should be merged bycreated_at(e.g. viaheapq.mergewith a key, or by sorting the union) before slicing byrecent_k.
if self._turns_container_client is not None and not memory_types:
items_main = list(
self._container_client.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True,
)
)
items_turns = list(
self._turns_container_client.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True,
)
)
items = items_main + items_turns
else:
container = self._container_for_query(memory_types)
items = list(
container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True,
)
)
except Exception as exc:
raise CosmosOperationError(f"get_thread query failed: {exc}") from exc
if recent_k is not None:
items = items[:recent_k]
items.reverse()
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self._container_client: Any = None | ||
| self._turns_container_client: Any = None # Separate container for turns (optional) | ||
| self._counter_container_client: Any = None |
There was a problem hiding this comment.
Fixed. Both async \close()\ and _drain_cosmos_client()\ now set \self._turns_container_client = None\ alongside the other container handles.
There was a problem hiding this comment.
Fixed. Both async \close()\ and _drain_cosmos_client()\ now set \self._turns_container_client = None\ alongside the other container handles.
There was a problem hiding this comment.
Fixed. Both async \close()\ and _drain_cosmos_client()\ now set \self._turns_container_client = None\ alongside the other container handles.
| def _container_for_query(self, memory_types: Optional[list[str]] = None) -> Any: | ||
| """Return the appropriate container for a read query. | ||
|
|
||
| If the query targets only turns and a separate turns container is | ||
| configured, route to the turns container. For mixed or non-turn | ||
| queries, use the main container. | ||
| """ | ||
| if ( | ||
| self._turns_container_client is not None | ||
| and memory_types | ||
| and all(t == "turn" for t in memory_types) | ||
| ): | ||
| return self._turns_container_client | ||
| return self._container_client | ||
|
|
There was a problem hiding this comment.
By design. \search_cosmos, \add_tags,
remove_tags, and \get_procedural_memories\ operate exclusively on derived memories (facts, episodic, procedural, summaries) which always live in the main container. Raw turns are not expected to be searched via vector/full-text or tagged - they are retrieved via \get_thread()\ which queries both containers. The docstring for _container_for_query\ now clarifies this contract explicitly.
| cosmos_key: Optional[str] = None, | ||
| cosmos_database: Optional[str] = None, | ||
| cosmos_container: Optional[str] = None, | ||
| cosmos_turns_container: Optional[str] = None, |
There was a problem hiding this comment.
Acknowledged. Added inline comment in both clients noting that change-feed triggers bound to the main container will NOT fire for turns in a dedicated turns container. Function App infra update is out of scope for this PR and will be addressed in a follow-up.
| else: | ||
| self._turns_container_client = None | ||
|
|
||
| self._init_pipeline() |
There was a problem hiding this comment.
Fixed. \ProcessingPipeline\ now accepts an optional \cosmos_turns_container\ parameter. \�xtract_memories, \generate_thread_summary, and \generate_user_summary\ all query both containers and merge results.
7b9754b to
34933db
Compare
34933db to
632822c
Compare
632822c to
bf7c9d9
Compare
| raise CosmosNotConnectedError() | ||
|
|
||
| def _container_for_type(self, memory_type: str) -> Any: | ||
| """Return the appropriate container client based on memory type. | ||
|
|
||
| When ``cosmos_turns_container`` is configured, turn-type documents | ||
| are routed to the dedicated turns container. All other memory types | ||
| (fact, summary, episodic, procedural, user_summary) use the main | ||
| memories container. | ||
| """ | ||
| if memory_type == "turn" and self._turns_container_client is not None: | ||
| return self._turns_container_client | ||
| return self._container_client | ||
|
|
||
| def _container_for_query(self, memory_types: Optional[list[str]] = None) -> Any: | ||
| """Return the appropriate container for a read query. | ||
|
|
||
| If the query targets only turns and a separate turns container is | ||
| configured, route to the turns container. For mixed or non-turn | ||
| queries, use the main container. | ||
|
|
||
| If a dedicated turns container is configured, this helper can only |
There was a problem hiding this comment.
Fixed — same as above. \ProcessingPipeline\ now receives the turns container and queries both in all thread-wide operations (\�xtract_memories, \generate_thread_summary, \generate_user_summary).
There was a problem hiding this comment.
Fixed — same as above. \ProcessingPipeline\ now receives the turns container and queries both in all thread-wide operations.
| thread_id=thread_id, | ||
| role=role, | ||
| memory_types=memory_types, |
There was a problem hiding this comment.
By design. \get_memories()\ is intended for derived/processed memories (facts, episodic, procedural, summaries), not raw turns. Raw turns are retrieved via \get_thread()\ which correctly queries both containers. The _container_for_query\ docstring now documents this explicitly.
|
|
||
| # Connect turns container if configured separately | ||
| if self._cosmos_turns_container: | ||
| turns_handle = db.get_container_client(self._cosmos_turns_container) | ||
| self._turns_container_client = turns_handle | ||
| logger.info( | ||
| "Connected turns container: %s/%s", | ||
| self._cosmos_database, | ||
| self._cosmos_turns_container, | ||
| ) | ||
| else: |
There was a problem hiding this comment.
Fixed. \connect_cosmos()\ now accepts an optional \ urns_container\ parameter that overrides the constructor setting, in both sync and async clients.
| cosmos_key: Optional[str] = None, | ||
| cosmos_database: Optional[str] = None, | ||
| cosmos_container: Optional[str] = None, | ||
| cosmos_turns_container: Optional[str] = None, |
There was a problem hiding this comment.
Same as above — documented as a known limitation. Function App infra update is out of scope for this PR.
| raise CosmosNotConnectedError() | ||
|
|
||
| def _container_for_type(self, memory_type: str) -> Any: | ||
| """Return the appropriate container client based on memory type. | ||
|
|
||
| When ``cosmos_turns_container`` is configured, turn-type documents | ||
| are routed to the dedicated turns container. All other memory types | ||
| (fact, summary, episodic, procedural, user_summary) use the main | ||
| memories container. | ||
| """ | ||
| if memory_type == "turn" and self._turns_container_client is not None: | ||
| return self._turns_container_client | ||
| return self._container_client | ||
|
|
||
| def _container_for_query(self, memory_types: Optional[list[str]] = None) -> Any: | ||
| """Return the appropriate container for a read query. | ||
|
|
||
| If the query targets only turns and a separate turns container is | ||
| configured, route to the turns container. For mixed or non-turn | ||
| queries, use the main container. | ||
|
|
||
| If a dedicated turns container is configured, this helper can only |
There was a problem hiding this comment.
Acknowledged. Existing tests pass. Dedicated unit tests for multi-container routing will be added in a follow-up PR.
| if tags: | ||
| for i, tag in enumerate(tags): | ||
| qb.add_array_contains("c.tags", f"@tag_{i}", tag) | ||
| if exclude_tags: |
There was a problem hiding this comment.
Fixed. \get_thread\ now uses _containers_for_query()\ to iterate all relevant containers, merges results, and re-sorts by \created_at DESC\ before applying
ecent_k\ slicing. Both sync and async.
bf7c9d9 to
faf6b4e
Compare
faf6b4e to
ea795ba
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (2)
agent_memory_toolkit/cosmos_memory_client.py:1229
get_memoriesuses_container_for_query, which returns only the main container whenmemory_typesisNone/empty or contains a mix of turn and non-turn types. With a dedicated turns container configured, this silently drops everyturnrecord from results in those cases — even though the docstring statesNone/emptymemory_types"returns all types". Callers like list-all or mixed-type queries (e.g.["turn", "fact"]) will see incomplete data with no error. Consider routingget_memoriesthrough_containers_for_query(merging and re-applyingrecent_k/sorting after the union, similar toget_thread) so that all requested types are returned regardless of which container holds them.
try:
container = self._container_for_query(memory_types)
items = list(
container.query_items(
query=query,
parameters=parameters or None,
enable_cross_partition_query=True,
)
)
agent_memory_toolkit/aio/cosmos_memory_client.py:976
- Same issue as the sync client: async
get_memoriesroutes through_container_for_query, so whenmemory_typesisNone/empty or mixed (e.g.["turn", "fact"]), turn records held in the separate turns container are silently omitted from results even though the docstring promises all types. Use_containers_for_queryand merge results (handlingrecent_k/ORDER BYafter the union) so the read path is consistent withget_thread.
try:
container = self._container_for_query(memory_types)
items_iter = container.query_items(
query=query,
parameters=parameters or None,
)
results = [item async for item in items_iter]
| ) | ||
| self._cosmos_client = client | ||
| self._container_client = container_handle | ||
|
|
||
| # Create and connect separate turns container if configured | ||
| if self._cosmos_turns_container: | ||
| turns_handle = db.create_container_if_not_exists( | ||
| **_build_container_kwargs( | ||
| container_id=self._cosmos_turns_container, | ||
| partition_key=partition_key, | ||
| offer_throughput=offer_throughput, | ||
| default_ttl=-1, | ||
| indexing_policy=idx_policy, | ||
| vector_embedding_policy=vec_policy, | ||
| full_text_policy=ft_policy, | ||
| ) | ||
| ) | ||
| self._turns_container_client = turns_handle | ||
| logger.info( | ||
| "Created turns container: %s/%s", | ||
| self._cosmos_database, | ||
| self._cosmos_turns_container, | ||
| ) | ||
| else: | ||
| self._turns_container_client = None | ||
|
|
There was a problem hiding this comment.
Same as above - documented as a known limitation via inline comments. Function App infra changes will be a follow-up PR.
| # Also fetch from the turns container if it differs from the main one | ||
| if self._turns_container is not self._container: | ||
| items.extend( | ||
| list( | ||
| self._turns_container.query_items( | ||
| query=query, | ||
| parameters=parameters, | ||
| partition_key=[user_id, thread_id], | ||
| ) | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Fixed. generate_user_summary now also queries the turns container when configured.
| """Return the appropriate container for a read query. | ||
|
|
||
| If the query targets only turns and a separate turns container is | ||
| configured, route to the turns container. For mixed or non-turn | ||
| queries, use the main container. | ||
|
|
||
| In the default single-container configuration this returns only the | ||
| main memories container. When a dedicated turns container is | ||
| configured: | ||
|
|
||
| * turn-only queries should search only the turns container | ||
| * non-turn-only queries should search only the main memories container | ||
| * mixed or unspecified queries may need to inspect both containers | ||
| """ | ||
| if not memory_types: | ||
| return self._container_client | ||
|
|
||
| has_turn = any(t == "turn" for t in memory_types) | ||
| has_not_turn = any(t != "turn" for t in memory_types) | ||
|
|
||
| if self._turns_container_client is not None: | ||
| if has_turn and not has_not_turn: | ||
| return self._turns_container_client | ||
| if not has_turn: | ||
| return self._container_client | ||
| # Mixed: fall back to main (turns in separate container won't | ||
| # appear but this preserves backward-compat for callers that | ||
| # don't handle multi-container merging) | ||
| return self._container_client | ||
| return self._container_client | ||
|
|
There was a problem hiding this comment.
Fixed. Rewrote the docstring to accurately describe the single-container behavior and direct callers needing both containers to _containers_for_query()\ instead.
ea795ba to
05485fb
Compare
05485fb to
c7fa331
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (2)
agent_memory_toolkit/cosmos_memory_client.py:1232
- When a dedicated turns container is configured,
_container_for_query(None)returns only the main container, soget_memories()called withoutmemory_types(the documented "returns all types" default) will silently omit all turn documents. The same applies to mixed queries likememory_types=["turn", "fact"], where turns are silently dropped. Eitherget_memories()should use_containers_for_query()(and merge results, similar toget_thread()), or these cases should raise/log explicitly rather than return incomplete data. The current behavior contradicts the docstring at lines 1167-1169 and is a silent data-loss footgun.
try:
container = self._container_for_query(memory_types)
items = list(
container.query_items(
query=query,
parameters=parameters or None,
enable_cross_partition_query=True,
)
)
agent_memory_toolkit/aio/cosmos_memory_client.py:979
- Same issue as the sync client: with a turns container configured, async
get_memories(memory_types=None)queries only the main container and silently drops turn documents, despite the docstring stating thatNone/emptymemory_types"returns all types". Mixedmemory_types=["turn", "fact"]exhibits the same silent loss. Consider routing this through_containers_for_query()and merging async iterators, asget_thread()does at lines 1254-1261.
try:
container = self._container_for_query(memory_types)
items_iter = container.query_items(
query=query,
parameters=parameters or None,
)
results = [item async for item in items_iter]
| # Mixed: fall back to main (turns in separate container won't | ||
| # appear but this preserves backward-compat for callers that | ||
| # don't handle multi-container merging) |
There was a problem hiding this comment.
By design for the single-container helper. _container_for_query() is explicitly a single-container router for derived-memory callers. Callers needing turns use _containers_for_query() (plural) which returns all relevant containers. get_thread already uses the plural version. Docstring now makes this contract clear.
…y separation Add optional cosmos_turns_container parameter to CosmosMemoryClient and AsyncCosmosMemoryClient that routes turn-type documents to a dedicated container while keeping derived memories (facts, summaries, episodic, procedural) in the main container. Changes: - New _container_for_type() routes writes by memory_type - New _container_for_query() returns single container for callers that target derived memories only (get_memories, search_cosmos). Docstring clarifies it does NOT merge across containers. - New _containers_for_query() returns list of all relevant containers for multi-container merging (no ValueError on mixed types) - get_thread uses _containers_for_query and merges with chronological sort before recent_k slicing - ProcessingPipeline accepts cosmos_turns_container and queries both containers in extract_memories, generate_thread_summary, and generate_user_summary - Pipeline uses getattr guard for _turns_container for backward compat with tests constructing via __new__ - connect_cosmos() now accepts turns_container override parameter (symmetric with create_memory_store) - create_memory_store provisions turns container when configured - update_cosmos/delete_cosmos check both containers (main first, then turns fallback) - close() and _drain_cosmos_client() clear _turns_container_client - Async _init_pipeline builds a sync turns container handle for pipeline - Added inline comment noting change-feed triggers bound to main container will NOT fire for turns in dedicated turns container - Fully backward compatible: omit the param for single-container behavior Note: search_cosmos, add_tags, remove_tags, get_procedural_memories query the main container only by design — these operate on derived memories, not raw turns. Raw turns are retrieved via get_thread(). The change-feed Function App trigger requires a follow-up PR to support the turns container binding.
c7fa331 to
8787f2e
Compare
Add optional cosmos_turns_container parameter to CosmosMemoryClient and AsyncCosmosMemoryClient that routes turn-type documents to a dedicated container while keeping derived memories (facts, summaries, episodic, procedural) in the main container.