Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update grpc.max_metadata_size to 4MiB for exactly-once, and shift ack/modack errors to 'debug' stream channel #1505

Merged
merged 8 commits into from
Apr 6, 2022
5 changes: 4 additions & 1 deletion src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ export abstract class MessageQueue {
try {
await this._sendBatch(batch);
} catch (e) {
this._subscriber.emit('error', e);
// These queues are used for ack and modAck messages, which should
// never surface an error to the user level. However, we'll emit
// them onto this debug channel in case debug info is needed.
this._subscriber.emit('debug', e);
}

this.numInFlightRequests -= batchSize;
Expand Down
12 changes: 11 additions & 1 deletion src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,17 @@ export class PubSub {
private schemaClient?: SchemaServiceClient;

constructor(options?: ClientConfig) {
options = options || {};
options = Object.assign({}, options || {});

// Needed for potentially large responses that may come from using exactly-once delivery.
// This will get passed down to grpc client objects.
const maxMetadataSize = 'grpc.max_metadata_size';
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const optionsAny = options as any;
if (optionsAny[maxMetadataSize] === undefined) {
optionsAny[maxMetadataSize] = 4 * 1024 * 1024; // 4 MiB
}

// Determine what scopes are needed.
// It is the union of the scopes on both clients.
const clientClasses = [v1.SubscriberClient, v1.PublisherClient];
Expand Down
1 change: 1 addition & 0 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ export class Subscriber extends EventEmitter {

this._stream
.on('error', err => this.emit('error', err))
.on('debug', err => this.emit('debug', err))
.on('data', (data: PullResponse) => this._onData(data))
.once('close', () => this.close());

Expand Down
8 changes: 8 additions & 0 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ export type DetachSubscriptionResponse = EmptyResponse;
listener: (error: StatusError) => void
): this;
on(event: 'close', listener: () => void): this;
on(event: 'debug', listener: (error: StatusError) => void); this;

// Only used internally.
on(event: 'newListener', listener: Function): this;
Expand Down Expand Up @@ -154,6 +155,9 @@ export type DetachSubscriptionResponse = EmptyResponse;
* Upon receipt of an error:
* on(event: 'error', listener: (error: Error) => void): this;
*
* Upon receipt of a (non-fatal) debug warning:
* on(event: 'debug', listener: (error: Error) => void): this;
*
* Upon the closing of the subscriber:
* on(event: 'close', listener: Function): this;
*
Expand Down Expand Up @@ -220,6 +224,9 @@ export type DetachSubscriptionResponse = EmptyResponse;
* // Register an error handler.
* subscription.on('error', (err) => {});
*
* // Register a debug handler, to catch non-fatal errors.
* subscription.on('debug', (err) => { console.error(err); });
*
* // Register a close handler in case the subscriber closes unexpectedly
* subscription.on('close', () => {});
*
Expand Down Expand Up @@ -318,6 +325,7 @@ export class Subscription extends EventEmitter {
this._subscriber = new Subscriber(this, options);
this._subscriber
.on('error', err => this.emit('error', err))
.on('debug', err => this.emit('debug', err))
.on('message', message => this.emit('message', message))
.on('close', () => this.emit('close'));

Expand Down
12 changes: 6 additions & 6 deletions test/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,12 @@ describe('MessageQueues', () => {
assert.deepStrictEqual(batch, expectedBatch);
});

it('should emit any errors', done => {
it('should emit any errors as debug events', done => {
const fakeError = new Error('err');

sandbox.stub(messageQueue.batches, 'push').throws(fakeError);

subscriber.on('error', err => {
subscriber.on('debug', err => {
assert.strictEqual(err, fakeError);
done();
});
Expand Down Expand Up @@ -362,7 +362,7 @@ describe('MessageQueues', () => {
assert.strictEqual(callOptions, fakeCallOptions);
});

it('should throw a BatchError if unable to ack', done => {
it('should throw a BatchError on "debug" if unable to ack', done => {
const messages = [
new FakeMessage(),
new FakeMessage(),
Expand All @@ -380,7 +380,7 @@ describe('MessageQueues', () => {

sandbox.stub(subscriber.client, 'acknowledge').rejects(fakeError);

subscriber.on('error', (err: BatchError) => {
subscriber.on('debug', (err: BatchError) => {
assert.strictEqual(err.message, expectedMessage);
assert.deepStrictEqual(err.ackIds, ackIds);
assert.strictEqual(err.code, fakeError.code);
Expand Down Expand Up @@ -487,7 +487,7 @@ describe('MessageQueues', () => {
assert.strictEqual(callOptions, fakeCallOptions);
});

it('should throw a BatchError if unable to modAck', done => {
it('should throw a BatchError on "debug" if unable to modAck', done => {
const messages = [
new FakeMessage(),
new FakeMessage(),
Expand All @@ -505,7 +505,7 @@ describe('MessageQueues', () => {

sandbox.stub(subscriber.client, 'modifyAckDeadline').rejects(fakeError);

subscriber.on('error', (err: BatchError) => {
subscriber.on('debug', (err: BatchError) => {
assert.strictEqual(err.message, expectedMessage);
assert.deepStrictEqual(err.ackIds, ackIds);
assert.strictEqual(err.code, fakeError.code);
Expand Down
16 changes: 16 additions & 0 deletions test/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,12 @@ describe('PubSub', () => {
});

describe('instantiation', () => {
const maxMetadataSizeKey = 'grpc.max_metadata_size';
const DEFAULT_OPTIONS = {
libName: 'gccl',
libVersion: PKG.version,
scopes: [],
[maxMetadataSizeKey]: 4 * 1024 * 1024,
};

it('should extend the correct methods', () => {
Expand All @@ -220,6 +222,20 @@ describe('PubSub', () => {
assert(new PubSub() instanceof PubSub);
});

it('should augment the gRPC options for metadata size', () => {
let pubsub = new PubSub();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let optionsAny: any = pubsub.options;
assert.strictEqual(optionsAny[maxMetadataSizeKey], 4 * 1024 * 1024);

optionsAny = {
[maxMetadataSizeKey]: 1 * 1024 * 1024,
};
pubsub = new PubSub(optionsAny);
optionsAny = pubsub.options;
assert.strictEqual(optionsAny[maxMetadataSizeKey], 1 * 1024 * 1024);
});

it('should combine all required scopes', () => {
v1ClientOverrides.SubscriberClient = {};
v1ClientOverrides.SubscriberClient.scopes = ['a', 'b', 'c'];
Expand Down
19 changes: 19 additions & 0 deletions test/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,25 @@ describe('Subscription', () => {
});
});

describe('debug', () => {
const error = new Error('err') as ServiceError;

beforeEach(() => {
subscription.request = (config, callback) => {
callback(error);
};
});

it('should return the debug events to the callback', done => {
subscription.on('debug', err => {
assert.strictEqual(err, error);
done();
});
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(subscription as any)._subscriber.emit('debug', error);
});
});

describe('delete', () => {
beforeEach(() => {
sandbox.stub(subscription, 'removeAllListeners').yields(util.noop);
Expand Down