-
Notifications
You must be signed in to change notification settings - Fork 232
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add support for exactly once subscriptions (#1572)
Adds support for exactly-once delivery subscriptions. Please see the samples for information on how to interact with exactly-once subscriptions properly (specifically, using the `*WithResponse()` methods). Other client library folks - Mahesh needs to review this, so please don't merge until that happens. Fixes #1571 🦕
- Loading branch information
Showing
22 changed files
with
2,241 additions
and
130 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
// Copyright 2022 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
/** | ||
* This application demonstrates how to perform basic operations on | ||
* schemas with the Google Cloud Pub/Sub API. | ||
* | ||
* For more information, see the README.md under /pubsub and the documentation | ||
* at https://cloud.google.com/pubsub/docs. | ||
*/ | ||
|
||
// This is a generated sample. Please see typescript/README.md for more info. | ||
|
||
'use strict'; | ||
|
||
// sample-metadata: | ||
// title: Create an exactly-once delivery subscription | ||
// description: Demonstrates how to create a subscription for exactly-once delivery. | ||
// usage: node createSubscriptionWithExactlyOnceDelivery.js <topic-name-or-id> <subscription-name-or-id> | ||
|
||
// [START pubsub_create_subscription_with_exactly_once_delivery] | ||
/** | ||
* TODO(developer): Uncomment these variables before running the sample. | ||
*/ | ||
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; | ||
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; | ||
|
||
// Imports the Google Cloud client library | ||
const {PubSub} = require('@google-cloud/pubsub'); | ||
|
||
// Creates a client; cache this for further use | ||
const pubSubClient = new PubSub(); | ||
|
||
async function createSubscriptionWithExactlyOnceDelivery( | ||
topicNameOrId, | ||
subscriptionNameOrId | ||
) { | ||
// Creates a new subscription | ||
await pubSubClient | ||
.topic(topicNameOrId) | ||
.createSubscription(subscriptionNameOrId, { | ||
enableExactlyOnceDelivery: true, | ||
}); | ||
console.log( | ||
`Created subscription ${subscriptionNameOrId} with exactly-once delivery.` | ||
); | ||
console.log( | ||
'To process messages, remember to check the return value of ackWithResponse().' | ||
); | ||
} | ||
// [END pubsub_create_subscription_with_exactly_once_delivery] | ||
|
||
function main( | ||
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', | ||
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID' | ||
) { | ||
createSubscriptionWithExactlyOnceDelivery( | ||
topicNameOrId, | ||
subscriptionNameOrId | ||
).catch(err => { | ||
console.error(err.message); | ||
process.exitCode = 1; | ||
}); | ||
} | ||
|
||
main(...process.argv.slice(2)); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
// Copyright 2022 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
/** | ||
* This application demonstrates how to perform basic operations on | ||
* schemas with the Google Cloud Pub/Sub API. | ||
* | ||
* For more information, see the README.md under /pubsub and the documentation | ||
* at https://cloud.google.com/pubsub/docs. | ||
*/ | ||
|
||
// This is a generated sample. Please see typescript/README.md for more info. | ||
|
||
'use strict'; | ||
|
||
// sample-metadata: | ||
// title: Listen with exactly-once delivery | ||
// description: Listen for messages on an exactly-once delivery subscription. | ||
// usage: node listenForMessagesWithExactlyOnceDelivery.js <subscription-name-or-id> | ||
|
||
// [START pubsub_subscriber_exactly_once] | ||
/** | ||
* TODO(developer): Uncomment this variable before running the sample. | ||
*/ | ||
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; | ||
|
||
// Imports the Google Cloud client library | ||
const {PubSub} = require('@google-cloud/pubsub'); | ||
|
||
// Creates a client; cache this for further use | ||
const pubSubClient = new PubSub(); | ||
|
||
async function listenForMessagesWithExactlyOnceDelivery( | ||
subscriptionNameOrId, | ||
timeout | ||
) { | ||
// References an existing subscription | ||
const subscription = pubSubClient.subscription(subscriptionNameOrId); | ||
|
||
// Create an event handler to handle messages | ||
let messageCount = 0; | ||
const messageHandler = async message => { | ||
console.log(`Received message ${message.id}:`); | ||
console.log(`\tData: ${message.data}`); | ||
console.log(`\tAttributes: ${message.attributes}`); | ||
messageCount++; | ||
|
||
// Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks | ||
// the result of the acknowledge call. When exactly-once delivery is enabled | ||
// on the subscription, the message is guaranteed not to be delivered again | ||
// if the ack Promise resolves. | ||
try { | ||
// When the Promise resolves, the value is always AckResponses.Success, | ||
// signaling that the ack was accepted. Note that you may call this | ||
// method on a subscription without exactly-once delivery, but it will | ||
// always return AckResponses.Success. | ||
await message.ackWithResponse(); | ||
console.log(`Ack for message ${message.id} successful.`); | ||
} catch (e) { | ||
// In all other cases, the error passed on reject will explain why. This | ||
// is only for permanent failures; transient errors are retried automatically. | ||
const ackError = e; | ||
console.log( | ||
`Ack for message ${message.id} failed with error: ${ackError.errorCode}` | ||
); | ||
} | ||
}; | ||
|
||
// Listen for new messages until timeout is hit | ||
subscription.on('message', messageHandler); | ||
|
||
setTimeout(() => { | ||
subscription.removeListener('message', messageHandler); | ||
console.log(`${messageCount} message(s) received.`); | ||
}, timeout * 1000); | ||
} | ||
// [END pubsub_subscriber_exactly_once] | ||
|
||
function main( | ||
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID', | ||
timeout = 60 | ||
) { | ||
listenForMessagesWithExactlyOnceDelivery( | ||
subscriptionNameOrId, | ||
Number(timeout) | ||
).catch(err => { | ||
console.error(err.message); | ||
process.exitCode = 1; | ||
}); | ||
} | ||
|
||
main(...process.argv.slice(2)); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
73 changes: 73 additions & 0 deletions
73
samples/typescript/createSubscriptionWithExactlyOnceDelivery.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
// Copyright 2022 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
/** | ||
* This application demonstrates how to perform basic operations on | ||
* schemas with the Google Cloud Pub/Sub API. | ||
* | ||
* For more information, see the README.md under /pubsub and the documentation | ||
* at https://cloud.google.com/pubsub/docs. | ||
*/ | ||
|
||
// sample-metadata: | ||
// title: Create an exactly-once delivery subscription | ||
// description: Demonstrates how to create a subscription for exactly-once delivery. | ||
// usage: node createSubscriptionWithExactlyOnceDelivery.js <topic-name-or-id> <subscription-name-or-id> | ||
|
||
// [START pubsub_create_subscription_with_exactly_once_delivery] | ||
/** | ||
* TODO(developer): Uncomment these variables before running the sample. | ||
*/ | ||
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; | ||
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; | ||
|
||
// Imports the Google Cloud client library | ||
import {PubSub} from '@google-cloud/pubsub'; | ||
|
||
// Creates a client; cache this for further use | ||
const pubSubClient = new PubSub(); | ||
|
||
async function createSubscriptionWithExactlyOnceDelivery( | ||
topicNameOrId: string, | ||
subscriptionNameOrId: string | ||
) { | ||
// Creates a new subscription | ||
await pubSubClient | ||
.topic(topicNameOrId) | ||
.createSubscription(subscriptionNameOrId, { | ||
enableExactlyOnceDelivery: true, | ||
}); | ||
console.log( | ||
`Created subscription ${subscriptionNameOrId} with exactly-once delivery.` | ||
); | ||
console.log( | ||
'To process messages, remember to check the return value of ackWithResponse().' | ||
); | ||
} | ||
// [END pubsub_create_subscription_with_exactly_once_delivery] | ||
|
||
function main( | ||
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', | ||
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID' | ||
) { | ||
createSubscriptionWithExactlyOnceDelivery( | ||
topicNameOrId, | ||
subscriptionNameOrId | ||
).catch(err => { | ||
console.error(err.message); | ||
process.exitCode = 1; | ||
}); | ||
} | ||
|
||
main(...process.argv.slice(2)); |
Oops, something went wrong.