Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for exactly once subscriptions #1572

Merged
merged 36 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0ca513a
feat: add minimum lease extensions for exactly-once
feywind May 30, 2022
3ffea4d
chore: merge remote-tracking branch 'remotes/origin/main' into exactl…
feywind Jun 6, 2022
2e5fb61
feat: put back warning about exactly once support
feywind Jun 6, 2022
15b4a37
Merge branch 'main' into exactly-once-1
feywind Jun 8, 2022
ec84cf9
feat: add minExtension setting and plug it into EOS and ackDeadline s…
feywind Jun 9, 2022
7816c1b
Merge branch 'main' into exactly-once-1
feywind Jun 9, 2022
406ad78
feat: bring back Duration, and put min/max in the right places
feywind Jun 13, 2022
913d79c
tests: add unit test for Duration
feywind Jun 13, 2022
9204194
tests: add deadline tests and fix a few bugs
feywind Jun 13, 2022
b5a95a5
chore: revert partially completed incorrect change
feywind Jun 13, 2022
dd6a073
chore: merge remote-tracking branch 'remotes/origin/main' into exactl…
feywind Jun 13, 2022
0c4ffe0
docs: work around jsdoc linking bug
feywind Jun 14, 2022
23727bc
chore: merge remote-tracking branch 'remotes/origin/main' into exactl…
feywind Jun 14, 2022
6102095
chore: merge remote-tracking branch 'remotes/origin/main' into exactl…
feywind Jul 5, 2022
bdc91aa
chore: split up updateAckDeadline for readability
feywind Jul 6, 2022
09b8509
fix: clean up min/max deadline extension handling to match other lang…
feywind Jul 7, 2022
9095ba0
chore: merge remote-tracking branch 'remotes/origin/main' into exactl…
feywind Jul 7, 2022
ad8080a
tests: update the histogram test to work within the new default max e…
feywind Jul 7, 2022
c9180de
feat: add "with response" versions of ack/nack/modack (#1588)
feywind Aug 5, 2022
ae7359b
feat: add processAckError to parse ErrorInfo (#1589)
feywind Aug 5, 2022
44a8623
chore: merge remote-tracking branch 'remotes/origin/main' into exactl…
feywind Aug 16, 2022
5bf18b9
Merge branch 'main' into exactly-once-main
feywind Sep 6, 2022
27356ef
feat: work in progress on piping ack errors back to callers (#1590)
feywind Sep 13, 2022
719cb82
chore: merge remote-tracking branch 'remotes/origin/main' into exactl…
feywind Sep 13, 2022
91f4ebf
samples: add samples for and tests for exactly-once (#1621)
feywind Sep 14, 2022
8bd9747
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 14, 2022
daa08fe
chore: merge remote-tracking branch 'remotes/origin/main' into exactl…
feywind Sep 15, 2022
95ff663
chore: merge branch 'exactly-once-main' of https://github.com/googlea…
feywind Sep 15, 2022
9cdc3a2
fix: review changes branch for work in progress on exactly-once deliv…
feywind Sep 21, 2022
8490539
Merge branch 'main' into exactly-once-main
feywind Sep 21, 2022
4c121f5
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 21, 2022
819c7ee
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 21, 2022
b24cf4b
Merge branch 'exactly-once-main' of https://github.com/googleapis/nod…
gcf-owl-bot[bot] Sep 21, 2022
01b3833
samples: clarify retry wording in samples
feywind Sep 22, 2022
85dd774
docs: update comment about short-circuiting handleAckFailures()
feywind Sep 22, 2022
a35e1a8
Merge branch 'main' into exactly-once-main
feywind Sep 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree
| Create Push Subscription | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createPushSubscription.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createPushSubscription.js,samples/README.md) |
| Create Subscription | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscription.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscription.js,samples/README.md) |
| Create Subscription With Dead Letter Policy | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithDeadLetterPolicy.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithDeadLetterPolicy.js,samples/README.md) |
| Create an exactly-once delivery subscription | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithExactlyOnceDelivery.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithExactlyOnceDelivery.js,samples/README.md) |
| Create Subscription With Filtering | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithFiltering.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithFiltering.js,samples/README.md) |
| Create Subscription with ordering enabled | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithOrdering.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithOrdering.js,samples/README.md) |
| Create Topic | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopic.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopic.js,samples/README.md) |
Expand All @@ -148,6 +149,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree
| Listen For Avro Records | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForAvroRecords.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForAvroRecords.js,samples/README.md) |
| Listen For Errors | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForErrors.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForErrors.js,samples/README.md) |
| Listen For Messages | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForMessages.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForMessages.js,samples/README.md) |
| Listen with exactly-once delivery | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForMessagesWithExactlyOnceDelivery.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForMessagesWithExactlyOnceDelivery.js,samples/README.md) |
| Listen For Protobuf Messages | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForProtobufMessages.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForProtobufMessages.js,samples/README.md) |
| Listen For Messages With Custom Attributes | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenWithCustomAttributes.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenWithCustomAttributes.js,samples/README.md) |
| Modify Push Configuration | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/modifyPushConfig.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/modifyPushConfig.js,samples/README.md) |
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"extend": "^3.0.2",
"google-auth-library": "^8.0.2",
"google-gax": "^3.3.0",
"heap-js": "^2.2.0",
"is-stream-ended": "^0.1.4",
"lodash.snakecase": "^4.1.1",
"p-defer": "^3.0.0"
Expand Down
40 changes: 40 additions & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ guides.
* [Create Push Subscription](#create-push-subscription)
* [Create Subscription](#create-subscription)
* [Create Subscription With Dead Letter Policy](#create-subscription-with-dead-letter-policy)
* [Create an exactly-once delivery subscription](#create-an-exactly-once-delivery-subscription)
* [Create Subscription With Filtering](#create-subscription-with-filtering)
* [Create Subscription with ordering enabled](#create-subscription-with-ordering-enabled)
* [Create Topic](#create-topic)
Expand All @@ -45,6 +46,7 @@ guides.
* [Listen For Avro Records](#listen-for-avro-records)
* [Listen For Errors](#listen-for-errors)
* [Listen For Messages](#listen-for-messages)
* [Listen with exactly-once delivery](#listen-with-exactly-once-delivery)
* [Listen For Protobuf Messages](#listen-for-protobuf-messages)
* [Listen For Messages With Custom Attributes](#listen-for-messages-with-custom-attributes)
* [Modify Push Configuration](#modify-push-configuration)
Expand Down Expand Up @@ -199,6 +201,25 @@ __Usage:__



### Create an exactly-once delivery subscription

Demonstrates how to create a subscription for exactly-once delivery.

View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithExactlyOnceDelivery.js).

[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithExactlyOnceDelivery.js,samples/README.md)

__Usage:__


`node createSubscriptionWithExactlyOnceDelivery.js <topic-name-or-id> <subscription-name-or-id>`


-----




### Create Subscription With Filtering

Creates a new subscription with filtering.
Expand Down Expand Up @@ -560,6 +581,25 @@ __Usage:__



### Listen with exactly-once delivery

Listen for messages on an exactly-once delivery subscription.

View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForMessagesWithExactlyOnceDelivery.js).

[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForMessagesWithExactlyOnceDelivery.js,samples/README.md)

__Usage:__


`node listenForMessagesWithExactlyOnceDelivery.js <subscription-name-or-id>`


-----




### Listen For Protobuf Messages

Listens for messages in protobuf encoding from a subscription.
Expand Down
77 changes: 77 additions & 0 deletions samples/createSubscriptionWithExactlyOnceDelivery.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This application demonstrates how to perform basic operations on
* schemas with the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// This is a generated sample. Please see typescript/README.md for more info.

'use strict';

// sample-metadata:
// title: Create an exactly-once delivery subscription
// description: Demonstrates how to create a subscription for exactly-once delivery.
// usage: node createSubscriptionWithExactlyOnceDelivery.js <topic-name-or-id> <subscription-name-or-id>

// [START pubsub_create_subscription_with_exactly_once_delivery]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
topicNameOrId,
subscriptionNameOrId
) {
// Creates a new subscription
await pubSubClient
.topic(topicNameOrId)
.createSubscription(subscriptionNameOrId, {
enableExactlyOnceDelivery: true,
});
console.log(
`Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
);
console.log(
'To process messages, remember to check the return value of ackWithResponse().'
);
}
// [END pubsub_create_subscription_with_exactly_once_delivery]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'
) {
createSubscriptionWithExactlyOnceDelivery(
topicNameOrId,
subscriptionNameOrId
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
103 changes: 103 additions & 0 deletions samples/listenForMessagesWithExactlyOnceDelivery.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This application demonstrates how to perform basic operations on
* schemas with the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// This is a generated sample. Please see typescript/README.md for more info.

'use strict';

// sample-metadata:
// title: Listen with exactly-once delivery
// description: Listen for messages on an exactly-once delivery subscription.
// usage: node listenForMessagesWithExactlyOnceDelivery.js <subscription-name-or-id>

// [START pubsub_subscriber_exactly_once]
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForMessagesWithExactlyOnceDelivery(
subscriptionNameOrId,
timeout
) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = async message => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount++;

// Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
// the result of the acknowledge call. When exactly-once delivery is enabled
// on the subscription, the message is guaranteed not to be delivered again
// if the ack Promise resolves.
try {
// When the Promise resolves, the value is always AckResponses.Success,
// signaling that the ack was accepted. Note that you may call this
// method on a subscription without exactly-once delivery, but it will
// always return AckResponses.Success.
await message.ackWithResponse();
console.log(`Ack for message ${message.id} successful.`);
} catch (e) {
// In all other cases, the error passed on reject will explain why. This
// is only for permanent failures; transient errors are retried for a while.
feywind marked this conversation as resolved.
Show resolved Hide resolved
const ackError = e;
console.log(
`Ack for message ${message.id} failed with error: ${ackError.errorCode}`
);
}
};

// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);

setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
// [END pubsub_subscriber_exactly_once]

function main(
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID',
timeout = 60
) {
listenForMessagesWithExactlyOnceDelivery(
subscriptionNameOrId,
Number(timeout)
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
39 changes: 39 additions & 0 deletions samples/system-test/subscriptions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -505,4 +505,43 @@ describe('subscriptions', () => {
.get();
assert.strictEqual(subscription.metadata?.enableMessageOrdering, true);
});

it('should create an exactly-once delivery sub and listen on it.', async () => {
const testId = 'eos';
const topic = await createTopic(testId);
const subName = reserveSub(testId);
const output = execSync(
`${commandFor('createSubscriptionWithExactlyOnceDelivery')} ${
topic.name
} ${subName}`
);
assert.include(
output,
`Created subscription ${subName} with exactly-once delivery.`
);

const [subscription] = await pubsub
.topic(topic.name)
.subscription(subName)
.get();
assert.strictEqual(subscription.metadata?.enableExactlyOnceDelivery, true);

const message = Buffer.from('test message');
const messageIds = [
await topic.publishMessage({
data: message,
}),
await topic.publishMessage({
data: message,
}),
];

const output2 = execSync(
`${commandFor('listenForMessagesWithExactlyOnceDelivery')} ${subName} 15`
);

for (const id of messageIds) {
assert.include(output2, `Ack for message ${id} successful`);
}
});
});
73 changes: 73 additions & 0 deletions samples/typescript/createSubscriptionWithExactlyOnceDelivery.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This application demonstrates how to perform basic operations on
* schemas with the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Create an exactly-once delivery subscription
// description: Demonstrates how to create a subscription for exactly-once delivery.
// usage: node createSubscriptionWithExactlyOnceDelivery.js <topic-name-or-id> <subscription-name-or-id>

// [START pubsub_create_subscription_with_exactly_once_delivery]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
topicNameOrId: string,
subscriptionNameOrId: string
) {
// Creates a new subscription
await pubSubClient
.topic(topicNameOrId)
.createSubscription(subscriptionNameOrId, {
enableExactlyOnceDelivery: true,
});
console.log(
`Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
);
console.log(
'To process messages, remember to check the return value of ackWithResponse().'
);
}
// [END pubsub_create_subscription_with_exactly_once_delivery]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'
) {
createSubscriptionWithExactlyOnceDelivery(
topicNameOrId,
subscriptionNameOrId
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
Loading