A complete SQS poller implementation to make message processing easier, written on top of Amazon SDK V3.
npm i --save sqs-poller
const { SQS } = require('@aws-sdk/client-sqs')
const { Poller } = require('sqs-poller')
const sqs = new SQS({
endpoint: 'your sqs endpoint',
region: 'your sqs region',
})
const poller = new Poller({
queueUrl: 'your sqs endpoint + queue name',
sqsClient: sqs,
})
poller.start({
eachMessage: async (message) => {
await doSomethingWithMessage(message)
},
// or
eachBatch: async function (messages) {
await doSomethingWithBatch(messages)
}
})
poller.on('error', console.error)
- options
<object>
queueUrl
<string>
The complete URL for the queue, consisting of your AWS SQS endpoint + queue name. Required.sqsClient
<sqsClient>
Instance of the sqsClient class provided by AWS SDK v3. Required.messageAttributeNames
<string | QueueAttributeName >
The names of the message attributes. Optional.maxNumberOfMessages
<number>
The maximum number of messages to return. Optional. Default:10
visibilityTimeout
<number>
The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. Optional. Default:20
waitTimeSeconds
<number>
The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. Optional. Default:10
pollingTimeout
<number>
The interval (in miliseconds) between a finished poll call and the next one. Optional. Default:1
shutdownTimeout
<number>
The duration (in miliseconds) for which the poller should wait for in flight messages before stopping it Optional. Default:5000
Create a new Poller class instance.
options
<object>
eachMessage
<Function>
(message) => Promise<any>
The handler to process one message at a time. EithereachMessage
oreachBatch
must be passed. If both are available, theeachMessage
handler will be used. Conditionally RequiredeachBatch
<Function>
(message) => Promise<any>
The handler to process a message batch (up to ten messages) at a time. It is required ifeachMessage
is not passed. Conditionally RequiredbeforePoll
<Function>
(message) => Promise<any>
The optional handler executed before performing a receiveMessage call. It can be used in scenarios where a business rule needs to be verified before polling new messages, such as rate limiting. Optional
Start message processing by passing a handler for each message.
Stop message processing, waiting for in-fligth messages to be finished before resolving the promise. It can be used for proper graceful shutdown as following.
process.once('SIGTERM', async (signal) => {
await poller.stop()
console.log('Poller has stopped')
process.exit(0)
})
The sqs-poller
supports diagnostics channels (feature currently available only on Node.js v16+). It is the preferred way to instrument this package. The data is only published to these channels in case there are subscribers listening to them. The available channels are the following:
-
sqspoller:poller:eachMessage:start
- Before a SQS message gets processed by the
eachMessage
handler, the SQS message gets published to this channel.
const diagnosticsChannel = require('diagnostics_channel') diagnosticsChannel.channel('sqspoller:poller:eachMessage:start').subscribe(({ message }) => { console.log('body', message.Body) console.log('ReceiptHandle', message.ReceiptHandle) })
- Before a SQS message gets processed by the
-
sqspoller:poller:eachMessage:end
- When the
eachMessage
handler has been either resolved or rejected, the SQS message gets published to this channel.
diagnosticsChannel.channel('sqspoller:poller:eachMessage:end').subscribe(({ message }) => { console.log('body', message.Body) console.log('ReceiptHandle', message.ReceiptHandle) })
- When the
-
sqspoller:poller:eachMessage:error
- When the
eachMessage
handler rejects, the SQS message and the error get published to this channel.
diagnosticsChannel.channel('sqspoller:poller:eachMessage:error').subscribe(({ message, error }) => { console.log('body', message.Body) console.log('ReceiptHandle', message.ReceiptHandle) console.log('error', error) })
- When the
-
sqspoller:poller:eachBatch:start
- Before a SQS message batch gets processed by the
eachBatch
handler, the batch (messages
) gets published to this channel.
- Before a SQS message batch gets processed by the
-
sqspoller:poller:eachBatch:end
- When the
eachBatch
resolves or rejects, the message batch (messages
) gets published to this channel.
- When the
-
sqspoller:poller:eachBatch:error
- When the
eachBatch
rejects, the message batch (messages
) anderror
get published to this channel.
- When the
-
sqspoller:poller:deleteMessage:start
- Before deleting a message from SQS, the
message
gets published to this channel.
- Before deleting a message from SQS, the
-
sqspoller:poller:deleteMessage:end
- When the deleteMessage call resolves or rejects, the
message
gets published to this channel.
- When the deleteMessage call resolves or rejects, the
-
sqspoller:poller:deleteMessage:error
- When the deleteMessage call rejects, the
message
anderror
get published to this channel.
- When the deleteMessage call rejects, the
-
sqspoller:poller:deleteBatch:start
- Before deleting a message batch from SQS, the batch (
messages
) gets published to this channel.
- Before deleting a message batch from SQS, the batch (
-
sqspoller:poller:deleteBatch:end
- When the deleteBatch call resolves or rejects, the batch (
messages
) gets published to this channel.
- When the deleteBatch call resolves or rejects, the batch (
-
sqspoller:poller:deleteBatch:error
- When the deleteBatch call rejects, the batch (
messages
) anderror
get published to this channel.
- When the deleteBatch call rejects, the batch (