Skip to content

Commit

Permalink
Merge pull request #4 from Sagacify/feature/streams
Browse files Browse the repository at this point in the history
Streams
  • Loading branch information
nekuz0r committed Jul 23, 2021
2 parents 46af692 + f0a8110 commit c4842b7
Show file tree
Hide file tree
Showing 14 changed files with 6,942 additions and 1,719 deletions.
77 changes: 77 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,64 @@ await sqsHandler.destroyBatch([
]);
```

### Readable Stream Usage

```js
const AWS = require('aws-sdk');
const { SqsHandler } = require('@sagacify/sqs-handler');

const sqsInstance = new AWS.SQS({
accessKeyId: 'your_access_key_id',
secretAccessKey: 'your_secret_access_key',
region: 'an_aws_region'
})

const sqsHandler = new SqsHandler(
sqsInstance,
'https://sqs.eu-west-1.amazonaws.com/23452042942/some-sqs-queue', {
VisibilityTimeout: 120,
WaitTimeSeconds: 0
}
);

const readable = sqsHandler.readableStream();

readable.on('data', (message) => {
console.log(message);
sqsHandler.destroy(message.receiptHandle);
});

const autoDestroyReadable = sqsHandeler.readableStream({ autoDestroy: true });
autoDestroyReadable.on('data', (message) => console.log(message));
```

### Writable Stream Usage

```js
const AWS = require('aws-sdk');
const { SqsHandler } = require('@sagacify/sqs-handler');

const sqsInstance = new AWS.SQS({
accessKeyId: 'your_access_key_id',
secretAccessKey: 'your_secret_access_key',
region: 'an_aws_region'
})

const sqsHandler = new SqsHandler(
sqsInstance,
'https://sqs.eu-west-1.amazonaws.com/23452042942/some-sqs-queue', {
VisibilityTimeout: 120,
WaitTimeSeconds: 0
}
);

const writable = sqsHandler.writableStream();

writable.write({
{ data: 'value' }
});
```

### API

**constructor(sqs, queueUrl, options)**
Expand Down Expand Up @@ -208,6 +266,25 @@ Equivalent of [AWS.SQS.deleteMessageBatch](https://docs.aws.amazon.com/AWSJavaSc
Only message related operations have been implemented.
For queue related operations use directly the SQS instance.

**readableStream(options)**

returns a readable stream from the SQS queue.
Each message received from the queue will trigger the `data` event.

*Options:*

- autoDestroy: automatically destroy received message from the queue once pushed to the stream buffer, if set to false you will have to destroy the message yourself otherwise it will be available to be consumed after the *visibilityTimeout* (default: false)
- autoClose: automatically close the stream when no more message are received from the queue (default: false)

see **receive** for other options details.

**writableStream(options)**

returns a writable stream to the SQS queue.
Each message written to this stream will be pushed to the queue with the specified options.

see **send** for options details.

## Npm scripts

### Running code formating
Expand Down

0 comments on commit c4842b7

Please sign in to comment.