Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve retries and add more guarding code #75

Merged
merged 9 commits into from
Nov 8, 2023
13 changes: 13 additions & 0 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,19 @@ export default function main() {

log(`feedback prompting enabled`);

if (sseChannel) {
pavkam marked this conversation as resolved.
Show resolved Hide resolved
if (feedbackPromptingUserId !== userId) {
err(
"Feedback prompting already initialized for a different user. Are you calling initLiveSatisfaction() or user() in a loop?",
);
} else {
warn(
"Feedback prompting already initialized for this user. Are you calling initLiveSatisfaction() or user() in a loop?",
);
return res;
}
}

feedbackPromptingUserId = userId;
sseChannel = openAblySSEChannel(
`${getUrl()}/feedback/prompting-auth`,
Expand Down
51 changes: 34 additions & 17 deletions src/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,21 @@ export class AblySSEChannel {
errorCode >= ABLY_TOKEN_ERROR_MIN &&
errorCode <= ABLY_TOKEN_ERROR_MAX
) {
this.log("token expired, refreshing");
await this.connect().catch((x) =>
this.warn("failed to refresh token", x),
);
this.log("event source token expired, refresh required");
}
return;
}

if ((e as any)?.target?.readyState === 2) {
this.log("event source connection closed");
} else {
this.warn("unexpected error occured", e);
const connectionState = (e as any)?.target?.readyState;
if (connectionState === 2) {
this.log("event source connection closed", e);
}
if (connectionState === 1) {
this.warn("event source connection failed to open", e);
} else {
this.warn("event source unexpected error occured", e);
}
}

this.disconnect();
}

private onMessage(e: MessageEvent) {
Expand Down Expand Up @@ -170,25 +172,40 @@ export class AblySSEChannel {

public open(options?: { retryInterval?: number; retryCount?: number }) {
const retryInterval = options?.retryInterval ?? 1000 * 30;
let retryCount = options?.retryCount ?? 3;
const retryCount = options?.retryCount ?? 3;
let retriesRemaining = retryCount;

const tryConnect = async () => {
await this.connect().catch((e) =>
this.warn(`failed to connect. ${retryCount} retries remaining`, e),
);
try {
await this.connect();
retriesRemaining = retryCount;
} catch (e) {
if (retriesRemaining > 0) {
this.warn(
`failed to connect, ${retriesRemaining} retries remaining`,
e,
);
} else {
this.warn(`failed to connect, no retries remaining`, e);
}
}
};

void tryConnect();

this.retryInterval = setInterval(() => {
if (!this.eventSource && this.retryInterval) {
if (retryCount <= 0) {
if (retriesRemaining <= 0) {
this.warn(
"failed to initiate a connection to feedback prompting, all retries exhausted",
);

clearInterval(this.retryInterval);
this.retryInterval = null;
return;
}

retryCount--;
retriesRemaining--;
void tryConnect();
}
}, retryInterval);
Expand All @@ -198,8 +215,8 @@ export class AblySSEChannel {
if (this.retryInterval) {
clearInterval(this.retryInterval);
this.retryInterval = null;
this.disconnect();
}
this.disconnect();
}

public isOpen() {
Expand Down
97 changes: 85 additions & 12 deletions test/sse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ vitest.mock("reconnecting-eventsource", () => {
return {
default: vi.fn().mockReturnValue({
addEventListener: vi.fn(),
close: vi.fn(),
}),
};
});
Expand Down Expand Up @@ -225,7 +226,7 @@ describe("message handling", () => {
expect(callback).toHaveBeenCalledTimes(1);
});

test("does not respond to unknown errors", async () => {
test("disconnects on unknown event source errors without data", async () => {
const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn());

let errorCallback: ((e: Event) => Promise<void>) | undefined = undefined;
Expand All @@ -246,18 +247,39 @@ describe("message handling", () => {
expect(errorCallback).toBeDefined();

await errorCallback!({} as any);
expect(close).not.toHaveBeenCalled();
expect(close).toHaveBeenCalledTimes(1);
});

test("disconnects on unknown event source errors with data", async () => {
const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn());

let errorCallback: ((e: Event) => Promise<void>) | undefined = undefined;
const addEventListener = (event: string, cb: (e: Event) => void) => {
if (event === "error") {
errorCallback = cb as typeof errorCallback;
}
};

const close = vi.fn();
vi.mocked(ReconnectingEventSource).mockReturnValue({
addEventListener,
close,
} as any);

await sse.connect();

expect(errorCallback).toBeDefined();

await errorCallback!(
new MessageEvent("error", {
data: JSON.stringify({ code: 400 }),
}),
);

expect(close).not.toHaveBeenCalled();
expect(close).toHaveBeenCalledTimes(1);
});

test("resets the connection and refreshes token for ably expiry errors", async () => {
test("disconnects when ably reports token is expired", async () => {
const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn());

let errorCallback: ((e: Event) => Promise<void>) | undefined = undefined;
Expand All @@ -275,21 +297,17 @@ describe("message handling", () => {

await sse.connect();

setupAuthNock(true);
setupTokenNock(true);

await errorCallback!(
new MessageEvent("error", {
data: JSON.stringify({ code: 40140 }),
}),
);

expect(close).toHaveBeenCalled();
expect(vi.mocked(ReconnectingEventSource)).toHaveBeenCalledTimes(2);
});
});

describe("automatic auth retries", () => {
describe("automatic retries", () => {
const nockWait = (n: nock.Scope) => {
return new Promise((resolve) => {
n.on("replied", () => {
Expand All @@ -298,13 +316,15 @@ describe("automatic auth retries", () => {
});
};

afterEach(() => {
expect(nock.isDone()).toBe(true);

beforeEach(() => {
vi.clearAllMocks();
nock.cleanAll();
});

afterEach(() => {
expect(nock.isDone()).toBe(true);
});

test("opens and connects to a channel", async () => {
const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn());

Expand Down Expand Up @@ -348,6 +368,59 @@ describe("automatic auth retries", () => {
expect(sse.isOpen()).toBe(true);
});

test("resets retry count on successfull connect", async () => {
const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn());

// mock event source
let errorCallback: ((e: Event) => Promise<void>) | undefined = undefined;
const addEventListener = (event: string, cb: (e: Event) => void) => {
if (event === "error") {
errorCallback = cb as typeof errorCallback;
}
};

const close = vi.fn();
vi.mocked(ReconnectingEventSource).mockReturnValue({
addEventListener,
close,
} as any);

// make initial failed attempt
vi.useFakeTimers();
const n = setupAuthNock(false);
sse.open({ retryInterval: 100, retryCount: 1 });
await nockWait(n);

const attempt = async () => {
const n1 = setupAuthNock(true);
const n2 = setupTokenNock(true);

vi.advanceTimersByTime(100);

await nockWait(n1);
await nockWait(n2);

await flushPromises();
vi.advanceTimersByTime(100);

expect(sse.isConnected()).toBe(true);

// simulate an error
vi.advanceTimersByTime(100);

await errorCallback!({} as any);
await flushPromises();

expect(sse.isConnected()).toBe(false);
};

await attempt();
await attempt();
await attempt();

vi.useRealTimers();
});

test("reconnects if manually disconnected", async () => {
const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn());

Expand Down
Loading