Skip to content

Commit

Permalink
reverts to the double-queue approach to minimize setTimeout() calls
Browse files Browse the repository at this point in the history
  • Loading branch information
jacoscaz committed Aug 22, 2023
1 parent 29baab5 commit b58c187
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 25 deletions.
12 changes: 4 additions & 8 deletions src/InstantRelay.spec.ts
Expand Up @@ -89,8 +89,7 @@ describe('instant-relay', () => {
return (message) => new Promise(() => {});
}, opts);
for (let i = 0; i < 4; i += 1) {
// @ts-ignore
ir.nodeMap.get('r')!.push({ id: i + '', type: 'msg' });
ir.nodeMap.get('r')!.queue.push({ id: i + '', type: 'msg' });
}
});

Expand All @@ -105,8 +104,7 @@ describe('instant-relay', () => {
let currDelta = 0;
let count = 0;
const loop = () => {
// @ts-ignore
ir.nodeMap.get('r')!.push({ id: count + '', type: 'msg' }).then(() => {
ir.nodeMap.get('r')!.queue.push({ id: count + '', type: 'msg' }).then(() => {
currTstmp = Date.now();
currDelta = currTstmp - prevTstmp;
if (count > 0) {
Expand Down Expand Up @@ -150,8 +148,7 @@ describe('instant-relay', () => {
}, {});

setImmediate(() => {
// @ts-ignore
ir.nodeMap.get('a')!.push({ id: '0', type: 'greeting' });
ir.nodeMap.get('a')!.queue.push({ id: '0', type: 'greeting' });
});

});
Expand Down Expand Up @@ -181,8 +178,7 @@ describe('instant-relay', () => {
}, {});

setImmediate(() => {
// @ts-ignore
ir.nodeMap.get('a')!.push({ id: '0', type: 'greeting' }, () => {});
ir.nodeMap.get('a')!.queue.push({ id: '0', type: 'greeting' });
});

});
Expand Down
4 changes: 2 additions & 2 deletions src/InstantRelay.ts
Expand Up @@ -4,8 +4,8 @@ import { makeNode } from './node';

export class InstantRelay<M> {

private readonly nodeMap: Map<string, InternalNode<M>>;
private readonly nodeArr: InternalNode<M>[];
readonly nodeMap: Map<string, InternalNode<M>>;
readonly nodeArr: InternalNode<M>[];

constructor() {
this.nodeMap = new Map();
Expand Down
26 changes: 12 additions & 14 deletions src/node.ts
Expand Up @@ -20,13 +20,13 @@ const makeSend = <M>(nodeMap: Map<string, InternalNode<M>>, senderId: string): S
crashWithError(new Error(`Unknown node with id "${recipientId}"`));
return;
}
await recipient.push(message);
await recipient.queue.push(message);
};
};

const makeBroadcast = <M>(nodeArr: InternalNode<M>[], senderId: string): BroadcastMessage<M> => {
return async (msg: M) => {
await Promise.all(nodeArr.map(node => node.id !== senderId ? node.push(msg) : null));
await Promise.all(nodeArr.map(node => node.id !== senderId ? node.queue.push(msg) : null));
};
};

Expand All @@ -51,27 +51,25 @@ export const makeNode = <M, O extends {}>(
const broadcast = makeBroadcast(nodeArr, id);
const handleMessage = factory(send, broadcast, { ...opts, id });

let queueLength = 0;
const handlingQueue = fastq.promise(handleMessage, concurrency);

const queue = fastq.promise(async (msg: M) => {
await handleMessage(msg);
}, concurrency);

queue.error((err, task) => {
handlingQueue.error((err, task) => {
if (err !== null) {
crashWithError(err);
return;
}
});

const push = async (msg: M) => {
queue.push(msg);
if ((queueLength = queue.length()) >= highWaterMark) {
await wait(throttle(queueLength));
let handlingQueueLength = 0;

const incomingQueue = fastq.promise(async (msg: M) => {
handlingQueue.push(msg);
if ((handlingQueueLength = handlingQueue.length()) >= highWaterMark) {
await wait(throttle(handlingQueueLength));
}
};
}, 1);

const node: InternalNode<M> = { id, push };
const node: InternalNode<M> = { id, queue: incomingQueue };

nodeMap.set(id, node);
nodeArr.push(node);
Expand Down
4 changes: 3 additions & 1 deletion src/types.ts
@@ -1,4 +1,6 @@

import fastq from 'fastq';

export interface HandleMessage<M> {
(message: M): Promise<void>;
}
Expand All @@ -23,5 +25,5 @@ export interface AddNodeOpts {

export interface InternalNode<M> {
readonly id: string;
readonly push: (msg: M) => Promise<void>;
readonly queue: fastq.queueAsPromised<M>;
}

0 comments on commit b58c187

Please sign in to comment.