diff --git a/package.json b/package.json index 893a7374..8a5a672d 100644 --- a/package.json +++ b/package.json @@ -73,7 +73,6 @@ "cross-fetch": "^4.0.0", "is-bundling-for-browser-or-node": "^1.1.1", "js-cookie": "^3.0.5", - "preact": "^10.16.0", - "reconnecting-eventsource": "^1.6.2" + "preact": "^10.16.0" } } diff --git a/src/sse.ts b/src/sse.ts index c5b76742..b1258e57 100644 --- a/src/sse.ts +++ b/src/sse.ts @@ -1,5 +1,4 @@ import fetch from "cross-fetch"; -import ReconnectingEventSource from "reconnecting-eventsource"; import { ABLY_REALTIME_HOST, ABLY_REST_HOST } from "./config"; @@ -15,7 +14,8 @@ const ABLY_TOKEN_ERROR_MIN = 40140; const ABLY_TOKEN_ERROR_MAX = 40149; export class AblySSEChannel { - private eventSource: ReconnectingEventSource | null = null; + private isOpen: boolean = false; + private eventSource: EventSource | null = null; private retryInterval: ReturnType | null = null; private debug: boolean; @@ -100,10 +100,17 @@ export class AblySSEChannel { private async onError(e: Event) { if (e instanceof MessageEvent) { - const errorPayload = JSON.parse(e.data); - const errorCode = Number(errorPayload?.code); + let errorCode: number | undefined; + + try { + const errorPayload = JSON.parse(e.data); + errorCode = errorPayload?.code && Number(errorPayload.code); + } catch (error: any) { + this.warn("received unparseable error message", error, e); + } if ( + errorCode && errorCode >= ABLY_TOKEN_ERROR_MIN && errorCode <= ABLY_TOKEN_ERROR_MAX ) { @@ -111,10 +118,10 @@ export class AblySSEChannel { } } else { const connectionState = (e as any)?.target?.readyState; + if (connectionState === 2) { this.log("event source connection closed", e); - } - if (connectionState === 1) { + } else if (connectionState === 1) { this.warn("event source connection failed to open", e); } else { this.warn("event source unexpected error occured", e); @@ -125,16 +132,30 @@ export class AblySSEChannel { } private onMessage(e: MessageEvent) { - if (e.data) { - const message = JSON.parse(e.data); - if (message.data) { - const payload = JSON.parse(message.data); + let payload: any; - this.log("received message", payload); - this.messageHandler(payload); + try { + if (e.data) { + const message = JSON.parse(e.data); + if (message.data) { + payload = JSON.parse(message.data); + } + } + } catch (error: any) { + this.warn("received unparseable message", error, e); + return; + } + + if (payload) { + this.log("received message", payload); - return; + try { + this.messageHandler(payload); + } catch (error: any) { + this.warn("failed to handle message", error, payload); } + + return; } this.warn("received invalid message", e); @@ -145,29 +166,45 @@ export class AblySSEChannel { } public async connect() { - this.disconnect(); - const token = await this.refreshToken(); + if (this.isOpen) { + this.warn("channel connection already open"); + return; + } - this.eventSource = new ReconnectingEventSource( - `${ABLY_REALTIME_HOST}/sse?v=1.2&accessToken=${encodeURIComponent( - token.token, - )}&channels=${encodeURIComponent(this.channel)}&rewind=1`, - ); + this.isOpen = true; + try { + const token = await this.refreshToken(); - this.eventSource.addEventListener("error", (e) => this.onError(e)); - this.eventSource.addEventListener("open", (e) => this.onOpen(e)); - this.eventSource.addEventListener("message", (m) => this.onMessage(m)); + this.eventSource = new EventSource( + `${ABLY_REALTIME_HOST}/sse?v=1.2&accessToken=${encodeURIComponent( + token.token, + )}&channels=${encodeURIComponent(this.channel)}&rewind=1`, + ); - this.log("channel connection opened"); + this.eventSource.addEventListener("error", (e) => this.onError(e)); + this.eventSource.addEventListener("open", (e) => this.onOpen(e)); + this.eventSource.addEventListener("message", (m) => this.onMessage(m)); + + this.log("channel connection opened"); + } finally { + this.isOpen = !!this.eventSource; + } } public disconnect() { + if (!this.isOpen) { + this.warn("channel connection already closed"); + return; + } + if (this.eventSource) { this.eventSource.close(); this.eventSource = null; this.log("channel connection closed"); } + + this.isOpen = false; } public open(options?: { retryInterval?: number; retryCount?: number }) { @@ -194,12 +231,8 @@ export class AblySSEChannel { void tryConnect(); this.retryInterval = setInterval(() => { - if (!this.eventSource && this.retryInterval) { + if (!this.isOpen && this.retryInterval) { if (retriesRemaining <= 0) { - this.warn( - "failed to initiate a connection to feedback prompting, all retries exhausted", - ); - clearInterval(this.retryInterval); this.retryInterval = null; return; @@ -216,15 +249,16 @@ export class AblySSEChannel { clearInterval(this.retryInterval); this.retryInterval = null; } + this.disconnect(); } - public isOpen() { - return this.retryInterval !== null; + public isActive() { + return !!this.retryInterval; } public isConnected() { - return this.eventSource !== null; + return this.isOpen; } } diff --git a/test/sse.test.ts b/test/sse.test.ts index 667cefaf..676bd9ef 100644 --- a/test/sse.test.ts +++ b/test/sse.test.ts @@ -1,15 +1,6 @@ import flushPromises from "flush-promises"; import nock from "nock"; -import ReconnectingEventSource from "reconnecting-eventsource"; -import { - afterEach, - beforeEach, - describe, - expect, - test, - vi, - vitest, -} from "vitest"; +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; import { ABLY_REST_HOST } from "../src/config"; import { @@ -30,13 +21,8 @@ const tokenDetails = { const userId = "foo"; const channel = "channel"; -vitest.mock("reconnecting-eventsource", () => { - return { - default: vi.fn().mockReturnValue({ - addEventListener: vi.fn(), - close: vi.fn(), - }), - }; +Object.defineProperty(window, "EventSource", { + value: vi.fn(), }); function setupAuthNock(success: boolean | number) { @@ -81,7 +67,7 @@ describe("connection handling", () => { await expect(sse.connect()).rejects.toThrowError(); - expect(vi.mocked(ReconnectingEventSource)).not.toHaveBeenCalled(); + expect(vi.mocked(window.EventSource)).not.toHaveBeenCalled(); }); test("rejects if auth endpoint is not 200", async () => { @@ -91,7 +77,7 @@ describe("connection handling", () => { await expect(sse.connect()).rejects.toThrowError(); - expect(vi.mocked(ReconnectingEventSource)).not.toHaveBeenCalled(); + expect(vi.mocked(window.EventSource)).not.toHaveBeenCalled(); }); test("rejects if token endpoint rejects", async () => { @@ -102,7 +88,7 @@ describe("connection handling", () => { await expect(sse.connect()).rejects.toThrowError(); - expect(vi.mocked(ReconnectingEventSource)).not.toHaveBeenCalled(); + expect(vi.mocked(window.EventSource)).not.toHaveBeenCalled(); }); test("obtains token, connects and subscribes, then closes", async () => { @@ -111,7 +97,7 @@ describe("connection handling", () => { const addEventListener = vi.fn(); const close = vi.fn(); - vi.mocked(ReconnectingEventSource).mockReturnValue({ + vi.mocked(window.EventSource).mockReturnValue({ addEventListener, close, } as any); @@ -121,7 +107,7 @@ describe("connection handling", () => { await sse.connect(); - expect(vi.mocked(ReconnectingEventSource)).toHaveBeenCalledTimes(1); + expect(vi.mocked(window.EventSource)).toHaveBeenCalledTimes(1); expect(addEventListener).toHaveBeenCalledTimes(3); expect(addEventListener).toHaveBeenCalledWith( "error", @@ -141,11 +127,11 @@ describe("connection handling", () => { expect(sse.isConnected()).toBe(false); }); - test("disconnects and re-requests token on re-connect", async () => { + test("does not try to re-connect if already connecting", async () => { const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn()); const close = vi.fn(); - vi.mocked(ReconnectingEventSource).mockReturnValue({ + vi.mocked(window.EventSource).mockReturnValue({ addEventListener: vi.fn(), close, } as any); @@ -153,22 +139,40 @@ describe("connection handling", () => { setupAuthNock(true); setupTokenNock(true); - await sse.connect(); + const c1 = sse.connect(); + const c2 = sse.connect(); + + await c1; + await c2; + + expect(close).toHaveBeenCalledTimes(0); + expect(vi.mocked(window.EventSource)).toHaveBeenCalledTimes(1); + }); + + test("does not re-connect if already connected", async () => { + const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn()); + + const close = vi.fn(); + vi.mocked(window.EventSource).mockReturnValue({ + addEventListener: vi.fn(), + close, + } as any); setupAuthNock(true); setupTokenNock(true); + await sse.connect(); await sse.connect(); - expect(close).toHaveBeenCalledTimes(1); - expect(vi.mocked(ReconnectingEventSource)).toHaveBeenCalledTimes(2); + expect(close).toHaveBeenCalledTimes(0); + expect(vi.mocked(window.EventSource)).toHaveBeenCalledTimes(1); }); test("disconnects only if connected", async () => { const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn()); const close = vi.fn(); - vi.mocked(ReconnectingEventSource).mockReturnValue({ + vi.mocked(window.EventSource).mockReturnValue({ close, } as any); @@ -202,7 +206,7 @@ describe("message handling", () => { } }; - vi.mocked(ReconnectingEventSource).mockReturnValue({ + vi.mocked(window.EventSource).mockReturnValue({ addEventListener, } as any); @@ -237,7 +241,7 @@ describe("message handling", () => { }; const close = vi.fn(); - vi.mocked(ReconnectingEventSource).mockReturnValue({ + vi.mocked(window.EventSource).mockReturnValue({ addEventListener, close, } as any); @@ -261,7 +265,7 @@ describe("message handling", () => { }; const close = vi.fn(); - vi.mocked(ReconnectingEventSource).mockReturnValue({ + vi.mocked(window.EventSource).mockReturnValue({ addEventListener, close, } as any); @@ -290,7 +294,7 @@ describe("message handling", () => { }; const close = vi.fn(); - vi.mocked(ReconnectingEventSource).mockReturnValue({ + vi.mocked(window.EventSource).mockReturnValue({ addEventListener, close, } as any); @@ -353,6 +357,7 @@ describe("automatic retries", () => { const n2 = setupAuthNock(true); const n3 = setupTokenNock(true); + await flushPromises(); expect(sse.isConnected()).toBe(false); @@ -365,7 +370,34 @@ describe("automatic retries", () => { await flushPromises(); expect(sse.isConnected()).toBe(true); - expect(sse.isOpen()).toBe(true); + expect(sse.isActive()).toBe(true); + }); + + test("only, ever, allow one connection to SSE", async () => { + const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn()); + + const n1 = setupAuthNock(true); + const n2 = setupTokenNock(true); + + vi.useFakeTimers(); + sse.open({ retryInterval: 1, retryCount: 100 }); + + for (let i = 0; i < 10; i++) { + await flushPromises(); + vi.advanceTimersByTime(1); + } + + await nockWait(n1); + await nockWait(n2); + + await flushPromises(); + + expect(sse.isConnected()).toBe(true); + expect(sse.isActive()).toBe(true); + + expect(vi.mocked(window.EventSource)).toHaveBeenCalledOnce(); + + vi.useRealTimers(); }); test("resets retry count on successfull connect", async () => { @@ -380,7 +412,7 @@ describe("automatic retries", () => { }; const close = vi.fn(); - vi.mocked(ReconnectingEventSource).mockReturnValue({ + vi.mocked(window.EventSource).mockReturnValue({ addEventListener, close, } as any); @@ -388,8 +420,11 @@ describe("automatic retries", () => { // make initial failed attempt vi.useFakeTimers(); const n = setupAuthNock(false); + sse.open({ retryInterval: 100, retryCount: 1 }); + await nockWait(n); + await flushPromises(); const attempt = async () => { const n1 = setupAuthNock(true); @@ -424,6 +459,11 @@ describe("automatic retries", () => { test("reconnects if manually disconnected", async () => { const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn()); + vi.mocked(window.EventSource).mockReturnValue({ + addEventListener: vi.fn(), + close: vi.fn(), + } as any); + const n1 = setupAuthNock(true); const n2 = setupTokenNock(true); @@ -450,7 +490,7 @@ describe("automatic retries", () => { await flushPromises(); expect(sse.isConnected()).toBe(true); - expect(sse.isOpen()).toBe(true); + expect(sse.isActive()).toBe(true); }); test("opens and does not connect later to a failed channel if no retries", async () => { @@ -465,13 +505,14 @@ describe("automatic retries", () => { }); await nockWait(n1); + await flushPromises(); vi.advanceTimersByTime(100); vi.useRealTimers(); await flushPromises(); - expect(sse.isOpen()).toBe(false); + expect(sse.isActive()).toBe(false); }); test("closes an open channel", async () => { @@ -481,7 +522,7 @@ describe("automatic retries", () => { const n2 = setupTokenNock(true); const close = vi.fn(); - vi.mocked(ReconnectingEventSource).mockReturnValue({ + vi.mocked(window.EventSource).mockReturnValue({ addEventListener: vi.fn(), close, } as any); @@ -498,7 +539,7 @@ describe("automatic retries", () => { expect(sse.isConnected()).toBe(false); expect(close).toHaveBeenCalledTimes(1); - expect(sse.isOpen()).toBe(false); + expect(sse.isActive()).toBe(false); }); }); @@ -524,7 +565,7 @@ describe("helper open and close functions", () => { const sse = openAblySSEChannel(ablyAuthUrl, userId, channel, vi.fn()); - expect(sse.isOpen()).toBe(true); + expect(sse.isActive()).toBe(true); await nockWait(n1); await nockWait(n2); @@ -533,6 +574,6 @@ describe("helper open and close functions", () => { closeAblySSEChannel(sse); expect(sse.isConnected()).toBe(false); - expect(sse.isOpen()).toBe(false); + expect(sse.isActive()).toBe(false); }); }); diff --git a/yarn.lock b/yarn.lock index 5d77a479..caf91fc3 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4202,11 +4202,6 @@ rechoir@^0.8.0: dependencies: resolve "^1.20.0" -reconnecting-eventsource@^1.6.2: - version "1.6.2" - resolved "https://registry.yarnpkg.com/reconnecting-eventsource/-/reconnecting-eventsource-1.6.2.tgz#b7f5b03b1c76291f6fbcb0203004892a57ae253b" - integrity sha512-vHhoxVLbA2YcfljWMKEbgR1KVTgwIrnyh/bzVJc+gfQbGcUIToLL6jNhkUL4E+9FbnAcfUVNLIw2YCiliTg/4g== - regexp.prototype.flags@^1.5.0: version "1.5.0" resolved "https://registry.yarnpkg.com/regexp.prototype.flags/-/regexp.prototype.flags-1.5.0.tgz#fe7ce25e7e4cca8db37b6634c8a2c7009199b9cb"