From 05971565a25ab42a78cccd13d214bd7cc94985fc Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 17 Sep 2021 19:16:21 +0200 Subject: [PATCH] fix: process incoming messages in a queue To allow processing pubsub messages that have steps that are slow but async, process the messages in a queue. Makes the concurrency configurable with a default of 10x messages. --- packages/interfaces/package.json | 1 + packages/interfaces/src/pubsub/index.js | 23 ++++++++++++++++++----- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/packages/interfaces/package.json b/packages/interfaces/package.json index f14558647..3759a35ca 100644 --- a/packages/interfaces/package.json +++ b/packages/interfaces/package.json @@ -67,6 +67,7 @@ "libp2p-crypto": "^0.19.5", "multiaddr": "^10.0.0", "multiformats": "^9.1.2", + "p-queue": "^6.6.2", "peer-id": "^0.15.0", "protobufjs": "^6.10.2", "uint8arrays": "^3.0.0" diff --git a/packages/interfaces/src/pubsub/index.js b/packages/interfaces/src/pubsub/index.js index 85c4ada02..798d7a325 100644 --- a/packages/interfaces/src/pubsub/index.js +++ b/packages/interfaces/src/pubsub/index.js @@ -5,6 +5,7 @@ const { EventEmitter } = require('events') const errcode = require('err-code') const { pipe } = require('it-pipe') +const { default: Queue } = require('p-queue') const MulticodecTopology = require('../topology/multicodec-topology') const { codes } = require('./errors') @@ -50,6 +51,7 @@ const { * @property {SignaturePolicyType} [globalSignaturePolicy = SignaturePolicy.StrictSign] - defines how signatures should be handled * @property {boolean} [canRelayMessage = false] - if can relay messages not subscribed * @property {boolean} [emitSelf = false] - if publish should emit to self, if subscribed + * @property {number} [messageProcessingConcurrency = 10] - handle this many incoming pubsub messages concurrently */ /** @@ -67,7 +69,8 @@ class PubsubBaseProtocol extends EventEmitter { libp2p, globalSignaturePolicy = SignaturePolicy.StrictSign, canRelayMessage = false, - emitSelf = false + emitSelf = false, + messageProcessingConcurrency = 10 }) { if (typeof debugName !== 'string') { throw new Error('a debugname `string` is required') @@ -162,6 +165,11 @@ class PubsubBaseProtocol extends EventEmitter { */ this.topicValidators = new Map() + /** + * @type {number} + */ + this.queue = new Queue({ concurrency: messageProcessingConcurrency }) + this._registrarId = undefined this._onIncomingStream = this._onIncomingStream.bind(this) this._onPeerConnected = this._onPeerConnected.bind(this) @@ -402,14 +410,19 @@ class PubsubBaseProtocol extends EventEmitter { } if (msgs.length) { - // @ts-ignore RPC message is modified - await Promise.all(msgs.map(async (message) => { + this.queue.addAll(msgs.map(message => async () => { if (!(this.canRelayMessage || (message.topicIDs && message.topicIDs.some((topic) => this.subscriptions.has(topic))))) { this.log('received message we didn\'t subscribe to. Dropping.') return } - const msg = utils.normalizeInRpcMessage(message, idB58Str) - await this._processRpcMessage(msg) + + try { + const msg = utils.normalizeInRpcMessage(message, idB58Str) + + await this._processRpcMessage(msg) + } catch (err) { + this.log.err(err) + } })) } return true