feat(memory) PR 2/5: capture — 5 lifecycle hooks + HTTP endpoints#3
feat(memory) PR 2/5: capture — 5 lifecycle hooks + HTTP endpoints#3fazleelahhee wants to merge 1 commit into
Conversation
PR 2 of 5 against feature/memory-claude-mem-parity, stacked on PR 1. Adds the auto-capture pipeline: - src/context_engine/memory/hooks.py — aiohttp handlers for /hooks/SessionStart, /hooks/UserPromptSubmit, /hooks/PostToolUse, /hooks/Stop, /hooks/SessionEnd. Each writes the appropriate row(s) to memory.db. Compression for the just-ended turn is enqueued in pending_compressions (UNIQUE constraint dedupes Stop + next-prompt double-fire). All errors are logged and return 202 — capture is best-effort and must never block the user's flow. - src/context_engine/memory/hook_server.py — loopback aiohttp listener on a random free port, started as a background asyncio task from cce serve's _run_serve. Port written to <storage_base>/serve.port. Cleanly shut down on MCP server exit. - src/context_engine/memory/hook_installer.py — installs ~/.cce/hooks/cce_hook.sh (the thin shell shim that POSTs hook payloads to the local port) and wires .claude/settings.json with all 5 lifecycle entries. Idempotent. Preserves user-added hooks. Uninstall removes only entries whose command points at our script. - cce init wires the installer in step 5; _run_serve spawns the hook server alongside the MCP stdio loop and watcher. Tests (15 new, full suite 326 passed): - tests/memory/test_hooks.py: integration tests via aiohttp_client for all 5 endpoints, including dedup, prompt-number assignment, payload sidecar table, and 400 on missing session_id. - tests/memory/test_hook_installer.py: script write + chmod, idempotent reinstall, settings.json merge preserving user hooks, uninstall removing only ours. pyproject.toml: pytest-aiohttp added to dev/dependency-groups. No change to recall surface — session_recall still reads JSON. PR 4 retires the JSON path and points recall at memory.db.
There was a problem hiding this comment.
Pull request overview
Adds the “capture” half of the memory system by introducing an internal loopback HTTP listener, wiring five Claude Code lifecycle hook endpoints into it, and installing project hook configuration + a forwarding shell script via cce init, with accompanying integration tests.
Changes:
- Implement 5 aiohttp POST endpoints to capture session/prompt/tool/stop/end events into
memory.dband enqueue compression work. - Start a loopback hook HTTP server during
cce serveand write the discovered port to a file for the hook script to find. - Add an installer/uninstaller for the hook script + Claude settings hooks, plus pytest coverage and dev dependency updates.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
tests/memory/test_hooks.py |
Integration tests for the 5 lifecycle-hook HTTP endpoints and DB side-effects. |
tests/memory/test_hook_installer.py |
Tests for installing/uninstalling the hook script and Claude settings entries. |
src/context_engine/memory/hooks.py |
New aiohttp handlers for SessionStart/UserPromptSubmit/PostToolUse/Stop/SessionEnd and compression enqueue helper. |
src/context_engine/memory/hook_server.py |
New loopback aiohttp server startup that opens the memory DB and writes serve.port. |
src/context_engine/memory/hook_installer.py |
New installer that writes ~/.cce/hooks/cce_hook.sh and wires the 5 lifecycle hooks into Claude settings. |
src/context_engine/cli.py |
Wire hook installation into cce init and start/stop the hook server during cce serve. |
pyproject.toml |
Add pytest-aiohttp to dev dependencies. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| HOOK_NAME="${1:-unknown}" | ||
| PORT_FILE="${HOME}/.cce/projects/$(basename "${PWD}")/serve.port" | ||
| [ -r "${PORT_FILE}" ] || exit 0 | ||
| PORT="$(cat "${PORT_FILE}" 2>/dev/null)" | ||
| [ -n "${PORT}" ] || exit 0 |
There was a problem hiding this comment.
The installed hook script hardcodes the port file path to ${HOME}/.cce/projects/.../serve.port, but the server writes it to Path(config.storage_path) / project_name / serve.port (configurable via storage.path). With non-default storage_path, capture won’t work. Consider writing the port file to a fixed location (e.g. under ~/.cce/hooks/), or generating the script with the resolved storage_path, or exporting the storage base via an env var that both server and script use.
| def install_settings(project_dir: Path) -> dict: | ||
| """Wire all 5 lifecycle hooks into <project>/.claude/settings.json. | ||
|
|
||
| Idempotent. Preserves any existing user hooks. Returns a summary dict | ||
| with `added` (hook names we wrote) and `skipped` (hook names already | ||
| present). | ||
| """ | ||
| settings_dir = project_dir / ".claude" | ||
| settings_path = settings_dir / "settings.json" | ||
| settings_dir.mkdir(parents=True, exist_ok=True) | ||
|
|
There was a problem hiding this comment.
This installer writes to <project>/.claude/settings.json, but the existing cce init SessionStart hook uses .claude/settings.local.json and .gitignore is set up to ignore only settings.local.json (see src/context_engine/project_commands.py). Writing settings.json risks committing per-machine hook config. Consider installing these lifecycle hooks into settings.local.json as well (or updating gitignore/docs accordingly).
| assert pending == [{"kind": "turn", "session_id": "abc", "prompt_number": 1}] \ | ||
| if isinstance(pending[0], dict) else len(pending) == 1 |
There was a problem hiding this comment.
This assertion is brittle/incorrect with sqlite3.Row (the repo’s memory_db.connect() sets row_factory=sqlite3.Row). In the non-dict case it only checks len(pending) == 1 (and will IndexError if the list is empty), so it doesn’t actually validate the queued row content. Prefer asserting len(pending) == 1 and then checking pending[0]["kind"], session_id, and prompt_number explicitly.
| assert pending == [{"kind": "turn", "session_id": "abc", "prompt_number": 1}] \ | |
| if isinstance(pending[0], dict) else len(pending) == 1 | |
| assert len(pending) == 1 | |
| assert pending[0]["kind"] == "turn" | |
| assert pending[0]["session_id"] == "abc" | |
| assert pending[0]["prompt_number"] == 1 |
|
|
||
|
|
||
| def _now_iso(epoch: int | None = None) -> str: | ||
| return time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(epoch or _now_epoch())) |
There was a problem hiding this comment.
_now_iso() uses epoch or _now_epoch(), which treats epoch=0 as falsy and substitutes the current time. Use an explicit if epoch is None check so epoch 0 (or other falsy-but-valid values) round-trip correctly.
| return time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(epoch or _now_epoch())) | |
| value = _now_epoch() if epoch is None else epoch | |
| return time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(value)) |
| conn.execute( | ||
| "INSERT OR IGNORE INTO prompts " | ||
| "(session_id, prompt_number, prompt_text, created_at_epoch, created_at) " | ||
| "VALUES (?, ?, ?, ?, ?)", | ||
| (session_id, int(prompt_number), str(prompt_text), epoch, _now_iso(epoch)), | ||
| ) | ||
| conn.execute( | ||
| "UPDATE sessions SET prompt_count = prompt_count + 1 WHERE id = ?", | ||
| (session_id,), | ||
| ) | ||
| # Enqueue previous turn for compression. The session may have N-1 | ||
| # prompts now; compress turn N-1. | ||
| if int(prompt_number) > 1: |
There was a problem hiding this comment.
handle_user_prompt_submit increments sessions.prompt_count even when the prompt insert is skipped by INSERT OR IGNORE (e.g., retries / duplicate prompt_number). This can over-count prompts. Consider only updating prompt_count when the prompt insert actually inserted a row (e.g., check cursor.rowcount) or compute prompt_count from prompts instead of maintaining it manually.
| conn.execute( | |
| "INSERT OR IGNORE INTO prompts " | |
| "(session_id, prompt_number, prompt_text, created_at_epoch, created_at) " | |
| "VALUES (?, ?, ?, ?, ?)", | |
| (session_id, int(prompt_number), str(prompt_text), epoch, _now_iso(epoch)), | |
| ) | |
| conn.execute( | |
| "UPDATE sessions SET prompt_count = prompt_count + 1 WHERE id = ?", | |
| (session_id,), | |
| ) | |
| # Enqueue previous turn for compression. The session may have N-1 | |
| # prompts now; compress turn N-1. | |
| if int(prompt_number) > 1: | |
| insert_cursor = conn.execute( | |
| "INSERT OR IGNORE INTO prompts " | |
| "(session_id, prompt_number, prompt_text, created_at_epoch, created_at) " | |
| "VALUES (?, ?, ?, ?, ?)", | |
| (session_id, int(prompt_number), str(prompt_text), epoch, _now_iso(epoch)), | |
| ) | |
| inserted = insert_cursor.rowcount == 1 | |
| if inserted: | |
| conn.execute( | |
| "UPDATE sessions SET prompt_count = prompt_count + 1 WHERE id = ?", | |
| (session_id,), | |
| ) | |
| # Enqueue previous turn for compression only when a new prompt was | |
| # inserted. The session may have N-1 prompts now; compress turn N-1. | |
| if inserted and int(prompt_number) > 1: |
| def _find_free_port() -> int: | ||
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | ||
| s.bind(("127.0.0.1", 0)) | ||
| return s.getsockname()[1] | ||
|
|
||
|
|
||
| async def start_hook_server( | ||
| *, | ||
| storage_base: Path, | ||
| project_name: str, | ||
| ) -> tuple[web.AppRunner, int]: | ||
| """Spin up the hook HTTP listener. Returns (runner, port). | ||
|
|
||
| Caller is responsible for `await runner.cleanup()` on shutdown. | ||
| """ | ||
| db_path = memory_db.memory_db_path(storage_base) | ||
| conn = memory_db.connect(db_path) | ||
|
|
||
| app = web.Application() | ||
| app["memory_db"] = conn | ||
| app["project_name"] = project_name | ||
| add_routes(app) | ||
|
|
||
| async def _close_db(app): | ||
| try: | ||
| app["memory_db"].close() | ||
| except Exception: | ||
| log.exception("memory_db close failed") | ||
|
|
||
| app.on_cleanup.append(_close_db) | ||
|
|
||
| runner = web.AppRunner(app) | ||
| await runner.setup() | ||
|
|
||
| port = _find_free_port() | ||
| site = web.TCPSite(runner, host="127.0.0.1", port=port) | ||
| await site.start() |
There was a problem hiding this comment.
The port selection is subject to a TOCTOU race: _find_free_port() binds then closes the socket, and TCPSite later tries to bind the same port, which can be taken in between. Prefer letting the OS choose by passing port=0 to TCPSite and then reading the bound port from the created server socket.
|
|
||
| app = web.Application() | ||
| app["memory_db"] = conn | ||
| app["project_name"] = project_name | ||
| add_routes(app) | ||
|
|
||
| async def _close_db(app): | ||
| try: | ||
| app["memory_db"].close() | ||
| except Exception: | ||
| log.exception("memory_db close failed") | ||
|
|
||
| app.on_cleanup.append(_close_db) | ||
|
|
||
| runner = web.AppRunner(app) | ||
| await runner.setup() | ||
|
|
||
| port = _find_free_port() | ||
| site = web.TCPSite(runner, host="127.0.0.1", port=port) | ||
| await site.start() | ||
|
|
||
| port_file = Path(storage_base) / "serve.port" | ||
| port_file.parent.mkdir(parents=True, exist_ok=True) | ||
| port_file.write_text(str(port)) | ||
|
|
||
| log.info("Memory hook server listening on 127.0.0.1:%d", port) | ||
| return runner, port |
There was a problem hiding this comment.
If start_hook_server() raises after memory_db.connect(db_path) (e.g., runner/site setup failure), the SQLite connection will leak because it’s only closed via app.on_cleanup. Consider wrapping startup in try/except/finally to close conn on exceptions before returning/raising.
| app = web.Application() | |
| app["memory_db"] = conn | |
| app["project_name"] = project_name | |
| add_routes(app) | |
| async def _close_db(app): | |
| try: | |
| app["memory_db"].close() | |
| except Exception: | |
| log.exception("memory_db close failed") | |
| app.on_cleanup.append(_close_db) | |
| runner = web.AppRunner(app) | |
| await runner.setup() | |
| port = _find_free_port() | |
| site = web.TCPSite(runner, host="127.0.0.1", port=port) | |
| await site.start() | |
| port_file = Path(storage_base) / "serve.port" | |
| port_file.parent.mkdir(parents=True, exist_ok=True) | |
| port_file.write_text(str(port)) | |
| log.info("Memory hook server listening on 127.0.0.1:%d", port) | |
| return runner, port | |
| runner: web.AppRunner | None = None | |
| try: | |
| app = web.Application() | |
| app["memory_db"] = conn | |
| app["project_name"] = project_name | |
| add_routes(app) | |
| async def _close_db(app): | |
| try: | |
| app["memory_db"].close() | |
| except Exception: | |
| log.exception("memory_db close failed") | |
| app.on_cleanup.append(_close_db) | |
| runner = web.AppRunner(app) | |
| await runner.setup() | |
| port = _find_free_port() | |
| site = web.TCPSite(runner, host="127.0.0.1", port=port) | |
| await site.start() | |
| port_file = Path(storage_base) / "serve.port" | |
| port_file.parent.mkdir(parents=True, exist_ok=True) | |
| port_file.write_text(str(port)) | |
| log.info("Memory hook server listening on 127.0.0.1:%d", port) | |
| return runner, port | |
| except Exception: | |
| if runner is not None: | |
| try: | |
| await runner.cleanup() | |
| except Exception: | |
| log.exception("hook server cleanup failed after startup error") | |
| else: | |
| try: | |
| conn.close() | |
| except Exception: | |
| log.exception("memory_db close failed") | |
| raise |
Wire-up audit on main surfaced 8 items. All addressed. #1 production: RRF dedup broken in dual-write window (live-confirmed duplicate in session_recall). Fixed by canonicalising _content_key through grammar.compress(lite). #2 ux: dashboard returned raw compressed bytes. /api/memory/* now applies grammar.expand on read. #3 cce sessions migrate now compresses on insert (homogeneous storage). #4-7 test gaps closed. #8 confirmed reserved-for-future column. Tests: 603/603 (+25 new). CI green on Python 3.11 / 3.12 / 3.13. Live-verified the dedup fix on /home/fazle/trading/v3.
PR 2 of 5 — Capture
Stacked on PR 1 (foundation). Base: `feature/memory-claude-mem-parity`.
Wires the auto-capture half of the system:
Capture flow per turn
```
SessionStart → INSERT INTO sessions (status='active')
UserPromptSubmit (n) → INSERT INTO prompts; if n>1 enqueue compress(turn n-1)
PostToolUse → INSERT INTO tool_event_payloads + tool_events
Stop → enqueue compress(turn n)
SessionEnd → UPDATE sessions SET status='completed';
enqueue compress(session_rollup)
```
Double-enqueue (Stop + next UserPromptSubmit) is deduped by the UNIQUE constraint on `pending_compressions(kind, session_id, prompt_number)`.
Out of scope (lands later)
So memory.db starts accumulating real capture data here; recall surface is unchanged from main until PR 4.
Test plan