Skip to content

Commit

Permalink
fix: move emitter wrapper out one level, to catch user functions prop…
Browse files Browse the repository at this point in the history
…erly
  • Loading branch information
feywind committed Dec 12, 2022
1 parent 5cddaed commit d523b1f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 38 deletions.
37 changes: 2 additions & 35 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {defaultOptions} from './default-options';
import {SubscriberClient} from './v1';
import * as otel from './opentelemetry-tracing';
import {Duration} from './temporal';
import {EmitterCallback, WrappingEmitter} from './wrapping-emitter';
import {EventEmitter} from 'events';

export type PullResponse = google.pubsub.v1.IStreamingPullResponse;
export type SubscriptionProperties =
Expand Down Expand Up @@ -443,7 +443,7 @@ const minAckDeadlineForExactlyOnceDelivery = Duration.from({seconds: 60});
* @param {Subscription} subscription The corresponding subscription.
* @param {SubscriberOptions} options The subscriber options.
*/
export class Subscriber extends WrappingEmitter {
export class Subscriber extends EventEmitter {
ackDeadline: number;
maxMessages: number;
maxBytes: number;
Expand All @@ -464,7 +464,6 @@ export class Subscriber extends WrappingEmitter {

constructor(subscription: Subscription, options = {}) {
super();
this.setEmitterWrapper(this.listenerWrapper.bind(this));

this.ackDeadline = defaultOptions.subscription.ackDeadline;
this.maxMessages = defaultOptions.subscription.maxOutstandingMessages;
Expand All @@ -479,38 +478,6 @@ export class Subscriber extends WrappingEmitter {
this.setOptions(options);
}

/**
* This wrapper will be called as part of the emit() process. This lets
* us capture the full time span of processing even if the user is using
* async callbacks.
*
* @private
*/
private listenerWrapper(
eventName: string | symbol,
listener: EmitterCallback,
args: unknown[]
) {
if (eventName !== 'message') {
return listener(...args);
} else {
const span = otel.SpanMaker.createReceiveProcessSpan(
args[0] as Message,
this.name
);

// If the user returned a Promise, that means they used an async handler.
// In that case, we need to tag on to their Promise to end the span.
// Otherwise, the listener chain is sync, and we can close out sync.
const result = listener(...args) as unknown as Promise<void>;
if (!!result && typeof result.then === 'function') {
result.then(() => span?.end());
} else {
span?.end();
}
}
}

/**
* Update our ack extension time that will be used by the lease manager
* for sending modAcks.
Expand Down
42 changes: 39 additions & 3 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/

import {EventEmitter} from 'events';
import * as extend from 'extend';
import {CallOptions} from 'google-gax';
import snakeCase = require('lodash.snakecase');
Expand Down Expand Up @@ -42,12 +41,15 @@ import {
SeekResponse,
Snapshot,
} from './snapshot';
import {Subscriber, SubscriberOptions} from './subscriber';
import {Message, Subscriber, SubscriberOptions} from './subscriber';
import {Topic} from './topic';
import {promisifySome} from './util';

export {AckError, AckResponse, AckResponses} from './subscriber';

import {EmitterCallback, WrappingEmitter} from './wrapping-emitter';
import * as otel from './opentelemetry-tracing';

export type PushConfig = google.pubsub.v1.IPushConfig;
export type OidcToken = google.pubsub.v1.PushConfig.IOidcToken;

Expand Down Expand Up @@ -266,7 +268,7 @@ export type DetachSubscriptionResponse = EmptyResponse;
* });
* ```
*/
export class Subscription extends EventEmitter {
export class Subscription extends WrappingEmitter {
pubsub: PubSub;
iam: IAM;
name: string;
Expand All @@ -277,6 +279,8 @@ export class Subscription extends EventEmitter {
constructor(pubsub: PubSub, name: string, options?: SubscriptionOptions) {
super();

this.setEmitterWrapper(this.listenerWrapper.bind(this));

options = options || {};

this.pubsub = pubsub;
Expand Down Expand Up @@ -334,6 +338,38 @@ export class Subscription extends EventEmitter {
this._listen();
}

/**
* This wrapper will be called as part of the emit() process. This lets
* us capture the full time span of processing even if the user is using
* async callbacks.
*
* @private
*/
private listenerWrapper(
eventName: string | symbol,
listener: EmitterCallback,
args: unknown[]
) {
if (eventName !== 'message') {
return listener(...args);
} else {
const span = otel.SpanMaker.createReceiveProcessSpan(
args[0] as Message,
this.name
);

// If the user returned a Promise, that means they used an async handler.
// In that case, we need to tag on to their Promise to end the span.
// Otherwise, the listener chain is sync, and we can close out sync.
const result = listener(...args) as unknown as Promise<void>;
if (!!result && typeof result.then === 'function') {
result.then(() => span?.end());
} else {
span?.end();
}
}
}

/**
* Indicates if the Subscription is open and receiving messages.
*
Expand Down

0 comments on commit d523b1f

Please sign in to comment.