Skip to content

Commit

Permalink
fix: ensure tail exits when the WebSocket disconnects (#5275)
Browse files Browse the repository at this point in the history
* fix: ensure tail exits when the WebSocket disconnects

Previously when the we tail WebSocket disconnected, e.g. because of an Internet failure,
the `wrangler tail` command would just hang and neither exit nor any longer receive tail messages.

Now the process exits with an exit code of 1, and outputs an error message.

The error message is formatted appropriately, if the tail format is set to `json`.

Fixes #3927

* fixup! fix: ensure tail exits when the WebSocket disconnects
  • Loading branch information
petebacondarwin committed Mar 18, 2024
1 parent 1720f0a commit e1f2576
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 18 deletions.
14 changes: 14 additions & 0 deletions .changeset/lazy-cherries-build.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
"wrangler": patch
---

fix: ensure tail exits when the WebSocket disconnects

Previously when the we tail WebSocket disconnected, e.g. because of an Internet failure,
the `wrangler tail` command would just hang and neither exit nor any longer receive tail messages.

Now the process exits with an exit code of 1, and outputs an error message.

The error message is formatted appropriately, if the tail format is set to `json`.

Fixes #3927
8 changes: 8 additions & 0 deletions packages/wrangler/src/__tests__/helpers/mock-web-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { WebSocket } from "mock-socket";
* A version of the mock WebSocket that supports the methods that we use.
*/
export class MockWebSocket extends WebSocket {
private pongListener: undefined | ((...args: unknown[]) => void);
on(event: string | symbol, listener: (...args: unknown[]) => void): this {
switch (event) {
case "message":
Expand All @@ -17,12 +18,19 @@ export class MockWebSocket extends WebSocket {
case "close":
this.onclose = listener;
break;
case "pong":
this.pongListener = listener;
break;
default:
throw new Error("Unknown event type: " + event.toString());
}
return this;
}

ping(data: Buffer) {
this.pongListener?.(data);
}

terminate() {
this.close();
}
Expand Down
53 changes: 46 additions & 7 deletions packages/wrangler/src/__tests__/tail.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { mockAccountId, mockApiToken } from "./helpers/mock-account-id";
import { mockConsoleMethods } from "./helpers/mock-console";
import { clearDialogs, mockConfirm } from "./helpers/mock-dialogs";
import { useMockIsTTY } from "./helpers/mock-istty";
import { MockWebSocket as MockWebSocket2 } from "./helpers/mock-web-socket";
import { createFetchResult, msw, mswSucessScriptHandlers } from "./helpers/msw";
import { runInTempDir } from "./helpers/run-in-tmp";
import { runWrangler } from "./helpers/run-wrangler";
Expand Down Expand Up @@ -213,13 +214,6 @@ describe("tail", () => {
expect(api.requests.deletion.count).toStrictEqual(1);
});

it("errors when the websocket closes unexpectedly", async () => {
const api = mockWebsocketAPIs();
await api.closeHelper();

await expect(runWrangler("tail test-worker")).rejects.toThrow();
});

it("activates debug mode when the cli arg is passed in", async () => {
const api = mockWebsocketAPIs();
await runWrangler("tail test-worker --debug");
Expand Down Expand Up @@ -733,6 +727,51 @@ describe("tail", () => {
expect(std.warn).toMatchInlineSnapshot(`""`);
});
});

describe("disconnects", () => {
it("errors when the websocket is already closed", async () => {
const api = mockWebsocketAPIs();
await api.closeHelper();

await expect(runWrangler("tail test-worker")).rejects.toThrow();
});

it("errors when the websocket stops reacting to pings (pretty format)", async () => {
const api = mockWebsocketAPIs();
jest.useFakeTimers({
doNotFake: ["setTimeout"],
});
// Block the websocket from replying to the ping
jest.spyOn(MockWebSocket2.prototype, "ping").mockImplementation();
await runWrangler("tail test-worker --format=pretty");
await api.ws.connected;
// The ping is sent every 2 secs, so it should not fail until the second ping is due.
await jest.advanceTimersByTimeAsync(10000);
await expect(
jest.advanceTimersByTimeAsync(10000)
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Tail disconnected, exiting."`
);
});

it("errors when the websocket stops reacting to pings (json format)", async () => {
const api = mockWebsocketAPIs();
jest.useFakeTimers({
doNotFake: ["setTimeout"],
});
// Block the websocket from replying to the ping
jest.spyOn(MockWebSocket2.prototype, "ping").mockImplementation();
await runWrangler("tail test-worker --format=json");
await api.ws.connected;
// The ping is sent every 10 secs, so it should not fail until the second ping is due.
await jest.advanceTimersByTimeAsync(10000);
await expect(
jest.advanceTimersByTimeAsync(10000)
).rejects.toThrowErrorMatchingInlineSnapshot(
`"\\"Tail disconnected, exiting.\\""`
);
});
});
});

/* helpers */
Expand Down
17 changes: 17 additions & 0 deletions packages/wrangler/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,20 @@ export class JsonFriendlyFatalError extends FatalError {
super(message);
}
}

/**
* Create either a FatalError or JsonFriendlyFatalError depending upon `isJson` parameter.
*
* If `isJson` is true, then the `message` is JSON stringified.
*/
export function createFatalError(
message: unknown,
isJson: boolean,
code?: number
): Error {
if (isJson) {
return new JsonFriendlyFatalError(JSON.stringify(message), code);
} else {
return new FatalError(`${message}`, code);
}
}
62 changes: 51 additions & 11 deletions packages/wrangler/src/tail/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import onExit from "signal-exit";
import { fetchResult, fetchScriptContent } from "../cfetch";
import { readConfig } from "../config";
import { confirm } from "../dialogs";
import { UserError } from "../errors";
import { createFatalError, UserError } from "../errors";
import {
getLegacyScriptName,
isLegacyEnv,
Expand Down Expand Up @@ -166,14 +166,6 @@ export async function tailHandler(args: TailArgs) {
);
}

onExit(async () => {
tail.terminate();
await deleteTail();
await metrics.sendMetricsEvent("end log stream", {
sendMetrics: config.send_metrics,
});
});

const printLog: (data: RawData) => void =
args.format === "pretty" ? prettyPrintLogs : jsonPrintLogs;

Expand Down Expand Up @@ -201,11 +193,59 @@ export async function tailHandler(args: TailArgs) {
logger.log(`Connected to ${scriptDisplayName}, waiting for logs...`);
}

tail.on("close", async () => {
const cancelPing = startWebSocketPing();
tail.on("close", exit);
onExit(exit);

async function exit() {
cancelPing();
tail.terminate();
await deleteTail();
await metrics.sendMetricsEvent("end log stream", {
sendMetrics: config.send_metrics,
});
});
}

/**
* Start pinging the websocket to see if it is still connected.
*
* We need to know if the connection to the tail drops.
* To do this we send a ping message to the backend every few seconds.
* If we don't get a matching pong message back before the next ping is due
* then we have probably lost the connect.
*/
function startWebSocketPing() {
/** The corelation message to send to tail when pinging. */
const PING_MESSAGE = Buffer.from("wrangler tail ping");
/** How long to wait between pings. */
const PING_INTERVAL = 10000;

let waitingForPong = false;

const pingInterval = setInterval(() => {
if (waitingForPong) {
// We didn't get a pong back quickly enough so assume the connection died and exit.
// This approach relies on the fact that throwing an error inside a `setInterval()` callback
// causes the process to exit.
// This is a bit nasty but otherwise we have to make wholesale changes to how the `tail` command
// works, since currently all the tests assume that `runWrangler()` will return immediately.
console.log(args.format);
throw createFatalError(
"Tail disconnected, exiting.",
args.format === "json",
1
);
}
waitingForPong = true;
tail.ping(PING_MESSAGE);
}, PING_INTERVAL);

tail.on("pong", (data) => {
if (data.equals(PING_MESSAGE)) {
waitingForPong = false;
}
});

return () => clearInterval(pingInterval);
}
}

0 comments on commit e1f2576

Please sign in to comment.