Skip to content

Commit bea711c

Browse files
committed
fix(daemon): rotate log file at cap and recover from tcpPortUsed ETIMEDOUT
The daemon's file logger silently dropped every write once the log file exceeded a hard 20MB cap, leaving long-running daemons with a frozen log file and `bitsocial logs -f` stuck on stale content. Compounding this, process.stderr.write was overridden to route only to the file logger, so once the cap tripped all stderr output was lost. Separately, the keepKuboUpInterval awaited tcpPortUsed.check outside its try/catch, so a transient ETIMEDOUT on 127.0.0.1:9138 became an unhandledRejection. - Extract the file logger into common-utils/daemon-file-logger.ts with front-truncate rotation: when bytesWritten exceeds maxBytes, end the stream, keep the last trimToBytes (line-aligned), rewrite the file, and reopen in append mode. Concurrent writes during a trim are queued and drained after, capped to bound memory. - writeTimestampedLine now returns boolean; the daemon's stderr and debug-module overrides fall back to the original stderr when it returns false, so debug/error output is never silently lost. - Extract the keepKuboUp interval body into runKeepKuboUpTick(deps), exported for testing, with tcpPortUsedCheck inside the try/catch. Refs #37
1 parent 61c852c commit bea711c

4 files changed

Lines changed: 415 additions & 32 deletions

File tree

src/cli/commands/daemon.ts

Lines changed: 73 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { loadChallengesIntoPKC } from "../../challenge-packages/challenge-utils.
2020
import { migrateDataDirectory } from "../../common-utils/data-migration.js";
2121
import { createBsoResolvers, DEFAULT_PROVIDERS } from "../../common-utils/resolvers.js";
2222
import { pruneStaleStates, writeDaemonState, deleteDaemonState } from "../../common-utils/daemon-state.js";
23+
import { createDaemonFileLogger, type DaemonFileLogger } from "../../common-utils/daemon-file-logger.js";
2324
import fs from "fs";
2425
import fsPromise from "fs/promises";
2526

@@ -46,6 +47,44 @@ const defaultPkcOptions: InputPKCOptions = {
4647
httpRoutersOptions: defaults.HTTP_TRACKERS
4748
};
4849

50+
export interface KeepKuboUpTickDeps {
51+
pkcRpcUrl: URL;
52+
tcpPortUsedCheck: (port: number, host: string) => Promise<boolean>;
53+
pkcOptionsFromFlag: { kuboRpcClientsOptions?: unknown } | undefined;
54+
usingDifferentProcessRpc: boolean;
55+
hasKuboProcess: boolean;
56+
hasPendingKuboStart: boolean;
57+
keepKuboUp: () => Promise<void>;
58+
createOrConnectRpc: () => Promise<void>;
59+
onError: (message: string) => void;
60+
}
61+
62+
/**
63+
* Runs one tick of the keepKuboUp interval. Exported so it can be unit-tested.
64+
*
65+
* Both `tcpPortUsedCheck` and the downstream `keepKuboUp`/`createOrConnectRpc` calls
66+
* are wrapped in try/catch — a transient ETIMEDOUT from the port check (or any other
67+
* error from this tick) must not propagate to the setInterval callback, which would
68+
* become an unhandledRejection (issue #37 bug 3).
69+
*/
70+
export async function runKeepKuboUpTick(deps: KeepKuboUpTickDeps): Promise<void> {
71+
let isRpcPortTaken = false;
72+
try {
73+
isRpcPortTaken = await deps.tcpPortUsedCheck(Number(deps.pkcRpcUrl.port), deps.pkcRpcUrl.hostname);
74+
if (!deps.pkcOptionsFromFlag?.kuboRpcClientsOptions && !isRpcPortTaken && !deps.usingDifferentProcessRpc) await deps.keepKuboUp();
75+
else if (deps.pkcOptionsFromFlag?.kuboRpcClientsOptions && !deps.usingDifferentProcessRpc) await deps.keepKuboUp();
76+
// Retry if kubo died and onKuboExit's restart attempt failed (e.g. transient port conflict)
77+
else if (!deps.hasKuboProcess && !deps.hasPendingKuboStart && !deps.usingDifferentProcessRpc) await deps.keepKuboUp();
78+
} catch (error) {
79+
deps.onError(`keepKuboUp tick error (will retry): ${error instanceof Error ? error.message : String(error)}`);
80+
}
81+
try {
82+
await deps.createOrConnectRpc();
83+
} catch (error) {
84+
deps.onError(`createOrConnectRpc tick error (will retry): ${error instanceof Error ? error.message : String(error)}`);
85+
}
86+
}
87+
4988
export default class Daemon extends Command {
5089
static override description = `Run a network-connected Bitsocial node. Once the daemon is running you can create and start your communities and receive publications from users. The daemon will also serve web ui on http that can be accessed through a browser on any machine. Within the web ui users are able to browse, create and manage their communities fully P2P.
5190
Options can be passed to the RPC's instance through flag --pkcOptions.optionName. For a list of pkc options (https://github.com/pkcprotocol/pkc-js?tab=readme-ov-file#pkcoptions)
@@ -115,24 +154,13 @@ export default class Daemon extends Command {
115154
private async _pipeDebugLogsToLogFile(
116155
logPath: string,
117156
Logger: PKCLoggerType
118-
): Promise<{ logFilePath: string; stdoutWrite: typeof process.stdout.write }> {
157+
): Promise<{ logFilePath: string; stdoutWrite: typeof process.stdout.write; fileLogger: DaemonFileLogger }> {
119158
const { logFilePath, deletedLogFile, logfilesCapacity } = await this._getNewLogfileByEvacuatingOldLogsIfNeeded(logPath);
120159

121-
const logFile = fs.createWriteStream(logFilePath, { flags: "a" });
160+
const fileLogger = createDaemonFileLogger({ logFilePath });
122161
const stdoutWrite = process.stdout.write.bind(process.stdout);
123162
const stderrWrite = process.stderr.write.bind(process.stderr);
124163

125-
const isLogFileOverLimit = () => logFile.bytesWritten > 20000000; // 20mb
126-
127-
const writeTimestampedLine = (text: string, stream: "stdout" | "stderr") => {
128-
if (isLogFileOverLimit()) return;
129-
if (!text || text.trim().length === 0) return;
130-
const timestamp = `[${new Date().toISOString()}] [${stream}] `;
131-
const lines = text.split("\n");
132-
const timestamped = lines.map((line, i) => (i === 0 ? timestamp + line : line)).join("\n");
133-
logFile.write(timestamped);
134-
};
135-
136164
// Redirect debug library output directly to the log file
137165
// instead of stderr, so only real errors appear in the terminal
138166
const require = createRequire(import.meta.url);
@@ -142,24 +170,32 @@ export default class Daemon extends Command {
142170
debugModule.inspectOpts.colors = true;
143171
debugModule.inspectOpts.hideDate = true;
144172
debugModule.log = (...args: any[]) => {
145-
writeTimestampedLine(formatWithOptions({ depth: Logger.inspectOpts?.depth || 10, colors: true }, ...args).trimStart() + EOL, "stderr");
173+
const text = formatWithOptions({ depth: Logger.inspectOpts?.depth || 10, colors: true }, ...args).trimStart() + EOL;
174+
const wrote = fileLogger.writeTimestampedLine(text, "stderr");
175+
// If the file logger could not accept the write (closed / pending buffer full),
176+
// fall back to original stderr so debug output is never silently lost
177+
if (!wrote) stderrWrite(text);
146178
};
147179

148180
const asString = (data: string | Uint8Array) => (typeof data === "string" ? data : Buffer.from(data).toString());
149181

150182
process.stdout.write = (...args) => {
151183
//@ts-expect-error
152184
const res = stdoutWrite(...args);
153-
writeTimestampedLine(asString(args[0]), "stdout");
185+
fileLogger.writeTimestampedLine(asString(args[0]), "stdout");
154186
return res;
155187
};
156188

157189
process.stderr.write = (...args) => {
158-
// Only write stderr to the log file, not to the terminal.
159-
// Debug output goes to stderr; we want it in logs only.
160-
// Real errors are caught by uncaughtException/unhandledRejection handlers
161-
// which use console.error -> stderr.write -> this override -> log file.
162-
writeTimestampedLine(asString(args[0]).trimStart(), "stderr");
190+
// Debug output goes to stderr; route it to the log file.
191+
// If the file logger is unavailable (closed, errored), fall back to original stderr
192+
// so output is never silently swallowed.
193+
const text = asString(args[0]);
194+
const wrote = fileLogger.writeTimestampedLine(text.trimStart(), "stderr");
195+
if (!wrote) {
196+
//@ts-expect-error
197+
return stderrWrite(...args);
198+
}
163199
return true;
164200
};
165201

@@ -184,9 +220,13 @@ export default class Daemon extends Command {
184220
console.error(err);
185221
});
186222

187-
process.on("exit", () => logFile.close());
223+
process.on("exit", () => {
224+
// close() returns a promise but exit handlers must be synchronous.
225+
// Best-effort: trigger the close; the underlying writeStream flushes on process exit.
226+
fileLogger.close().catch(() => {});
227+
});
188228

189-
return { logFilePath, stdoutWrite };
229+
return { logFilePath, stdoutWrite, fileLogger };
190230
}
191231

192232
async run() {
@@ -532,16 +572,17 @@ export default class Daemon extends Command {
532572

533573
keepKuboUpInterval = setInterval(async () => {
534574
if (mainProcessExited) return;
535-
const isRpcPortTaken = await tcpPortUsed.check(Number(pkcRpcUrl.port), pkcRpcUrl.hostname);
536-
try {
537-
if (!pkcOptionsFromFlag?.kuboRpcClientsOptions && !isRpcPortTaken && !usingDifferentProcessRpc) await keepKuboUp();
538-
else if (pkcOptionsFromFlag?.kuboRpcClientsOptions && !usingDifferentProcessRpc) await keepKuboUp();
539-
// Retry if kubo died and onKuboExit's restart attempt failed (e.g. transient port conflict)
540-
else if (!kuboProcess && !pendingKuboStart && !usingDifferentProcessRpc) await keepKuboUp();
541-
} catch (error) {
542-
log.trace(`keepKuboUp error (will retry): ${error instanceof Error ? error.message : String(error)}`);
543-
}
544-
await createOrConnectRpc();
575+
await runKeepKuboUpTick({
576+
pkcRpcUrl,
577+
tcpPortUsedCheck: (port, host) => tcpPortUsed.check(port, host),
578+
pkcOptionsFromFlag,
579+
usingDifferentProcessRpc,
580+
hasKuboProcess: !!kuboProcess,
581+
hasPendingKuboStart: !!pendingKuboStart,
582+
keepKuboUp,
583+
createOrConnectRpc,
584+
onError: (msg) => log.trace(msg)
585+
});
545586
}, 5000);
546587
} catch (err) {
547588
const errorMsg = err instanceof Error ? err.message : String(err);
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
import fs from "fs";
2+
import fsPromise from "fs/promises";
3+
4+
export interface DaemonFileLoggerOptions {
5+
logFilePath: string;
6+
/** Trigger a trim once the current writer's bytesWritten exceeds this. Defaults to 20 MB. */
7+
maxBytes?: number;
8+
/** Target file size after trim. Must be < maxBytes. Defaults to 15 MB. */
9+
trimToBytes?: number;
10+
/** Bound on how many bytes of writes are buffered while a trim cycle is running. */
11+
pendingByteCap?: number;
12+
}
13+
14+
export interface DaemonFileLogger {
15+
/**
16+
* Append a timestamped line to the log file. Returns false when the write was
17+
* dropped (e.g. the logger is closed or the in-flight pending buffer is full),
18+
* letting the caller decide whether to fall back to terminal output.
19+
*/
20+
writeTimestampedLine(text: string, stream: "stdout" | "stderr"): boolean;
21+
close(): Promise<void>;
22+
/** Force-run a trim cycle — for tests. */
23+
_trimNow(): Promise<void>;
24+
readonly currentPath: string;
25+
readonly bytesWritten: number;
26+
}
27+
28+
const DEFAULT_MAX = 20_000_000;
29+
const DEFAULT_TRIM_TO = 15_000_000;
30+
const DEFAULT_PENDING_CAP = 5_000_000;
31+
32+
export function createDaemonFileLogger(options: DaemonFileLoggerOptions): DaemonFileLogger {
33+
const maxBytes = options.maxBytes ?? DEFAULT_MAX;
34+
const trimToBytes = options.trimToBytes ?? DEFAULT_TRIM_TO;
35+
const pendingByteCap = options.pendingByteCap ?? DEFAULT_PENDING_CAP;
36+
if (trimToBytes >= maxBytes) {
37+
throw new Error(`trimToBytes (${trimToBytes}) must be less than maxBytes (${maxBytes})`);
38+
}
39+
const logFilePath = options.logFilePath;
40+
41+
let stream = fs.createWriteStream(logFilePath, { flags: "a" });
42+
stream.on("error", () => {});
43+
let trimming: Promise<void> | undefined;
44+
let pending: string[] = [];
45+
let pendingBytes = 0;
46+
let closed = false;
47+
48+
const reopenStream = () => {
49+
stream.removeAllListeners("error");
50+
stream = fs.createWriteStream(logFilePath, { flags: "a" });
51+
stream.on("error", () => {});
52+
};
53+
54+
const trim = async () => {
55+
// End the current stream first so its buffered writes flush to disk before we read the file
56+
await new Promise<void>((res) => stream.end(() => res()));
57+
58+
const stat = await fsPromise.stat(logFilePath).catch(() => null);
59+
if (stat && stat.size > trimToBytes) {
60+
const fd = await fsPromise.open(logFilePath, "r");
61+
try {
62+
const buf = Buffer.alloc(trimToBytes);
63+
await fd.read(buf, 0, trimToBytes, stat.size - trimToBytes);
64+
// Skip a partial line at the start so the file always starts on a line boundary
65+
const firstNewline = buf.indexOf(0x0a);
66+
const tail = firstNewline >= 0 ? buf.subarray(firstNewline + 1) : buf;
67+
await fsPromise.writeFile(logFilePath, tail);
68+
} finally {
69+
await fd.close();
70+
}
71+
}
72+
reopenStream();
73+
};
74+
75+
const drainPending = () => {
76+
if (pending.length === 0) return;
77+
const drained = pending;
78+
pending = [];
79+
pendingBytes = 0;
80+
for (const chunk of drained) {
81+
stream.write(chunk);
82+
}
83+
};
84+
85+
const scheduleTrim = () => {
86+
if (trimming) return;
87+
trimming = trim()
88+
.catch(() => {
89+
// If trim fails (FS error, etc.) we drop the pending buffer rather than
90+
// hold memory forever. The next write will hit the same condition and retry.
91+
pending = [];
92+
pendingBytes = 0;
93+
})
94+
.finally(() => {
95+
trimming = undefined;
96+
drainPending();
97+
});
98+
};
99+
100+
const writeTimestampedLine = (text: string, streamLabel: "stdout" | "stderr"): boolean => {
101+
if (closed) return false;
102+
if (!text || text.trim().length === 0) return false;
103+
const timestamp = `[${new Date().toISOString()}] [${streamLabel}] `;
104+
const lines = text.split("\n");
105+
const timestamped = lines.map((line, i) => (i === 0 ? timestamp + line : line)).join("\n");
106+
107+
if (trimming) {
108+
// A trim cycle is in flight — buffer up to pendingByteCap, then drop with false
109+
// so the caller can fall back to writing to the terminal.
110+
if (pendingBytes + timestamped.length > pendingByteCap) return false;
111+
pending.push(timestamped);
112+
pendingBytes += timestamped.length;
113+
return true;
114+
}
115+
116+
stream.write(timestamped);
117+
118+
if (stream.bytesWritten > maxBytes) scheduleTrim();
119+
return true;
120+
};
121+
122+
return {
123+
writeTimestampedLine,
124+
async close() {
125+
closed = true;
126+
if (trimming) await trimming.catch(() => {});
127+
await new Promise<void>((res) => stream.end(() => res()));
128+
},
129+
async _trimNow() {
130+
scheduleTrim();
131+
if (trimming) await trimming;
132+
},
133+
get currentPath() {
134+
return logFilePath;
135+
},
136+
get bytesWritten() {
137+
return stream.bytesWritten;
138+
}
139+
};
140+
}

0 commit comments

Comments
 (0)