Skip to content
This repository has been archived by the owner on Nov 27, 2022. It is now read-only.

Commit

Permalink
Review socket code.
Browse files Browse the repository at this point in the history
  • Loading branch information
RubenVerborgh committed Mar 29, 2020
1 parent 927f1ce commit 8333fb6
Showing 1 changed file with 36 additions and 32 deletions.
68 changes: 36 additions & 32 deletions src/UpdateTracker.js
Expand Up @@ -48,24 +48,35 @@ export default class UpdateTracker {
}

/** Tracks updates to the given resource */
function trackResource(url, retryAttempt, backOffDelay) {
function trackResource(url, webSocketOptions = {}) {
// Try to find an existing socket for the host
const { protocol, host } = new URL(url);
let webSocket = webSockets[host];

// If none exists, create a new one
if (!webSocket || webSocket.reopen) {
// If no socket exists, create a new one
if (!webSocket) {
const socketUrl = `${protocol.replace('http', 'ws')}//${host}/`;
webSockets[host] = webSocket = new WebSocket(socketUrl);
Object.assign(webSocket, { enqueue, onmessage,
ready: new Promise(resolve => (webSocket.onopen = resolve)),
onclose: oncloseFor(host),
Object.assign(webSocket, {
host,
resources: new Set(),
});
setUpBackOff(webSocket, retryAttempt, backOffDelay);
reconnectionAttempts: 0,
reconnectionDelay: 1000,
enqueue,
onmessage: processMessage,
onclose: reconnect,
ready: new Promise(resolve => {
webSocket.onopen = () => {
webSocket.reconnectionAttempts = 0;
webSocket.reconnectionDelay = 1000;
resolve();
};
}),
}, webSocketOptions);
}

// Each WebSocket keeps track of subscribed resources so we can resubscribe later if needed
// Each WebSocket keeps track of subscribed resources
// so we can resubscribe later if needed
webSocket.resources.add(url);

// Subscribe to updates on the resource
Expand All @@ -89,11 +100,10 @@ export function resetWebSockets() {
async function enqueue(data) {
await this.ready;
this.send(data);
setUpBackOff(this);
}

/** Processes an update message from the WebSocket */
function onmessage({ data }) {
function processMessage({ data }) {
// Verify the message is an update notification
const match = /^pub +(.+)/.exec(data);
if (!match)
Expand All @@ -111,30 +121,24 @@ function onmessage({ data }) {
subscriber(update);
}

function oncloseFor(host) {
return function () {
let ws = webSockets[host];
webSockets[host].reopen = true;
reconnectAfterBackoff(ws);
};
}

/** Reconnect WebSocket after a backoff delay */
async function reconnectAfterBackoff(ws) {
if (ws.retry < 6) {
await new Promise(resolve => (setTimeout(resolve, ws.delay)));
const nextDelay = ws.delay * 2;
ws.resources.forEach(url => trackResource(url, ++ws.retry, nextDelay));
/** Reconnects a socket after a backoff delay */
async function reconnect() {
// Ensure this socket is no longer marked as active
if ((this.host in webSockets) && webSockets[this.host] === this)
delete webSockets[this.host];

// Try setting up a new socket
if (this.reconnectionAttempts < 6) {
// Wait a given backoff period before reconnecting
await new Promise(done => (setTimeout(done, this.reconnectionDelay)));
// Try reconnecting, and back off exponentially
this.resources.forEach(url => trackResource(url, {
reconnectionAttempts: this.reconnectionAttempts + 1,
reconnectionDelay: this.reconnectionDelay * 2,
}));
}
}

function setUpBackOff(webSocket, retryAttempt, backOffDelay) {
Object.assign(webSocket, {
delay: backOffDelay || 1000,
retry: retryAttempt || 0,
});
}

// Keep track of all fetched resources
auth.on('request', url => {
if (!fetchedUrls.has(url)) {
Expand Down

0 comments on commit 8333fb6

Please sign in to comment.