From abd10cc0c9956256bd06e5b48a412ec0af6dd086 Mon Sep 17 00:00:00 2001 From: Megan Potter <57276408+feywind@users.noreply.github.com> Date: Wed, 6 Apr 2022 15:56:53 -0400 Subject: [PATCH] fix: update grpc.max_metadata_size to 4MiB for exactly-once, and shift ack/modack errors to 'debug' stream channel (#1505) * fix: update grpc.max_metadata_size to 4MiB for exactly-once * fix: change ack and modAck errors to be optional debug warnings * tests: update testing to match earlier changes * fix: check against undefined, not falsey --- src/message-queues.ts | 5 ++++- src/pubsub.ts | 12 +++++++++++- src/subscriber.ts | 1 + src/subscription.ts | 8 ++++++++ test/message-queues.ts | 12 ++++++------ test/pubsub.ts | 16 ++++++++++++++++ test/subscription.ts | 19 +++++++++++++++++++ 7 files changed, 65 insertions(+), 8 deletions(-) diff --git a/src/message-queues.ts b/src/message-queues.ts index 82493a4d2..a46e71927 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -141,7 +141,10 @@ export abstract class MessageQueue { try { await this._sendBatch(batch); } catch (e) { - this._subscriber.emit('error', e); + // These queues are used for ack and modAck messages, which should + // never surface an error to the user level. However, we'll emit + // them onto this debug channel in case debug info is needed. + this._subscriber.emit('debug', e); } this.numInFlightRequests -= batchSize; diff --git a/src/pubsub.ts b/src/pubsub.ts index beb59ae6a..e2bb34594 100644 --- a/src/pubsub.ts +++ b/src/pubsub.ts @@ -272,7 +272,17 @@ export class PubSub { private schemaClient?: SchemaServiceClient; constructor(options?: ClientConfig) { - options = options || {}; + options = Object.assign({}, options || {}); + + // Needed for potentially large responses that may come from using exactly-once delivery. + // This will get passed down to grpc client objects. + const maxMetadataSize = 'grpc.max_metadata_size'; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const optionsAny = options as any; + if (optionsAny[maxMetadataSize] === undefined) { + optionsAny[maxMetadataSize] = 4 * 1024 * 1024; // 4 MiB + } + // Determine what scopes are needed. // It is the union of the scopes on both clients. const clientClasses = [v1.SubscriberClient, v1.PublisherClient]; diff --git a/src/subscriber.ts b/src/subscriber.ts index 46c1ebe60..ee29bdb8d 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -393,6 +393,7 @@ export class Subscriber extends EventEmitter { this._stream .on('error', err => this.emit('error', err)) + .on('debug', err => this.emit('debug', err)) .on('data', (data: PullResponse) => this._onData(data)) .once('close', () => this.close()); diff --git a/src/subscription.ts b/src/subscription.ts index 87773958c..6accd5adf 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -104,6 +104,7 @@ export type DetachSubscriptionResponse = EmptyResponse; listener: (error: StatusError) => void ): this; on(event: 'close', listener: () => void): this; + on(event: 'debug', listener: (error: StatusError) => void); this; // Only used internally. on(event: 'newListener', listener: Function): this; @@ -154,6 +155,9 @@ export type DetachSubscriptionResponse = EmptyResponse; * Upon receipt of an error: * on(event: 'error', listener: (error: Error) => void): this; * + * Upon receipt of a (non-fatal) debug warning: + * on(event: 'debug', listener: (error: Error) => void): this; + * * Upon the closing of the subscriber: * on(event: 'close', listener: Function): this; * @@ -220,6 +224,9 @@ export type DetachSubscriptionResponse = EmptyResponse; * // Register an error handler. * subscription.on('error', (err) => {}); * + * // Register a debug handler, to catch non-fatal errors. + * subscription.on('debug', (err) => { console.error(err); }); + * * // Register a close handler in case the subscriber closes unexpectedly * subscription.on('close', () => {}); * @@ -318,6 +325,7 @@ export class Subscription extends EventEmitter { this._subscriber = new Subscriber(this, options); this._subscriber .on('error', err => this.emit('error', err)) + .on('debug', err => this.emit('debug', err)) .on('message', message => this.emit('message', message)) .on('close', () => this.emit('close')); diff --git a/test/message-queues.ts b/test/message-queues.ts index 43bf46fdc..d60073cce 100644 --- a/test/message-queues.ts +++ b/test/message-queues.ts @@ -195,12 +195,12 @@ describe('MessageQueues', () => { assert.deepStrictEqual(batch, expectedBatch); }); - it('should emit any errors', done => { + it('should emit any errors as debug events', done => { const fakeError = new Error('err'); sandbox.stub(messageQueue.batches, 'push').throws(fakeError); - subscriber.on('error', err => { + subscriber.on('debug', err => { assert.strictEqual(err, fakeError); done(); }); @@ -362,7 +362,7 @@ describe('MessageQueues', () => { assert.strictEqual(callOptions, fakeCallOptions); }); - it('should throw a BatchError if unable to ack', done => { + it('should throw a BatchError on "debug" if unable to ack', done => { const messages = [ new FakeMessage(), new FakeMessage(), @@ -380,7 +380,7 @@ describe('MessageQueues', () => { sandbox.stub(subscriber.client, 'acknowledge').rejects(fakeError); - subscriber.on('error', (err: BatchError) => { + subscriber.on('debug', (err: BatchError) => { assert.strictEqual(err.message, expectedMessage); assert.deepStrictEqual(err.ackIds, ackIds); assert.strictEqual(err.code, fakeError.code); @@ -487,7 +487,7 @@ describe('MessageQueues', () => { assert.strictEqual(callOptions, fakeCallOptions); }); - it('should throw a BatchError if unable to modAck', done => { + it('should throw a BatchError on "debug" if unable to modAck', done => { const messages = [ new FakeMessage(), new FakeMessage(), @@ -505,7 +505,7 @@ describe('MessageQueues', () => { sandbox.stub(subscriber.client, 'modifyAckDeadline').rejects(fakeError); - subscriber.on('error', (err: BatchError) => { + subscriber.on('debug', (err: BatchError) => { assert.strictEqual(err.message, expectedMessage); assert.deepStrictEqual(err.ackIds, ackIds); assert.strictEqual(err.code, fakeError.code); diff --git a/test/pubsub.ts b/test/pubsub.ts index 03fd44a28..d908973d5 100644 --- a/test/pubsub.ts +++ b/test/pubsub.ts @@ -196,10 +196,12 @@ describe('PubSub', () => { }); describe('instantiation', () => { + const maxMetadataSizeKey = 'grpc.max_metadata_size'; const DEFAULT_OPTIONS = { libName: 'gccl', libVersion: PKG.version, scopes: [], + [maxMetadataSizeKey]: 4 * 1024 * 1024, }; it('should extend the correct methods', () => { @@ -220,6 +222,20 @@ describe('PubSub', () => { assert(new PubSub() instanceof PubSub); }); + it('should augment the gRPC options for metadata size', () => { + let pubsub = new PubSub(); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let optionsAny: any = pubsub.options; + assert.strictEqual(optionsAny[maxMetadataSizeKey], 4 * 1024 * 1024); + + optionsAny = { + [maxMetadataSizeKey]: 1 * 1024 * 1024, + }; + pubsub = new PubSub(optionsAny); + optionsAny = pubsub.options; + assert.strictEqual(optionsAny[maxMetadataSizeKey], 1 * 1024 * 1024); + }); + it('should combine all required scopes', () => { v1ClientOverrides.SubscriberClient = {}; v1ClientOverrides.SubscriberClient.scopes = ['a', 'b', 'c']; diff --git a/test/subscription.ts b/test/subscription.ts index 860cded5c..a5699b8a1 100644 --- a/test/subscription.ts +++ b/test/subscription.ts @@ -509,6 +509,25 @@ describe('Subscription', () => { }); }); + describe('debug', () => { + const error = new Error('err') as ServiceError; + + beforeEach(() => { + subscription.request = (config, callback) => { + callback(error); + }; + }); + + it('should return the debug events to the callback', done => { + subscription.on('debug', err => { + assert.strictEqual(err, error); + done(); + }); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (subscription as any)._subscriber.emit('debug', error); + }); + }); + describe('delete', () => { beforeEach(() => { sandbox.stub(subscription, 'removeAllListeners').yields(util.noop);