Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Break out of PubSub Subscription.Receive #881

Closed
mbinette opened this issue Feb 2, 2018 · 3 comments
Closed

Break out of PubSub Subscription.Receive #881

mbinette opened this issue Feb 2, 2018 · 3 comments
Assignees

Comments

@mbinette
Copy link

mbinette commented Feb 2, 2018

I've followed the basic examples of PubSub Subscription.Receive and it's working for me except I can't figure out how to get it to exit after all messages have been received.

I am writing a simple cli application that should connect, receive and process all messages, then exit if no messages are left. I saw samples on exiting after receiving a certain amount of messages, say 10, but I have no idea how many will be waiting for me when I call Receive and want to process all of them and then exit.

I tried context.WithTimeout but I can't see a way to reset the timeout during each of me Receive calls that process the message. If I could reset the timeout then I could reset it to something short like 30 seconds on each call so that the application would exit 30 seconds after the last message is processed.

I really don't want to drop down to the apiv1 code if I don't have to.

@jba
Copy link
Contributor

jba commented Feb 2, 2018

As you suggest with your discussion of timeout, there's no clear meaning to "all the messages." As long as the subscription exists, it could receive messages.

You could also structure your system to publish an "I'm done" message, but the problem there is that PubSub doesn't guarantee delivery order, so it gets complicated.

I think your timeout idea is a good one. Here's some sample code (untested). The idea is that we write to a channel (non-blocking) if we've seen something, and another goroutine receives from that channel. If it doesn't hear anything after the timeout, it cancels the Receive's context.

func receiveWithTimeout(ctx context.Context, timeout time.Duration) error {
 	seen := make(chan int, 1)
 	ctx2, cancel := context.WithCancel(ctx)
 	defer cancel()
	go func() {
		for {
			select {
			case <-seen:
			case <-time.After(timeout):
				cancel()
				return
			}
		}
	}()
	return mySubscription.Receive(ctx2, func(c context.Context, m *pubsub.Message) {
		select {
		case seen <- 1:
		default:
		}
		// do work
    })
} 

You can call that in a loop.

@mbinette
Copy link
Author

mbinette commented Feb 2, 2018

That works great. Thank You!

I'm trying to make sure I can handle errors in processing my messages as well. So instead of calling message.Nack() on error I simply return so that the message uses the MaxExtension to timeout later rather than immediately. The purpose is to process all messages, leave any message that caused an error in the subscription, then exit the app at the end of processing.

Your code lets me do that as long as my timeout value is less than my MaxExtension. Messages are only added once a day on a schedule and I'll be able to monitor errors separately. I can't lose the message.

I will most likely need to implement separate logic that moves the message to another queue on error at some point.

@mbinette mbinette closed this as completed Feb 2, 2018
@jba
Copy link
Contributor

jba commented Feb 2, 2018

Glad that works.

But due to a recent change, your strategy of ignoring the message will reduce your throughput. The reason is that until you ack or nack, our flow controller counts the message as active, and you can only have MaxOutstandingMessages/Bytes active at once.

See #870 for a solution.

gcf-owl-bot bot added a commit that referenced this issue Feb 8, 2024
Source-Link: googleapis/googleapis@6953fd7

Source-Link: googleapis/googleapis-gen@5ba472f
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiNWJhNDcyZjc5MjQ1ZTQyMDA0OWM2MGYwYzMyNGU0NDU3OGYyNzJhMyJ9
gcf-merge-on-green bot pushed a commit that referenced this issue Feb 9, 2024
…ling (#9387)

- [ ] Regenerate this pull request now.

feat: add psc_automated_endpoints to IndexPrivateEndpoints
feat: add request_response_logging_schema_version to ModelDeploymentMonitoringBigQueryTable
feat: add resource_title, resource_use_case, resource_description to RegionalResourceReferences
feat: add deploy_gke, open_tine_tuning_pipelines, open_notebooks to CallToAction
docs: deprecate use_case and description in ResourceReference
docs: minor changes to comments

PiperOrigin-RevId: 605667976

Source-Link: https://togithub.com/googleapis/googleapis/commit/3b25a48cfd09b9fc8a9d4a539a0328594e0c2668

Source-Link: https://togithub.com/googleapis/googleapis-gen/commit/b8402de22611a3b7bffa59b075891e5b29a0d554
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiYjg0MDJkZTIyNjExYTNiN2JmZmE1OWIwNzU4OTFlNWIyOWEwZDU1NCJ9
BEGIN_NESTED_COMMIT
feat(monitoring/dashboard): Add support for pie charts, incident lists, dropdown groups, error reporting panels, section headers, and styling options on text widgets
docs: updated some comments

PiperOrigin-RevId: 605410874

Source-Link: https://togithub.com/googleapis/googleapis/commit/8aa881798104f793e79058978bd5aa0147661cee

Source-Link: https://togithub.com/googleapis/googleapis-gen/commit/6782599ff384fd1851b9ae270536d130ecb5ee7a
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiNjc4MjU5OWZmMzg0ZmQxODUxYjlhZTI3MDUzNmQxMzBlY2I1ZWU3YSJ9
END_NESTED_COMMIT
BEGIN_NESTED_COMMIT
feat(dialogflow/cx): A new field `opt_out_conformer_model_migration` is added to message `.google.cloud.dialogflow.cx.v3.InputAudioConfig`
feat: A new field `language_code` is added to message `.google.cloud.dialogflow.cx.v3.Changelog`
feat: A new method `ExportEntityTypes` is added to service `EntityTypes`
feat: A new method `ImportEntityTypes` is added to service `EntityTypes`
feat: A new message `ExportEntityTypesRequest` is added
feat: A new message `ExportEntityTypesResponse` is added
feat: A new message `ExportEntityTypesMetadata` is added
feat: A new message `ImportEntityTypesRequest` is added
feat: A new message `ImportEntityTypesResponse` is added
feat: A new message `ImportEntityTypesMetadata` is added
feat: A new field `disable_data_store_fallback` is added to message `.google.cloud.dialogflow.cx.v3.GenerativeSettings`
feat: A new field `description` is added to message `.google.cloud.dialogflow.cx.v3.Page`
feat: A new method `ServerStreamingDetectIntent` is added to service `Sessions`
feat: A new field `webhook_ids` is added to message `.google.cloud.dialogflow.cx.v3.QueryResult`
feat: A new field `webhook_display_names` is added to message `.google.cloud.dialogflow.cx.v3.QueryResult`
feat: A new field `webhook_latencies` is added to message `.google.cloud.dialogflow.cx.v3.QueryResult`
feat: A new field `webhook_tags` is added to message `.google.cloud.dialogflow.cx.v3.QueryResult`
feat: A new field `current_flow` is added to message `.google.cloud.dialogflow.cx.v3.QueryResult`
docs: A comment for field `start_flow` in message `.google.cloud.dialogflow.cx.v3.Agent` is changed
docs: A comment for enum value `USE_BEST_AVAILABLE` in enum `SpeechModelVariant` is changed
docs: A comment for enum value `USE_ENHANCED` in enum `SpeechModelVariant` is changed
docs: A comment for field `model` in message `.google.cloud.dialogflow.cx.v3.InputAudioConfig` is changed
docs: A comment for enum value `END_OF_SINGLE_UTTERANCE` in enum `MessageType` is changed
docs: A comment for field `session_ttl` in message `.google.cloud.dialogflow.cx.v3.QueryParameters` is changed
docs: A comment for message `QueryInput` is changed
docs: A comment for field `text` in message `.google.cloud.dialogflow.cx.v3.TextInput` is changed

PiperOrigin-RevId: 605402250

Source-Link: https://togithub.com/googleapis/googleapis/commit/802bb2c2cd6825a97b2b94c6ff8e171029521500

Source-Link: https://togithub.com/googleapis/googleapis-gen/commit/bf877cc5e78c72120b29e38b3246fb8b72533955
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiYmY4NzdjYzVlNzhjNzIxMjBiMjllMzhiMzI0NmZiOGI3MjUzMzk1NSJ9
END_NESTED_COMMIT
BEGIN_NESTED_COMMIT
feat(dialogflow/cx): Public preview the  Dialogflow LLM based agent
docs: A comment for field `start_flow` in message `.google.cloud.dialogflow.cx.v3beta1.Agent` is changed
docs: A comment for enum value `USE_BEST_AVAILABLE` in enum `SpeechModelVariant` is changed
docs: A comment for enum value `USE_ENHANCED` in enum `SpeechModelVariant` is changed
docs: A comment for field `model` in message `.google.cloud.dialogflow.cx.v3beta1.InputAudioConfig` is changed
docs: A comment for enum value `END_OF_SINGLE_UTTERANCE` in enum `MessageType` is changed
docs: A comment for field `session_ttl` in message `.google.cloud.dialogflow.cx.v3beta1.QueryParameters` is changed
docs: A comment for message `QueryInput` is changed
docs: A comment for field `text` in message `.google.cloud.dialogflow.cx.v3beta1.TextInput` is changed

PiperOrigin-RevId: 605368061

Source-Link: https://togithub.com/googleapis/googleapis/commit/ed81d4eda9d532ee88270c35a053e28fa50fa675

Source-Link: https://togithub.com/googleapis/googleapis-gen/commit/9932768751ae9e9ba92cabe1312e451301962a1d
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiOTkzMjc2ODc1MWFlOWU5YmE5MmNhYmUxMzEyZTQ1MTMwMTk2MmExZCJ9
END_NESTED_COMMIT
BEGIN_NESTED_COMMIT
feat(discoveryengine): add data store, engine, serving config and site search engine services
feat: support search summarization with citations and references
feat: add suggestion deny list import/purge APIs
feat: add engine support for multi-turn search and search APIs
docs: keep the API doc up-to-date with recent changes

PiperOrigin-RevId: 605344453

Source-Link: https://togithub.com/googleapis/googleapis/commit/4fd031d58d7a1ecbfe9afbfc7ac5fa27ec42b841

Source-Link: https://togithub.com/googleapis/googleapis-gen/commit/b77d7a446c2e859dad16afd0a1d1e8f503fba884
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiYjc3ZDdhNDQ2YzJlODU5ZGFkMTZhZmQwYTFkMWU4ZjUwM2ZiYTg4NCJ9
END_NESTED_COMMIT
BEGIN_NESTED_COMMIT
feat(retail): add analytics service
PiperOrigin-RevId: 605291363

Source-Link: https://togithub.com/googleapis/googleapis/commit/12fdc70b657e8b0fa49d17619e200bc7d908ba3e

Source-Link: https://togithub.com/googleapis/googleapis-gen/commit/c8ca9c8a22584e0be2b47c6f15e7ea726d98c0c2
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiYzhjYTljOGEyMjU4NGUwYmUyYjQ3YzZmMTVlN2VhNzI2ZDk4YzBjMiJ9
END_NESTED_COMMIT
BEGIN_NESTED_COMMIT
feat(discoveryengine): add data store, engine and site search engine services
feat: support search summarization with citations and references
feat: add suggestion deny list import/purge APIs
feat: add engine support for multi-turn search and search APIs
docs: keep the API doc up-to-date with recent changes

PiperOrigin-RevId: 605218577

Source-Link: https://togithub.com/googleapis/googleapis/commit/565c3409420f18b1c9e08960f46fd2a418fc6513

Source-Link: https://togithub.com/googleapis/googleapis-gen/commit/58fd8ac1d8582a698eac2f184ecec6a68497b545
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiNThmZDhhYzFkODU4MmE2OThlYWMyZjE4NGVjZWM2YTY4NDk3YjU0NSJ9
END_NESTED_COMMIT
BEGIN_NESTED_COMMIT
feat(compute): Update Compute Engine API to revision 20240130 (#881)
Source-Link: https://togithub.com/googleapis/googleapis/commit/6953fd7c25f95f3975d98e53d05085911c3beec3

Source-Link: https://togithub.com/googleapis/googleapis-gen/commit/5ba472f79245e420049c60f0c324e44578f272a3
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiNWJhNDcyZjc5MjQ1ZTQyMDA0OWM2MGYwYzMyNGU0NDU3OGYyNzJhMyJ9
END_NESTED_COMMIT
gcf-merge-on-green bot pushed a commit that referenced this issue Feb 12, 2024
🤖 I have created a release *beep* *boop*
---


<details><summary>aiplatform: 1.60.0</summary>

## [1.60.0](https://togithub.com/googleapis/google-cloud-go/compare/aiplatform/v1.59.0...aiplatform/v1.60.0) (2024-02-09)


### Features

* **aiplatform:** Add SearchNearestEntities rpc to FeatureOnlineStoreService in aiplatform v1 ([#9385](https://togithub.com/googleapis/google-cloud-go/issues/9385)) ([46a5050](https://togithub.com/googleapis/google-cloud-go/commit/46a50502f033ff0afe2f17b5f1e9812a956e190e))


### Bug Fixes

* **aiplatform:** Remove field `max_wait_duration` from message Scheduling ([#9387](https://togithub.com/googleapis/google-cloud-go/issues/9387)) ([f049c97](https://togithub.com/googleapis/google-cloud-go/commit/f049c9751415f9fc4c81c1839a8371782cfc016c))
</details>

<details><summary>asset: 1.17.2</summary>

## [1.17.2](https://togithub.com/googleapis/google-cloud-go/compare/asset/v1.17.1...asset/v1.17.2) (2024-02-09)


### Documentation

* **asset:** Updated comments ([46a5050](https://togithub.com/googleapis/google-cloud-go/commit/46a50502f033ff0afe2f17b5f1e9812a956e190e))
</details>

<details><summary>compute: 1.24.0</summary>

## [1.24.0](https://togithub.com/googleapis/google-cloud-go/compare/compute/v1.23.4...compute/v1.24.0) (2024-02-09)


### Features

* **compute:** Update Compute Engine API to revision 20240130 ([#881](https://togithub.com/googleapis/google-cloud-go/issues/881)) ([f049c97](https://togithub.com/googleapis/google-cloud-go/commit/f049c9751415f9fc4c81c1839a8371782cfc016c))
</details>

<details><summary>container: 1.31.0</summary>

## [1.31.0](https://togithub.com/googleapis/google-cloud-go/compare/container/v1.30.1...container/v1.31.0) (2024-02-09)


### Features

* **container:** Added configuration for the StatefulHA addon to the AddonsConfig ([46a5050](https://togithub.com/googleapis/google-cloud-go/commit/46a50502f033ff0afe2f17b5f1e9812a956e190e))
</details>

<details><summary>dialogflow: 1.49.0</summary>

## [1.49.0](https://togithub.com/googleapis/google-cloud-go/compare/dialogflow/v1.48.2...dialogflow/v1.49.0) (2024-02-09)


### Features

* **dialogflow/cx:** A new field `opt_out_conformer_model_migration` is added to message `.google.cloud.dialogflow.cx.v3.InputAudioConfig` ([f049c97](https://togithub.com/googleapis/google-cloud-go/commit/f049c9751415f9fc4c81c1839a8371782cfc016c))
* **dialogflow/cx:** Public preview the  Dialogflow LLM based agent ([f049c97](https://togithub.com/googleapis/google-cloud-go/commit/f049c9751415f9fc4c81c1839a8371782cfc016c))
</details>

<details><summary>discoveryengine: 1.5.0</summary>

## [1.5.0](https://togithub.com/googleapis/google-cloud-go/compare/discoveryengine/v1.4.1...discoveryengine/v1.5.0) (2024-02-09)


### Features

* **discoveryengine:** Add data store, engine and site search engine services ([f049c97](https://togithub.com/googleapis/google-cloud-go/commit/f049c9751415f9fc4c81c1839a8371782cfc016c))
* **discoveryengine:** Add data store, engine, serving config and site search engine services ([f049c97](https://togithub.com/googleapis/google-cloud-go/commit/f049c9751415f9fc4c81c1839a8371782cfc016c))
</details>

<details><summary>documentai: 1.25.0</summary>

## [1.25.0](https://togithub.com/googleapis/google-cloud-go/compare/documentai/v1.24.0...documentai/v1.25.0) (2024-02-09)


### Features

* **documentai:** Expose model_type in v1 processor, so that user can see the model_type after get or list processor version ([2fcf55c](https://togithub.com/googleapis/google-cloud-go/commit/2fcf55ccb24749cf5387e707b0212bca722f2e96))
</details>

<details><summary>monitoring: 1.18.0</summary>

## [1.18.0](https://togithub.com/googleapis/google-cloud-go/compare/monitoring/v1.17.1...monitoring/v1.18.0) (2024-02-09)


### Features

* **monitoring/dashboard:** Add support for pie charts, incident lists, dropdown groups, error reporting panels, section headers, and styling options on text widgets ([f049c97](https://togithub.com/googleapis/google-cloud-go/commit/f049c9751415f9fc4c81c1839a8371782cfc016c))
</details>

<details><summary>retail: 1.16.0</summary>

## [1.16.0](https://togithub.com/googleapis/google-cloud-go/compare/retail/v1.15.1...retail/v1.16.0) (2024-02-09)


### Features

* **retail:** Add analytics service ([f049c97](https://togithub.com/googleapis/google-cloud-go/commit/f049c9751415f9fc4c81c1839a8371782cfc016c))
</details>

<details><summary>vision: 2.8.0</summary>

## [2.8.0](https://togithub.com/googleapis/google-cloud-go/compare/vision/v2.7.6...vision/v2.8.0) (2024-02-09)


### Features

* **vision:** Added option for user to set labels ([#9359](https://togithub.com/googleapis/google-cloud-go/issues/9359)) ([2fcf55c](https://togithub.com/googleapis/google-cloud-go/commit/2fcf55ccb24749cf5387e707b0212bca722f2e96))
</details>

---
This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants