Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion packages/storage/src/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ export interface StorageConfig {
maxBatchBytes?: number;
logBatches?: boolean;
};
/** Watch for file changes and auto-reload (JSONL adapter only) */
watchForChanges?: boolean;
}

/**
Expand Down Expand Up @@ -322,6 +324,7 @@ export async function createStorageAdapter(
type: config?.type ?? envConfig.type ?? 'jsonl',
path: config?.path ?? envConfig.path ?? dbPath,
url: config?.url ?? envConfig.url,
watchForChanges: config?.watchForChanges,
};

const storageType = finalConfig.type?.toLowerCase();
Expand Down Expand Up @@ -371,6 +374,7 @@ export async function createStorageAdapter(
const adapter = new JsonlStorageAdapter({
baseDir,
reason: 'upgrade to Node.js 22+ or run: npm rebuild better-sqlite3',
watchForChanges: finalConfig.watchForChanges,
});
await adapter.init();
return adapter;
Expand All @@ -393,7 +397,10 @@ export async function createStorageAdapter(
const { JsonlStorageAdapter } = await import('./jsonl-adapter.js');
const baseDir = path.dirname(finalConfig.path!);
console.error('[storage] Using JSONL storage');
const adapter = new JsonlStorageAdapter({ baseDir });
const adapter = new JsonlStorageAdapter({
baseDir,
watchForChanges: finalConfig.watchForChanges,
});
await adapter.init();
return adapter;
}
Expand All @@ -417,6 +424,7 @@ export async function createStorageAdapter(
const adapter = new JsonlStorageAdapter({
baseDir,
reason: 'upgrade to Node.js 22+ or run: npm rebuild better-sqlite3',
watchForChanges: finalConfig.watchForChanges,
});
await adapter.init();
return adapter;
Expand Down
31 changes: 31 additions & 0 deletions packages/storage/src/jsonl-adapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,35 @@ describe('JsonlStorageAdapter', () => {
const sessions = await adapter.getSessions({ agentName: 'Agent' });
expect(sessions[0]?.messageCount).toBe(2);
});

it('auto-reloads messages when watchForChanges is enabled', async () => {
// Create a watching adapter
const watchingAdapter = new JsonlStorageAdapter({
baseDir,
cleanupIntervalMs: 0,
watchForChanges: true,
watchDebounceMs: 50,
});
await watchingAdapter.init();

// Initially no messages (adapter doesn't share with the default one)
const before = await watchingAdapter.getMessages({});
const countBefore = before.length;

// Write a message directly to disk using another adapter (simulating daemon)
const writerAdapter = new JsonlStorageAdapter({ baseDir, cleanupIntervalMs: 0 });
await writerAdapter.init();
await writerAdapter.saveMessage(makeMessage({ id: 'watch-test', body: 'from daemon' }));
await writerAdapter.close();

// Wait for debounce + file watcher to trigger reload
await new Promise(resolve => setTimeout(resolve, 200));

// Should now see the new message
const after = await watchingAdapter.getMessages({});
expect(after.length).toBe(countBefore + 1);
expect(after.some(m => m.id === 'watch-test')).toBe(true);

await watchingAdapter.close();
});
});
86 changes: 86 additions & 0 deletions packages/storage/src/jsonl-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ export interface JsonlAdapterOptions {
cleanupIntervalMs?: number;
/** Optional reason for falling back to JSONL (surfaced in health check) */
reason?: string;
/** Watch for file changes and auto-reload (default: false) */
watchForChanges?: boolean;
/** Debounce interval for file watching in milliseconds (default: 100ms) */
watchDebounceMs?: number;
}

const DEFAULT_RETENTION_MS = 7 * 24 * 60 * 60 * 1000;
const DEFAULT_CLEANUP_INTERVAL_MS = 60 * 60 * 1000;
const DEFAULT_WATCH_DEBOUNCE_MS = 100;

interface MessageRecord {
type: 'message';
Expand Down Expand Up @@ -70,13 +75,22 @@ export class JsonlStorageAdapter implements StorageAdapter {
private sessions: Map<string, StoredSession> = new Map();
private resumeIndex: Map<string, string> = new Map();

private watchForChanges: boolean;
private watchDebounceMs: number;
private messageWatcher?: fs.FSWatcher;
private sessionWatcher?: fs.FSWatcher;
private reloadDebounceTimer?: NodeJS.Timeout;
private sessionReloadDebounceTimer?: NodeJS.Timeout;

constructor(options: JsonlAdapterOptions) {
this.baseDir = options.baseDir;
this.messageDir = path.join(this.baseDir, 'messages');
this.sessionFile = path.join(this.baseDir, 'sessions.jsonl');
this.retentionMs = options.messageRetentionMs ?? DEFAULT_RETENTION_MS;
this.cleanupIntervalMs = options.cleanupIntervalMs ?? DEFAULT_CLEANUP_INTERVAL_MS;
this.fallbackReason = options.reason;
this.watchForChanges = options.watchForChanges ?? false;
this.watchDebounceMs = options.watchDebounceMs ?? DEFAULT_WATCH_DEBOUNCE_MS;
}

async init(): Promise<void> {
Expand All @@ -90,13 +104,18 @@ export class JsonlStorageAdapter implements StorageAdapter {
if (this.cleanupIntervalMs > 0) {
this.startCleanupTimer();
}

if (this.watchForChanges) {
this.startFileWatching();
}
}

async close(): Promise<void> {
if (this.cleanupTimer) {
clearInterval(this.cleanupTimer);
this.cleanupTimer = undefined;
}
this.stopFileWatching();
this.messages.clear();
this.deletedMessages.clear();
this.sessions.clear();
Expand Down Expand Up @@ -405,6 +424,73 @@ export class JsonlStorageAdapter implements StorageAdapter {
}
}

private startFileWatching(): void {
// Watch message directory for changes
try {
this.messageWatcher = fs.watch(this.messageDir, (eventType, filename) => {
if (filename && filename.endsWith('.jsonl')) {
this.debouncedReloadMessages();
}
});

if (this.messageWatcher.unref) {
this.messageWatcher.unref();
}
} catch {
// Directory may not exist yet or watching not supported
}

// Watch session file for changes
try {
this.sessionWatcher = fs.watch(this.sessionFile, () => {
this.debouncedReloadSessions();
});

if (this.sessionWatcher.unref) {
this.sessionWatcher.unref();
}
} catch {
// File may not exist yet or watching not supported
}
}

private stopFileWatching(): void {
if (this.messageWatcher) {
this.messageWatcher.close();
this.messageWatcher = undefined;
}
if (this.sessionWatcher) {
this.sessionWatcher.close();
this.sessionWatcher = undefined;
}
if (this.reloadDebounceTimer) {
clearTimeout(this.reloadDebounceTimer);
this.reloadDebounceTimer = undefined;
}
if (this.sessionReloadDebounceTimer) {
clearTimeout(this.sessionReloadDebounceTimer);
this.sessionReloadDebounceTimer = undefined;
}
}

private debouncedReloadMessages(): void {
if (this.reloadDebounceTimer) {
clearTimeout(this.reloadDebounceTimer);
}
this.reloadDebounceTimer = setTimeout(() => {
this.loadMessagesFromDisk().catch(() => {});
}, this.watchDebounceMs);
}

private debouncedReloadSessions(): void {
if (this.sessionReloadDebounceTimer) {
clearTimeout(this.sessionReloadDebounceTimer);
}
this.sessionReloadDebounceTimer = setTimeout(() => {
this.loadSessionsFromDisk().catch(() => {});
}, this.watchDebounceMs);
}

private async loadMessagesFromDisk(): Promise<void> {
this.messages.clear();
this.deletedMessages.clear();
Expand Down
Loading