Skip to content

Commit

Permalink
some sonar fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
tbence94 committed Jan 18, 2023
1 parent 586417a commit 2c99db0
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 56 deletions.
36 changes: 22 additions & 14 deletions src/GatheringClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,7 @@ class GatheringClient {
this._replyQueue = replyQueue.queue

await channel.consume(this._replyQueue, (reply) => {
if (!reply) {
this._logger.error(`QUEUE GATHERING CLIENT: INVALID REPLY ON '${this.name}': NO REPLY`, reply)
return
}
if (!reply.properties) {
this._logger.error(`QUEUE GATHERING CLIENT: INVALID REPLY ON '${this.name}': NO PROPERTIES ON REPLY`, reply)
return
}
if (!reply.properties.correlationId) {
this._logger.error(`QUEUE GATHERING CLIENT: INVALID REPLY ON '${this.name}': NO CORRELATION ID ON REPLY`, reply)
return
}
if (!reply.properties.type) {
this._logger.error(`QUEUE GATHERING CLIENT: INVALID REPLY ON '${this.name}': NO MESSAGE TYPE ON REPLY`, reply)
if (!this.isValidReply(reply)) {
return
}

Expand All @@ -83,6 +70,27 @@ class GatheringClient {
}
}

isValidReply (reply) {
if (!reply) {
this._logger.error(`QUEUE GATHERING CLIENT: INVALID REPLY ON '${this.name}': NO REPLY`, reply)
return false
}
if (!reply.properties) {
this._logger.error(`QUEUE GATHERING CLIENT: INVALID REPLY ON '${this.name}': NO PROPERTIES ON REPLY`, reply)
return false
}
if (!reply.properties.correlationId) {
this._logger.error(`QUEUE GATHERING CLIENT: INVALID REPLY ON '${this.name}': NO CORRELATION ID ON REPLY`, reply)
return false
}
if (!reply.properties.type) {
this._logger.error(`QUEUE GATHERING CLIENT: INVALID REPLY ON '${this.name}': NO MESSAGE TYPE ON REPLY`, reply)
return false
}

return true
}

/**
* @param {*} data
* @param {Number} timeoutMs
Expand Down
99 changes: 67 additions & 32 deletions src/GatheringServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,39 +113,15 @@ class GatheringServer {
* @private
*/
async _handleGatheringAnnouncement (channel, msg) {
if (!msg || !msg.properties) {
this._logger.error(`QUEUE GATHERING SERVER: INVALID REQUEST ON '${this.name}': NO MESSAGE/PROPERTIES`, msg)
this._nack(channel, msg)
return
}
if (!msg.properties.correlationId) {
this._logger.error(`QUEUE GATHERING SERVER: INVALID REQUEST ON '${this.name}': NO CORRELATION ID`, msg)
this._nack(channel, msg)
return
}
if (!msg.properties.replyTo) {
this._logger.error(`QUEUE GATHERING SERVER: INVALID REQUEST ON '${this.name}': NO REPLY TO`, msg)
this._nack(channel, msg)
return
}

const correlationId = msg.properties.correlationId
const replyTo = msg.properties.replyTo
let request
const {
isValid,
correlationId,
replyTo,
request
} = this.unserializeMessage(channel, msg)

try {
request = QueueMessage.unserialize(msg.content)
if (request.status !== 'ok') {
this._logger.error(`QUEUE GATHERING SERVER: MESSAGE NOT OK '${this.name}' ${correlationId}`, request)
this._sendStatus(channel, replyTo, correlationId, 'error', 'message not OK')
this._nack(channel, msg)
return
}
} catch (err) {
this._logger.error(`QUEUE GATHERING SERVER: MALFORMED MESSAGE '${this.name}' ${correlationId}`, err)
this._sendStatus(channel, replyTo, correlationId, 'error', 'malformed message')
this._nack(channel, msg)
return
if (!isValid) {
return // Invalid request
}

let responseTimedOut = false
Expand Down Expand Up @@ -222,6 +198,65 @@ class GatheringServer {
channel.sendToQueue(replyTo, reply.serialize(), { correlationId, type: 'status' })
}

/**
* @returns {boolean} is valid
*/
verifyMessage (channel, msg) {
if (!msg || !msg.properties) {
this._logger.error(`QUEUE GATHERING SERVER: INVALID REQUEST ON '${this.name}': NO MESSAGE/PROPERTIES`, msg)
this._nack(channel, msg)
return false
}
if (!msg.properties.correlationId) {
this._logger.error(`QUEUE GATHERING SERVER: INVALID REQUEST ON '${this.name}': NO CORRELATION ID`, msg)
this._nack(channel, msg)
return false
}
if (!msg.properties.replyTo) {
this._logger.error(`QUEUE GATHERING SERVER: INVALID REQUEST ON '${this.name}': NO REPLY TO`, msg)
this._nack(channel, msg)
return false
}
return true
}

/**
* @param channel
* @param msg
* @returns {{request: QueueMessage, isValid: boolean, replyTo: *, correlationId: *}}
*/
unserializeMessage (channel, msg) {
if (!this.verifyMessage(channel, msg)) {
return { isValid: false }
}

const correlationId = msg.properties.correlationId
const replyTo = msg.properties.replyTo
let request

try {
request = QueueMessage.unserialize(msg.content)
if (request.status !== 'ok') {
this._logger.error(`QUEUE GATHERING SERVER: MESSAGE NOT OK '${this.name}' ${correlationId}`, request)
this._sendStatus(channel, replyTo, correlationId, 'error', 'message not OK')
this._nack(channel, msg)
return { isValid: false }
}
} catch (err) {
this._logger.error(`QUEUE GATHERING SERVER: MALFORMED MESSAGE '${this.name}' ${correlationId}`, err)
this._sendStatus(channel, replyTo, correlationId, 'error', 'malformed message')
this._nack(channel, msg)
return { isValid: false }
}

return {
isValid: true,
correlationId,
replyTo,
request
}
}

/**
* @param ch
* @param msg
Expand Down
9 changes: 3 additions & 6 deletions test/Gathering.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,9 @@ describe('GatheringClient && GatheringServer', () => {
gatheringServer1.consume((msg, queueMessage) => {
if (queueMessage.getAttachments().get('test').toString() !== buf.toString()) {
done(new Error('String received is not the same as the String sent'))
return true
return
}
done()
return true
})

gatheringClient.request(stringMessage, null, attachments).catch((err) => {
Expand All @@ -151,10 +150,8 @@ describe('GatheringClient && GatheringServer', () => {
it(`GatheringClient.request() throws an error if it doesn't receive a response sooner than ${timeoutMs}ms`, (done) => {
const objectMessage = { foo: 'bar', bar: 'foo' }

gatheringServer1.consume((msg) => {
const now = Date.now()
// eslint-disable-next-line no-empty
while (new Date().getTime() < now + timeoutMs + 100) { }
gatheringServer1.consume(async (msg) => {
await new Promise((resolve) => setTimeout(resolve, timeoutMs + 100))
return msg
})

Expand Down
6 changes: 2 additions & 4 deletions test/RPC.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,8 @@ describe('RPCClient && RPCServer', () => {
it(`RPCClient.call() throws an error if it doesn't receive a response sooner than ${timeoutMs}ms`, (done) => {
const objectMessage = { foo: 'bar', bar: 'foo' }

rpcServer.consume((msg) => {
const now = Date.now()
// eslint-disable-next-line no-empty
while (new Date().getTime() < now + timeoutMs + 100) { }
rpcServer.consume(async (msg) => {
await new Promise((resolve) => setTimeout(resolve, timeoutMs + 100))
return msg
})

Expand Down

0 comments on commit 2c99db0

Please sign in to comment.