Skip to content

Commit

Permalink
Merge 15610e2 into 498d201
Browse files Browse the repository at this point in the history
  • Loading branch information
regevbr committed Mar 17, 2019
2 parents 498d201 + 15610e2 commit 291d40f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).
### Added
- Optionally retain s3 blobs on message delete [#16](https://github.com/PruvoNet/squiss-ts/issues/16)
- Optionally set prefix for s3 blob names [#15](https://github.com/PruvoNet/squiss-ts/issues/15)
### Fixed
- Batch messaging to create batches by max of 10 messages, or by max message size [#14](https://github.com/PruvoNet/squiss-ts/issues/14)

## [v1.4.0]
### Added
Expand Down
22 changes: 16 additions & 6 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -559,14 +559,24 @@ export class Squiss extends EventEmitter {
}
promises.push(this.perpareMessageRequest(msg, delay, currentAttributes));
});
return Promise.all(promises)
.then((messageRequests) => {
for (let i = 0; i < messageRequests.length; i++) {
if (i % AWS_MAX_SEND_BATCH === 0) {
return Promise.all([this.getQueueMaximumMessageSize(), Promise.all(promises)])
.then((results) => {
const queueMaximumMessageSize = results[0];
const messageRequests = results[1];
let currentBatchSize = 0;
let currentBatchLength = 0;
messageRequests.forEach((message) => {
const messageSize = getMessageSize(message);
if (currentBatchLength % AWS_MAX_SEND_BATCH === 0 ||
currentBatchSize + messageSize >= queueMaximumMessageSize) {
currentBatchLength = 0;
currentBatchSize = 0;
batches.push([]);
}
batches[batches.length - 1].push(messageRequests[i]);
}
currentBatchSize += messageSize;
currentBatchLength++;
batches[batches.length - 1].push(message);
});
return Promise.all(batches.map((batch, idx) => {
return this._sendMessageBatch(batch, delay, idx * AWS_MAX_SEND_BATCH);
}));
Expand Down
13 changes: 13 additions & 0 deletions test/src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1645,6 +1645,19 @@ describe('index', () => {
res.should.have.property('Failed').with.length(0);
});
});
it('sends multiple batches of messages and merges successes with batch size is too big', () => {
inst = new SquissPatched({queueUrl: 'foo'});
inst!.sqs = new SQSStub() as any as SQS;
const spy = sinon.spy(inst!.sqs, 'sendMessageBatch');
const msgs = 'a.b.c.d.e.f.g.h.i.j.k.l.m.n.o'.split('.');
msgs.unshift(generateLargeMessage(300));
return inst!.sendMessages(msgs).then((res: SQS.Types.SendMessageBatchResult) => {
spy.should.be.calledThrice();
(inst!.sqs as any as SQSStub).msgs.length.should.equal(16);
res.should.have.property('Successful').with.length(16);
res.should.have.property('Failed').with.length(0);
});
});
it('sends multiple batches of messages and merges failures', () => {
inst = new SquissPatched({queueUrl: 'foo'});
inst!.sqs = new SQSStub() as any as SQS;
Expand Down

0 comments on commit 291d40f

Please sign in to comment.