Skip to content

Commit

Permalink
Remove the reconnecting event source and add some checks (#77)
Browse files Browse the repository at this point in the history
* feat: remove the reconnecting event source

* feat: additional race-condition protection
  • Loading branch information
pavkam committed Nov 10, 2023
1 parent 7e7f3f4 commit 735f627
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 79 deletions.
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
"cross-fetch": "^4.0.0",
"is-bundling-for-browser-or-node": "^1.1.1",
"js-cookie": "^3.0.5",
"preact": "^10.16.0",
"reconnecting-eventsource": "^1.6.2"
"preact": "^10.16.0"
}
}
98 changes: 66 additions & 32 deletions src/sse.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import fetch from "cross-fetch";
import ReconnectingEventSource from "reconnecting-eventsource";

import { ABLY_REALTIME_HOST, ABLY_REST_HOST } from "./config";

Expand All @@ -15,7 +14,8 @@ const ABLY_TOKEN_ERROR_MIN = 40140;
const ABLY_TOKEN_ERROR_MAX = 40149;

export class AblySSEChannel {
private eventSource: ReconnectingEventSource | null = null;
private isOpen: boolean = false;
private eventSource: EventSource | null = null;
private retryInterval: ReturnType<typeof setInterval> | null = null;
private debug: boolean;

Expand Down Expand Up @@ -100,21 +100,28 @@ export class AblySSEChannel {

private async onError(e: Event) {
if (e instanceof MessageEvent) {
const errorPayload = JSON.parse(e.data);
const errorCode = Number(errorPayload?.code);
let errorCode: number | undefined;

try {
const errorPayload = JSON.parse(e.data);
errorCode = errorPayload?.code && Number(errorPayload.code);
} catch (error: any) {
this.warn("received unparseable error message", error, e);
}

if (
errorCode &&
errorCode >= ABLY_TOKEN_ERROR_MIN &&
errorCode <= ABLY_TOKEN_ERROR_MAX
) {
this.log("event source token expired, refresh required");
}
} else {
const connectionState = (e as any)?.target?.readyState;

if (connectionState === 2) {
this.log("event source connection closed", e);
}
if (connectionState === 1) {
} else if (connectionState === 1) {
this.warn("event source connection failed to open", e);
} else {
this.warn("event source unexpected error occured", e);
Expand All @@ -125,16 +132,30 @@ export class AblySSEChannel {
}

private onMessage(e: MessageEvent) {
if (e.data) {
const message = JSON.parse(e.data);
if (message.data) {
const payload = JSON.parse(message.data);
let payload: any;

this.log("received message", payload);
this.messageHandler(payload);
try {
if (e.data) {
const message = JSON.parse(e.data);
if (message.data) {
payload = JSON.parse(message.data);
}
}
} catch (error: any) {
this.warn("received unparseable message", error, e);
return;
}

if (payload) {
this.log("received message", payload);

return;
try {
this.messageHandler(payload);
} catch (error: any) {
this.warn("failed to handle message", error, payload);
}

return;
}

this.warn("received invalid message", e);
Expand All @@ -145,29 +166,45 @@ export class AblySSEChannel {
}

public async connect() {
this.disconnect();
const token = await this.refreshToken();
if (this.isOpen) {
this.warn("channel connection already open");
return;
}

this.eventSource = new ReconnectingEventSource(
`${ABLY_REALTIME_HOST}/sse?v=1.2&accessToken=${encodeURIComponent(
token.token,
)}&channels=${encodeURIComponent(this.channel)}&rewind=1`,
);
this.isOpen = true;
try {
const token = await this.refreshToken();

this.eventSource.addEventListener("error", (e) => this.onError(e));
this.eventSource.addEventListener("open", (e) => this.onOpen(e));
this.eventSource.addEventListener("message", (m) => this.onMessage(m));
this.eventSource = new EventSource(
`${ABLY_REALTIME_HOST}/sse?v=1.2&accessToken=${encodeURIComponent(
token.token,
)}&channels=${encodeURIComponent(this.channel)}&rewind=1`,
);

this.log("channel connection opened");
this.eventSource.addEventListener("error", (e) => this.onError(e));
this.eventSource.addEventListener("open", (e) => this.onOpen(e));
this.eventSource.addEventListener("message", (m) => this.onMessage(m));

this.log("channel connection opened");
} finally {
this.isOpen = !!this.eventSource;
}
}

public disconnect() {
if (!this.isOpen) {
this.warn("channel connection already closed");
return;
}

if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;

this.log("channel connection closed");
}

this.isOpen = false;
}

public open(options?: { retryInterval?: number; retryCount?: number }) {
Expand All @@ -194,12 +231,8 @@ export class AblySSEChannel {
void tryConnect();

this.retryInterval = setInterval(() => {
if (!this.eventSource && this.retryInterval) {
if (!this.isOpen && this.retryInterval) {
if (retriesRemaining <= 0) {
this.warn(
"failed to initiate a connection to feedback prompting, all retries exhausted",
);

clearInterval(this.retryInterval);
this.retryInterval = null;
return;
Expand All @@ -216,15 +249,16 @@ export class AblySSEChannel {
clearInterval(this.retryInterval);
this.retryInterval = null;
}

this.disconnect();
}

public isOpen() {
return this.retryInterval !== null;
public isActive() {
return !!this.retryInterval;
}

public isConnected() {
return this.eventSource !== null;
return this.isOpen;
}
}

Expand Down
Loading

0 comments on commit 735f627

Please sign in to comment.