Skip to content

Commit

Permalink
feat: add minimum lease extensions for exactly-once
Browse files Browse the repository at this point in the history
  • Loading branch information
feywind committed May 30, 2022
1 parent 461b58b commit 0ca513a
Showing 1 changed file with 17 additions and 12 deletions.
29 changes: 17 additions & 12 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import {Subscription} from './subscription';
import {defaultOptions} from './default-options';
import {SubscriberClient} from './v1';
import {createSpan} from './opentelemetry-tracing';
import {Throttler} from './util';

export type PullResponse = google.pubsub.v1.IStreamingPullResponse;
export type SubscriptionProperties =
Expand Down Expand Up @@ -218,6 +217,8 @@ export interface SubscriberOptions {
enableOpenTelemetryTracing?: boolean;
}

const minAckSecondsForExactlyOnce = 60;

/**
* @typedef {object} SubscriberOptions
* @property {number} [ackDeadline=10] Acknowledge deadline in seconds. If left
Expand Down Expand Up @@ -256,7 +257,6 @@ export class Subscriber extends EventEmitter {
private _options!: SubscriberOptions;
private _stream!: MessageStream;
private _subscription: Subscription;
private _errorLog: Throttler;

subscriptionProperties?: SubscriptionProperties;

Expand All @@ -273,7 +273,6 @@ export class Subscriber extends EventEmitter {
this._histogram = new Histogram({min: 10, max: 600});
this._latencies = new Histogram();
this._subscription = subscription;
this._errorLog = new Throttler(60 * 1000);

this.setOptions(options);
}
Expand All @@ -287,14 +286,15 @@ export class Subscriber extends EventEmitter {
setSubscriptionProperties(subscriptionProperties: SubscriptionProperties) {
this.subscriptionProperties = subscriptionProperties;

// If this is an exactly-once subscription, warn the user that they may have difficulty.
if (this.subscriptionProperties.exactlyOnceDeliveryEnabled) {
this._errorLog.doMaybe(() =>
console.error(
'WARNING: Exactly-once subscriptions are not yet supported ' +
'by the Node client library. This feature will be added ' +
'in a future release.'
)
// If this is an exactly-once subscription, and the user didn't set their
// own minimum ack periods, set it to the default for exactly-once.
if (
this.subscriptionProperties.exactlyOnceDeliveryEnabled &&
!this._isUserSetDeadline
) {
this.ackDeadline = Math.max(
this.ackDeadline,
minAckSecondsForExactlyOnce
);
}
}
Expand All @@ -313,7 +313,12 @@ export class Subscriber extends EventEmitter {
bufferTime = this._modAcks.maxMilliseconds;
}

return latency * 1000 + bufferTime;
const calcedLatency = latency * 1000 + bufferTime;
if (this.subscriptionProperties?.exactlyOnceDeliveryEnabled) {
return Math.max(calcedLatency, minAckSecondsForExactlyOnce);
} else {
return calcedLatency;
}
}
/**
* The full name of the Subscription.
Expand Down

0 comments on commit 0ca513a

Please sign in to comment.