Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
dermasmid committed Mar 10, 2024
1 parent b2a7980 commit e18df90
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export interface BatchOptions {
callOptions?: CallOptions;
maxMessages?: number;
maxMilliseconds?: number;
maxBytes?: number;
}

/**
Expand Down Expand Up @@ -93,6 +94,7 @@ export class BatchError extends DebugMessage {
* @property {number} [maxMilliseconds=100] Maximum duration to wait before
* sending a batch. Batches can be sent earlier if the maxMessages option
* is met before the configured duration has passed.
* @property {number} [maxBytes=512000] Maximum number of bytes to allow in
*/
/**
* Class for buffering ack/modAck requests.
Expand All @@ -107,6 +109,7 @@ export abstract class MessageQueue {
numPendingRequests: number;
numInFlightRequests: number;
numInRetryRequests: number;
bytes: number;
protected _onFlush?: defer.DeferredPromise<void>;
protected _onDrain?: defer.DeferredPromise<void>;
protected _options!: BatchOptions;
Expand All @@ -121,6 +124,7 @@ export abstract class MessageQueue {
this.numPendingRequests = 0;
this.numInFlightRequests = 0;
this.numInRetryRequests = 0;
this.bytes = 0;
this._requests = [];
this._subscriber = sub;
this._retrier = new ExponentialRetry<QueuedMessage>(
Expand Down Expand Up @@ -188,7 +192,7 @@ export abstract class MessageQueue {
}
}

const {maxMessages, maxMilliseconds} = this._options;
const {maxMessages, maxMilliseconds, maxBytes} = this._options;

const responsePromise = defer<void>();
this._requests.push({
Expand All @@ -199,8 +203,9 @@ export abstract class MessageQueue {
});
this.numPendingRequests++;
this.numInFlightRequests++;
this.bytes += Buffer.byteLength(ackId, 'utf8');

if (this._requests.length >= maxMessages!) {
if (this._requests.length >= maxMessages! || this.bytes >= maxBytes!) {
this.flush();
} else if (!this._timer) {
this._timer = setTimeout(() => this.flush(), maxMilliseconds!);
Expand Down Expand Up @@ -264,6 +269,7 @@ export abstract class MessageQueue {
const deferred = this._onFlush;

this._requests = [];
this.bytes = 0;
this.numPendingRequests -= batchSize;
delete this._onFlush;

Expand Down Expand Up @@ -330,7 +336,11 @@ export abstract class MessageQueue {
* @private
*/
setOptions(options: BatchOptions): void {
const defaults: BatchOptions = {maxMessages: 3000, maxMilliseconds: 100};
const defaults: BatchOptions = {
maxMessages: 3000,
maxMilliseconds: 100,
maxBytes: 512000,
};

this._options = Object.assign(defaults, options);
}
Expand Down

0 comments on commit e18df90

Please sign in to comment.