diff --git a/src/main.ts b/src/main.ts index eb766c00..e72c8265 100644 --- a/src/main.ts +++ b/src/main.ts @@ -47,8 +47,9 @@ export default function main() { let trackingHost: string = TRACKING_HOST; let sessionUserId: string | undefined = undefined; let persistUser: boolean = !isForNode; + let liveSatisfactionActive: boolean = false; let sseChannel: AblySSEChannel | undefined = undefined; - let liveFeedback: boolean = !isForNode; + let liveSatisfactionEnabled: boolean = !isForNode; let feedbackPromptHandler: FeedbackPromptHandler | undefined = undefined; let feedbackPromptingUserId: string | undefined = undefined; let feedbackPosition: FeedbackPosition | undefined = undefined; @@ -101,8 +102,6 @@ export default function main() { return userId!; } - // methods - function init(key: Key, options: Options = {}) { reset(); if (!key) { @@ -127,18 +126,18 @@ export default function main() { } if (typeof options.feedback?.enableLiveFeedback !== "undefined") { - liveFeedback = options.feedback.enableLiveFeedback; + liveSatisfactionEnabled = options.feedback.enableLiveFeedback; } if (typeof options.feedback?.enableLiveSatisfaction !== "undefined") { - liveFeedback = options.feedback.enableLiveSatisfaction; + liveSatisfactionEnabled = options.feedback.enableLiveSatisfaction; } - if (liveFeedback && isForNode) { + if (liveSatisfactionEnabled && isForNode) { err("Feedback prompting is not supported in Node.js environment"); } - if (liveFeedback && !persistUser) { + if (liveSatisfactionEnabled && !persistUser) { err("Feedback prompting is not supported when persistUser is disabled"); } @@ -162,7 +161,7 @@ export default function main() { reset(); } sessionUserId = userId; - if (liveFeedback && !sseChannel) { + if (liveSatisfactionEnabled && !liveSatisfactionActive) { await initLiveSatisfaction(userId); } } @@ -264,7 +263,7 @@ export default function main() { err("Feedback prompting is not supported in Node.js environment"); } - if (sseChannel) { + if (liveSatisfactionActive) { err("Feedback prompting already initialized. Use reset() first."); } @@ -275,28 +274,38 @@ export default function main() { userId = resolveUser(userId); - const res = await request(`${getUrl()}/feedback/prompting-init`, { - userId, - }); - log(`feedback prompting status sent`, res); - const body: { success: boolean; channel?: string } = await res.json(); - if (!body.success || !body.channel) { - log(`feedback prompting not enabled`); - return res; - } + // while initializing, consider the channel active + liveSatisfactionActive = true; + try { + const res = await request(`${getUrl()}/feedback/prompting-init`, { + userId, + }); + + log(`feedback prompting status sent`, res); + const body: { success: boolean; channel?: string } = await res.json(); + if (!body.success || !body.channel) { + log(`feedback prompting not enabled`); + return res; + } - log(`feedback prompting enabled`); + log(`feedback prompting enabled`); - feedbackPromptingUserId = userId; - sseChannel = openAblySSEChannel( - `${getUrl()}/feedback/prompting-auth`, - userId, - body.channel, - (message) => handleFeedbackPromptRequest(userId!, message), - { debug }, - ); - log(`feedback prompting connection established`); - return res; + sseChannel = openAblySSEChannel( + `${getUrl()}/feedback/prompting-auth`, + userId, + body.channel, + (message) => handleFeedbackPromptRequest(userId!, message), + { debug }, + ); + + feedbackPromptingUserId = userId; + + log(`feedback prompting connection established`); + return res; + } finally { + // check that SSE channel has actually been opened, otherwise reset the value + liveSatisfactionActive = !!sseChannel; + } } function handleFeedbackPromptRequest(userId: User["userId"], message: any) { @@ -483,6 +492,7 @@ export default function main() { function reset() { sessionUserId = undefined; feedbackPromptingUserId = undefined; + liveSatisfactionActive = false; if (sseChannel) { closeAblySSEChannel(sseChannel); log(`feedback prompting connection closed`); diff --git a/src/sse.ts b/src/sse.ts index c7dca885..c5b76742 100644 --- a/src/sse.ts +++ b/src/sse.ts @@ -107,19 +107,21 @@ export class AblySSEChannel { errorCode >= ABLY_TOKEN_ERROR_MIN && errorCode <= ABLY_TOKEN_ERROR_MAX ) { - this.log("token expired, refreshing"); - await this.connect().catch((x) => - this.warn("failed to refresh token", x), - ); + this.log("event source token expired, refresh required"); } - return; - } - - if ((e as any)?.target?.readyState === 2) { - this.log("event source connection closed"); } else { - this.warn("unexpected error occured", e); + const connectionState = (e as any)?.target?.readyState; + if (connectionState === 2) { + this.log("event source connection closed", e); + } + if (connectionState === 1) { + this.warn("event source connection failed to open", e); + } else { + this.warn("event source unexpected error occured", e); + } } + + this.disconnect(); } private onMessage(e: MessageEvent) { @@ -170,25 +172,40 @@ export class AblySSEChannel { public open(options?: { retryInterval?: number; retryCount?: number }) { const retryInterval = options?.retryInterval ?? 1000 * 30; - let retryCount = options?.retryCount ?? 3; + const retryCount = options?.retryCount ?? 3; + let retriesRemaining = retryCount; const tryConnect = async () => { - await this.connect().catch((e) => - this.warn(`failed to connect. ${retryCount} retries remaining`, e), - ); + try { + await this.connect(); + retriesRemaining = retryCount; + } catch (e) { + if (retriesRemaining > 0) { + this.warn( + `failed to connect, ${retriesRemaining} retries remaining`, + e, + ); + } else { + this.warn(`failed to connect, no retries remaining`, e); + } + } }; void tryConnect(); this.retryInterval = setInterval(() => { if (!this.eventSource && this.retryInterval) { - if (retryCount <= 0) { + if (retriesRemaining <= 0) { + this.warn( + "failed to initiate a connection to feedback prompting, all retries exhausted", + ); + clearInterval(this.retryInterval); this.retryInterval = null; return; } - retryCount--; + retriesRemaining--; void tryConnect(); } }, retryInterval); @@ -198,8 +215,8 @@ export class AblySSEChannel { if (this.retryInterval) { clearInterval(this.retryInterval); this.retryInterval = null; - this.disconnect(); } + this.disconnect(); } public isOpen() { diff --git a/test/sse.test.ts b/test/sse.test.ts index 2ba9db89..667cefaf 100644 --- a/test/sse.test.ts +++ b/test/sse.test.ts @@ -34,6 +34,7 @@ vitest.mock("reconnecting-eventsource", () => { return { default: vi.fn().mockReturnValue({ addEventListener: vi.fn(), + close: vi.fn(), }), }; }); @@ -225,7 +226,7 @@ describe("message handling", () => { expect(callback).toHaveBeenCalledTimes(1); }); - test("does not respond to unknown errors", async () => { + test("disconnects on unknown event source errors without data", async () => { const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn()); let errorCallback: ((e: Event) => Promise) | undefined = undefined; @@ -246,7 +247,28 @@ describe("message handling", () => { expect(errorCallback).toBeDefined(); await errorCallback!({} as any); - expect(close).not.toHaveBeenCalled(); + expect(close).toHaveBeenCalledTimes(1); + }); + + test("disconnects on unknown event source errors with data", async () => { + const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn()); + + let errorCallback: ((e: Event) => Promise) | undefined = undefined; + const addEventListener = (event: string, cb: (e: Event) => void) => { + if (event === "error") { + errorCallback = cb as typeof errorCallback; + } + }; + + const close = vi.fn(); + vi.mocked(ReconnectingEventSource).mockReturnValue({ + addEventListener, + close, + } as any); + + await sse.connect(); + + expect(errorCallback).toBeDefined(); await errorCallback!( new MessageEvent("error", { @@ -254,10 +276,10 @@ describe("message handling", () => { }), ); - expect(close).not.toHaveBeenCalled(); + expect(close).toHaveBeenCalledTimes(1); }); - test("resets the connection and refreshes token for ably expiry errors", async () => { + test("disconnects when ably reports token is expired", async () => { const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn()); let errorCallback: ((e: Event) => Promise) | undefined = undefined; @@ -275,9 +297,6 @@ describe("message handling", () => { await sse.connect(); - setupAuthNock(true); - setupTokenNock(true); - await errorCallback!( new MessageEvent("error", { data: JSON.stringify({ code: 40140 }), @@ -285,11 +304,10 @@ describe("message handling", () => { ); expect(close).toHaveBeenCalled(); - expect(vi.mocked(ReconnectingEventSource)).toHaveBeenCalledTimes(2); }); }); -describe("automatic auth retries", () => { +describe("automatic retries", () => { const nockWait = (n: nock.Scope) => { return new Promise((resolve) => { n.on("replied", () => { @@ -298,13 +316,15 @@ describe("automatic auth retries", () => { }); }; - afterEach(() => { - expect(nock.isDone()).toBe(true); - + beforeEach(() => { vi.clearAllMocks(); nock.cleanAll(); }); + afterEach(() => { + expect(nock.isDone()).toBe(true); + }); + test("opens and connects to a channel", async () => { const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn()); @@ -348,6 +368,59 @@ describe("automatic auth retries", () => { expect(sse.isOpen()).toBe(true); }); + test("resets retry count on successfull connect", async () => { + const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn()); + + // mock event source + let errorCallback: ((e: Event) => Promise) | undefined = undefined; + const addEventListener = (event: string, cb: (e: Event) => void) => { + if (event === "error") { + errorCallback = cb as typeof errorCallback; + } + }; + + const close = vi.fn(); + vi.mocked(ReconnectingEventSource).mockReturnValue({ + addEventListener, + close, + } as any); + + // make initial failed attempt + vi.useFakeTimers(); + const n = setupAuthNock(false); + sse.open({ retryInterval: 100, retryCount: 1 }); + await nockWait(n); + + const attempt = async () => { + const n1 = setupAuthNock(true); + const n2 = setupTokenNock(true); + + vi.advanceTimersByTime(100); + + await nockWait(n1); + await nockWait(n2); + + await flushPromises(); + vi.advanceTimersByTime(100); + + expect(sse.isConnected()).toBe(true); + + // simulate an error + vi.advanceTimersByTime(100); + + await errorCallback!({} as any); + await flushPromises(); + + expect(sse.isConnected()).toBe(false); + }; + + await attempt(); + await attempt(); + await attempt(); + + vi.useRealTimers(); + }); + test("reconnects if manually disconnected", async () => { const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn()); diff --git a/test/usage.test.ts b/test/usage.test.ts index ac7794cb..674d94cb 100644 --- a/test/usage.test.ts +++ b/test/usage.test.ts @@ -381,6 +381,68 @@ describe("feedback prompting", () => { "Feedback prompting already initialized. Use reset() first.", ); }); + + test("rejects if feedback prompting already initialized (loop)", async () => { + nock(`${TRACKING_HOST}/${KEY}`) + .post(/.*\/feedback\/prompting-init/) + .reply(200, { success: true, channel: "test-channel" }); + + const bucketInstance = bucket(); + bucketInstance.init(KEY, { + persistUser: false, + feedback: { enableLiveSatisfaction: false }, + }); + + const init = bucketInstance.initLiveSatisfaction("foo"); + + const p1 = bucketInstance.initLiveSatisfaction("foo"); + const p2 = bucketInstance.initLiveSatisfaction("moo"); + + await expect(p1).rejects.toThrowError( + "Feedback prompting already initialized. Use reset() first.", + ); + await expect(p2).rejects.toThrowError( + "Feedback prompting already initialized. Use reset() first.", + ); + + await expect(init).resolves.not.toBeUndefined(); + }); + + test("does not think it is connected if the connection fails", async () => { + nock(`${TRACKING_HOST}/${KEY}`) + .post(/.*\/feedback\/prompting-init/) + .reply(200, { success: false }); + + const bucketInstance = bucket(); + bucketInstance.init(KEY, { + persistUser: false, + feedback: { enableLiveSatisfaction: false }, + }); + + await bucketInstance.initLiveSatisfaction("foo"); + + nock(`${TRACKING_HOST}/${KEY}`) + .post(/.*\/feedback\/prompting-init/) + .reply(200, { success: true, channel: "test-channel" }); + + vi.mocked(openAblySSEChannel).mockImplementation(() => { + throw new Error("something bad happened"); + }); + + await expect(() => + bucketInstance.initLiveSatisfaction("foo"), + ).rejects.toThrowError("something bad happened"); + + nock(`${TRACKING_HOST}/${KEY}`) + .post(/.*\/feedback\/prompting-init/) + .reply(200, { success: true, channel: "test-channel" }); + + vi.mocked(openAblySSEChannel).mockImplementation(() => { + return "fake_client" as any; + }); + + await bucketInstance.initLiveSatisfaction("foo"); + }); }); describe("feedback state management", () => {