Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve retries and add more guarding code #75

Merged
merged 9 commits into from
Nov 8, 2023
58 changes: 34 additions & 24 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export default function main() {
let trackingHost: string = TRACKING_HOST;
let sessionUserId: string | undefined = undefined;
let persistUser: boolean = !isForNode;
let sseChannelActive: boolean = false;
pavkam marked this conversation as resolved.
Show resolved Hide resolved
let sseChannel: AblySSEChannel | undefined = undefined;
let liveFeedback: boolean = !isForNode;
let feedbackPromptHandler: FeedbackPromptHandler | undefined = undefined;
Expand Down Expand Up @@ -101,8 +102,6 @@ export default function main() {
return userId!;
}

// methods

function init(key: Key, options: Options = {}) {
reset();
if (!key) {
Expand Down Expand Up @@ -162,7 +161,7 @@ export default function main() {
reset();
}
sessionUserId = userId;
if (liveFeedback && !sseChannel) {
if (liveFeedback && !sseChannelActive) {
await initLiveSatisfaction(userId);
}
}
Expand Down Expand Up @@ -264,7 +263,7 @@ export default function main() {
err("Feedback prompting is not supported in Node.js environment");
}

if (sseChannel) {
if (sseChannelActive) {
err("Feedback prompting already initialized. Use reset() first.");
}

Expand All @@ -275,28 +274,38 @@ export default function main() {

userId = resolveUser(userId);

const res = await request(`${getUrl()}/feedback/prompting-init`, {
userId,
});
log(`feedback prompting status sent`, res);
const body: { success: boolean; channel?: string } = await res.json();
if (!body.success || !body.channel) {
log(`feedback prompting not enabled`);
return res;
}
// while initializeing, consider the channel active
pavkam marked this conversation as resolved.
Show resolved Hide resolved
sseChannelActive = true;
try {
const res = await request(`${getUrl()}/feedback/prompting-init`, {
userId,
});

log(`feedback prompting status sent`, res);
const body: { success: boolean; channel?: string } = await res.json();
if (!body.success || !body.channel) {
log(`feedback prompting not enabled`);
return res;
}

log(`feedback prompting enabled`);
log(`feedback prompting enabled`);

feedbackPromptingUserId = userId;
sseChannel = openAblySSEChannel(
`${getUrl()}/feedback/prompting-auth`,
userId,
body.channel,
(message) => handleFeedbackPromptRequest(userId!, message),
{ debug },
);
log(`feedback prompting connection established`);
return res;
sseChannel = openAblySSEChannel(
`${getUrl()}/feedback/prompting-auth`,
userId,
body.channel,
(message) => handleFeedbackPromptRequest(userId!, message),
{ debug },
);

feedbackPromptingUserId = userId;

log(`feedback prompting connection established`);
return res;
} finally {
// check that SSE channel has actually been opened, otherwise reset the value
sseChannelActive = !!sseChannel;
}
}

function handleFeedbackPromptRequest(userId: User["userId"], message: any) {
Expand Down Expand Up @@ -483,6 +492,7 @@ export default function main() {
function reset() {
sessionUserId = undefined;
feedbackPromptingUserId = undefined;
sseChannelActive = false;
if (sseChannel) {
closeAblySSEChannel(sseChannel);
log(`feedback prompting connection closed`);
Expand Down
51 changes: 34 additions & 17 deletions src/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,21 @@ export class AblySSEChannel {
errorCode >= ABLY_TOKEN_ERROR_MIN &&
errorCode <= ABLY_TOKEN_ERROR_MAX
) {
this.log("token expired, refreshing");
await this.connect().catch((x) =>
this.warn("failed to refresh token", x),
);
this.log("event source token expired, refresh required");
}
return;
}

if ((e as any)?.target?.readyState === 2) {
this.log("event source connection closed");
} else {
this.warn("unexpected error occured", e);
const connectionState = (e as any)?.target?.readyState;
if (connectionState === 2) {
this.log("event source connection closed", e);
}
if (connectionState === 1) {
this.warn("event source connection failed to open", e);
} else {
this.warn("event source unexpected error occured", e);
}
}

this.disconnect();
}

private onMessage(e: MessageEvent) {
Expand Down Expand Up @@ -170,25 +172,40 @@ export class AblySSEChannel {

public open(options?: { retryInterval?: number; retryCount?: number }) {
const retryInterval = options?.retryInterval ?? 1000 * 30;
let retryCount = options?.retryCount ?? 3;
const retryCount = options?.retryCount ?? 3;
let retriesRemaining = retryCount;

const tryConnect = async () => {
await this.connect().catch((e) =>
this.warn(`failed to connect. ${retryCount} retries remaining`, e),
);
try {
await this.connect();
retriesRemaining = retryCount;
} catch (e) {
if (retriesRemaining > 0) {
this.warn(
`failed to connect, ${retriesRemaining} retries remaining`,
e,
);
} else {
this.warn(`failed to connect, no retries remaining`, e);
}
}
};

void tryConnect();

this.retryInterval = setInterval(() => {
if (!this.eventSource && this.retryInterval) {
if (retryCount <= 0) {
if (retriesRemaining <= 0) {
this.warn(
"failed to initiate a connection to feedback prompting, all retries exhausted",
);

clearInterval(this.retryInterval);
this.retryInterval = null;
return;
}

retryCount--;
retriesRemaining--;
void tryConnect();
}
}, retryInterval);
Expand All @@ -198,8 +215,8 @@ export class AblySSEChannel {
if (this.retryInterval) {
clearInterval(this.retryInterval);
this.retryInterval = null;
this.disconnect();
}
this.disconnect();
}

public isOpen() {
Expand Down
97 changes: 85 additions & 12 deletions test/sse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ vitest.mock("reconnecting-eventsource", () => {
return {
default: vi.fn().mockReturnValue({
addEventListener: vi.fn(),
close: vi.fn(),
}),
};
});
Expand Down Expand Up @@ -225,7 +226,7 @@ describe("message handling", () => {
expect(callback).toHaveBeenCalledTimes(1);
});

test("does not respond to unknown errors", async () => {
test("disconnects on unknown event source errors without data", async () => {
const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn());

let errorCallback: ((e: Event) => Promise<void>) | undefined = undefined;
Expand All @@ -246,18 +247,39 @@ describe("message handling", () => {
expect(errorCallback).toBeDefined();

await errorCallback!({} as any);
expect(close).not.toHaveBeenCalled();
expect(close).toHaveBeenCalledTimes(1);
});

test("disconnects on unknown event source errors with data", async () => {
const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn());

let errorCallback: ((e: Event) => Promise<void>) | undefined = undefined;
const addEventListener = (event: string, cb: (e: Event) => void) => {
if (event === "error") {
errorCallback = cb as typeof errorCallback;
}
};

const close = vi.fn();
vi.mocked(ReconnectingEventSource).mockReturnValue({
addEventListener,
close,
} as any);

await sse.connect();

expect(errorCallback).toBeDefined();

await errorCallback!(
new MessageEvent("error", {
data: JSON.stringify({ code: 400 }),
}),
);

expect(close).not.toHaveBeenCalled();
expect(close).toHaveBeenCalledTimes(1);
});

test("resets the connection and refreshes token for ably expiry errors", async () => {
test("disconnects when ably reports token is expired", async () => {
const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn());

let errorCallback: ((e: Event) => Promise<void>) | undefined = undefined;
Expand All @@ -275,21 +297,17 @@ describe("message handling", () => {

await sse.connect();

setupAuthNock(true);
setupTokenNock(true);

await errorCallback!(
new MessageEvent("error", {
data: JSON.stringify({ code: 40140 }),
}),
);

expect(close).toHaveBeenCalled();
expect(vi.mocked(ReconnectingEventSource)).toHaveBeenCalledTimes(2);
});
});

describe("automatic auth retries", () => {
describe("automatic retries", () => {
const nockWait = (n: nock.Scope) => {
return new Promise((resolve) => {
n.on("replied", () => {
Expand All @@ -298,13 +316,15 @@ describe("automatic auth retries", () => {
});
};

afterEach(() => {
expect(nock.isDone()).toBe(true);

beforeEach(() => {
vi.clearAllMocks();
nock.cleanAll();
});

afterEach(() => {
expect(nock.isDone()).toBe(true);
});

test("opens and connects to a channel", async () => {
const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn());

Expand Down Expand Up @@ -348,6 +368,59 @@ describe("automatic auth retries", () => {
expect(sse.isOpen()).toBe(true);
});

test("resets retry count on successfull connect", async () => {
const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn());

// mock event source
let errorCallback: ((e: Event) => Promise<void>) | undefined = undefined;
const addEventListener = (event: string, cb: (e: Event) => void) => {
if (event === "error") {
errorCallback = cb as typeof errorCallback;
}
};

const close = vi.fn();
vi.mocked(ReconnectingEventSource).mockReturnValue({
addEventListener,
close,
} as any);

// make initial failed attempt
vi.useFakeTimers();
const n = setupAuthNock(false);
sse.open({ retryInterval: 100, retryCount: 1 });
await nockWait(n);

const attempt = async () => {
const n1 = setupAuthNock(true);
const n2 = setupTokenNock(true);

vi.advanceTimersByTime(100);

await nockWait(n1);
await nockWait(n2);

await flushPromises();
vi.advanceTimersByTime(100);

expect(sse.isConnected()).toBe(true);

// simulate an error
vi.advanceTimersByTime(100);

await errorCallback!({} as any);
await flushPromises();

expect(sse.isConnected()).toBe(false);
};

await attempt();
await attempt();
await attempt();

vi.useRealTimers();
});

test("reconnects if manually disconnected", async () => {
const sse = new AblySSEChannel(userId, channel, ablyAuthUrl, vi.fn());

Expand Down
Loading
Loading