diff --git a/README.md b/README.md index f77054073..a1821be38 100644 --- a/README.md +++ b/README.md @@ -129,6 +129,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree | Create Push Subscription | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createPushSubscription.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createPushSubscription.js,samples/README.md) | | Create Subscription | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscription.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscription.js,samples/README.md) | | Create Subscription With Dead Letter Policy | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithDeadLetterPolicy.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithDeadLetterPolicy.js,samples/README.md) | +| Create an exactly-once delivery subscription | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithExactlyOnceDelivery.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithExactlyOnceDelivery.js,samples/README.md) | | Create Subscription With Filtering | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithFiltering.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithFiltering.js,samples/README.md) | | Create Subscription with ordering enabled | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithOrdering.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithOrdering.js,samples/README.md) | | Create Topic | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopic.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopic.js,samples/README.md) | @@ -148,6 +149,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree | Listen For Avro Records | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForAvroRecords.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForAvroRecords.js,samples/README.md) | | Listen For Errors | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForErrors.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForErrors.js,samples/README.md) | | Listen For Messages | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForMessages.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForMessages.js,samples/README.md) | +| Listen with exactly-once delivery | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForMessagesWithExactlyOnceDelivery.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForMessagesWithExactlyOnceDelivery.js,samples/README.md) | | Listen For Protobuf Messages | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForProtobufMessages.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForProtobufMessages.js,samples/README.md) | | Listen For Messages With Custom Attributes | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenWithCustomAttributes.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenWithCustomAttributes.js,samples/README.md) | | Modify Push Configuration | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/modifyPushConfig.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/modifyPushConfig.js,samples/README.md) | diff --git a/package.json b/package.json index 5d136193c..463bf2932 100644 --- a/package.json +++ b/package.json @@ -59,6 +59,7 @@ "extend": "^3.0.2", "google-auth-library": "^8.0.2", "google-gax": "^3.3.0", + "heap-js": "^2.2.0", "is-stream-ended": "^0.1.4", "lodash.snakecase": "^4.1.1", "p-defer": "^3.0.0" diff --git a/samples/README.md b/samples/README.md index 30dd790c9..fd833884c 100644 --- a/samples/README.md +++ b/samples/README.md @@ -26,6 +26,7 @@ guides. * [Create Push Subscription](#create-push-subscription) * [Create Subscription](#create-subscription) * [Create Subscription With Dead Letter Policy](#create-subscription-with-dead-letter-policy) + * [Create an exactly-once delivery subscription](#create-an-exactly-once-delivery-subscription) * [Create Subscription With Filtering](#create-subscription-with-filtering) * [Create Subscription with ordering enabled](#create-subscription-with-ordering-enabled) * [Create Topic](#create-topic) @@ -45,6 +46,7 @@ guides. * [Listen For Avro Records](#listen-for-avro-records) * [Listen For Errors](#listen-for-errors) * [Listen For Messages](#listen-for-messages) + * [Listen with exactly-once delivery](#listen-with-exactly-once-delivery) * [Listen For Protobuf Messages](#listen-for-protobuf-messages) * [Listen For Messages With Custom Attributes](#listen-for-messages-with-custom-attributes) * [Modify Push Configuration](#modify-push-configuration) @@ -199,6 +201,25 @@ __Usage:__ +### Create an exactly-once delivery subscription + +Demonstrates how to create a subscription for exactly-once delivery. + +View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithExactlyOnceDelivery.js). + +[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithExactlyOnceDelivery.js,samples/README.md) + +__Usage:__ + + +`node createSubscriptionWithExactlyOnceDelivery.js ` + + +----- + + + + ### Create Subscription With Filtering Creates a new subscription with filtering. @@ -560,6 +581,25 @@ __Usage:__ +### Listen with exactly-once delivery + +Listen for messages on an exactly-once delivery subscription. + +View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForMessagesWithExactlyOnceDelivery.js). + +[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForMessagesWithExactlyOnceDelivery.js,samples/README.md) + +__Usage:__ + + +`node listenForMessagesWithExactlyOnceDelivery.js ` + + +----- + + + + ### Listen For Protobuf Messages Listens for messages in protobuf encoding from a subscription. diff --git a/samples/createSubscriptionWithExactlyOnceDelivery.js b/samples/createSubscriptionWithExactlyOnceDelivery.js new file mode 100644 index 000000000..886d0ffd8 --- /dev/null +++ b/samples/createSubscriptionWithExactlyOnceDelivery.js @@ -0,0 +1,77 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// This is a generated sample. Please see typescript/README.md for more info. + +'use strict'; + +// sample-metadata: +// title: Create an exactly-once delivery subscription +// description: Demonstrates how to create a subscription for exactly-once delivery. +// usage: node createSubscriptionWithExactlyOnceDelivery.js + +// [START pubsub_create_subscription_with_exactly_once_delivery] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; +// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; + +// Imports the Google Cloud client library +const {PubSub} = require('@google-cloud/pubsub'); + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function createSubscriptionWithExactlyOnceDelivery( + topicNameOrId, + subscriptionNameOrId +) { + // Creates a new subscription + await pubSubClient + .topic(topicNameOrId) + .createSubscription(subscriptionNameOrId, { + enableExactlyOnceDelivery: true, + }); + console.log( + `Created subscription ${subscriptionNameOrId} with exactly-once delivery.` + ); + console.log( + 'To process messages, remember to check the return value of ackWithResponse().' + ); +} +// [END pubsub_create_subscription_with_exactly_once_delivery] + +function main( + topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', + subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID' +) { + createSubscriptionWithExactlyOnceDelivery( + topicNameOrId, + subscriptionNameOrId + ).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/listenForMessagesWithExactlyOnceDelivery.js b/samples/listenForMessagesWithExactlyOnceDelivery.js new file mode 100644 index 000000000..c38ff7dd6 --- /dev/null +++ b/samples/listenForMessagesWithExactlyOnceDelivery.js @@ -0,0 +1,103 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// This is a generated sample. Please see typescript/README.md for more info. + +'use strict'; + +// sample-metadata: +// title: Listen with exactly-once delivery +// description: Listen for messages on an exactly-once delivery subscription. +// usage: node listenForMessagesWithExactlyOnceDelivery.js + +// [START pubsub_subscriber_exactly_once] +/** + * TODO(developer): Uncomment this variable before running the sample. + */ +// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; + +// Imports the Google Cloud client library +const {PubSub} = require('@google-cloud/pubsub'); + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function listenForMessagesWithExactlyOnceDelivery( + subscriptionNameOrId, + timeout +) { + // References an existing subscription + const subscription = pubSubClient.subscription(subscriptionNameOrId); + + // Create an event handler to handle messages + let messageCount = 0; + const messageHandler = async message => { + console.log(`Received message ${message.id}:`); + console.log(`\tData: ${message.data}`); + console.log(`\tAttributes: ${message.attributes}`); + messageCount++; + + // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks + // the result of the acknowledge call. When exactly-once delivery is enabled + // on the subscription, the message is guaranteed not to be delivered again + // if the ack Promise resolves. + try { + // When the Promise resolves, the value is always AckResponses.Success, + // signaling that the ack was accepted. Note that you may call this + // method on a subscription without exactly-once delivery, but it will + // always return AckResponses.Success. + await message.ackWithResponse(); + console.log(`Ack for message ${message.id} successful.`); + } catch (e) { + // In all other cases, the error passed on reject will explain why. This + // is only for permanent failures; transient errors are retried automatically. + const ackError = e; + console.log( + `Ack for message ${message.id} failed with error: ${ackError.errorCode}` + ); + } + }; + + // Listen for new messages until timeout is hit + subscription.on('message', messageHandler); + + setTimeout(() => { + subscription.removeListener('message', messageHandler); + console.log(`${messageCount} message(s) received.`); + }, timeout * 1000); +} +// [END pubsub_subscriber_exactly_once] + +function main( + subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID', + timeout = 60 +) { + listenForMessagesWithExactlyOnceDelivery( + subscriptionNameOrId, + Number(timeout) + ).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/system-test/subscriptions.test.ts b/samples/system-test/subscriptions.test.ts index 5861c6dd2..b20d95a1c 100644 --- a/samples/system-test/subscriptions.test.ts +++ b/samples/system-test/subscriptions.test.ts @@ -505,4 +505,43 @@ describe('subscriptions', () => { .get(); assert.strictEqual(subscription.metadata?.enableMessageOrdering, true); }); + + it('should create an exactly-once delivery sub and listen on it.', async () => { + const testId = 'eos'; + const topic = await createTopic(testId); + const subName = reserveSub(testId); + const output = execSync( + `${commandFor('createSubscriptionWithExactlyOnceDelivery')} ${ + topic.name + } ${subName}` + ); + assert.include( + output, + `Created subscription ${subName} with exactly-once delivery.` + ); + + const [subscription] = await pubsub + .topic(topic.name) + .subscription(subName) + .get(); + assert.strictEqual(subscription.metadata?.enableExactlyOnceDelivery, true); + + const message = Buffer.from('test message'); + const messageIds = [ + await topic.publishMessage({ + data: message, + }), + await topic.publishMessage({ + data: message, + }), + ]; + + const output2 = execSync( + `${commandFor('listenForMessagesWithExactlyOnceDelivery')} ${subName} 15` + ); + + for (const id of messageIds) { + assert.include(output2, `Ack for message ${id} successful`); + } + }); }); diff --git a/samples/typescript/createSubscriptionWithExactlyOnceDelivery.ts b/samples/typescript/createSubscriptionWithExactlyOnceDelivery.ts new file mode 100644 index 000000000..472006fea --- /dev/null +++ b/samples/typescript/createSubscriptionWithExactlyOnceDelivery.ts @@ -0,0 +1,73 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Create an exactly-once delivery subscription +// description: Demonstrates how to create a subscription for exactly-once delivery. +// usage: node createSubscriptionWithExactlyOnceDelivery.js + +// [START pubsub_create_subscription_with_exactly_once_delivery] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; +// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; + +// Imports the Google Cloud client library +import {PubSub} from '@google-cloud/pubsub'; + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function createSubscriptionWithExactlyOnceDelivery( + topicNameOrId: string, + subscriptionNameOrId: string +) { + // Creates a new subscription + await pubSubClient + .topic(topicNameOrId) + .createSubscription(subscriptionNameOrId, { + enableExactlyOnceDelivery: true, + }); + console.log( + `Created subscription ${subscriptionNameOrId} with exactly-once delivery.` + ); + console.log( + 'To process messages, remember to check the return value of ackWithResponse().' + ); +} +// [END pubsub_create_subscription_with_exactly_once_delivery] + +function main( + topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', + subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID' +) { + createSubscriptionWithExactlyOnceDelivery( + topicNameOrId, + subscriptionNameOrId + ).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/typescript/listenForMessagesWithExactlyOnceDelivery.ts b/samples/typescript/listenForMessagesWithExactlyOnceDelivery.ts new file mode 100644 index 000000000..618ba9a30 --- /dev/null +++ b/samples/typescript/listenForMessagesWithExactlyOnceDelivery.ts @@ -0,0 +1,99 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This application demonstrates how to perform basic operations on + * schemas with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Listen with exactly-once delivery +// description: Listen for messages on an exactly-once delivery subscription. +// usage: node listenForMessagesWithExactlyOnceDelivery.js + +// [START pubsub_subscriber_exactly_once] +/** + * TODO(developer): Uncomment this variable before running the sample. + */ +// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; + +// Imports the Google Cloud client library +import {Message, PubSub, AckError} from '@google-cloud/pubsub'; + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +async function listenForMessagesWithExactlyOnceDelivery( + subscriptionNameOrId: string, + timeout: number +) { + // References an existing subscription + const subscription = pubSubClient.subscription(subscriptionNameOrId); + + // Create an event handler to handle messages + let messageCount = 0; + const messageHandler = async (message: Message) => { + console.log(`Received message ${message.id}:`); + console.log(`\tData: ${message.data}`); + console.log(`\tAttributes: ${message.attributes}`); + messageCount++; + + // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks + // the result of the acknowledge call. When exactly-once delivery is enabled + // on the subscription, the message is guaranteed not to be delivered again + // if the ack Promise resolves. + try { + // When the Promise resolves, the value is always AckResponses.Success, + // signaling that the ack was accepted. Note that you may call this + // method on a subscription without exactly-once delivery, but it will + // always return AckResponses.Success. + await message.ackWithResponse(); + console.log(`Ack for message ${message.id} successful.`); + } catch (e) { + // In all other cases, the error passed on reject will explain why. This + // is only for permanent failures; transient errors are retried automatically. + const ackError = e as AckError; + console.log( + `Ack for message ${message.id} failed with error: ${ackError.errorCode}` + ); + } + }; + + // Listen for new messages until timeout is hit + subscription.on('message', messageHandler); + + setTimeout(() => { + subscription.removeListener('message', messageHandler); + console.log(`${messageCount} message(s) received.`); + }, timeout * 1000); +} +// [END pubsub_subscriber_exactly_once] + +function main( + subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID', + timeout = 60 +) { + listenForMessagesWithExactlyOnceDelivery( + subscriptionNameOrId, + Number(timeout) + ).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/src/ack-metadata.ts b/src/ack-metadata.ts new file mode 100644 index 000000000..a26bf23d8 --- /dev/null +++ b/src/ack-metadata.ts @@ -0,0 +1,118 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {GoogleError, Status} from 'google-gax'; +import {AckResponse, AckResponses} from './subscriber'; + +const permanentFailureInvalidAckId = 'PERMANENT_FAILURE_INVALID_ACK_ID'; +const transientFailurePrefix = 'TRANSIENT_'; + +// If we get these as RPC errors, they will trigger a retry. +const exactlyOnceDeliveryTemporaryRetryErrors = [ + Status.DEADLINE_EXCEEDED, + Status.RESOURCE_EXHAUSTED, + Status.ABORTED, + Status.INTERNAL, + Status.UNAVAILABLE, +]; + +/** + * @private + */ +interface StringToString { + [propname: string]: string; +} + +/** + * Contains information about ack responses that may be used to build + * responses to user ack calls. + * + * @private + */ +export interface AckErrorInfo { + transient: boolean; + response?: AckResponse; + rawErrorCode?: string; + grpcErrorCode?: Status; +} + +export type AckErrorCodes = Map; + +/** + * Processes the raw RPC information when sending a batch of acks + * to the Pub/Sub service. + * + * @private + */ +export function processAckErrorInfo(rpcError: GoogleError): AckErrorCodes { + const ret = new Map(); + + if (!rpcError.errorInfoMetadata) { + return ret; + } + + // The typing for errorInfoMetadata is currently incorrect. + const metadata = rpcError.errorInfoMetadata as StringToString; + + for (const ackId of Object.getOwnPropertyNames(metadata)) { + const code = metadata[ackId]; + + if (code === permanentFailureInvalidAckId) { + ret.set(ackId, { + transient: false, + response: AckResponses.Invalid, + rawErrorCode: code, + }); + } else if (code.startsWith(transientFailurePrefix)) { + ret.set(ackId, { + transient: true, + rawErrorCode: code, + }); + } else { + ret.set(ackId, { + transient: false, + response: AckResponses.Other, + rawErrorCode: code, + }); + } + } + + return ret; +} + +/** + * For a completely failed RPC call, this will find the appropriate + * error information to return to an ack() caller. + * + * @private + */ +export function processAckRpcError(grpcCode: Status): AckErrorInfo { + const ackError: AckErrorInfo = { + transient: exactlyOnceDeliveryTemporaryRetryErrors.includes(grpcCode), + grpcErrorCode: grpcCode, + }; + switch (grpcCode) { + case Status.PERMISSION_DENIED: + ackError.response = AckResponses.PermissionDenied; + break; + case Status.FAILED_PRECONDITION: + ackError.response = AckResponses.FailedPrecondition; + break; + default: + ackError.response = AckResponses.Other; + break; + } + + return ackError; +} diff --git a/src/exponential-retry.ts b/src/exponential-retry.ts new file mode 100644 index 000000000..9187559a3 --- /dev/null +++ b/src/exponential-retry.ts @@ -0,0 +1,193 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import Heap from 'heap-js'; +import {Duration} from './temporal'; + +/** + * This interface specifies what we'll add to retried items in order + * to track them through the exponential backoff. + * + * @private + */ +export interface RetriedItem { + retryInfo?: RetryInfo; +} + +/** + * These items will go inside the added retry metadata. + * + * @private + */ +export interface RetryInfo { + // Date.now() value of when we first started retrying. + firstRetry: number; + + // Date.now() value of when we will next retry. + nextRetry: number; + + // The current multiplier for exponential backoff. + multiplier: number; + + // The user callback to call when it next comes off retry wait. + callback: RetryCallback; +} + +// Compare function for Heap so we can use it as a priority queue. +function comparator(a: RetriedItem, b: RetriedItem) { + return a.retryInfo!.nextRetry - b.retryInfo!.nextRetry; +} + +/** + * Users of this class will pass in a callback in this form when + * an item is ready to be retried. The item must be placed + * back on the queue if it needs to be retried again. + * + * @private + */ +export interface RetryCallback { + (item: T, totalTime: Duration): void; +} + +/** + * Provides a helper that will manage your retries using the "truncated + * exponential backoff" strategy. + * + * Most of the pieces of this library are doing retries via gax, but for + * exactly-once delivery, we have some things where gRPC failures won't + * take care of it. + * + * @private + */ +export class ExponentialRetry { + private _items = new Heap>(comparator); + private _backoffMs: number; + private _maxBackoffMs: number; + private _timer?: NodeJS.Timeout; + + constructor(backoff: Duration, maxBackoff: Duration) { + this._backoffMs = backoff.totalOf('millisecond'); + this._maxBackoffMs = maxBackoff.totalOf('millisecond'); + } + + /** + * Shut down all operations/timers/etc and return a list of + * items that were still pending retry. + * + * @private + */ + close(): T[] { + if (this._timer) { + clearTimeout(this._timer); + } + + const leftovers = this._items.toArray(); + this._items.clear(); + return leftovers as T[]; + } + + /** + * Place an item on the retry queue. It's important that it's the + * same exact item that was already on the queue, if it's being retried + * more than once. + * + * @private + */ + retryLater(item: T, callback: RetryCallback) { + const retried = item as RetriedItem; + const retryInfo = retried.retryInfo; + + if (!retryInfo) { + // This item's first time through. + retried.retryInfo = { + firstRetry: Date.now(), + nextRetry: Date.now() + this.randomizeDelta(this._backoffMs), + multiplier: 1, + callback, + }; + } else { + // Not the first time - handle backoff. + const nextMultiplier = retryInfo.multiplier * 2; + let delta = this.randomizeDelta(nextMultiplier * this._backoffMs); + if (delta > this._maxBackoffMs) { + delta = this.randomizeDelta(this._maxBackoffMs); + } else { + retryInfo.multiplier = nextMultiplier; + } + retryInfo.nextRetry = Date.now() + delta; + } + + // Re-sort it into the heap with the correct position. + // It's my assumption here that any item that is being retried is + // very likely near or at the top. + this._items.remove(retried); + this._items.push(retried); + + // Schedule the next retry. + this.scheduleRetry(); + } + + // Takes a time delta and adds fuzz. + private randomizeDelta(durationMs: number): number { + // The fuzz distance should never exceed one second, but in the + // case of smaller things, don't end up with a negative delta. + const magnitude = durationMs < 1000 ? durationMs : 1000; + const offset = Math.random() * magnitude - magnitude / 2.0; + return durationMs + offset; + } + + // Looks through the queue to see if there's anything to handle. + private doRetries() { + const now = Date.now(); + while (!this._items.isEmpty()) { + const next = this._items.peek()!; + + // Within 10msec is close enough. + if (next.retryInfo!.nextRetry - now < 10) { + this._items.pop(); + + next.retryInfo!.callback( + next as unknown as T, + Duration.from({millis: now - next.retryInfo!.firstRetry}) + ); + } else { + break; + } + } + + // Is there stuff to retry still? + if (!this._items.isEmpty()) { + this.scheduleRetry(); + } + } + + // If there are items to retry, schedule the next timer event. + private scheduleRetry() { + // What's next? + const next = this._items.peek(); + if (next) { + let delta = next.retryInfo!.nextRetry - Date.now(); + if (delta < 0) { + delta = 0; + } + + if (this._timer) { + clearTimeout(this._timer); + } + this._timer = setTimeout(() => { + this.doRetries(); + }, delta); + } + } +} diff --git a/src/index.ts b/src/index.ts index b4684e998..787b126ee 100644 --- a/src/index.ts +++ b/src/index.ts @@ -154,6 +154,9 @@ export { SetSubscriptionMetadataCallback, SetSubscriptionMetadataResponse, Subscription, + AckError, + AckResponse, + AckResponses, } from './subscription'; export { CreateTopicCallback, diff --git a/src/message-queues.ts b/src/message-queues.ts index e8f431720..9d87980b3 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -14,12 +14,39 @@ * limitations under the License. */ -import {CallOptions, grpc, ServiceError} from 'google-gax'; +import {CallOptions, GoogleError, grpc, RetryOptions} from 'google-gax'; import defer = require('p-defer'); +import { + AckErrorInfo, + AckErrorCodes, + processAckErrorInfo, + processAckRpcError, +} from './ack-metadata'; +import {ExponentialRetry} from './exponential-retry'; +import { + AckError, + AckResponse, + AckResponses, + Message, + Subscriber, +} from './subscriber'; +import {Duration} from './temporal'; +import {addToBucket} from './util'; -import {Message, Subscriber} from './subscriber'; +/** + * @private + */ +export interface QueuedMessage { + ackId: string; + deadline?: number; + responsePromise?: defer.DeferredPromise; + retryCount: number; +} -type QueuedMessages = Array<[string, number?]>; +/** + * @private + */ +export type QueuedMessages = Array; export interface BatchOptions { callOptions?: CallOptions; @@ -30,17 +57,19 @@ export interface BatchOptions { /** * Error class used to signal a batch failure. * + * Now that we have exactly-once delivery subscriptions, we'll only + * throw one of these if there was an unknown error. + * * @class * * @param {string} message The error message. - * @param {ServiceError} err The grpc service error. + * @param {GoogleError} err The grpc error. */ -export class BatchError extends Error implements grpc.ServiceError { +export class BatchError extends Error { ackIds: string[]; code: grpc.status; details: string; - metadata: grpc.Metadata; - constructor(err: grpc.ServiceError, ackIds: string[], rpc: string) { + constructor(err: GoogleError, ackIds: string[], rpc: string) { super( `Failed to "${rpc}" for ${ackIds.length} message(s). Reason: ${ process.env.DEBUG_GRPC ? err.stack : err.message @@ -48,9 +77,8 @@ export class BatchError extends Error implements grpc.ServiceError { ); this.ackIds = ackIds; - this.code = err.code; - this.details = err.details; - this.metadata = err.metadata; + this.code = err.code!; + this.details = err.message; } } @@ -76,21 +104,62 @@ export class BatchError extends Error implements grpc.ServiceError { export abstract class MessageQueue { numPendingRequests: number; numInFlightRequests: number; + numInRetryRequests: number; protected _onFlush?: defer.DeferredPromise; protected _onDrain?: defer.DeferredPromise; protected _options!: BatchOptions; protected _requests: QueuedMessages; protected _subscriber: Subscriber; protected _timer?: NodeJS.Timer; - protected abstract _sendBatch(batch: QueuedMessages): Promise; + protected _retrier: ExponentialRetry; + protected _closed = false; + protected abstract _sendBatch(batch: QueuedMessages): Promise; + constructor(sub: Subscriber, options = {} as BatchOptions) { this.numPendingRequests = 0; this.numInFlightRequests = 0; + this.numInRetryRequests = 0; this._requests = []; this._subscriber = sub; + this._retrier = new ExponentialRetry( + Duration.from({seconds: 1}), + Duration.from({seconds: 64}) + ); this.setOptions(options); } + + /** + * Shuts down this message queue gracefully. Any acks/modAcks pending in + * the queue or waiting for retry will be removed. If exactly-once delivery + * is enabled on the subscription, we'll send permanent failures to + * anyone waiting on completions; otherwise we'll send successes. + * + * If a flush is desired first, do it before calling close(). + * + * @private + */ + close() { + let requests = this._requests; + this._requests = []; + this.numInFlightRequests = this.numPendingRequests = 0; + requests = requests.concat(this._retrier.close()); + const isExactlyOnceDelivery = this._subscriber.isExactlyOnceDelivery; + requests.forEach(r => { + if (r.responsePromise) { + if (isExactlyOnceDelivery) { + r.responsePromise.reject( + new AckError(AckResponses.Invalid, 'Subscriber closed') + ); + } else { + r.responsePromise.resolve(); + } + } + }); + + this._closed = true; + } + /** * Gets the default buffer time in ms. * @@ -100,6 +169,7 @@ export abstract class MessageQueue { get maxMilliseconds(): number { return this._options!.maxMilliseconds!; } + /** * Adds a message to the queue. * @@ -107,19 +177,76 @@ export abstract class MessageQueue { * @param {number} [deadline] The deadline. * @private */ - add({ackId}: Message, deadline?: number): void { + add({ackId}: Message, deadline?: number): Promise { + if (this._closed) { + if (this._subscriber.isExactlyOnceDelivery) { + throw new AckError(AckResponses.Invalid, 'Subscriber closed'); + } else { + return Promise.resolve(); + } + } + const {maxMessages, maxMilliseconds} = this._options; - this._requests.push([ackId, deadline]); - this.numPendingRequests += 1; - this.numInFlightRequests += 1; + const responsePromise = defer(); + this._requests.push({ + ackId, + deadline, + responsePromise, + retryCount: 0, + }); + this.numPendingRequests++; + this.numInFlightRequests++; if (this._requests.length >= maxMessages!) { this.flush(); } else if (!this._timer) { this._timer = setTimeout(() => this.flush(), maxMilliseconds!); } + + return responsePromise.promise; } + + /** + * Retry handler for acks/modacks that have transient failures. Unless + * it's passed the final deadline, we will just re-queue it for sending. + * + * @private + */ + private handleRetry(message: QueuedMessage, totalTime: Duration) { + // Has it been too long? + if (totalTime.totalOf('minute') >= 10 || this.shouldFailEarly(message)) { + message.responsePromise?.reject( + new AckError(AckResponses.Invalid, 'Retried for too long') + ); + return; + } + + // Just throw it in for another round of processing on the next batch. + this._requests.push(message); + this.numPendingRequests++; + this.numInFlightRequests++; + this.numInRetryRequests--; + + // Make sure we actually do have another batch scheduled. + if (!this._timer) { + this._timer = setTimeout( + () => this.flush(), + this._options.maxMilliseconds! + ); + } + } + + /** + * This hook lets a subclass tell the retry handler to go ahead and fail early. + * + * @private + */ + protected shouldFailEarly(message: QueuedMessage): boolean { + message; + return false; + } + /** * Sends a batch of messages. * @private @@ -139,7 +266,14 @@ export abstract class MessageQueue { delete this._onFlush; try { - await this._sendBatch(batch); + const toRetry = await this._sendBatch(batch); + + // We'll get back anything that needs a retry for transient errors. + for (const m of toRetry) { + this.numInRetryRequests++; + m.retryCount++; + this._retrier.retryLater(m, this.handleRetry.bind(this)); + } } catch (e) { // These queues are used for ack and modAck messages, which should // never surface an error to the user level. However, we'll emit @@ -152,11 +286,16 @@ export abstract class MessageQueue { deferred.resolve(); } - if (this.numInFlightRequests <= 0 && this._onDrain) { + if ( + this.numInFlightRequests <= 0 && + this.numInRetryRequests <= 0 && + this._onDrain + ) { this._onDrain.resolve(); delete this._onDrain; } } + /** * Returns a promise that resolves after the next flush occurs. * @@ -169,6 +308,7 @@ export abstract class MessageQueue { } return this._onFlush.promise; } + /** * Returns a promise that resolves when all in-flight messages have settled. */ @@ -178,6 +318,7 @@ export abstract class MessageQueue { } return this._onDrain.promise; } + /** * Set the batching options. * @@ -189,6 +330,124 @@ export abstract class MessageQueue { this._options = Object.assign(defaults, options); } + + /** + * Succeed a whole batch of Acks/Modacks for an OK RPC response. + * + * @private + */ + handleAckSuccesses(batch: QueuedMessages) { + // Everyone gets a resolve! + batch.forEach(({responsePromise}) => { + responsePromise?.resolve(); + }); + } + + /** + * If we get an RPC failure of any kind, this will take care of deciding + * what to do for each related ack/modAck. Successful ones will have their + * Promises resolved, permanent errors will have their Promises rejected, + * and transients will be returned for retry. + * + * Note that this is only used for subscriptions with exactly-once + * delivery enabled, so _sendBatch() in the classes below take care of + * resolving errors to success; they don't make it here. + * + * @private + */ + handleAckFailures( + operation: string, + batch: QueuedMessages, + rpcError: GoogleError + ) { + const toSucceed: QueuedMessages = []; + const toRetry: QueuedMessages = []; + const toError = new Map([ + [AckResponses.PermissionDenied, []], + [AckResponses.FailedPrecondition, []], + [AckResponses.Other, []], + ]); + + // Parse any error codes, both for the RPC call and the ErrorInfo. + const error: AckErrorInfo | undefined = rpcError.code + ? processAckRpcError(rpcError.code) + : undefined; + const codes: AckErrorCodes = processAckErrorInfo(rpcError); + + for (const m of batch) { + if (codes.has(m.ackId)) { + // This ack has an ErrorInfo entry, so use that to route it. + const code = codes.get(m.ackId)!; + if (code.transient) { + // Transient errors get retried. + toRetry.push(m); + } else { + // It's a permanent error. + addToBucket(toError, code.response!, m); + } + } else if (error !== undefined) { + // This ack doesn't have an ErrorInfo entry, but we do have an RPC + // error, so use that to route it. + if (error.transient) { + toRetry.push(m); + } else { + addToBucket(toError, error.response!, m); + } + } else { + // Looks like this one worked out. + toSucceed.push(m); + } + } + + // To remain consistent with previous behaviour, we will push a debug + // stream message if an unknown error happens during ack. + const others = toError.get(AckResponses.Other); + if (others?.length) { + const otherIds = others.map(e => e.ackId); + this._subscriber.emit( + 'debug', + new BatchError(rpcError, otherIds, operation) + ); + } + + // Take care of following up on all the Promises. + toSucceed.forEach(m => { + m.responsePromise?.resolve(); + }); + for (const e of toError.entries()) { + e[1].forEach(m => { + const exc = new AckError(e[0], rpcError.message); + m.responsePromise?.reject(exc); + }); + } + return { + toError, + toRetry, + }; + } + + /** + * Since we handle our own retries for ack/modAck calls when exactly-once + * delivery is enabled on a subscription, we conditionally need to disable + * the gax retries. This returns an appropriate CallOptions for the + * subclasses to pass down. + * + * @private + */ + protected getCallOptions(): CallOptions | undefined { + let callOptions = this._options.callOptions; + if (this._subscriber.isExactlyOnceDelivery) { + // If exactly-once-delivery is enabled, tell gax not to do retries for us. + callOptions = Object.assign({}, callOptions ?? {}); + callOptions.retry = new RetryOptions([], { + initialRetryDelayMillis: 0, + retryDelayMultiplier: 0, + maxRetryDelayMillis: 0, + }); + } + + return callOptions; + } } /** @@ -206,15 +465,41 @@ export class AckQueue extends MessageQueue { * @param {Array.>} batch Array of ackIds and deadlines. * @return {Promise} */ - protected async _sendBatch(batch: QueuedMessages): Promise { + protected async _sendBatch(batch: QueuedMessages): Promise { const client = await this._subscriber.getClient(); - const ackIds = batch.map(([ackId]) => ackId); + const ackIds = batch.map(({ackId}) => ackId); const reqOpts = {subscription: this._subscriber.name, ackIds}; try { - await client.acknowledge(reqOpts, this._options.callOptions!); + await client.acknowledge(reqOpts, this.getCallOptions()); + + // It's okay if these pass through since they're successful anyway. + this.handleAckSuccesses(batch); + return []; } catch (e) { - throw new BatchError(e as ServiceError, ackIds, 'acknowledge'); + // If exactly-once delivery isn't enabled, don't do error processing. We'll + // emulate previous behaviour by resolving all pending Promises with + // a success status, and then throwing a BatchError for debug logging. + if (!this._subscriber.isExactlyOnceDelivery) { + batch.forEach(m => { + m.responsePromise?.resolve(); + }); + throw new BatchError(e as GoogleError, ackIds, 'ack'); + } else { + const grpcError = e as GoogleError; + try { + const results = this.handleAckFailures('ack', batch, grpcError); + return results.toRetry; + } catch (e) { + // This should only ever happen if there's a code failure. + this._subscriber.emit('debug', e); + const exc = new AckError(AckResponses.Other, 'Code error'); + batch.forEach(m => { + m.responsePromise?.reject(exc); + }); + return []; + } + } } } } @@ -235,33 +520,66 @@ export class ModAckQueue extends MessageQueue { * @param {Array.>} batch Array of ackIds and deadlines. * @return {Promise} */ - protected async _sendBatch(batch: QueuedMessages): Promise { + protected async _sendBatch(batch: QueuedMessages): Promise { const client = await this._subscriber.getClient(); const subscription = this._subscriber.name; - const modAckTable: {[index: string]: string[]} = batch.reduce( - (table: {[index: string]: string[]}, [ackId, deadline]) => { - if (!table[deadline!]) { - table[deadline!] = []; + const modAckTable: {[index: string]: QueuedMessages} = batch.reduce( + (table: {[index: string]: QueuedMessages}, message) => { + if (!table[message.deadline!]) { + table[message.deadline!] = []; } - table[deadline!].push(ackId); + table[message.deadline!].push(message); return table; }, {} ); + const callOptions = this.getCallOptions(); const modAckRequests = Object.keys(modAckTable).map(async deadline => { - const ackIds = modAckTable[deadline]; + const messages = modAckTable[deadline]; + const ackIds = messages.map(m => m.ackId); const ackDeadlineSeconds = Number(deadline); const reqOpts = {subscription, ackIds, ackDeadlineSeconds}; try { - await client.modifyAckDeadline(reqOpts, this._options.callOptions!); + await client.modifyAckDeadline(reqOpts, callOptions); + + // It's okay if these pass through since they're successful anyway. + this.handleAckSuccesses(messages); + return []; } catch (e) { - throw new BatchError(e as ServiceError, ackIds, 'modifyAckDeadline'); + // If exactly-once delivery isn't enabled, don't do error processing. We'll + // emulate previous behaviour by resolving all pending Promises with + // a success status, and then throwing a BatchError for debug logging. + if (!this._subscriber.isExactlyOnceDelivery) { + batch.forEach(m => { + m.responsePromise?.resolve(); + }); + throw new BatchError(e as GoogleError, ackIds, 'modAck'); + } else { + const grpcError = e as GoogleError; + + const newBatch = this.handleAckFailures( + 'modAck', + messages, + grpcError + ); + return newBatch.toRetry; + } } }); - await Promise.all(modAckRequests); + // This catches the sub-failures and bubbles up anything we need to bubble. + const allNewBatches: QueuedMessages[] = await Promise.all(modAckRequests); + return allNewBatches.reduce((p: QueuedMessage[], c: QueuedMessage[]) => [ + ...(p ?? []), + ...c, + ]); + } + + // For modacks only, we'll stop retrying after 3 tries. + protected shouldFailEarly(message: QueuedMessage): boolean { + return message.retryCount >= 3; } } diff --git a/src/message-stream.ts b/src/message-stream.ts index 8eddf364a..0e8e2017d 100644 --- a/src/message-stream.ts +++ b/src/message-stream.ts @@ -23,6 +23,7 @@ import {PullRetry} from './pull-retry'; import {Subscriber} from './subscriber'; import {google} from '../protos/protos'; import {defaultOptions} from './default-options'; +import {Duration} from './temporal'; /*! * Frequency to ping streams. @@ -151,6 +152,23 @@ export class MessageStream extends PassThrough { ); this._keepAliveHandle.unref(); } + + /** + * Updates the stream ack deadline with the server. + * + * @param {Duration} deadline The new deadline value to set. + */ + setStreamAckDeadline(deadline: Duration) { + const request: StreamingPullRequest = { + streamAckDeadlineSeconds: deadline.totalOf('second'), + }; + + for (const stream of this._streams.keys()) { + // We don't need a callback on this one, it's advisory. + stream.write(request); + } + } + /** * Destroys the stream and any underlying streams. * diff --git a/src/subscriber.ts b/src/subscriber.ts index fe9cf0bc3..2011aad63 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -30,13 +30,40 @@ import {Subscription} from './subscription'; import {defaultOptions} from './default-options'; import {SubscriberClient} from './v1'; import {createSpan} from './opentelemetry-tracing'; -import {Throttler} from './util'; import {Duration} from './temporal'; export type PullResponse = google.pubsub.v1.IStreamingPullResponse; export type SubscriptionProperties = google.pubsub.v1.StreamingPullResponse.ISubscriptionProperties; +type ValueOf = T[keyof T]; +export const AckResponses = { + PermissionDenied: 'PERMISSION_DENIED' as const, + FailedPrecondition: 'FAILED_PRECONDITION' as const, + Success: 'SUCCESS' as const, + Invalid: 'INVALID' as const, + Other: 'OTHER' as const, +}; +export type AckResponse = ValueOf; + +/** + * Thrown when an error is detected in an ack/nack/modack call, when + * exactly-once delivery is enabled on the subscription. This will + * only be thrown for actual errors that can't be retried. + */ +export class AckError extends Error { + errorCode: AckResponse; + + constructor(errorCode: AckResponse, message?: string) { + let finalMessage = `${errorCode}`; + if (message) { + finalMessage += ` : ${message}`; + } + super(finalMessage); + this.errorCode = errorCode; + } +} + /** * Date object with nanosecond precision. Supports all standard Date arguments * in addition to several custom types. @@ -184,6 +211,31 @@ export class Message { } } + /** + * Acknowledges the message, expecting a response (for exactly-once delivery subscriptions). + * If exactly-once delivery is not enabled, this will immediately resolve successfully. + * + * @example + * ``` + * subscription.on('message', async (message) => { + * const response = await message.ackWithResponse(); + * }); + * ``` + */ + async ackWithResponse(): Promise { + if (!this._subscriber.isExactlyOnceDelivery) { + this.ack(); + return AckResponses.Success; + } + + if (!this._handled) { + this._handled = true; + return await this._subscriber.ackWithResponse(this); + } else { + return AckResponses.Invalid; + } + } + /** * Modifies the ack deadline. * @@ -196,6 +248,26 @@ export class Message { } } + /** + * Modifies the ack deadline, expecting a response (for exactly-once delivery subscriptions). + * If exactly-once delivery is not enabled, this will immediately resolve successfully. + * + * @param {number} deadline The number of seconds to extend the deadline. + * @private + */ + async modAckWithResponse(deadline: number): Promise { + if (!this._subscriber.isExactlyOnceDelivery) { + this.modAck(deadline); + return AckResponses.Success; + } + + if (!this._handled) { + return await this._subscriber.modAckWithResponse(this, deadline); + } else { + return AckResponses.Invalid; + } + } + /** * Removes the message from our inventory and schedules it to be redelivered. * @@ -212,6 +284,32 @@ export class Message { this._subscriber.nack(this); } } + + /** + * Removes the message from our inventory and schedules it to be redelivered, + * with the modAck response being returned (for exactly-once delivery subscriptions). + * If exactly-once delivery is not enabled, this will immediately resolve successfully. + * + * @example + * ``` + * subscription.on('message', async (message) => { + * const response = await message.nackWithResponse(); + * }); + * ``` + */ + async nackWithResponse(): Promise { + if (!this._subscriber.isExactlyOnceDelivery) { + this.nack(); + return AckResponses.Success; + } + + if (!this._handled) { + this._handled = true; + return await this._subscriber.nackWithResponse(this); + } else { + return AckResponses.Invalid; + } + } } /** @@ -246,7 +344,7 @@ export interface SubscriberOptions { enableOpenTelemetryTracing?: boolean; } -const minAckDeadlineForExactlyOnce = Duration.from({seconds: 60}); +const minAckDeadlineForExactlyOnceDelivery = Duration.from({seconds: 60}); /** * Subscriber class is used to manage all message related functionality. @@ -273,7 +371,6 @@ export class Subscriber extends EventEmitter { private _options!: SubscriberOptions; private _stream!: MessageStream; private _subscription: Subscription; - private _errorLog: Throttler; subscriptionProperties?: SubscriptionProperties; @@ -289,7 +386,6 @@ export class Subscriber extends EventEmitter { this._histogram = new Histogram({min: 10, max: 600}); this._latencies = new Histogram(); this._subscription = subscription; - this._errorLog = new Throttler(60 * 1000); this.setOptions(options); } @@ -318,7 +414,7 @@ export class Subscriber extends EventEmitter { } // Grab our current min/max deadline values, based on whether exactly-once - // is enabled, and the defaults. + // delivery is enabled, and the defaults. const [minDeadline, maxDeadline] = this.getMinMaxDeadlines(); if (minDeadline) { @@ -333,10 +429,11 @@ export class Subscriber extends EventEmitter { } private getMinMaxDeadlines(): [Duration?, Duration?] { - // If this is an exactly-once subscription, and the user didn't set their - // own minimum ack periods, set it to the default for exactly-once. - const defaultMinDeadline = this.isExactlyOnce - ? minAckDeadlineForExactlyOnce + // If this is an exactly-once delivery subscription, and the user + // didn't set their own minimum ack periods, set it to the default + // for exactly-once delivery. + const defaultMinDeadline = this.isExactlyOnceDelivery + ? minAckDeadlineForExactlyOnceDelivery : defaultOptions.subscription.minAckDeadline; const defaultMaxDeadline = defaultOptions.subscription.maxAckDeadline; @@ -348,11 +445,11 @@ export class Subscriber extends EventEmitter { } /** - * Returns true if an exactly once subscription has been detected. + * Returns true if an exactly-once delivery subscription has been detected. * * @private */ - get isExactlyOnce(): boolean { + get isExactlyOnceDelivery(): boolean { if (!this.subscriptionProperties) { return false; } @@ -367,25 +464,22 @@ export class Subscriber extends EventEmitter { * @private */ setSubscriptionProperties(subscriptionProperties: SubscriptionProperties) { - const previouslyEnabled = this.isExactlyOnce; + const previouslyEnabled = this.isExactlyOnceDelivery; this.subscriptionProperties = subscriptionProperties; - // If this is an exactly-once subscription... - if (this.subscriptionProperties.exactlyOnceDeliveryEnabled) { - // Warn the user that they may have difficulty. - this._errorLog.doMaybe(() => - console.error( - 'WARNING: Exactly-once subscriptions are not yet supported ' + - 'by the Node client library. This feature will be added ' + - 'in a future release.' - ) - ); - } - // Update ackDeadline in case the flag switched. - if (previouslyEnabled !== this.isExactlyOnce) { + if (previouslyEnabled !== this.isExactlyOnceDelivery) { this.updateAckDeadline(); + + // For exactly-once delivery, make sure the subscription ack deadline is 60. + // (Otherwise fall back to the default of 10 seconds.) + const subscriptionAckDeadlineSeconds = this.isExactlyOnceDelivery + ? 60 + : 10; + this._stream.setStreamAckDeadline( + Duration.from({seconds: subscriptionAckDeadlineSeconds}) + ); } } @@ -425,18 +519,42 @@ export class Subscriber extends EventEmitter { * Acknowledges the supplied message. * * @param {Message} message The message to acknowledge. - * @returns {Promise} + * @returns {Promise} * @private */ async ack(message: Message): Promise { const ackTimeSeconds = (Date.now() - message.received) / 1000; this.updateAckDeadline(ackTimeSeconds); - this._acks.add(message); + // Ignore this in this version of the method (but hook then/catch + // to avoid unhandled exceptions). + const resultPromise = this._acks.add(message); + resultPromise.then(() => {}); + resultPromise.catch(() => {}); + await this._acks.onFlush(); this._inventory.remove(message); } + /** + * Acknowledges the supplied message, expecting a response (for exactly + * once subscriptions). + * + * @param {Message} message The message to acknowledge. + * @returns {Promise} + * @private + */ + async ackWithResponse(message: Message): Promise { + const ackTimeSeconds = (Date.now() - message.received) / 1000; + this.updateAckDeadline(ackTimeSeconds); + + await this._acks.add(message); + this._inventory.remove(message); + + // No exception means Success. + return AckResponses.Success; + } + /** * Closes the subscriber. The returned promise will resolve once any pending * acks/modAcks are finished. @@ -456,6 +574,9 @@ export class Subscriber extends EventEmitter { await this._waitForFlush(); this.emit('close'); + + this._acks.close(); + this._modAcks.close(); } /** @@ -478,25 +599,52 @@ export class Subscriber extends EventEmitter { * * @param {Message} message The message to modify. * @param {number} deadline The deadline. - * @returns {Promise} + * @returns {Promise} * @private */ async modAck(message: Message, deadline: number): Promise { const startTime = Date.now(); - this._modAcks.add(message, deadline); + const responsePromise = this._modAcks.add(message, deadline); + responsePromise.then(() => {}); + responsePromise.catch(() => {}); + await this._modAcks.onFlush(); const latency = (Date.now() - startTime) / 1000; this._latencies.add(latency); } + /** + * Modifies the acknowledge deadline for the provided message, expecting + * a reply (for exactly-once delivery subscriptions). + * + * @param {Message} message The message to modify. + * @param {number} deadline The deadline. + * @returns {Promise} + * @private + */ + async modAckWithResponse( + message: Message, + deadline: number + ): Promise { + const startTime = Date.now(); + + await this._modAcks.add(message, deadline); + + const latency = (Date.now() - startTime) / 1000; + this._latencies.add(latency); + + // No exception means Success. + return AckResponses.Success; + } + /** * Modfies the acknowledge deadline for the provided message and then removes * it from our inventory. * * @param {Message} message The message. - * @return {Promise} + * @return {Promise} * @private */ async nack(message: Message): Promise { @@ -504,6 +652,19 @@ export class Subscriber extends EventEmitter { this._inventory.remove(message); } + /** + * Modfies the acknowledge deadline for the provided message and then removes + * it from our inventory, expecting a response from modAck (for + * exactly-once delivery subscriptions). + * + * @param {Message} message The message. + * @return {Promise} + * @private + */ + async nackWithResponse(message: Message): Promise { + return await this.modAckWithResponse(message, 0); + } + /** * Starts pulling messages. * @private @@ -648,7 +809,7 @@ export class Subscriber extends EventEmitter { * @private */ private _onData(response: PullResponse): void { - // Grab the subscription properties for exactly once and ordering flags. + // Grab the subscription properties for exactly-once delivery and ordering flags. if (response.subscriptionProperties) { this.setSubscriptionProperties(response.subscriptionProperties); } diff --git a/src/subscription.ts b/src/subscription.ts index 6accd5adf..29567ca12 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -46,6 +46,8 @@ import {Subscriber, SubscriberOptions} from './subscriber'; import {Topic} from './topic'; import {promisifySome} from './util'; +export {AckError, AckResponse, AckResponses} from './subscriber'; + export type PushConfig = google.pubsub.v1.IPushConfig; export type OidcToken = google.pubsub.v1.PushConfig.IOidcToken; diff --git a/src/util.ts b/src/util.ts index 753191c05..60cbdc205 100644 --- a/src/util.ts +++ b/src/util.ts @@ -72,3 +72,14 @@ export class Throttler { } } } + +/** + * Takes care of managing a Map of buckets to the bucket arrays themselves. + * + * @private + */ +export function addToBucket(map: Map, bucket: T, item: U) { + const items = map.get(bucket) ?? []; + items.push(item); + map.set(bucket, items); +} diff --git a/test/ack-metadata.ts b/test/ack-metadata.ts new file mode 100644 index 000000000..1b01faf33 --- /dev/null +++ b/test/ack-metadata.ts @@ -0,0 +1,225 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {describe, it} from 'mocha'; +import * as assert from 'assert'; +import { + AckErrorInfo, + processAckErrorInfo, + processAckRpcError, +} from '../src/ack-metadata'; +import {GoogleError, Status} from 'google-gax'; +import {AckResponses} from '../src/subscriber'; + +describe('ack-metadata', () => { + it('deals with no ErrorInfo', () => { + const error = {} as GoogleError; + const results = processAckErrorInfo(error); + assert.strictEqual(results.size, 0); + }); + + it('handles permanent errors', () => { + const ackId = '12345'; + const errorCode = 'PERMANENT_FAILURE_INVALID_ACK_ID'; + const error = { + errorInfoMetadata: { + [ackId]: errorCode, + }, + } as unknown as GoogleError; + + const results = processAckErrorInfo(error); + + assert.deepStrictEqual(Array.from(results.entries()), [ + [ + ackId, + { + transient: false, + response: AckResponses.Invalid, + rawErrorCode: errorCode, + }, + ], + ]); + }); + + it('handles transient errors', () => { + const ackId = '12345'; + const errorCode = 'TRANSIENT_FAILURE_ESPRESSO_BAR_CLOSED'; + const error = { + errorInfoMetadata: { + [ackId]: errorCode, + }, + } as unknown as GoogleError; + + const results = processAckErrorInfo(error); + + assert.deepStrictEqual(Array.from(results.entries()), [ + [ + ackId, + { + transient: true, + rawErrorCode: errorCode, + }, + ], + ]); + }); + + it('handles other errors', () => { + const ackId = '12345'; + const errorCode = 'NO_IDEA_ERROR'; + const error = { + errorInfoMetadata: { + [ackId]: errorCode, + }, + } as unknown as GoogleError; + + const results = processAckErrorInfo(error); + + assert.deepStrictEqual(Array.from(results.entries()), [ + [ + ackId, + { + transient: false, + response: AckResponses.Other, + rawErrorCode: errorCode, + }, + ], + ]); + }); + + it('handles multiple responses', () => { + const ackIds = ['12345', '23456', '34567']; + const errorCodes = [ + 'PERMANENT_FAILURE_INVALID_ACK_ID', + 'TRANSIENT_FAILURE_ESPRESSO_BAR_CLOSED', + 'NO_IDEA_ERROR', + ]; + const expectedResults = new Map([ + [ + ackIds[0], + { + transient: false, + response: AckResponses.Invalid, + rawErrorCode: errorCodes[0], + }, + ], + [ + ackIds[1], + { + transient: true, + rawErrorCode: errorCodes[1], + }, + ], + [ + ackIds[2], + { + transient: false, + response: AckResponses.Other, + rawErrorCode: errorCodes[2], + }, + ], + ]); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const metaData: any = {}; + for (let i = 0; i < ackIds.length; i++) { + metaData[ackIds[i]] = errorCodes[i]; + } + + const error = { + errorInfoMetadata: metaData, + } as unknown as GoogleError; + + const results = processAckErrorInfo(error); + + ackIds.forEach(id => { + const ackError = results.get(id); + const expected = expectedResults.get(id); + assert.deepStrictEqual(ackError, expected); + }); + }); + + it('handles gRPC errors', () => { + const testTable = [ + { + code: Status.DEADLINE_EXCEEDED, + result: { + transient: true, + grpcErrorCode: Status.DEADLINE_EXCEEDED, + response: AckResponses.Other, + }, + }, + { + code: Status.RESOURCE_EXHAUSTED, + result: { + transient: true, + grpcErrorCode: Status.RESOURCE_EXHAUSTED, + response: AckResponses.Other, + }, + }, + { + code: Status.ABORTED, + result: { + transient: true, + grpcErrorCode: Status.ABORTED, + response: AckResponses.Other, + }, + }, + { + code: Status.INTERNAL, + result: { + transient: true, + grpcErrorCode: Status.INTERNAL, + response: AckResponses.Other, + }, + }, + { + code: Status.UNAVAILABLE, + result: { + transient: true, + grpcErrorCode: Status.UNAVAILABLE, + response: AckResponses.Other, + }, + }, + { + code: Status.PERMISSION_DENIED, + result: { + transient: false, + grpcErrorCode: Status.PERMISSION_DENIED, + response: AckResponses.PermissionDenied, + }, + }, + { + code: Status.FAILED_PRECONDITION, + result: { + transient: false, + grpcErrorCode: Status.FAILED_PRECONDITION, + response: AckResponses.FailedPrecondition, + }, + }, + { + code: Status.UNIMPLEMENTED, + result: { + transient: false, + grpcErrorCode: Status.UNIMPLEMENTED, + response: AckResponses.Other, + }, + }, + ]; + + for (const t of testTable) { + const result = processAckRpcError(t.code); + assert.deepStrictEqual(result, t.result); + } + }); +}); diff --git a/test/exponential-retry.ts b/test/exponential-retry.ts new file mode 100644 index 000000000..f96689165 --- /dev/null +++ b/test/exponential-retry.ts @@ -0,0 +1,230 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {describe, it} from 'mocha'; +import * as assert from 'assert'; +import * as sinon from 'sinon'; + +import {ExponentialRetry} from '../src/exponential-retry'; +import {Duration} from '../src/temporal'; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function introspect(obj: unknown): any { + return obj; +} + +interface TestItem { + foo: string; +} + +function makeItem() { + return { + foo: 'an item', + }; +} + +describe('exponential retry class', () => { + const sandbox = sinon.createSandbox(); + afterEach(() => sandbox.restore()); + + it('initializes correctly', () => { + // This one is honestly not terribly interesting except that the + // class is storing the durations as numbers internally. + const er = new ExponentialRetry( + Duration.from({seconds: 1}), + Duration.from({seconds: 2}) + ); + + const eri = introspect(er); + assert.strictEqual(eri._backoffMs, 1000); + assert.strictEqual(eri._maxBackoffMs, 2000); + }); + + it('makes the first callback', () => { + const clock = sandbox.useFakeTimers(); + const er = new ExponentialRetry( + Duration.from({millis: 100}), + Duration.from({millis: 1000}) + ); + sandbox.stub(global.Math, 'random').returns(0.75); + + const item = makeItem(); + er.retryLater(item, (s: typeof item, t: Duration) => { + assert.strictEqual(s, item); + assert.strictEqual(t.totalOf('millisecond'), 125); + }); + + clock.tick(125); + + const leftovers = er.close(); + assert.strictEqual(leftovers.length, 0); + }); + + it('closes gracefully', () => { + const clock = sandbox.useFakeTimers(); + const er = new ExponentialRetry( + Duration.from({millis: 100}), + Duration.from({millis: 1000}) + ); + sandbox.stub(global.Math, 'random').returns(0.75); + + let called = false; + const item = makeItem(); + er.retryLater(item, (s: typeof item, t: Duration) => { + assert.strictEqual(s, item); + assert.strictEqual(t.totalOf('millisecond'), 125); + called = true; + }); + + clock.tick(5); + + const leftovers = er.close(); + + clock.tick(125); + + assert.strictEqual(called, false); + + const eri = introspect(er); + assert.strictEqual(eri._items.isEmpty(), true); + + assert.strictEqual(leftovers.length, 1); + }); + + it('backs off exponentially', () => { + const clock = sandbox.useFakeTimers(); + const er = new ExponentialRetry( + Duration.from({millis: 100}), + Duration.from({millis: 1000}) + ); + sandbox.stub(global.Math, 'random').returns(0.75); + + let callbackCount = 0; + let callbackTime: Duration = Duration.from({millis: 0}); + + const item = makeItem(); + const callback = (s: TestItem, t: Duration) => { + assert.strictEqual(s, item); + callbackTime = t; + callbackCount++; + if (callbackCount === 1) { + er.retryLater(item, callback); + } + }; + er.retryLater(item, callback); + + clock.tick(125); + assert.strictEqual(callbackCount, 1); + assert.strictEqual(callbackTime.totalOf('millisecond'), 125); + + clock.tick(400); + assert.strictEqual(callbackCount, 2); + assert.strictEqual(callbackTime.totalOf('millisecond'), 375); + + const leftovers = er.close(); + assert.strictEqual(leftovers.length, 0); + }); + + it('backs off exponentially until the max backoff', () => { + const clock = sandbox.useFakeTimers(); + const item = makeItem(); + const er = new ExponentialRetry( + Duration.from({millis: 100}), + Duration.from({millis: 150}) + ); + sandbox.stub(global.Math, 'random').returns(0.75); + + let callbackCount = 0; + let callbackTime: Duration = Duration.from({millis: 0}); + + const callback = (s: TestItem, t: Duration) => { + assert.strictEqual(s, item); + callbackTime = t; + callbackCount++; + if (callbackCount === 1) { + er.retryLater(item, callback); + } + }; + er.retryLater(item, callback); + + clock.tick(125); + assert.strictEqual(callbackCount, 1); + assert.strictEqual(callbackTime.totalOf('millisecond'), 125); + + clock.tick(400); + assert.strictEqual(callbackCount, 2); + assert.strictEqual(callbackTime.totalOf('millisecond'), 312); + + const leftovers = er.close(); + assert.strictEqual(leftovers.length, 0); + }); + + it('calls retries in the right order', () => { + const clock = sandbox.useFakeTimers(); + const items = [makeItem(), makeItem()]; + + const er = new ExponentialRetry( + Duration.from({millis: 100}), + Duration.from({millis: 1000}) + ); + + // Just disable the fuzz for this test. + sandbox.stub(global.Math, 'random').returns(0.5); + + const callbackCounts = [0, 0]; + const callbackTimes: Duration[] = [ + Duration.from({millis: 0}), + Duration.from({millis: 0}), + ]; + + const callback = (s: TestItem, t: Duration) => { + const idx = s === items[0] ? 0 : 1; + callbackCounts[idx]++; + callbackTimes[idx] = t; + + if (callbackCounts[idx] < 2) { + er.retryLater(items[idx], callback); + } + }; + + // Load in the first item and get it retrying. + er.retryLater(items[0], callback); + + clock.tick(300); + assert.deepStrictEqual(callbackCounts, [2, 0]); + assert.deepStrictEqual( + callbackTimes.map(d => d.totalOf('millisecond')), + [300, 0] + ); + + // Load in the second item and get it retrying. + er.retryLater(items[1], callback); + + clock.tick(125); + + // The first item should've retried twice and still be in the queue, + // while the second item should've retried once and quit. + assert.deepStrictEqual(callbackCounts, [2, 1]); + assert.deepStrictEqual( + callbackTimes.map(d => d.totalOf('millisecond')), + [300, 100] + ); + + // Make sure that we did in fact set another timer for the next event. + const eri = introspect(er); + assert.ok(eri._timer); + + const leftovers = er.close(); + assert.strictEqual(leftovers.length, 1); + }); +}); diff --git a/test/message-queues.ts b/test/message-queues.ts index d60073cce..8cde622bf 100644 --- a/test/message-queues.ts +++ b/test/message-queues.ts @@ -17,15 +17,14 @@ import * as assert from 'assert'; import {describe, it, before, beforeEach, afterEach} from 'mocha'; import {EventEmitter} from 'events'; -import {CallOptions, grpc} from 'google-gax'; -import * as proxyquire from 'proxyquire'; +import {CallOptions, GoogleError, Status} from 'google-gax'; import * as sinon from 'sinon'; import * as uuid from 'uuid'; import defer = require('p-defer'); import * as messageTypes from '../src/message-queues'; import {BatchError} from '../src/message-queues'; -import {Message, Subscriber} from '../src/subscriber'; +import {AckError, Message, Subscriber} from '../src/subscriber'; class FakeClient { async acknowledge( @@ -49,15 +48,21 @@ class FakeClient { class FakeSubscriber extends EventEmitter { name: string; client: FakeClient; + iEOS: boolean; + constructor() { super(); this.name = uuid.v4(); this.client = new FakeClient(); + this.iEOS = false; } async getClient(): Promise { return this.client; } + get isExactlyOnceDelivery(): boolean { + return this.iEOS; + } } class FakeMessage { @@ -67,42 +72,79 @@ class FakeMessage { } } -describe('MessageQueues', () => { - const sandbox = sinon.createSandbox(); +function fakeMessage() { + return new FakeMessage() as unknown as Message; +} - let subscriber: FakeSubscriber; +class MessageQueue extends messageTypes.MessageQueue { + batches: messageTypes.QueuedMessages[] = []; + async _sendBatch( + batch: messageTypes.QueuedMessages + ): Promise { + this.batches.push(batch); + return []; + } +} - // eslint-disable-next-line @typescript-eslint/no-explicit-any - let MessageQueue: any; - // tslint:disable-next-line variable-name - let AckQueue: typeof messageTypes.AckQueue; - // tslint:disable-next-line variable-name - let ModAckQueue: typeof messageTypes.ModAckQueue; +class AckQueue extends messageTypes.AckQueue { + get requests() { + return this._requests; + } +} - type QueuedMessages = Array<[string, number?]>; +class ModAckQueue extends messageTypes.ModAckQueue { + get requests() { + return this._requests; + } +} - before(() => { - const queues = proxyquire('../src/message-queues.js', {}); +// This discount polyfill for Promise.allSettled can be removed after we drop Node 12. +type AllSettledResult = { + status: 'fulfilled' | 'rejected'; + value?: T; + reason?: U; +}; +function allSettled( + proms: Promise[] +): Promise[]> { + const checkedProms = proms.map((r: Promise) => + r + .then( + (value: T) => + ({ + status: 'fulfilled', + value, + } as AllSettledResult) + ) + .catch( + (error: U) => + ({ + status: 'rejected', + reason: error, + } as AllSettledResult) + ) + ); + + return Promise.all(checkedProms); +} - AckQueue = queues.AckQueue; - ModAckQueue = queues.ModAckQueue; +describe('MessageQueues', () => { + const sandbox = sinon.createSandbox(); - MessageQueue = class MessageQueue extends queues.MessageQueue { - batches = [] as QueuedMessages[]; - protected async _sendBatch(batch: QueuedMessages): Promise { - this.batches.push(batch); - } - }; - }); + let fakeSubscriber: FakeSubscriber; + let subscriber: Subscriber; + + before(() => {}); beforeEach(() => { - subscriber = new FakeSubscriber(); + fakeSubscriber = new FakeSubscriber(); + subscriber = fakeSubscriber as unknown as Subscriber; }); afterEach(() => sandbox.restore()); describe('MessageQueue', () => { - let messageQueue: typeof MessageQueue; + let messageQueue: MessageQueue; beforeEach(() => { messageQueue = new MessageQueue(subscriber); @@ -159,6 +201,25 @@ describe('MessageQueues', () => { clock.tick(delay); assert.strictEqual(stub.callCount, 1); }); + + it('should return a Promise that resolves when the ack is sent', async () => { + const clock = sandbox.useFakeTimers(); + const delay = 1000; + messageQueue.setOptions({maxMilliseconds: delay}); + + sandbox + .stub(messageQueue, '_sendBatch') + .callsFake((batch: messageTypes.QueuedMessages) => { + batch.forEach(m => { + m.responsePromise?.resolve(); + }); + return Promise.resolve([]); + }); + + const completion = messageQueue.add(new FakeMessage() as Message); + clock.tick(delay); + await completion; + }); }); describe('flush', () => { @@ -189,10 +250,10 @@ describe('MessageQueues', () => { messageQueue.add(message as Message, deadline); messageQueue.flush(); - const expectedBatch = [[message.ackId, deadline]]; const [batch] = messageQueue.batches; - - assert.deepStrictEqual(batch, expectedBatch); + assert.strictEqual(batch[0].ackId, message.ackId); + assert.strictEqual(batch[0].deadline, deadline); + assert.ok(batch[0].responsePromise?.resolve); }); it('should emit any errors as debug events', done => { @@ -221,6 +282,7 @@ describe('MessageQueues', () => { log.push('send:start'); await sendDone.promise; log.push('send:end'); + return []; }); const message = new FakeMessage(); @@ -280,7 +342,7 @@ describe('MessageQueues', () => { for (let i = 0; i < 3000; i++) { assert.strictEqual(stub.callCount, 0); - messageQueue.add(new FakeMessage()); + messageQueue.add(fakeMessage()); } assert.strictEqual(stub.callCount, 1); @@ -294,7 +356,7 @@ describe('MessageQueues', () => { for (let i = 0; i < maxMessages; i++) { assert.strictEqual(stub.callCount, 0); - messageQueue.add(new FakeMessage()); + messageQueue.add(fakeMessage()); } assert.strictEqual(stub.callCount, 1); @@ -304,7 +366,7 @@ describe('MessageQueues', () => { const clock = sandbox.useFakeTimers(); const stub = sandbox.stub(messageQueue, 'flush'); - messageQueue.add(new FakeMessage()); + messageQueue.add(fakeMessage()); clock.tick(100); assert.strictEqual(stub.callCount, 1); @@ -316,7 +378,7 @@ describe('MessageQueues', () => { const maxMilliseconds = 10000; messageQueue.setOptions({maxMilliseconds}); - messageQueue.add(new FakeMessage()); + messageQueue.add(fakeMessage()); clock.tick(maxMilliseconds); assert.strictEqual(stub.callCount, 1); @@ -325,10 +387,10 @@ describe('MessageQueues', () => { }); describe('AckQueue', () => { - let ackQueue: messageTypes.AckQueue; + let ackQueue: AckQueue; beforeEach(() => { - ackQueue = new AckQueue(subscriber as {} as Subscriber); + ackQueue = new AckQueue(subscriber); }); it('should send batches via Client#acknowledge', async () => { @@ -338,7 +400,9 @@ describe('MessageQueues', () => { new FakeMessage(), ]; - const stub = sandbox.stub(subscriber.client, 'acknowledge').resolves(); + const stub = sandbox + .stub(fakeSubscriber.client, 'acknowledge') + .resolves(); const expectedReqOpts = { subscription: subscriber.name, ackIds: messages.map(({ackId}) => ackId), @@ -353,7 +417,9 @@ describe('MessageQueues', () => { it('should send call options', async () => { const fakeCallOptions = {timeout: 10000}; - const stub = sandbox.stub(subscriber.client, 'acknowledge').resolves(); + const stub = sandbox + .stub(fakeSubscriber.client, 'acknowledge') + .resolves(); ackQueue.setOptions({callOptions: fakeCallOptions}); await ackQueue.flush(); @@ -362,7 +428,7 @@ describe('MessageQueues', () => { assert.strictEqual(callOptions, fakeCallOptions); }); - it('should throw a BatchError on "debug" if unable to ack', done => { + it('should throw a BatchError on "debug" if unable to ack due to grpc error', done => { const messages = [ new FakeMessage(), new FakeMessage(), @@ -371,33 +437,195 @@ describe('MessageQueues', () => { const ackIds = messages.map(message => message.ackId); - const fakeError = new Error('Err.') as grpc.ServiceError; - fakeError.code = 2; - fakeError.metadata = new grpc.Metadata(); + const fakeError = new Error('Err.') as GoogleError; + fakeError.code = Status.DATA_LOSS; - const expectedMessage = - 'Failed to "acknowledge" for 3 message(s). Reason: Err.'; + // Since this runs without EOS enabled, we should get the old error handling. + const expectedMessage = 'Failed to "ack" for 3 message(s). Reason: Err.'; - sandbox.stub(subscriber.client, 'acknowledge').rejects(fakeError); + sandbox.stub(fakeSubscriber.client, 'acknowledge').rejects(fakeError); subscriber.on('debug', (err: BatchError) => { - assert.strictEqual(err.message, expectedMessage); - assert.deepStrictEqual(err.ackIds, ackIds); - assert.strictEqual(err.code, fakeError.code); - assert.strictEqual(err.metadata, fakeError.metadata); - done(); + try { + assert.strictEqual(err.message, expectedMessage); + assert.deepStrictEqual(err.ackIds, ackIds); + assert.strictEqual(err.code, fakeError.code); + done(); + } catch (e) { + // I'm unsure why Mocha's regular handler doesn't work here, + // but manually throw the exception from asserts. + done(e); + } }); messages.forEach(message => ackQueue.add(message as Message)); ackQueue.flush(); }); + + // The analogous modAck version is very similar, so please sync changes. + describe('handle ack responses when !isExactlyOnceDelivery', () => { + it('should appropriately resolve result promises when !isExactlyOnceDelivery', async () => { + const fakeError = new Error('Err.') as GoogleError; + fakeError.code = Status.DATA_LOSS; + + const stub = sandbox + .stub(fakeSubscriber.client, 'acknowledge') + .rejects(fakeError); + + const message = new FakeMessage() as Message; + const completion = ackQueue.add(message); + await ackQueue.flush(); + assert.strictEqual(stub.callCount, 1); + await assert.doesNotReject(completion); + }); + }); + + // The analogous modAck version is very similar, so please sync changes. + describe('handle ack responses for exactly-once delivery', () => { + beforeEach(() => { + fakeSubscriber.iEOS = true; + }); + + it('should trigger Promise resolves on no errors', async () => { + const messages = [fakeMessage(), fakeMessage(), fakeMessage()]; + messages.forEach(m => ackQueue.add(m)); + + sandbox.stub(fakeSubscriber.client, 'acknowledge').resolves(); + const proms = ackQueue.requests.map( + (r: messageTypes.QueuedMessage) => r.responsePromise!.promise + ); + await ackQueue.flush(); + const results = await allSettled(proms); + const oneSuccess = {status: 'fulfilled', value: undefined}; + assert.deepStrictEqual(results, [oneSuccess, oneSuccess, oneSuccess]); + }); + + it('should trigger Promise failures on grpc errors', async () => { + const messages = [fakeMessage(), fakeMessage(), fakeMessage()]; + + const fakeError = new Error('Err.') as GoogleError; + fakeError.code = Status.DATA_LOSS; + fakeError.errorInfoMetadata = { + // These should be routed by the errorInfo resolver. + [messages[0].ackId]: 'TRANSIENT_CAT_ATE_HOMEWORK', + }; + + messages.forEach(m => ackQueue.add(m)); + + sandbox.stub(fakeSubscriber.client, 'acknowledge').rejects(fakeError); + const proms = ackQueue.requests.map( + (r: messageTypes.QueuedMessage) => r.responsePromise!.promise + ); + proms.shift(); + await ackQueue.flush(); + + const results = await allSettled(proms); + assert.strictEqual(results[0].status, 'rejected'); + assert.strictEqual(results[0].reason?.errorCode, 'OTHER'); + assert.strictEqual(results[1].status, 'rejected'); + assert.strictEqual(results[1].reason?.errorCode, 'OTHER'); + + // Make sure the one handled by errorInfo was retried. + assert.strictEqual(ackQueue.numInRetryRequests, 1); + }); + + it('should correctly handle a mix of errors and successes', async () => { + const messages = [fakeMessage(), fakeMessage(), fakeMessage()]; + + const fakeError = new Error('Err.') as GoogleError; + delete fakeError.code; + fakeError.errorInfoMetadata = { + [messages[0].ackId]: 'PERMANENT_FAILURE_INVALID_ACK_ID', + [messages[1].ackId]: 'TRANSIENT_CAT_ATE_HOMEWORK', + }; + + messages.forEach(m => ackQueue.add(m)); + + sandbox.stub(fakeSubscriber.client, 'acknowledge').rejects(fakeError); + + const proms = [ + ackQueue.requests[0].responsePromise!.promise, + ackQueue.requests[2].responsePromise!.promise, + ]; + await ackQueue.flush(); + + const results = await allSettled(proms); + assert.strictEqual(results[0].status, 'rejected'); + assert.strictEqual(results[0].reason?.errorCode, 'INVALID'); + + // Since there's no RPC error, the last one should've succeeded. + const oneSuccess = {status: 'fulfilled', value: undefined}; + assert.deepStrictEqual(results[1], oneSuccess); + + // Make sure the transient one was retried. + assert.strictEqual(ackQueue.numInRetryRequests, 1); + }); + + // This is separate because the retry mechanism itself could fail, and + // we want to make sure that transients actually make it back into the + // queue for retry. + // + // This doesn't need to be duplicated down to modAck because it's just + // testing common code. + it('should retry transient failures', async () => { + const clock = sandbox.useFakeTimers(); + sandbox.stub(global.Math, 'random').returns(0.5); + + const message = fakeMessage(); + const fakeError = new Error('Err.') as GoogleError; + fakeError.code = Status.DATA_LOSS; + fakeError.errorInfoMetadata = { + // These should be routed by the errorInfo resolver. + [message.ackId]: 'TRANSIENT_CAT_ATE_HOMEWORK', + }; + + sandbox.stub(fakeSubscriber.client, 'acknowledge').rejects(fakeError); + ackQueue.add(message); + await ackQueue.flush(); + + // Make sure the one handled by errorInfo was retried. + assert.strictEqual(ackQueue.numInRetryRequests, 1); + + // And wait for a second attempt. + clock.tick(1000); + + assert.strictEqual(ackQueue.requests.length, 1); + assert.strictEqual(ackQueue.requests[0].ackId, message.ackId); + assert.strictEqual(ackQueue.numInRetryRequests, 0); + assert.strictEqual(ackQueue.numPendingRequests, 1); + }); + }); + + it('should appropriately resolve result promises', async () => { + const stub = sandbox + .stub(fakeSubscriber.client, 'acknowledge') + .resolves(); + + const message = new FakeMessage() as Message; + const completion = ackQueue.add(message); + await ackQueue.flush(); + assert.strictEqual(stub.callCount, 1); + await completion; + }); + + it('should appropriately reject result promises', async () => { + const stub = sandbox + .stub(fakeSubscriber.client, 'acknowledge') + .resolves(); + + const message = new FakeMessage() as Message; + const completion = ackQueue.add(message); + await ackQueue.flush(); + assert.strictEqual(stub.callCount, 1); + await completion; + }); }); describe('ModAckQueue', () => { - let modAckQueue: messageTypes.ModAckQueue; + let modAckQueue: ModAckQueue; beforeEach(() => { - modAckQueue = new ModAckQueue(subscriber as {} as Subscriber); + modAckQueue = new ModAckQueue(subscriber); }); it('should send batches via Client#modifyAckDeadline', async () => { @@ -409,7 +637,7 @@ describe('MessageQueues', () => { ]; const stub = sandbox - .stub(subscriber.client, 'modifyAckDeadline') + .stub(fakeSubscriber.client, 'modifyAckDeadline') .resolves(); const expectedReqOpts = { @@ -443,7 +671,7 @@ describe('MessageQueues', () => { ]; const stub = sandbox - .stub(subscriber.client, 'modifyAckDeadline') + .stub(fakeSubscriber.client, 'modifyAckDeadline') .resolves(); const expectedReqOpts1 = { @@ -476,7 +704,7 @@ describe('MessageQueues', () => { it('should send call options', async () => { const fakeCallOptions = {timeout: 10000}; const stub = sandbox - .stub(subscriber.client, 'modifyAckDeadline') + .stub(fakeSubscriber.client, 'modifyAckDeadline') .resolves(); modAckQueue.setOptions({callOptions: fakeCallOptions}); @@ -487,7 +715,7 @@ describe('MessageQueues', () => { assert.strictEqual(callOptions, fakeCallOptions); }); - it('should throw a BatchError on "debug" if unable to modAck', done => { + it('should throw a BatchError on "debug" if unable to modAck due to gRPC error', done => { const messages = [ new FakeMessage(), new FakeMessage(), @@ -496,25 +724,159 @@ describe('MessageQueues', () => { const ackIds = messages.map(message => message.ackId); - const fakeError = new Error('Err.') as grpc.ServiceError; - fakeError.code = 2; - fakeError.metadata = new grpc.Metadata(); + const fakeError = new Error('Err.') as GoogleError; + fakeError.code = Status.DATA_LOSS; + // Since this runs without EOS enabled, we should get the old error handling. const expectedMessage = - 'Failed to "modifyAckDeadline" for 3 message(s). Reason: Err.'; + 'Failed to "modAck" for 3 message(s). Reason: Err.'; - sandbox.stub(subscriber.client, 'modifyAckDeadline').rejects(fakeError); + sandbox + .stub(fakeSubscriber.client, 'modifyAckDeadline') + .rejects(fakeError); subscriber.on('debug', (err: BatchError) => { - assert.strictEqual(err.message, expectedMessage); - assert.deepStrictEqual(err.ackIds, ackIds); - assert.strictEqual(err.code, fakeError.code); - assert.strictEqual(err.metadata, fakeError.metadata); - done(); + try { + assert.strictEqual(err.message, expectedMessage); + assert.deepStrictEqual(err.ackIds, ackIds); + assert.strictEqual(err.code, fakeError.code); + done(); + } catch (e) { + // I'm unsure why Mocha's regular handler doesn't work here, + // but manually throw the exception from asserts. + done(e); + } }); messages.forEach(message => modAckQueue.add(message as Message)); modAckQueue.flush(); }); + + describe('handle modAck responses when !isExactlyOnceDelivery', () => { + it('should appropriately resolve result promises when !isExactlyOnceDelivery', async () => { + const fakeError = new Error('Err.') as GoogleError; + fakeError.code = Status.DATA_LOSS; + + const stub = sandbox + .stub(fakeSubscriber.client, 'modifyAckDeadline') + .rejects(fakeError); + + const message = new FakeMessage() as Message; + const completion = modAckQueue.add(message); + await modAckQueue.flush(); + assert.strictEqual(stub.callCount, 1); + await assert.doesNotReject(completion); + }); + }); + + // The analogous ack version is very similar, so please sync changes. + describe('handle modAck responses for exactly-once delivery', () => { + beforeEach(() => { + fakeSubscriber.iEOS = true; + }); + + it('should trigger Promise resolves on no errors', async () => { + const messages = [fakeMessage(), fakeMessage(), fakeMessage()]; + messages.forEach(m => modAckQueue.add(m)); + + sandbox.stub(fakeSubscriber.client, 'modifyAckDeadline').resolves(); + const proms = modAckQueue.requests.map( + (r: messageTypes.QueuedMessage) => r.responsePromise!.promise + ); + await modAckQueue.flush(); + const results = await allSettled(proms); + const oneSuccess = {status: 'fulfilled', value: undefined}; + assert.deepStrictEqual(results, [oneSuccess, oneSuccess, oneSuccess]); + }); + + it('should trigger Promise failures on grpc errors', async () => { + const messages = [fakeMessage(), fakeMessage(), fakeMessage()]; + + const fakeError = new Error('Err.') as GoogleError; + fakeError.code = Status.DATA_LOSS; + fakeError.errorInfoMetadata = { + // These should be routed by the errorInfo resolver. + [messages[0].ackId]: 'TRANSIENT_CAT_ATE_HOMEWORK', + }; + + messages.forEach(m => modAckQueue.add(m)); + + sandbox + .stub(fakeSubscriber.client, 'modifyAckDeadline') + .rejects(fakeError); + const proms = modAckQueue.requests.map( + (r: messageTypes.QueuedMessage) => r.responsePromise!.promise + ); + proms.shift(); + await modAckQueue.flush(); + + const results = await allSettled(proms); + assert.strictEqual(results[0].status, 'rejected'); + assert.strictEqual(results[0].reason?.errorCode, 'OTHER'); + assert.strictEqual(results[1].status, 'rejected'); + assert.strictEqual(results[1].reason?.errorCode, 'OTHER'); + + // Make sure the one handled by errorInfo was retried. + assert.strictEqual(modAckQueue.numInRetryRequests, 1); + }); + + it('should correctly handle a mix of errors and successes', async () => { + const messages = [fakeMessage(), fakeMessage(), fakeMessage()]; + + const fakeError = new Error('Err.') as GoogleError; + delete fakeError.code; + fakeError.errorInfoMetadata = { + [messages[0].ackId]: 'PERMANENT_FAILURE_INVALID_ACK_ID', + [messages[1].ackId]: 'TRANSIENT_CAT_ATE_HOMEWORK', + }; + + messages.forEach(m => modAckQueue.add(m)); + + sandbox + .stub(fakeSubscriber.client, 'modifyAckDeadline') + .rejects(fakeError); + + const proms = [ + modAckQueue.requests[0].responsePromise!.promise, + modAckQueue.requests[2].responsePromise!.promise, + ]; + await modAckQueue.flush(); + + const results = await allSettled(proms); + assert.strictEqual(results[0].status, 'rejected'); + assert.strictEqual(results[0].reason?.errorCode, 'INVALID'); + + // Since there's no RPC error, the last one should've succeeded. + const oneSuccess = {status: 'fulfilled', value: undefined}; + assert.deepStrictEqual(results[1], oneSuccess); + + // Make sure the transient one was retried. + assert.strictEqual(modAckQueue.numInRetryRequests, 1); + }); + }); + + it('should appropriately resolve result promises', async () => { + const stub = sandbox + .stub(fakeSubscriber.client, 'modifyAckDeadline') + .resolves(); + + const message = new FakeMessage() as Message; + const completion = modAckQueue.add(message); + await modAckQueue.flush(); + assert.strictEqual(stub.callCount, 1); + await completion; + }); + + it('should appropriately reject result promises', async () => { + const stub = sandbox + .stub(fakeSubscriber.client, 'modifyAckDeadline') + .resolves(); + + const message = new FakeMessage() as Message; + const completion = modAckQueue.add(message); + await modAckQueue.flush(); + assert.strictEqual(stub.callCount, 1); + await completion; + }); }); }); diff --git a/test/message-stream.ts b/test/message-stream.ts index fc831f81f..9fb3eb331 100644 --- a/test/message-stream.ts +++ b/test/message-stream.ts @@ -24,6 +24,7 @@ import * as uuid from 'uuid'; import * as messageTypes from '../src/message-stream'; import {Subscriber} from '../src/subscriber'; import {defaultOptions} from '../src/default-options'; +import {Duration} from '../src/temporal'; const FAKE_STREAMING_PULL_TIMEOUT = 123456789; const FAKE_CLIENT_CONFIG = { @@ -513,5 +514,22 @@ describe('MessageStream', () => { }); }); }); + + it('should allow updating the ack deadline', async () => { + const stubs = client.streams.map(stream => { + return sandbox.stub(stream, 'write'); + }); + + messageStream.setStreamAckDeadline(Duration.from({seconds: 10})); + + const expected = { + streamAckDeadlineSeconds: 10, + }; + + stubs.forEach(stub => { + const [data] = stub.lastCall.args; + assert.deepStrictEqual(data, expected); + }); + }); }); }); diff --git a/test/subscriber.ts b/test/subscriber.ts index d7ed05c55..2e31c2d05 100644 --- a/test/subscriber.ts +++ b/test/subscriber.ts @@ -98,8 +98,11 @@ class FakeQueue { constructor(sub: s.Subscriber, options: BatchOptions) { this.options = options; } + close() {} // eslint-disable-next-line @typescript-eslint/no-unused-vars - add(message: s.Message, deadline?: number): void {} + async add(message: s.Message, deadline?: number): Promise { + return s.AckResponses.Success; + } async flush(): Promise {} async onFlush(): Promise {} async onDrain(): Promise {} @@ -324,7 +327,7 @@ describe('Subscriber', () => { assert.strictEqual(subscriber.ackDeadline, 10); }); - it('should default to 60s min for exactly-once subscriptions', () => { + it('should default to 60s min for exactly-once delivery subscriptions', () => { subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true}; const histogram: FakeHistogram = stubs.get('histogram'); diff --git a/test/util.ts b/test/util.ts index 80fe5690e..6dbdd746c 100644 --- a/test/util.ts +++ b/test/util.ts @@ -13,7 +13,7 @@ // limitations under the License. import {describe, it} from 'mocha'; -import {Throttler} from '../src/util'; +import {addToBucket, Throttler} from '../src/util'; import * as assert from 'assert'; describe('utils', () => { @@ -43,4 +43,19 @@ describe('utils', () => { assert.strictEqual(totalCalls, 'FIRSTTHIRD'); }); }); + + describe('addToBucket', () => { + it('adds to a non-existent bucket', () => { + const map = new Map(); + addToBucket(map, 'a', 'b'); + assert.deepStrictEqual(map.get('a'), ['b']); + }); + + it('adds to an existent bucket', () => { + const map = new Map(); + map.set('a', ['c']); + addToBucket(map, 'a', 'b'); + assert.deepStrictEqual(map.get('a'), ['c', 'b']); + }); + }); });