Skip to content

Commit

Permalink
allow to set min size to upload to s3 #33 (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
regevbr committed May 18, 2019
1 parent 3329e5b commit 5cd3e98
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 14 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
This project adheres to [Semantic Versioning](http://semver.org/).

## Development
- nothing yet

## [v3.0.1]
### Fixed
- Fixed documentation [#31](https://github.com/PruvoNet/squiss-ts/issues/31)
- Added `minS3Size` option to set the min size that will cause upload to S3 [#33](https://github.com/PruvoNet/squiss-ts/issues/33)

## [v3.0.0]
### Added
Expand Down Expand Up @@ -106,3 +110,4 @@ Marking the library as stable after stress usage in a full blown production envi
[v1.5.2]: https://github.com/PruvoNet/squiss-ts/compare/v1.5.1...v1.5.2
[v2.0.0]: https://github.com/PruvoNet/squiss-ts/compare/v1.5.2...v2.0.0
[v3.0.0]: https://github.com/PruvoNet/squiss-ts/compare/v2.0.0...v3.0.0
[v3.0.1]: https://github.com/PruvoNet/squiss-ts/compare/v3.0.0...v3.0.1
40 changes: 31 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,41 @@ The library is production ready and is being stress used in a full blown product
## Quick example
```typescript
import {Squiss, Message} from 'squiss-ts';
const poller = new Squiss({

const awsConfig = {
accessKeyId: '<accessKeyId>',
secretAccessKey: '<secretAccessKey>',
region: '<region>',
};

const squiss = new Squiss({
awsConfig,
queueName: 'my-sqs-queue',
bodyFormat: 'json',
unwrapSns: true,
maxInFlight: 500
maxInFlight: 15
});

poller.on('message', (msg: Message) => {
console.log('%s says: %s', msg.body.name, msg.body.message);
squiss.on('message', (msg: Message) => {
console.log('%s says: %s', msg.body.name, JSON.stringify(msg.body.message), msg.attributes.p1);
msg.del();
});

poller.start();
squiss.start();

const messageToSend = {
name: 'messageName',
message: {
a: 1,
b: 2,
},
}

const propsToSend = {
p1: 1,
p2: 2,
}

squiss.sendMessage(messageToSend, 0, propsToSend);
```

## Install
Expand Down Expand Up @@ -74,9 +95,10 @@ Squiss's defaults are great out of the box for most use cases, but you can use t
- **opts.bodyFormat** _Default "plain"._ The format of the incoming message. Set to "json" to automatically call `JSON.parse()` on each incoming message.
- **opts.gzip** _Default "false"._ Auto gzip messages to reduce message size.
- **opts.minGzipSize** _Default 0._ The min message size to gzip (in bytes) when `gzip` is set to `true`.
- **opts.s3Fallback** _Default "false"._ Upload messages bigger than `maxMessageBytes` or queue default `maxMessageBytes` to s3, and retrieve it from there when message is received.
- **opts.s3Fallback** _Default "false"._ Upload messages bigger than `minS3Size` or queue default `maxMessageBytes` to s3, and retrieve it from there when message is received.
- **opts.s3Bucket** if `s3Fallback` is true, upload message to this s3 bucket.
- **opts.s3Retain** _Default "false"._ if `s3Fallback` is true, do not delete blob on message delete.
- **opts.minS3Size** _Defaults to queue max message size._ The min message size to send to S3 (in bytes) when `s3Fallback` is set to `true`.
- **opts.s3Prefix** _Default ""._ if `s3Fallback` is true, set this prefix to uploaded s3 blobs.
- **opts.deleteBatchSize** _Default 10._ The number of messages to delete at one time. Squiss will trigger a batch delete when this limit is reached, or when `deleteWaitMs` milliseconds have passed since the first queued delete in the batch, whichever comes first. Set to 1 to make all deletes immediate. Maximum 10.
- **opts.deleteWaitMs** _Default 2000._ The number of milliseconds to wait after the first queued message deletion before deleting the message(s) from SQS.
Expand All @@ -87,7 +109,7 @@ Squiss's defaults are great out of the box for most use cases, but you can use t
- **opts.minReceiveBatchSize** _Default 1._ The minimum number of available message slots that will initiate a call to get the next batch. Maximum 10 or `maxInFlight`, whichever is lower.
- **opts.receiveWaitTimeSecs** _Default 20._ The number of seconds for which to hold open the SQS call to receive messages, when no message is currently available. It is recommended to set this high, as Squiss will re-open the receiveMessage HTTP request as soon as the last one ends. If this needs to be set low, consider setting `activePollIntervalMs` to space out calls to SQS. Maximum 20.
- **opts.unwrapSns** _Default false._ Set to `true` to denote that Squiss should treat each message as though it comes from a queue subscribed to an SNS endpoint, and automatically extract the message from the SNS metadata wrapper.
- **opts.visibilityTimeoutSecs** _Defaults to queue setting on read, or 30 seconds for createQueue._ The amount of time, in seconds, that received messages will be unavailable to other pollers without being deleted.
- **opts.visibilityTimeoutSecs** _Defaults to queue setting on read, or 30 seconds for createQueue._ The amount of time, in seconds, that received messages will be unavailable to other squiss instances without being deleted.
- **opts.receiveAttributes** _Defaults to `['All']`._ An an array of strings with attribute names (e.g. `myAttribute`) to request along with the `receiveMessage` call. The attributes will be accessible via `message.attributes.<attributeName>`.
- **opts.receiveSqsAttributes** _Defaults to `['All']`._ An an array of strings with attribute names (e.g. `ApproximateReceiveCount`) to request along with the `receiveMessage` call. The attributes will be accessible via `message.sqsAttributes.<attributeName>`.

Expand Down Expand Up @@ -203,7 +225,7 @@ Emitted when a message reaches it's timeout limit, including any extensions made
Example at the Squiss level:

```javascript
poller.on('timeoutReached', (msg) => {
squiss.on('timeoutReached', (msg) => {
console.log(msg,'timeout was reached!');
});
```
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "squiss-ts",
"version": "3.0.0",
"version": "3.0.1",
"description": "High-volume SQS poller",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
15 changes: 11 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export interface ISquissOptions {
s3Bucket?: string;
s3Retain?: boolean;
s3Prefix?: string;
minS3Size?: number;
}

interface IDeleteQueueItem {
Expand Down Expand Up @@ -208,11 +209,13 @@ export class Squiss extends EventEmitter implements EventEmitterOverride<ISquiss
* against a stubbed SQS service, such as ElasticMQ.
* @param {boolean} [opts.gzip=false] Auto gzip messages to reduce message size.
* @param {number} [opts.minGzipSize=0] The min message size to gzip (in bytes) when `gzip` is set to `true`.
* @param {boolean} [opts.s3Fallback=false] Upload messages bigger than `maxMessageBytes` or queue default to s3,
* @param {boolean} [opts.s3Fallback=false] Upload messages bigger than `minS3Size` or queue default to s3,
* and retrieve it from there when message is received.
* @param {string} [opts.s3Bucket] if `s3Fallback` is true, upload message to this s3 bucket.
* @param {boolean} [opts.s3Retain=false] if `s3Fallback` is true, do not delete blob on message delete.
* @param {string} [opts.s3Prefix] if `s3Fallback` is true, set this prefix to uploaded s3 blobs.
* @param {number} [opts.minS3Size=queueMaxMessageSize] The min message size to send to S3 (in bytes) when
* `s3Fallback` is set to `true`.
* @param {number} [opts.deleteBatchSize=10] The number of messages to delete at one time. Squiss will trigger a
* batch delete when this limit is reached, or when deleteWaitMs milliseconds have passed since the first queued
* delete in the batch; whichever comes first. Set to 1 to make all deletes immediate. Maximum 10.
Expand Down Expand Up @@ -924,10 +927,14 @@ export class Squiss extends EventEmitter implements EventEmitterOverride<ISquiss
return this._s3;
}

private isLargeMessage(message: ISendMessageRequest): Promise<boolean> {
private isLargeMessage(message: ISendMessageRequest, minSize?: number): Promise<boolean> {
const messageSize = getMessageSize(message);
if (minSize) {
return Promise.resolve(messageSize > minSize);
}
return this.getQueueMaximumMessageSize()
.then((queueMaximumMessageSize) => {
return getMessageSize(message) >= queueMaximumMessageSize;
return messageSize >= queueMaximumMessageSize;
});
}

Expand Down Expand Up @@ -981,7 +988,7 @@ export class Squiss extends EventEmitter implements EventEmitterOverride<ISquiss
if (!this._opts.s3Fallback) {
return Promise.resolve(params);
}
return this.isLargeMessage(params)
return this.isLargeMessage(params, this._opts.minS3Size)
.then((isLarge) => {
if (!isLarge) {
return Promise.resolve(params);
Expand Down
35 changes: 35 additions & 0 deletions test/src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,41 @@ describe('index', () => {
});
});
});
it('sends a S3 message if it is bigger than minS3Size', () => {
const blobs: Blobs = {};
const s3Stub = getS3Stub(blobs);
inst = new SquissPatched({S3: s3Stub, queueUrl: 'foo', s3Fallback: true, s3Bucket: 'my_bucket', minS3Size: 250});
inst!.sqs = new SQSStub() as any as SQS;
const spy = sinon.spy(inst!.sqs, 'sendMessage');
const largeMessage = generateLargeMessage(300);
return inst!.sendMessage(largeMessage, 10).then(() => {
blobs.my_bucket!.my_uuid.should.be.eq(largeMessage);
spy.should.be.calledWith({
QueueUrl: 'foo',
MessageBody: '{"uploadSize":300,"bucket":"my_bucket","key":"my_uuid"}',
DelaySeconds: 10,
MessageAttributes: {
__SQS_S3__: {DataType: 'Number', StringValue: '300'},
},
});
});
});
it('does not send a S3 message if it is smaller than minS3Size', () => {
const blobs: Blobs = {};
const s3Stub = getS3Stub(blobs);
inst = new SquissPatched({S3: s3Stub, queueUrl: 'foo', s3Fallback: true, s3Bucket: 'my_bucket', minS3Size: 500});
inst!.sqs = new SQSStub() as any as SQS;
const spy = sinon.spy(inst!.sqs, 'sendMessage');
const largeMessage = generateLargeMessage(300);
return inst!.sendMessage(largeMessage, 10).then(() => {
Object.keys(blobs).length.should.be.eq(0);
spy.should.be.calledWith({
QueueUrl: 'foo',
MessageBody: largeMessage,
DelaySeconds: 10,
});
});
});
it('sends a skipped S3 message with a delay and attributes', () => {
const blobs: Blobs = {};
const s3Stub = getS3Stub(blobs);
Expand Down

0 comments on commit 5cd3e98

Please sign in to comment.