Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/interfaces/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"libp2p-crypto": "^0.19.5",
"multiaddr": "^10.0.0",
"multiformats": "^9.1.2",
"p-queue": "^6.6.2",
"peer-id": "^0.15.0",
"protobufjs": "^6.10.2",
"uint8arrays": "^3.0.0"
Expand Down
23 changes: 18 additions & 5 deletions packages/interfaces/src/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const { EventEmitter } = require('events')
const errcode = require('err-code')

const { pipe } = require('it-pipe')
const { default: Queue } = require('p-queue')

const MulticodecTopology = require('../topology/multicodec-topology')
const { codes } = require('./errors')
Expand Down Expand Up @@ -50,6 +51,7 @@ const {
* @property {SignaturePolicyType} [globalSignaturePolicy = SignaturePolicy.StrictSign] - defines how signatures should be handled
* @property {boolean} [canRelayMessage = false] - if can relay messages not subscribed
* @property {boolean} [emitSelf = false] - if publish should emit to self, if subscribed
* @property {number} [messageProcessingConcurrency = 10] - handle this many incoming pubsub messages concurrently
*/

/**
Expand All @@ -67,7 +69,8 @@ class PubsubBaseProtocol extends EventEmitter {
libp2p,
globalSignaturePolicy = SignaturePolicy.StrictSign,
canRelayMessage = false,
emitSelf = false
emitSelf = false,
messageProcessingConcurrency = 10
}) {
if (typeof debugName !== 'string') {
throw new Error('a debugname `string` is required')
Expand Down Expand Up @@ -162,6 +165,11 @@ class PubsubBaseProtocol extends EventEmitter {
*/
this.topicValidators = new Map()

/**
* @type {number}
*/
this.queue = new Queue({ concurrency: messageProcessingConcurrency })

this._registrarId = undefined
this._onIncomingStream = this._onIncomingStream.bind(this)
this._onPeerConnected = this._onPeerConnected.bind(this)
Expand Down Expand Up @@ -402,14 +410,19 @@ class PubsubBaseProtocol extends EventEmitter {
}

if (msgs.length) {
// @ts-ignore RPC message is modified
await Promise.all(msgs.map(async (message) => {
this.queue.addAll(msgs.map(message => async () => {
if (!(this.canRelayMessage || (message.topicIDs && message.topicIDs.some((topic) => this.subscriptions.has(topic))))) {
this.log('received message we didn\'t subscribe to. Dropping.')
return
}
const msg = utils.normalizeInRpcMessage(message, idB58Str)
await this._processRpcMessage(msg)

try {
const msg = utils.normalizeInRpcMessage(message, idB58Str)

await this._processRpcMessage(msg)
} catch (err) {
this.log.err(err)
}
}))
}
return true
Expand Down