Skip to content

Code refactoring, async code fixes and error handling#2

Merged
jcodella merged 2 commits into
AzureCosmosDB:mainfrom
aayush3011:users/akataria/asyncFixes
Apr 7, 2026
Merged

Code refactoring, async code fixes and error handling#2
jcodella merged 2 commits into
AzureCosmosDB:mainfrom
aayush3011:users/akataria/asyncFixes

Conversation

@aayush3011
Copy link
Copy Markdown
Contributor

Motivation

  1. Code Duplication: Code refactoring and cleanup to improve maintainability, modularity, and long‑term velocity.
  2. Zero error handling: LLM calls, Cosmos operations, and HTTP polling had no try/except, no retries, no logging. Failures surfaced as raw urllib or openai exceptions with no context.
  3. The async implementation had bugs: A NameError (_os undefined at line 62), deprecated asyncio.get_event_loop() calls (3 sites), and a new aiohttp.ClientSession created per request causing connection churn.

What This PR Does

1 - Code Cleanup & Modularity

Broke the two monoliths into focused, single-responsibility modules:

New Module What It Owns What It Replaced
models.py Pydantic MemoryRecord, MemoryRole/MemoryType enums, SearchResult Raw dict construction via _make_memory()
exceptions.py 10-class exception hierarchy under AgentMemoryError Bare ValueError, KeyError, RuntimeError, TimeoutError scattered throughout
cosmos_memory_client.py CosmosMemoryStore — sync Cosmos CRUD, vector search, container provisioning ~400 lines of inline Cosmos logic in memory.py
embeddings.py EmbeddingsClient — sync Azure OpenAI embedding generation Same logic duplicated in memory.py, async_memory.py, and activities.py
processing.py ProcessingClient — sync Durable Functions HTTP-start + poll 3× duplicated poll loops in memory.py
_query_builder.py Shared _QueryBuilder helper for Cosmos query construction 6+ copy-pasted condition-builder blocks
aio/ subpackage Async mirrors: AsyncCosmosMemoryStore, AsyncEmbeddingsClient, AsyncProcessingClient, AsyncAgentMemory async_memory.py monolith (deleted)

memory.py dropped from 1,069 → 574 lines. It's now a thin orchestrator composing the extracted modules.

Key design decisions:

  • Plain kwargs, no config objects. Constructors accept endpoint, credential, database, etc. directly — matching the azure-cosmos SDK convention.
  • aio/ subpackage for async. Mirrors azure.cosmos / azure.cosmos.aio. Some code duplication is intentional for independent readability.
  • Backward compatible. from agent_memory_toolkit import AsyncAgentMemory still works via re-export.

2 - Async Fixes

Bug Where Fix
_os NameError async_memory.py:62 Eliminated — async code uses same kwargs pattern as sync
Deprecated asyncio.get_event_loop() 3 call sites in async polling AsyncProcessingClient uses asyncio.get_running_loop().time()
New aiohttp.ClientSession per call Every orchestrator invocation Lazy-created, reused single session with close() + async with support

3 - Error Handling & Logging

Logging — Every module now uses logging.getLogger(__name__):

  • INFO: connections, operation start/complete, batch sizes
  • DEBUG: query field names (no PII), embedding dimensions, poll status
  • WARNING: empty results, retry attempts
  • ERROR: failures with exc_info=True
  • Endpoints masked (last 8 chars), no user content or credentials logged

Exception hierarchy:

AgentMemoryError (catchable base)
├── ConfigurationError          → missing endpoint, credential
├── ValidationError             → invalid role, memory_type
├── CosmosNotConnectedError     → Cosmos op before connect()
├── CosmosOperationError        → query/write failures
├── MemoryNotFoundError         → carries memory_id, user_id, thread_id
├── EmbeddingError              → OpenAI API failures
├── ProcessingError             → Durable Functions failures
├── OrchestrationTimeoutError   → carries timeout, status_url
└── AuthenticationError         → Azure auth failures

Activities (azure_functions/activities.py):

  • _call_llm_with_retry() — exponential backoff for 429/500/503 (3 attempts, 2s base)
  • _validate_required() on all 6 activity functions
  • Cosmos ops wrapped with CosmosHttpResponseError handling
  • DRY helpers extracted: _build_transcript(), _load_prompt(), _get_llm_model()
  • Model env var standardized: LLM_MODELAI_FOUNDRY_LLM"gpt-4o"

File Change Summary

New Files (11)

File Lines Purpose
models.py 173 MemoryRecord (Pydantic), enums, SearchResult, OrchestrationResult
exceptions.py 129 Exception hierarchy with structured context attributes
_query_builder.py 43 Shared _QueryBuilder for Cosmos queries
cosmos_memory_client.py 561 Sync CosmosMemoryStore
embeddings.py 159 Sync EmbeddingsClient
processing.py 198 Sync ProcessingClient
aio/__init__.py 17 Async subpackage exports
aio/cosmos_memory_client.py 523 Async AsyncCosmosMemoryStore
aio/embeddings.py 158 Async AsyncEmbeddingsClient
aio/processing.py 202 Async AsyncProcessingClient
aio/memory.py 530 Async AsyncAgentMemory

Modified Files (4)

File Before → After What Changed
memory.py 1,069 → 574 Thin orchestrator composing sub-modules. Same public API.
__init__.py 6 → 18 Exports models, exceptions, re-exports AsyncAgentMemory from aio
activities.py 650 → 829 Retry, error handling, DRY helpers, logging, input validation
README.md Updated project structure, Python badge (3.11+), async import example, install instructions
Samples/Demo_async.ipynb Updated import to from agent_memory_toolkit.aio import AsyncAgentMemory

Deleted Files (3)

File Reason
async_memory.py Replaced by aio/memory.py
requirements.txt Consolidated into pyproject.toml [project] dependencies
requirements-dev.txt Consolidated into pyproject.toml [project.optional-dependencies] dev

New: Packaging & CI

File Purpose
pyproject.toml Full [project] metadata (name, version, requires-python >= 3.11, deps, dev deps), [tool.pytest], [tool.ruff], [tool.coverage]
uv.lock Dependency lock file
.github/workflows/ci.yml CI pipeline — lint (ruff) + unit tests (pytest + coverage), matrix across Python 3.11 / 3.12 / 3.13

New Test Files (12)

File Lines Tests What It Covers
tests/conftest.py 82 Shared fixtures
tests/unit/test_models.py 207 20 MemoryRecord, enums, serialization
tests/unit/test_exceptions.py 111 16 Exception hierarchy, attrs
tests/unit/test_query_builder.py 69 6 Query builder filters
tests/unit/test_cosmos_memory_client.py 385 22 Sync Cosmos CRUD + search
tests/unit/test_embeddings.py 203 11 Sync embeddings
tests/unit/test_processing.py 214 10 Sync processing/polling
tests/unit/test_memory.py 287 17 Sync AgentMemory orchestrator
tests/unit/aio/test_cosmos_memory_client.py 399 27 Async Cosmos + context mgr
tests/unit/aio/test_embeddings.py 241 14 Async embeddings + cleanup
tests/unit/aio/test_processing.py 287 13 Async polling + aiohttp
tests/unit/aio/test_memory.py 360 28 Async AgentMemory + lifecycle

Backward Compatibility

Concern Status
AgentMemory(cosmos_endpoint=..., ...) constructor ✅ Unchanged
from agent_memory_toolkit import AsyncAgentMemory ✅ Works (re-exported from aio)
All public method signatures ✅ Unchanged
All return types (list[dict]) ✅ Unchanged
_make_memory(), VALID_ROLES, VALID_TYPES ✅ Still exported from memory.py
Activity function signatures + decorators ✅ Unchanged
Orchestrator (function_app.py) ✅ Not modified

New preferred async import path:

from agent_memory_toolkit.aio import AsyncAgentMemory  # mirrors azure.cosmos.aio

Architecture

AgentMemory / AsyncAgentMemory              ← thin orchestrator, plain kwargs
    ├── CosmosMemoryStore                   ← all Cosmos CRUD + vector search
    │     └── _QueryBuilder                 ← shared query helper
    ├── EmbeddingsClient                    ← Azure OpenAI embeddings (lazy init)
    └── ProcessingClient                    ← Durable Functions HTTP polling
          │
          ▼
    Azure Durable Functions (activities.py)
    ├── _call_llm_with_retry()              ← exponential backoff
    ├── _build_transcript()                 ← DRY transcript builder
    └── _validate_required()                ← input validation

Dependencies

Dependency Change
pydantic (v2) AddedMemoryRecord validation and Cosmos serialization
azure-cosmos, azure-identity, openai, aiohttp Unchanged

Dev Dependencies (pip install ".[dev]")

Dependency Purpose
pytest >= 8.0 Test runner
pytest-asyncio >= 0.23 Async test support (auto mode)
pytest-cov >= 5.0 Coverage reporting
pytest-mock >= 3.12 Mock fixtures
ruff >= 0.4 Linting + formatting

CI/CD Pipeline (.github/workflows/ci.yml)

Job Python Versions What It Runs
lint 3.11, 3.12, 3.13 ruff check + ruff format --check
test 3.11, 3.12, 3.13 pytest tests/unit/ with coverage, uploads coverage.xml per version

Triggers on push to main and all PRs to main. 6 jobs total (3 versions × 2 jobs).


Unit Tests — 184 Tests, 87% Coverage

All tests are fully mocked with zero cloud dependencies. Every test runs offline.

Sync Unit Tests (tests/unit/)

Test File Tests What It Covers
test_models.py 20 MemoryRecord creation with defaults, enum validation (valid + invalid roles/types), to_cosmos_dict() key mapping + None omission, from_cosmos_dict() round-trip + Cosmos system field handling, SearchResult, OrchestrationResult
test_exceptions.py 16 All 9 exception subtypes inherit from AgentMemoryError, ConfigurationError auto-message from parameter kwarg, MemoryNotFoundError context attributes, CosmosNotConnectedError default message, OrchestrationTimeoutError attributes, base class catch-all
test_query_builder.py 6 Empty/single/multiple filters, None value skipping, get_parameters() returns copy
test_cosmos_memory_client.py 22 connect() success + missing config errors, create_store() partition key + vector index + full-text policy, upsert/upsert_batch, get_memories with all filter combos + recent_k, get_thread, update/delete (success + MemoryNotFoundError), vector_search (vector-only + hybrid/RRF), get_user_summary, _require_connected guard
test_embeddings.py 11 generate() success + lazy init reuse, api_key vs credential auth paths, missing config errors, API failure → EmbeddingError wrapping, generate_batch() order preservation via index sort, empty batch, dimensions kwarg
test_processing.py 10 invoke_orchestrator() immediate completion + multi-poll + FailedProcessingError + timeout → OrchestrationTimeoutError, missing endpoint, function key in URL, convenience method payloads (thread_summary_only, extract_facts_only, user_summary_only)
test_memory.py 17 Constructor credential resolution (on/off), all local CRUD (valid + invalid role + not found), Cosmos delegation (connect, add, push, get, search composing embeddings + vector_search), processing delegation, CosmosNotConnectedError guard

Async Unit Tests (tests/unit/aio/)

Test File Tests What It Covers
test_cosmos_memory_client.py 27 Async mirror of sync tests + close(), async with context manager, upsert_batch with batch_size
test_embeddings.py 14 Async mirror + close(), async with context manager
test_processing.py 13 Async mirror + close(), async with context manager, lazy aiohttp.ClientSession
test_memory.py 28 Async mirror + constructor, local CRUD (sync), async Cosmos/processing delegation, close() delegates to all sub-clients, async with context manager

Uncovered lines are primarily Cosmos SDK interaction paths (create_store container provisioning, update/delete Cosmos error branches) that require live Azure services — covered by integration tests in a future PR.

Lint

ruff check and ruff format --check both pass with 0 errors across all source and test files.


Suggested Review Order

  1. models.py + exceptions.py — foundational types
  2. _query_builder.py — tiny shared helper
  3. Sync sub-clients: cosmos_memory_client.py, embeddings.py, processing.py
  4. memory.py — see how it composes the above
  5. aio/ — async mirrors
  6. activities.py — error handling, retry, DRY helpers
  7. tests/conftest.pytests/unit/tests/unit/aio/ — test coverage

@aayush3011 aayush3011 marked this pull request as ready for review April 6, 2026 22:14
@jcodella jcodella requested a review from Copilot April 7, 2026 19:42
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors the Agent Memory Toolkit into smaller sync/async client modules, fixes async implementation issues, and adds structured error handling + logging across Cosmos/LLM/processing paths while modernizing packaging and CI.

Changes:

  • Split monolithic memory implementations into dedicated modules (models/exceptions/query builder + Cosmos/embeddings/processing clients, with async mirrors under aio/).
  • Add more resilient Durable Functions activities (input validation, retries, batch embeddings, richer logging).
  • Introduce pyproject.toml packaging, CI (ruff + pytest/coverage matrix), and extensive offline unit test coverage.

Reviewed changes

Copilot reviewed 35 out of 40 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
agent_memory_toolkit/models.py Adds Pydantic models/enums for validated memory and result envelopes.
agent_memory_toolkit/exceptions.py Introduces a unified exception hierarchy for catchable, typed failures.
agent_memory_toolkit/_query_builder.py Adds a small shared helper for building parameterized Cosmos WHERE clauses.
agent_memory_toolkit/_utils.py Centralizes shared validation/query/policy helpers used by sync+async clients.
agent_memory_toolkit/cosmos_memory_client.py New sync Cosmos client encapsulating CRUD + vector/hybrid search.
agent_memory_toolkit/embeddings.py New sync Azure OpenAI embeddings client (lazy init + batch support).
agent_memory_toolkit/processing.py New sync Durable Functions HTTP-start + polling client.
agent_memory_toolkit/memory.py Sync orchestrator updated to compose the extracted sub-clients.
agent_memory_toolkit/aio/init.py Exports async API surface mirroring sync package layout.
agent_memory_toolkit/aio/cosmos_memory_client.py New async Cosmos client for CRUD + vector/hybrid search.
agent_memory_toolkit/aio/embeddings.py New async embeddings client (lazy init + batch support).
agent_memory_toolkit/aio/processing.py New async Durable Functions client using a reusable aiohttp session.
agent_memory_toolkit/aio/memory.py New async orchestrator composing async sub-clients + lifecycle management.
agent_memory_toolkit/init.py Re-exports key public types/exceptions and AsyncAgentMemory entrypoint.
azure_functions/activities.py Adds validation, retries/backoff, batch embeddings, and improved logging/error handling.
tests/conftest.py Adds shared fixtures for sample docs/embeddings/credentials.
tests/unit/test_models.py Unit tests for models, validation, and Cosmos serialization round-trips.
tests/unit/test_exceptions.py Unit tests for exception inheritance and structured attributes.
tests/unit/test_query_builder.py Unit tests for query builder filter composition/copy semantics.
tests/unit/test_cosmos_memory_client.py Unit tests for sync Cosmos store CRUD/search behavior (mocked).
tests/unit/test_embeddings.py Unit tests for sync embeddings (auth modes, lazy init, ordering).
tests/unit/test_processing.py Unit tests for sync Durable Functions polling and error paths.
tests/unit/test_memory.py Unit tests for sync orchestrator delegation and local CRUD.
tests/unit/aio/test_cosmos_memory_client.py Unit tests for async Cosmos store behavior (mocked + async iterators).
tests/unit/aio/test_embeddings.py Unit tests for async embeddings client behavior and cleanup.
tests/unit/aio/test_processing.py Unit tests for async processing client session reuse, polling, close/ctx mgr.
tests/unit/aio/test_memory.py Unit tests for async orchestrator lifecycle and delegation.
.github/workflows/ci.yml Adds CI jobs for ruff + pytest/coverage across Python 3.11–3.13.
pyproject.toml Defines packaging metadata, dependencies, pytest/ruff/coverage config.
uv.lock Adds a lockfile for reproducible dependency resolution.
README.md Updates structure, install instructions, and async import guidance.
Docs/local_testing.md Updates Python version guidance and install commands for new packaging.
Docs/azure_testing.md Updates prerequisites and install commands for new packaging.
Docs/design_patterns.md Updates async import path to agent_memory_toolkit.aio.
Samples/Demo_async.ipynb Updates async demo import path to agent_memory_toolkit.aio.
requirements.txt Removed in favor of pyproject.toml dependencies.
agent_memory_toolkit/async_memory.py Deleted legacy async monolith in favor of aio/ package.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread agent_memory_toolkit/aio/cosmos_memory_client.py
Comment thread tests/unit/aio/test_cosmos_memory_client.py
Comment thread azure_functions/activities.py
Comment thread agent_memory_toolkit/aio/memory.py
Comment thread agent_memory_toolkit/processing.py
Comment thread agent_memory_toolkit/aio/processing.py
@jcodella jcodella merged commit 4dea57d into AzureCosmosDB:main Apr 7, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants