Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@
"type": "git",
"url": "https://github.com/AgentWorkforce/trajectories"
},
"files": [
"dist"
],
"files": ["dist"],
"engines": {
"node": ">=20.0.0"
},
Expand Down
255 changes: 172 additions & 83 deletions src/storage/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,16 @@
* Active trajectories go in active/, completed in completed/YYYY-MM/.
*/

import { randomUUID } from "node:crypto";
import { type Dirent, existsSync } from "node:fs";
import { mkdir, readFile, readdir, unlink, writeFile } from "node:fs/promises";
import {
mkdir,
readFile,
readdir,
rename,
unlink,
writeFile,
} from "node:fs/promises";
import { join } from "node:path";
import { validateTrajectory } from "../core/schema.js";
import type {
Expand Down Expand Up @@ -98,6 +106,35 @@ export interface ReconcileSummary {
skippedIoError: number;
}

/**
* Per-path promise-chain mutex for index.json access.
*
* Keyed by the absolute index path, so multiple FileStorage instances in
* the same process that target the same `.trajectories` directory share
* the same lock. This is an in-process mutex only — it does not protect
* against writers in other processes. Cross-process safety is provided
* by the atomic tmp-file + rename in `saveIndex` (rename is atomic on
* POSIX, so readers never observe a half-written index).
*
* Implementation: store the tail of a promise chain per path. Each new
* critical section chains onto `.then(task)` so it only runs after the
* previous task resolves. We swallow errors on the tail so one failed
* task doesn't poison the chain for subsequent callers.
*/
const indexLocks = new Map<string, Promise<unknown>>();

function withIndexLock<T>(path: string, task: () => Promise<T>): Promise<T> {
const prev = indexLocks.get(path) ?? Promise.resolve();
const next = prev.then(task, task);
// Replace the tail with a swallowed-error version so a rejection in
// `task` doesn't propagate to the next queued caller.
indexLocks.set(
path,
next.catch(() => undefined),
);
return next;
}

/**
* File system storage adapter
*/
Expand Down Expand Up @@ -133,12 +170,13 @@ export class FileStorage implements StorageAdapter {
await mkdir(this.activeDir, { recursive: true });
await mkdir(this.completedDir, { recursive: true });

// Create index if it doesn't exist
// Create index if it doesn't exist. Take the lock so a parallel
// initialize() in the same process doesn't race its seed write.
if (!existsSync(this.indexPath)) {
await this.saveIndex({
version: 1,
lastUpdated: new Date().toISOString(),
trajectories: {},
await withIndexLock(this.indexPath, async () => {
if (!existsSync(this.indexPath)) {
await this.saveIndex(this.emptyIndex());
}
});
}

Expand Down Expand Up @@ -172,58 +210,60 @@ export class FileStorage implements StorageAdapter {
skippedIoError: 0,
};

const index = await this.loadIndex();
const before = Object.keys(index.trajectories).length;
await withIndexLock(this.indexPath, async () => {
const index = await this.loadIndex();
const before = Object.keys(index.trajectories).length;

const discovered: string[] = [];
const discovered: string[] = [];

// Walk active/ — intentionally NOT recursive; active trajectories
// always live at the flat root.
try {
const activeFiles = await readdir(this.activeDir);
for (const file of activeFiles) {
if (!file.endsWith(".json")) continue;
discovered.push(join(this.activeDir, file));
// Walk active/ — intentionally NOT recursive; active trajectories
// always live at the flat root.
try {
const activeFiles = await readdir(this.activeDir);
for (const file of activeFiles) {
if (!file.endsWith(".json")) continue;
discovered.push(join(this.activeDir, file));
}
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== "ENOENT") throw error;
}
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== "ENOENT") throw error;
}

// Walk completed/ recursively so we transparently support every
// historical layout without guessing depth.
await this.walkJsonFilesInto(this.completedDir, discovered);

for (const filePath of discovered) {
summary.scanned += 1;
const result = await this.readTrajectoryFile(filePath);
if (!result.ok) {
if (result.reason === "malformed_json") {
summary.skippedMalformedJson += 1;
} else if (result.reason === "schema_violation") {
summary.skippedSchemaViolation += 1;
} else {
summary.skippedIoError += 1;
// Walk completed/ recursively so we transparently support every
// historical layout without guessing depth.
await this.walkJsonFilesInto(this.completedDir, discovered);

for (const filePath of discovered) {
summary.scanned += 1;
const result = await this.readTrajectoryFile(filePath);
if (!result.ok) {
if (result.reason === "malformed_json") {
summary.skippedMalformedJson += 1;
} else if (result.reason === "schema_violation") {
summary.skippedSchemaViolation += 1;
} else {
summary.skippedIoError += 1;
}
continue;
}
continue;
}
const trajectory = result.trajectory;
if (index.trajectories[trajectory.id]) {
summary.alreadyIndexed += 1;
continue;
const trajectory = result.trajectory;
if (index.trajectories[trajectory.id]) {
summary.alreadyIndexed += 1;
continue;
}
index.trajectories[trajectory.id] = {
title: trajectory.task.title,
status: trajectory.status,
startedAt: trajectory.startedAt,
completedAt: trajectory.completedAt,
path: filePath,
};
summary.added += 1;
}
index.trajectories[trajectory.id] = {
title: trajectory.task.title,
status: trajectory.status,
startedAt: trajectory.startedAt,
completedAt: trajectory.completedAt,
path: filePath,
};
summary.added += 1;
}

if (Object.keys(index.trajectories).length !== before) {
await this.saveIndex(index);
}
if (Object.keys(index.trajectories).length !== before) {
await this.saveIndex(index);
}
});

// Only log when something interesting happened. Noise is worse than
// silence here — the CLI spinner is the user's feedback.
Expand Down Expand Up @@ -490,21 +530,22 @@ export class FileStorage implements StorageAdapter {
await unlink(activePath);
}

// Remove from completed (search subdirectories)
const index = await this.loadIndex();
const entry = index.trajectories[id];
if (entry?.path && existsSync(entry.path)) {
await unlink(entry.path);
// Also remove markdown if exists
const mdPath = entry.path.replace(".json", ".md");
if (existsSync(mdPath)) {
await unlink(mdPath);
// Read + mutate + write the index under the lock so we can't clobber
// a concurrent save's update.
await withIndexLock(this.indexPath, async () => {
const index = await this.loadIndex();
const entry = index.trajectories[id];
if (entry?.path && existsSync(entry.path)) {
await unlink(entry.path);
// Also remove markdown if exists
const mdPath = entry.path.replace(".json", ".md");
if (existsSync(mdPath)) {
await unlink(mdPath);
}
}
}

// Update index
delete index.trajectories[id];
await this.saveIndex(index);
delete index.trajectories[id];
await this.saveIndex(index);
});
}

/**
Expand Down Expand Up @@ -611,43 +652,91 @@ export class FileStorage implements StorageAdapter {
return result.ok ? result.trajectory : null;
}

/**
* Read and parse the on-disk index.
*
* Tolerances (belt-and-braces against the read/write race):
* - ENOENT: first-run, return an empty index silently.
* - Empty file: a concurrent writer truncated index.json in "w" mode
* right before we read. Return an empty index silently — this is
* not a real corruption, just an interleaving the mutex + atomic
* rename should already prevent. Logging here would be noise.
* - Non-empty but malformed JSON: genuinely corrupted on disk (hand
* edit, disk error, etc). Log it and return an empty index so the
* caller can recover, but keep the log so the problem is visible.
*/
private async loadIndex(): Promise<TrajectoryIndex> {
let content: string;
try {
const content = await readFile(this.indexPath, "utf-8");
return JSON.parse(content);
content = await readFile(this.indexPath, "utf-8");
} catch (error) {
// ENOENT means index doesn't exist yet - this is expected on first run
// ENOENT means index doesn't exist yet - expected on first run.
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
console.error(
"Error loading trajectory index, using empty index:",
error,
);
}
return {
version: 1,
lastUpdated: new Date().toISOString(),
trajectories: {},
};
return this.emptyIndex();
}

// Empty file == treat as empty index. Happens when readFile sneaks
// in between a writer's truncate and its write. Defense in depth
// against the race the in-process mutex already eliminates.
if (content.length === 0) {
return this.emptyIndex();
}

try {
return JSON.parse(content) as TrajectoryIndex;
} catch (error) {
console.error(
"Error loading trajectory index, using empty index:",
error,
);
return this.emptyIndex();
}
}

private emptyIndex(): TrajectoryIndex {
return {
version: 1,
lastUpdated: new Date().toISOString(),
trajectories: {},
};
}

/**
* Atomic write: stage into a process-unique temp path in the same directory
* and then rename over the live file. `rename` is atomic on POSIX, so
* concurrent readers in any process either see the old complete file or
* the new complete file — never a half-written / zero-byte state.
*
* Callers MUST hold `withIndexLock(this.indexPath, ...)` so the in-process
* read-modify-write cycle stays serialized; the unique temp name also keeps
* parallel writers in other processes from colliding on a shared tmp path.
*/
private async saveIndex(index: TrajectoryIndex): Promise<void> {
index.lastUpdated = new Date().toISOString();
await writeFile(this.indexPath, JSON.stringify(index, null, 2), "utf-8");
const tmpPath = `${this.indexPath}.${process.pid}.${randomUUID()}.tmp`;
await writeFile(tmpPath, JSON.stringify(index, null, 2), "utf-8");
await rename(tmpPath, this.indexPath);
}

private async updateIndex(
trajectory: Trajectory,
filePath: string,
): Promise<void> {
const index = await this.loadIndex();
index.trajectories[trajectory.id] = {
title: trajectory.task.title,
status: trajectory.status,
startedAt: trajectory.startedAt,
completedAt: trajectory.completedAt,
path: filePath,
};
await this.saveIndex(index);
await withIndexLock(this.indexPath, async () => {
const index = await this.loadIndex();
index.trajectories[trajectory.id] = {
title: trajectory.task.title,
status: trajectory.status,
startedAt: trajectory.startedAt,
completedAt: trajectory.completedAt,
path: filePath,
};
await this.saveIndex(index);
});
}
}
Loading
Loading