From b4a201182987914b269ac7e30cda134191b4bb15 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Thu, 29 Jan 2026 15:41:31 +0100 Subject: [PATCH 1/2] watch jsonl --- packages/storage/src/adapter.ts | 10 +++- packages/storage/src/jsonl-adapter.ts | 86 +++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) diff --git a/packages/storage/src/adapter.ts b/packages/storage/src/adapter.ts index 132451aa1..624811351 100644 --- a/packages/storage/src/adapter.ts +++ b/packages/storage/src/adapter.ts @@ -147,6 +147,8 @@ export interface StorageConfig { maxBatchBytes?: number; logBatches?: boolean; }; + /** Watch for file changes and auto-reload (JSONL adapter only) */ + watchForChanges?: boolean; } /** @@ -322,6 +324,7 @@ export async function createStorageAdapter( type: config?.type ?? envConfig.type ?? 'jsonl', path: config?.path ?? envConfig.path ?? dbPath, url: config?.url ?? envConfig.url, + watchForChanges: config?.watchForChanges, }; const storageType = finalConfig.type?.toLowerCase(); @@ -371,6 +374,7 @@ export async function createStorageAdapter( const adapter = new JsonlStorageAdapter({ baseDir, reason: 'upgrade to Node.js 22+ or run: npm rebuild better-sqlite3', + watchForChanges: finalConfig.watchForChanges, }); await adapter.init(); return adapter; @@ -393,7 +397,10 @@ export async function createStorageAdapter( const { JsonlStorageAdapter } = await import('./jsonl-adapter.js'); const baseDir = path.dirname(finalConfig.path!); console.error('[storage] Using JSONL storage'); - const adapter = new JsonlStorageAdapter({ baseDir }); + const adapter = new JsonlStorageAdapter({ + baseDir, + watchForChanges: finalConfig.watchForChanges, + }); await adapter.init(); return adapter; } @@ -417,6 +424,7 @@ export async function createStorageAdapter( const adapter = new JsonlStorageAdapter({ baseDir, reason: 'upgrade to Node.js 22+ or run: npm rebuild better-sqlite3', + watchForChanges: finalConfig.watchForChanges, }); await adapter.init(); return adapter; diff --git a/packages/storage/src/jsonl-adapter.ts b/packages/storage/src/jsonl-adapter.ts index 18be0bef8..4795f41af 100644 --- a/packages/storage/src/jsonl-adapter.ts +++ b/packages/storage/src/jsonl-adapter.ts @@ -19,10 +19,15 @@ export interface JsonlAdapterOptions { cleanupIntervalMs?: number; /** Optional reason for falling back to JSONL (surfaced in health check) */ reason?: string; + /** Watch for file changes and auto-reload (default: false) */ + watchForChanges?: boolean; + /** Debounce interval for file watching in milliseconds (default: 100ms) */ + watchDebounceMs?: number; } const DEFAULT_RETENTION_MS = 7 * 24 * 60 * 60 * 1000; const DEFAULT_CLEANUP_INTERVAL_MS = 60 * 60 * 1000; +const DEFAULT_WATCH_DEBOUNCE_MS = 100; interface MessageRecord { type: 'message'; @@ -70,6 +75,13 @@ export class JsonlStorageAdapter implements StorageAdapter { private sessions: Map = new Map(); private resumeIndex: Map = new Map(); + private watchForChanges: boolean; + private watchDebounceMs: number; + private messageWatcher?: fs.FSWatcher; + private sessionWatcher?: fs.FSWatcher; + private reloadDebounceTimer?: NodeJS.Timeout; + private sessionReloadDebounceTimer?: NodeJS.Timeout; + constructor(options: JsonlAdapterOptions) { this.baseDir = options.baseDir; this.messageDir = path.join(this.baseDir, 'messages'); @@ -77,6 +89,8 @@ export class JsonlStorageAdapter implements StorageAdapter { this.retentionMs = options.messageRetentionMs ?? DEFAULT_RETENTION_MS; this.cleanupIntervalMs = options.cleanupIntervalMs ?? DEFAULT_CLEANUP_INTERVAL_MS; this.fallbackReason = options.reason; + this.watchForChanges = options.watchForChanges ?? false; + this.watchDebounceMs = options.watchDebounceMs ?? DEFAULT_WATCH_DEBOUNCE_MS; } async init(): Promise { @@ -90,6 +104,10 @@ export class JsonlStorageAdapter implements StorageAdapter { if (this.cleanupIntervalMs > 0) { this.startCleanupTimer(); } + + if (this.watchForChanges) { + this.startFileWatching(); + } } async close(): Promise { @@ -97,6 +115,7 @@ export class JsonlStorageAdapter implements StorageAdapter { clearInterval(this.cleanupTimer); this.cleanupTimer = undefined; } + this.stopFileWatching(); this.messages.clear(); this.deletedMessages.clear(); this.sessions.clear(); @@ -405,6 +424,73 @@ export class JsonlStorageAdapter implements StorageAdapter { } } + private startFileWatching(): void { + // Watch message directory for changes + try { + this.messageWatcher = fs.watch(this.messageDir, (eventType, filename) => { + if (filename && filename.endsWith('.jsonl')) { + this.debouncedReloadMessages(); + } + }); + + if (this.messageWatcher.unref) { + this.messageWatcher.unref(); + } + } catch { + // Directory may not exist yet or watching not supported + } + + // Watch session file for changes + try { + this.sessionWatcher = fs.watch(this.sessionFile, () => { + this.debouncedReloadSessions(); + }); + + if (this.sessionWatcher.unref) { + this.sessionWatcher.unref(); + } + } catch { + // File may not exist yet or watching not supported + } + } + + private stopFileWatching(): void { + if (this.messageWatcher) { + this.messageWatcher.close(); + this.messageWatcher = undefined; + } + if (this.sessionWatcher) { + this.sessionWatcher.close(); + this.sessionWatcher = undefined; + } + if (this.reloadDebounceTimer) { + clearTimeout(this.reloadDebounceTimer); + this.reloadDebounceTimer = undefined; + } + if (this.sessionReloadDebounceTimer) { + clearTimeout(this.sessionReloadDebounceTimer); + this.sessionReloadDebounceTimer = undefined; + } + } + + private debouncedReloadMessages(): void { + if (this.reloadDebounceTimer) { + clearTimeout(this.reloadDebounceTimer); + } + this.reloadDebounceTimer = setTimeout(() => { + this.loadMessagesFromDisk().catch(() => {}); + }, this.watchDebounceMs); + } + + private debouncedReloadSessions(): void { + if (this.sessionReloadDebounceTimer) { + clearTimeout(this.sessionReloadDebounceTimer); + } + this.sessionReloadDebounceTimer = setTimeout(() => { + this.loadSessionsFromDisk().catch(() => {}); + }, this.watchDebounceMs); + } + private async loadMessagesFromDisk(): Promise { this.messages.clear(); this.deletedMessages.clear(); From afcf67f3dab7fb18b07a6c3567cfbc53bb2bb8f5 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Thu, 29 Jan 2026 15:42:31 +0100 Subject: [PATCH 2/2] tests --- packages/storage/src/jsonl-adapter.test.ts | 31 ++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/packages/storage/src/jsonl-adapter.test.ts b/packages/storage/src/jsonl-adapter.test.ts index 00e39bc1b..a01c8b283 100644 --- a/packages/storage/src/jsonl-adapter.test.ts +++ b/packages/storage/src/jsonl-adapter.test.ts @@ -197,4 +197,35 @@ describe('JsonlStorageAdapter', () => { const sessions = await adapter.getSessions({ agentName: 'Agent' }); expect(sessions[0]?.messageCount).toBe(2); }); + + it('auto-reloads messages when watchForChanges is enabled', async () => { + // Create a watching adapter + const watchingAdapter = new JsonlStorageAdapter({ + baseDir, + cleanupIntervalMs: 0, + watchForChanges: true, + watchDebounceMs: 50, + }); + await watchingAdapter.init(); + + // Initially no messages (adapter doesn't share with the default one) + const before = await watchingAdapter.getMessages({}); + const countBefore = before.length; + + // Write a message directly to disk using another adapter (simulating daemon) + const writerAdapter = new JsonlStorageAdapter({ baseDir, cleanupIntervalMs: 0 }); + await writerAdapter.init(); + await writerAdapter.saveMessage(makeMessage({ id: 'watch-test', body: 'from daemon' })); + await writerAdapter.close(); + + // Wait for debounce + file watcher to trigger reload + await new Promise(resolve => setTimeout(resolve, 200)); + + // Should now see the new message + const after = await watchingAdapter.getMessages({}); + expect(after.length).toBe(countBefore + 1); + expect(after.some(m => m.id === 'watch-test')).toBe(true); + + await watchingAdapter.close(); + }); });