Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions desktop/playwright.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export default defineConfig({
"**/channel-browser.spec.ts",
"**/messaging.spec.ts",
"**/mentions.spec.ts",
"**/relay-reconnect.spec.ts",
"**/workflows.spec.ts",
],
use: {
Expand Down
53 changes: 20 additions & 33 deletions desktop/src/shared/api/relayClientSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,16 @@ import { RelayStallWatchdog } from "@/shared/api/relayStallWatchdog";
import { buildThreadReferenceTags } from "@/features/messages/lib/threading";

const RECONNECT_BASE_DELAY_MS = 1_000,
RECONNECT_MAX_DELAY_MS = 30_000;
const RECONNECT_REPLAY_SKEW_SECS = 5,
RECONNECT_MAX_DELAY_MS = 30_000,
RECONNECT_REPLAY_SKEW_SECS = 5,
EVENT_BATCH_MS = 16;

/**
* Application-level liveness probe.
*
* Tungstenite auto-pongs and the OS keeps the TCP socket open, so a
* half-open WS (Warp's orange-icon state, an asleep VPN, etc.) presents as
* "fully connected" to the WS layer indefinitely — no Close, no Error.
*
* We work around that by periodically sending a cheap NIP-01 `REQ` with
* `limit: 0` and waiting for the matching `EOSE`. A single missed probe
* (no EOSE within `STALL_PROBE_TIMEOUT_MS`) — or a send-side failure on the
* probe itself — flips state to `stalled` and force-resets the socket so
* the existing reconnect path runs.
*
* The filter intentionally matches nothing real so the relay only ever
* answers with EOSE.
* Passive liveness check. The relay sends heartbeat pings every 30s; if no
* inbound frame arrives for two heartbeat windows, treat the socket as stalled.
*/
const STALL_PROBE_INTERVAL_MS = 20_000;
const STALL_PROBE_TIMEOUT_MS = 10_000;
const STALL_CHECK_INTERVAL_MS = 10_000;
const STALL_IDLE_TIMEOUT_MS = 60_000;

export class RelayClient {
private wsId: number | null = null;
Expand All @@ -74,6 +62,7 @@ export class RelayClient {
private hasConnectedOnce = false;
private notifyReconnectListeners = false;
private onMessageChannel: Channel<unknown> | null = null;
private connectionGeneration = 0;

/**
* Sticky terminal flag. Set when `resetConnection` is called with
Expand All @@ -89,9 +78,8 @@ export class RelayClient {

private connectionStateEmitter = new RelayConnectionStateEmitter("idle");
private stallWatchdog = new RelayStallWatchdog({
intervalMs: STALL_PROBE_INTERVAL_MS,
probeTimeoutMs: STALL_PROBE_TIMEOUT_MS,
sendRaw: (payload) => this.sendRaw(payload),
intervalMs: STALL_CHECK_INTERVAL_MS,
idleTimeoutMs: STALL_IDLE_TIMEOUT_MS,
onStall: (error) => {
this.connectionStateEmitter.set("stalled");
this.resetConnection(error);
Expand All @@ -111,6 +99,7 @@ export class RelayClient {
this.reconnectTimeout = null;
}
this.stallWatchdog.stop();
this.connectionGeneration++;
this.keepAliveRequested = false;
this.relayUrl = null;
this.hasConnectedOnce = false;
Expand Down Expand Up @@ -460,8 +449,9 @@ export class RelayClient {
this.relayUrl = await getRelayWsUrl();
}

const generation = ++this.connectionGeneration;
this.onMessageChannel = new Channel<unknown>((message) => {
void this.handleWsMessage(message);
void this.handleWsMessage(message, generation);
});

this.wsId = await invoke<number>("plugin:websocket|connect", {
Expand Down Expand Up @@ -687,7 +677,10 @@ export class RelayClient {
});
}

private async handleWsMessage(message: unknown) {
private async handleWsMessage(message: unknown, generation: number) {
if (generation !== this.connectionGeneration) return;
this.stallWatchdog.recordInbound();

if (
typeof message === "object" &&
message !== null &&
Expand Down Expand Up @@ -726,10 +719,9 @@ export class RelayClient {

const [type, ...rest] = data;
if (type === "AUTH" && typeof rest[0] === "string") {
await this.handleAuthChallenge(rest[0]);
await this.handleAuthChallenge(rest[0], generation);
return;
}

if (type === "EVENT" && typeof rest[0] === "string" && rest[1]) {
this.handleEvent(rest[0], rest[1] as RelayEvent);
return;
Expand All @@ -753,7 +745,7 @@ export class RelayClient {
}
}

private async handleAuthChallenge(challenge: string) {
private async handleAuthChallenge(challenge: string, generation: number) {
if (!this.relayUrl) {
this.relayUrl = await getRelayWsUrl();
}
Expand All @@ -763,7 +755,7 @@ export class RelayClient {
relayUrl: this.relayUrl,
});

if (!this.authRequest) {
if (generation !== this.connectionGeneration || !this.authRequest) {
return;
}

Expand Down Expand Up @@ -809,12 +801,6 @@ export class RelayClient {
}

private handleEose(subId: string) {
if (this.stallWatchdog.handleEose(subId)) {
// Probe round-trip succeeded — silently CLOSE the sub.
void this.closeSubscription(subId).catch(() => {});
return;
}

const subscription = this.subscriptions.get(subId);
if (!subscription) {
return;
Expand Down Expand Up @@ -971,6 +957,7 @@ export class RelayClient {
) {
this.onMessageChannel = null;
this.stallWatchdog.stop();
this.connectionGeneration++;
if (this.flushTimeout !== null) window.clearTimeout(this.flushTimeout);
this.flushTimeout = null;
this.eventBuffer = [];
Expand Down
137 changes: 59 additions & 78 deletions desktop/src/shared/api/relayStallWatchdog.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -3,123 +3,104 @@ import test from "node:test";

import { RelayStallWatchdog } from "./relayStallWatchdog.ts";

// Shim `window` to expose the timer + crypto APIs the watchdog uses. The
// real RelayClient runs in a Tauri WebView where `window` exists; under
// node:test we wire it to the same globals.
// Shim `window` to expose the timer APIs the watchdog uses. The real
// RelayClient runs in a Tauri WebView where `window` exists; under node:test we
// wire it to the same globals.
if (typeof globalThis.window === "undefined") {
globalThis.window = {
setInterval: (...args) => setInterval(...args),
clearInterval: (id) => clearInterval(id),
setTimeout: (...args) => setTimeout(...args),
clearTimeout: (id) => clearTimeout(id),
};
}

const sleep = (ms) => new Promise((r) => setTimeout(r, ms));

function makeWatchdog(overrides = {}) {
const sends = [];
const stalls = [];
let now = overrides.now ?? 1;
const wd = new RelayStallWatchdog({
intervalMs: overrides.intervalMs ?? 30,
probeTimeoutMs: overrides.probeTimeoutMs ?? 30,
sendRaw:
overrides.sendRaw ??
(async (payload) => {
sends.push(payload);
}),
intervalMs: overrides.intervalMs ?? 20,
idleTimeoutMs: overrides.idleTimeoutMs ?? 50,
onStall: (err) => {
stalls.push(err);
},
now: overrides.now,
now: () => now,
});
return { wd, sends, stalls };
return {
advance: (ms) => {
now += ms;
},
setNow: (value) => {
now = value;
},
stalls,
wd,
};
}

test("first probe carries the expected NIP-01 REQ shape", async () => {
const { wd, sends } = makeWatchdog();
test("does not send probes while watching for stalls", async () => {
const { wd } = makeWatchdog();
wd.start();
// Wait until a probe is observed.
for (let i = 0; i < 50 && sends.length === 0; i++) await sleep(5);
await sleep(45);
wd.stop();
assert.equal(sends.length, 1);
const [verb, subId, filter] = sends[0];
assert.equal(verb, "REQ");
assert.match(subId, /^probe-/);
assert.deepEqual(filter.kinds, [9999]);
assert.equal(filter.limit, 0);
assert.ok(typeof filter.since === "number");
// The passive watchdog has no send callback by construction. This test is a
// regression guard for the WARP bug: liveness checks must not write to a
// socket already suspected of being half-open.
assert.equal(typeof wd.recordInbound, "function");
});

test("EOSE for the current probe clears in-flight + lets the next probe fire", async () => {
const { wd, sends, stalls } = makeWatchdog();
test("idle timeout without inbound frames triggers onStall", async () => {
const { advance, stalls, wd } = makeWatchdog();
wd.start();
for (let i = 0; i < 50 && sends.length === 0; i++) await sleep(5);
const firstSubId = sends[0][1];
// Resolve the probe.
assert.equal(wd.handleEose(firstSubId), true);
// Within the next interval+probe window, another probe should fire.
for (let i = 0; i < 50 && sends.length < 2; i++) await sleep(5);
advance(60);
for (let i = 0; i < 20 && stalls.length === 0; i++) await sleep(5);
wd.stop();
assert.ok(sends.length >= 2, `expected ≥2 probes, got ${sends.length}`);
assert.equal(stalls.length, 0, "no stall expected when EOSE arrives");
});

test("EOSE for a non-probe subId returns false", () => {
const { wd } = makeWatchdog();
assert.equal(wd.handleEose("live-abc"), false);
assert.equal(stalls.length, 1);
assert.match(stalls[0].message, /no inbound frames/i);
});

test("timeout without EOSE triggers onStall", async () => {
const { wd, stalls } = makeWatchdog();
test("inbound frames reset the idle timer", async () => {
const { advance, stalls, wd } = makeWatchdog({ idleTimeoutMs: 50 });
wd.start();
// intervalMs (30) before first send + probeTimeoutMs (30) — wait a bit
// past their sum.
for (let i = 0; i < 50 && stalls.length === 0; i++) await sleep(10);
advance(40);
wd.recordInbound();
advance(40);
await sleep(30);
assert.equal(
stalls.length,
0,
"recent inbound frame should keep socket alive",
);
advance(20);
for (let i = 0; i < 20 && stalls.length === 0; i++) await sleep(5);
wd.stop();
assert.ok(stalls.length >= 1, "expected at least one stall");
assert.match(stalls[0].message, /stalled/i);
assert.equal(stalls.length, 1);
});

test("send-side failure triggers onStall immediately", async () => {
const { wd, stalls } = makeWatchdog({
sendRaw: async () => {
throw new Error("ws is dead");
},
});
wd.start();
for (let i = 0; i < 50 && stalls.length === 0; i++) await sleep(5);
wd.stop();
assert.ok(stalls.length >= 1, "expected stall on send failure");
assert.match(stalls[0].message, /ws is dead/);
test("recordInbound is ignored while stopped", async () => {
const { advance, stalls, wd } = makeWatchdog({ idleTimeoutMs: 50 });
wd.recordInbound();
advance(100);
await sleep(30);
assert.equal(stalls.length, 0);
});

test("stop() cancels a pending stall timeout", async () => {
const { wd, sends, stalls } = makeWatchdog();
test("stop() cancels the idle check", async () => {
const { advance, stalls, wd } = makeWatchdog({ idleTimeoutMs: 50 });
wd.start();
for (let i = 0; i < 50 && sends.length === 0; i++) await sleep(5);
// Probe is in-flight; stop before it can time out.
wd.stop();
// Wait well past the timeout window.
await sleep(80);
assert.equal(stalls.length, 0, "stop() should cancel the pending stall");
advance(100);
await sleep(35);
assert.equal(stalls.length, 0);
});

test("start() is idempotent — does not create duplicate intervals", async () => {
const { wd, sends } = makeWatchdog({ intervalMs: 25, probeTimeoutMs: 200 });
const { advance, stalls, wd } = makeWatchdog({ idleTimeoutMs: 50 });
wd.start();
wd.start();
wd.start();
// Allow one probe to fire and resolve it so the *next* probe can fire if
// the interval was somehow doubled.
for (let i = 0; i < 50 && sends.length === 0; i++) await sleep(5);
wd.handleEose(sends[0][1]);
// Within one more interval window, exactly one more probe should fire
// (not two), which is the contract for `start()` being idempotent.
await sleep(45);
advance(60);
for (let i = 0; i < 20 && stalls.length === 0; i++) await sleep(5);
wd.stop();
assert.ok(
sends.length <= 2,
`expected ≤2 probes despite triple-start(), got ${sends.length}`,
);
assert.equal(stalls.length, 1);
});
Loading
Loading