Skip to content

Commit

Permalink
Protobuf support (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
tunderdomb committed Aug 2, 2022
1 parent 7643983 commit 0ffbc87
Show file tree
Hide file tree
Showing 23 changed files with 576 additions and 123 deletions.
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
"eslint-plugin-standard": "^5.0.0",
"mocha": "^10.0.0",
"nyc": "^15.1.0",
"protobufjs": "^6.11.3",
"seed-random": "^2.2.0"
},
"peerDependencies": {
"protobufjs": "^6.11.3"
}
}
20 changes: 17 additions & 3 deletions src/GatheringClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,32 @@ class GatheringClient {
this._correlationIdMap = new Map()
this._replyQueue = ''

const { queueMaxSize, timeoutMs, serverCount = 0 } = options
const {
queueMaxSize,
timeoutMs,
serverCount = 0,
assertQueueOptions,
assertExchange = true,
assertExchangeOptions = null
} = options

this._rpcQueueMaxSize = queueMaxSize
this._rpcTimeoutMs = timeoutMs
this._gatheringServerCount = serverCount

this._assertExchange = assertExchange === true
this._assertQueueOptions = Object.assign({ exclusive: true }, assertQueueOptions || {})
this._assertExchangeOptions = Object.assign({ durable: true }, assertExchangeOptions || {})
}

async initialize () {
try {
const channel = await this._connection.getChannel()
await channel.assertExchange(this.name, 'fanout', { durable: true })
if (this._assertExchange) {
await channel.assertExchange(this.name, 'fanout', this._assertExchangeOptions)
}

const replyQueue = await channel.assertQueue('', { exclusive: true })
const replyQueue = await channel.assertQueue('', this._assertQueueOptions)
this._replyQueue = replyQueue.queue

await channel.consume(this._replyQueue, (reply) => {
Expand Down
19 changes: 16 additions & 3 deletions src/GatheringServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,31 @@ class GatheringServer {
this.name = name
this.statusQueue = name

const { prefetchCount, timeoutMs } = options || {}
const {
prefetchCount,
timeoutMs,
assertQueueOptions,
assertExchange = true,
assertExchangeOptions = null
} = options || {}

this._prefetchCount = prefetchCount
this._responseTimeoutMs = timeoutMs

this._assertExchange = assertExchange === true
this._assertQueueOptions = Object.assign({ exclusive: true }, assertQueueOptions || {})
this._assertExchangeOptions = Object.assign({ durable: true }, assertExchangeOptions || {})

this.actions = new Map()
}

async initialize () {
try {
const channel = await this._connection.getChannel()
await channel.assertExchange(this.name, 'fanout', { durable: true })
const serverQueue = await channel.assertQueue('', { exclusive: true })
if (this._assertExchange) {
await channel.assertExchange(this.name, 'fanout', this._assertExchangeOptions)
}
const serverQueue = await channel.assertQueue('', this._assertQueueOptions)
const serverQueueName = serverQueue.queue

await channel.prefetch(this._prefetchCount)
Expand Down
36 changes: 36 additions & 0 deletions src/ProtoQueueMessage.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
const QueueMessage = require('./QueueMessage')

class ProtoQueueMessage extends QueueMessage {
serialize () {
const errorMessage = this.ContentSchema.verify(this.data)

if (errorMessage) {
throw new Error(`Error verifying ProtoQueueMessage data: ${errorMessage}`)
}

const message = this.ContentSchema.create(this.data)

return this.ContentSchema.encode(message).finish()
}

static unserialize (buffer, ContentSchema) {
try {
const message = ContentSchema.decode(buffer)
const data = ContentSchema.toObject(message, {
enums: String, // enums as string names
longs: Number, // longs as strings (requires long.js)
bytes: Buffer, // bytes as base64 encoded strings
defaults: true, // includes default values
arrays: true, // populates empty arrays (repeated fields) even if defaults=false
objects: true, // populates empty objects (map fields) even if defaults=false
oneofs: true // includes virtual oneof fields set to the present field's name
})

return new ProtoQueueMessage('ok', data, null, ContentSchema)
} catch (err) {
return new ProtoQueueMessage('error', `Cannot decode protobuf: ${err.message}`, null, ContentSchema)
}
}
}

module.exports = ProtoQueueMessage
23 changes: 20 additions & 3 deletions src/Publisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,27 @@ class Publisher {
* @param {QueueConnection} queueConnection
* @param {Console} logger
* @param {String} exchange
* @param {Object} options
*/
constructor (queueConnection, logger, exchange) {
constructor (queueConnection, logger, exchange, options) {
this._connection = queueConnection
this._logger = logger
this.exchange = exchange
this.routingKey = ''
this.options = options

const {
MessageModel,
ContentSchema,
assertExchange = true,
assertExchangeOptions = null
} = options || {}

this._assertExchange = assertExchange === true
this._assertExchangeOptions = Object.assign({ durable: true }, assertExchangeOptions || {})

this.MessageModel = MessageModel || QueueMessage
this.ContentSchema = ContentSchema || JSON
}

/**
Expand All @@ -20,7 +35,9 @@ class Publisher {
* @returns {Promise}
*/
assertExchangeOrQueue (channel) {
return channel.assertExchange(this.exchange, 'fanout', { durable: true })
if (this._assertExchange) {
return channel.assertExchange(this.exchange, 'fanout', this._assertExchangeOptions)
}
}

async initialize () {
Expand Down Expand Up @@ -58,7 +75,7 @@ class Publisher {
const channel = await this._connection.getChannel()
let param
try {
param = new QueueMessage('ok', message, timeOut)
param = new this.MessageModel('ok', message, timeOut, this.ContentSchema)
if (attachments instanceof Map) {
for (const [key, value] of attachments) {
param.addAttachment(key, value)
Expand Down
18 changes: 15 additions & 3 deletions src/QueueClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,30 @@ class QueueClient extends Publisher {
* @param {QueueConnection} queueConnection
* @param {Console} logger
* @param {String} name
* @param {Object} options
*/
constructor (queueConnection, logger, name) {
super(queueConnection, logger, '')
constructor (queueConnection, logger, name, options) {
super(queueConnection, logger, '', options)
this.routingKey = name
this._assertQueue = null

const {
assertQueue = true,
assertQueueOptions
} = options || {}

this._assertQueue = assertQueue === true
this._assertQueueOptions = Object.assign({ durable: true }, assertQueueOptions || {})
}

/**
* @param channel
* @returns {Promise}
*/
assertExchangeOrQueue (channel) {
return channel.assertQueue(this.routingKey, { durable: true })
if (this._assertQueue) {
return channel.assertQueue(this.routingKey, this._assertQueueOptions)
}
}
}

Expand Down
20 changes: 16 additions & 4 deletions src/QueueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,24 @@ class QueueManager {
/**
* @param {String} exchangeName
* @param {Publisher|function() : Publisher} OverrideClass
* @param {Object} [options]
* @return Publisher
*/
getPublisher (exchangeName, OverrideClass = Publisher) {
getPublisher (exchangeName, OverrideClass = Publisher, options = {}) {
if (this.publishers.has(exchangeName)) {
return this.publishers.get(exchangeName)
}

if (arguments.length === 2 && typeof OverrideClass !== 'function') {
options = OverrideClass
OverrideClass = Publisher
}

if (OverrideClass !== Publisher && !(OverrideClass.prototype instanceof Publisher)) {
throw new Error('Override must be a subclass of Publisher')
}

const publisher = new OverrideClass(this.connection, this._logger, exchangeName)
const publisher = new OverrideClass(this.connection, this._logger, exchangeName, options)

this.publishers.set(exchangeName, publisher)

Expand Down Expand Up @@ -274,18 +280,24 @@ class QueueManager {
/**
* @param {String} queueName
* @param {QueueClient|function() : QueueClient} OverrideClass
* @param {Object} [options={}]
* @return QueueClient
*/
getQueueClient (queueName, OverrideClass = QueueClient) {
getQueueClient (queueName, OverrideClass = QueueClient, options = {}) {
if (this.queueClients.has(queueName)) {
return this.queueClients.get(queueName)
}

if (arguments.length === 2 && typeof OverrideClass !== 'function') {
options = OverrideClass
OverrideClass = QueueClient
}

if (OverrideClass !== QueueClient && !(OverrideClass.prototype instanceof QueueClient)) {
throw new Error('Override must be a subclass of QueueClient')
}

const queueClient = new OverrideClass(this.connection, this._logger, queueName)
const queueClient = new OverrideClass(this.connection, this._logger, queueName, options)

this.queueClients.set(queueName, queueClient)

Expand Down
39 changes: 25 additions & 14 deletions src/QueueMessage.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
class QueueMessage {
constructor (status, data, timeOut) {
constructor (status, data, timeOut, ContentSchema = JSON) {
this.status = status
this.data = data
this.timeOut = timeOut
this.attachments = new Map()
this.ContentSchema = ContentSchema
}

static fromJSON (jsonString) {
Expand Down Expand Up @@ -42,21 +43,31 @@ class QueueMessage {
return Buffer.concat([formatBuf, lengthBuf, jsonBuf, ...attachmentBuffers])
}

static unserialize (buffer) {
if (buffer.toString('utf8', 0, 1) === '+') {
const jsonLength = buffer.slice(1, 5).readUInt32BE()
const { status, data, timeOut, attachArray } = JSON.parse(buffer.toString('utf8', 5, 5 + jsonLength))
let prevAttachmentLength = 5 + jsonLength
const queueMessage = new QueueMessage(status, data, timeOut)
for (const [key, length] of attachArray) {
queueMessage.addAttachment(key, buffer.slice(prevAttachmentLength, prevAttachmentLength + length))
prevAttachmentLength = prevAttachmentLength + length
/**
* @param {Buffer} buffer
* @param ContentSchema
* @returns {QueueMessage}
*/
static unserialize (buffer, ContentSchema = JSON) {
if (!ContentSchema || ContentSchema === JSON) {
if (buffer.toString('utf8', 0, 1) === '+') {
const jsonLength = buffer.slice(1, 5).readUInt32BE()
const { status, data, timeOut, attachArray } = JSON.parse(buffer.toString('utf8', 5, 5 + jsonLength))
let prevAttachmentLength = 5 + jsonLength
const queueMessage = new this(status, data, timeOut, ContentSchema)
for (const [key, length] of attachArray) {
queueMessage.addAttachment(key, buffer.slice(prevAttachmentLength, prevAttachmentLength + length))
prevAttachmentLength = prevAttachmentLength + length
}

return queueMessage
} else if (buffer.toString('utf8', 0, 1) === '{') {
return this.fromJSON(buffer.toString('utf8'))
} else {
throw new Error('Impossible to deserialize the message: unrecognized format')
}
return queueMessage
} else if (buffer.toString('utf8', 0, 1) === '{') {
return this.fromJSON(buffer.toString('utf8'))
} else {
throw new Error('Impossible to deserialize the message')
throw new Error('Impossible to deserialize the message: unknown content schema')
}
}

Expand Down
18 changes: 15 additions & 3 deletions src/QueueServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,17 @@ class QueueServer extends Subscriber {
*/
constructor (queueConnection, logger, name, options) {
super(queueConnection, logger, name, options)
this._prefetchCount = options.prefetchCount
const {
assertQueue = true,
assertQueueOptions = null,
prefetchCount
} = options || {}

this._assertQueue = null
this._prefetchCount = prefetchCount

this._assertQueue = assertQueue === true
this._assertQueueOptions = Object.assign({ durable: true }, assertQueueOptions || {})
}

/**
Expand All @@ -18,13 +28,15 @@ class QueueServer extends Subscriber {
async initialize () {
try {
const channel = await this._connection.getChannel()
await channel.assertQueue(this.name, { durable: true })
if (this._assertQueue) {
await channel.assertQueue(this.name, this._assertQueueOptions)
}
await channel.prefetch(this._prefetchCount)
await channel.consume(this.name, (msg) => {
this._processMessage(channel, msg)
})
} catch (err) {
this._logger.error('CANNOT INITIALIZE QUEUE SERVER', err)
this._logger.error('CANNOT INITIALIZE QUEUE SERVER', this.name, this._assertQueueOptions, err)
throw err
}
}
Expand Down
Loading

0 comments on commit 0ffbc87

Please sign in to comment.