Skip to content

Commit

Permalink
Fix ordering when flush and multiple batches
Browse files Browse the repository at this point in the history
  • Loading branch information
kimjoar committed Mar 28, 2023
1 parent fa3f83c commit 20018b6
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions src/publisher/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,12 @@ export class OrderedQueue extends MessageQueue {
/**
* Starts a timeout to publish any pending messages.
*/
beginNextPublish(): void {
beginNextPublish(callback?: PublishDone): void {
const maxMilliseconds = this.batchOptions.maxMilliseconds!;
const timeWaiting = Date.now() - this.currentBatch.created;
const delay = Math.max(0, maxMilliseconds - timeWaiting);

this.pending = setTimeout(() => this.publish(), delay);
this.pending = setTimeout(() => this.publish(callback), delay);
}
/**
* Creates a new {@link MessageBatch} instance.
Expand Down Expand Up @@ -361,7 +361,7 @@ export class OrderedQueue extends MessageQueue {
this.handlePublishFailure(err);
definedCallback(err);
} else if (this.batches.length) {
this.beginNextPublish();
this.beginNextPublish(callback);
} else {
this.emit('drain');
definedCallback(null);
Expand Down

0 comments on commit 20018b6

Please sign in to comment.