feat: Phase 1 local performance optimizations#126
Conversation
- Add per-connection write queues with backpressure - Configurable high/low water marks (default 1500/500) - Max queue size of 2000 messages - Socket drain handling for slow consumers - Add batched SQLite writes - Configurable batch size (default 50 messages) - Time-based flush (default 100ms) - Memory-based flush (default 1MB) - Metrics for monitoring batch behavior - Add token bucket rate limiter with generous defaults - 500 messages/sec sustained rate - 1000 message burst capacity - Per-agent tracking with auto-cleanup These changes optimize the local daemon experience while preparing the architecture for cloud sync improvements.
- Add SyncQueue with adaptive batching - Size trigger (100 messages) - Time trigger (200ms) - Bytes trigger (512KB) - Add gzip compression for large payloads - Threshold: 1KB - Typical 60-80% reduction for message batches - Add disk spillover for offline resilience - Automatic spill on sync failure - Recovery on startup - Configurable max spill files - Add retry with exponential backoff - 3 retries by default - Exponential delay (1s, 2s, 4s) - Integrate into CloudSyncService - Real-time queueMessageForSync() method - Stats via getSyncQueueStats() - Graceful shutdown with flush This ensures zero message loss even during network outages and reduces bandwidth usage significantly.
- cloud-sync.test.ts: await stop() before testing disconnected state - connection.test.ts: wait for setImmediate drain before checking writes
- Add bulk-ingest.ts with multi-row INSERT and staging table strategies - Optimized connection pool (20 max, 30s idle timeout, 10s connection timeout) - Auto-select strategy based on batch size: - < 1000 rows: multi-row INSERT with chunking - > 1000 rows: staging table with single dedup INSERT SELECT - Enhanced /api/daemons/messages/stats with pool health metrics - Proper JSONB serialization for data/payloadMeta fields
There was a problem hiding this comment.
Pull request overview
This PR implements Phase 1 local performance optimizations for the agent relay system, focusing on three key areas: write queue management, batched SQLite operations, and rate limiting. The changes aim to improve throughput and reliability for high-volume message processing while maintaining backward compatibility.
Changes:
- Added per-connection write queues with configurable backpressure handling to prevent blocking on slow consumers
- Implemented batched SQLite writes with configurable size, time, and memory-based flush triggers
- Added token bucket rate limiting with generous defaults (500 msg/sec sustained, 1000 burst) and per-agent tracking
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| src/storage/batched-sqlite-adapter.ts | New batched SQLite adapter that buffers writes and flushes based on size/time/memory thresholds |
| src/storage/adapter.ts | Updated storage configuration to support 'sqlite-batched' type and batch configuration options |
| src/daemon/sync-queue.ts | New optimized cloud sync queue with compression, disk spillover, and retry logic |
| src/daemon/router.ts | Integrated rate limiter into message routing with configurable limits |
| src/daemon/rate-limiter.ts | New token bucket rate limiter implementation with per-agent tracking |
| src/daemon/connection.ts | Added write queue with backpressure handling and socket drain management |
| src/daemon/connection.test.ts | Updated test to await write queue drain using setImmediate |
| src/daemon/cloud-sync.ts | Integrated optimized sync queue with spill recovery on startup |
| src/daemon/cloud-sync.test.ts | Updated tests to await async stop() method |
| src/cloud/db/index.ts | Exported new bulk ingest utilities for high-volume operations |
| src/cloud/db/drizzle.ts | Enhanced connection pool with optimized settings and error logging |
| src/cloud/db/bulk-ingest.ts | New bulk insert utilities using raw SQL for performance |
| src/cloud/api/daemons.ts | Updated message sync endpoint to use optimized bulk insert with health monitoring |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| * Queue a message for batched writing. | ||
| * May trigger an immediate flush if thresholds are exceeded. | ||
| */ | ||
| async saveMessage(message: StoredMessage): Promise<void> { |
There was a problem hiding this comment.
The saveMessage method lacks test coverage for the batching behavior, including verification that messages are properly queued and flushed based on size, time, and memory thresholds. Consider adding tests that verify flush conditions and ensure no message loss occurs during batched operations.
| /** | ||
| * Flush all pending messages to SQLite. | ||
| */ | ||
| async flush(): Promise<void> { |
There was a problem hiding this comment.
The flush method lacks test coverage for concurrent flush scenarios and error handling during batch writes. Consider adding tests that verify the re-queuing behavior when writes fail and ensure that concurrent flushes are properly synchronized.
| * Queue a message for sync to cloud. | ||
| * May trigger an immediate flush if thresholds are exceeded. | ||
| */ | ||
| async enqueue(message: StoredMessage): Promise<void> { |
There was a problem hiding this comment.
The enqueue method lacks test coverage for compression triggers, disk spillover behavior, and recovery scenarios. Consider adding tests that verify compression is applied when payloads exceed the threshold and that spilled messages are properly recovered on startup.
| try { | ||
| await fs.mkdir(this.config.spillDir, { recursive: true }); | ||
|
|
||
| const filename = `spill-${Date.now()}-${Math.random().toString(36).slice(2, 10)}.json`; |
There was a problem hiding this comment.
Using Math.random() for file naming could lead to collisions in high-concurrency scenarios. Consider using a more robust identifier such as a UUID or an atomic counter to ensure uniqueness.
| * Try to acquire a token for the given agent. | ||
| * Returns true if the message should be allowed, false if rate limited. | ||
| */ | ||
| tryAcquire(agentName: string): boolean { |
There was a problem hiding this comment.
The tryAcquire method lacks test coverage for the token bucket refill logic, especially edge cases such as rapid successive calls and boundary conditions when tokens are at exactly 1.0. Consider adding tests that verify correct token accounting across various time intervals.
| * Drain the write queue to the socket. | ||
| * Respects socket backpressure by waiting for 'drain' events. | ||
| */ | ||
| private drain(): void { |
There was a problem hiding this comment.
The drain method lacks test coverage for socket backpressure scenarios, including verification that the drain event handler is properly registered and that messages are queued when the socket buffer is full. Consider adding tests that simulate slow consumers and verify correct backpressure handling.
| export async function bulkInsertMessages( | ||
| pool: Pool, | ||
| messages: NewAgentMessage[], | ||
| chunkSize = 100 | ||
| ): Promise<BulkInsertResult> { |
There was a problem hiding this comment.
The bulkInsertMessages function lacks test coverage for chunk processing, error handling within chunks, and deduplication behavior. Consider adding tests that verify correct handling of partial failures and that duplicate messages are properly counted.
| export async function streamingBulkInsert( | ||
| pool: Pool, | ||
| messages: NewAgentMessage[] | ||
| ): Promise<BulkInsertResult> { |
There was a problem hiding this comment.
The streamingBulkInsert function lacks test coverage for transaction rollback scenarios and staging table cleanup. Consider adding tests that verify the temporary table is properly cleaned up on commit/rollback and that errors during staging don't leave orphaned data.
- Replace Math.random() with UUID for spill file naming to avoid collisions in high-concurrency scenarios - Add comprehensive tests for batched-sqlite-adapter.ts covering batch size, time, and memory threshold triggers - Add tests for sync-queue.ts including enqueue behavior, compression, and disk spillover with recovery - Add tests for rate-limiter.ts covering token bucket refill logic and boundary conditions - Add tests for connection.ts drain/backpressure handling including socket buffer full scenarios and queue cleanup - Add tests for bulk-ingest.ts covering chunk processing, error handling, and deduplication behavior 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add git-remote.ts utility for parsing git remote URLs - Add findByRepoFullName() to workspace queries for repo lookup - Include repoFullName in message sync payload from daemon - Auto-link daemon to workspace when syncing if repo matches - Better error messages when workspace can't be resolved This enables messages to sync to cloud automatically without requiring explicit workspace linking, as long as the repo is connected to a workspace in the dashboard. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…socket-baseline-architecture-zpoVU
- Prefix unused caught errors with _ (codex-auth-helper, dashboard-server) - Prefix unused parameters with _ (workspaces, sync-queue, trajectory/config, auth-detection) - Remove unused imports (bulk-ingest.test, drizzle, intro-expiration, daemon/api, cli-auth.test, rate-limiter.test, sync-queue.test, user-directory.test, batched-sqlite-adapter.test, pty-wrapper, tmux-wrapper) - Change let to const where variable is never reassigned (sync-queue.test) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Runs health check endpoint on a separate worker thread to ensure responses even when main event loop is blocked by heavy compute tasks (e.g., next build). Includes: - health-worker.ts: Minimal HTTP server in worker thread - health-worker-manager.ts: Manages worker lifecycle and stats updates - Updated Fly provisioner to use dedicated health port (3889) - Integrated into dashboard-server with fallback to main thread 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Allows users to send Ctrl+C (SIGINT) to agents stuck in loops, giving them an opportunity to refocus. Includes: - AgentManager: interrupt() and interruptByName() methods - Daemon API: POST /agents/:id/interrupt and /agents/by-name/:name/interrupt - Dashboard API: interruptAgent() client method - LogViewerPanel: Interrupt button with warning color styling 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
cc/ @willwashburn |
Summary
Local Performance Optimizations
Per-connection write queues with backpressure
Batched SQLite writes
Token bucket rate limiter with generous defaults
Cloud Sync Improvements
Auto-detect workspace from git remote
AgentWorkforce/relay) from git remoteAuto-link daemon to workspace
New DB query: findByRepoFullName
Workspace Reliability
Dashboard Improvements
/agents/by-name/:name/interruptendpointTest plan
npm test- all 1197 tests passnpm run build- compiles successfully🤖 Generated with Claude Code