Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: new Subscription.Receive methods and problems processing one message at a time #569

Closed
bradleyfalzon opened this issue Mar 20, 2017 · 4 comments
Assignees

Comments

@bradleyfalzon
Copy link

bradleyfalzon commented Mar 20, 2017

I've updated pubsub library to use the new Subscription.Receive methods discussed in https://groups.google.com/forum/#!topic/google-api-go-announce/8pt6oetAdKc/discussion.

My use case is basically, I use PubSub as a queue where each instance processes one message at a time (no concurrency).

Where previously I was using MessageIterator to block on each message, changing to the Subscription.Receive callback and still limiting concurrency to 1 baffles me.

I limit the concurrency by setting subscription.ReceiveSettings.MaxOutstandingMessages = 1 as per https://godoc.org/cloud.google.com/go/pubsub#Subscription.Receive

When I receive a single message on the queue, the process works fine, but when I receive two messages on the queue, the last message (ignoring the order) is always duplicated.

I'm either doing something awfully wrong, or there may be a bug somewhere.

I've tried to create a minimal example: https://gist.github.com/bradleyfalzon/a3e88d33e2597c834d83ba33865c6aa0

  • You'll need gcloud application credentials already set
  • Change the project name
  • Run it as is and it should add two messages to the queue
  • I expect "GCPPubSubQueue: I am doing the work for" to be printed twice
  • I see "GCPPubSubQueue: I am doing the work for" printed 3 times
  • If I push just one message to the queue (remove line 32) it works fine (obviously)
  • If I don't set MaxOutstandingMessages, it works fine
  • No matter when I acknowledge the message, the issue still happens
  • If there is a bug, I think it's related to how these messages are acknowledged internally, and when I add debug throughout pubsub library, I see the duplicate is delivered before the message is actually acknowledged.
  • This is run from servers in Australia, so if there is a timing related bug here, it may be isolated to connections with high latency.
  • I understand there's no exactly-once delivery, but these are being duplicated every single time.

Am I limiting concurrency wrong in some way? My issue may be somewhat related to #566, as I just want to process one message at a time (per server) and having concurrency by running multiple servers/instances.

2017/03/20 17:05:24 NewGCPPubSubQueue: creating topic
2017/03/20 17:05:31 NewGCPPubSubQueue: creating subscription
2017/03/20 17:05:32 Failed to decode ("google.rpc.debuginfo-bin", "Q3FzQlkyOXRMbWR2YjJkc1pTNWpiRzkxWkM1d2RXSnpkV0l1YzJWeWRtVnlMbUZqZEdsdmJuTXVkakZpWlhSaE1TNVRkV0p6WTNKcFltVnlRM0psWVhSbFUzVmljMk55YVhCMGFXOXVRV04wYVc5dUpFRmpkR2x2YmxCeWIyUjFZMlZ5VFc5a2RXeGxMbkJ5YjJSMVkyVk5ZWGxVYUhKdmQxSmxjM0J2Ym5ObEtGTjFZbk5qY21saVpYSkRjbVZoZEdWVGRXSnpZM0pwY0hScGIyNUJZM1JwYjI0dWFtRjJZVG96TURRcEVoSkRiMjVtYVdjZ2FYUmxiU0JsZUdsemRITT0"): illegal base64 data at input byte 344
2017/03/20 17:05:33 GCPPubSubQueue: published a message with a message ID: 88545717747656
2017/03/20 17:05:34 GCPPubSubQueue: processing ID 88545717747656, published at 2017-03-20 06:35:33.169 +0000 UTC
2017/03/20 17:05:34 GCPPubSubQueue: ack'd ID 88545717747656
2017/03/20 17:05:34 GCPPubSubQueue: I am doing the work for 88545717747656
2017/03/20 17:05:38 GCPPubSubQueue: published a message with a message ID: 88546446760100
2017/03/20 17:05:49 GCPPubSubQueue: successfully processed ID 88545717747656
2017/03/20 17:05:49 GCPPubSubQueue: processing ID 88546446760100, published at 2017-03-20 06:35:38.531 +0000 UTC
2017/03/20 17:05:49 GCPPubSubQueue: ack'd ID 88546446760100
2017/03/20 17:05:49 GCPPubSubQueue: I am doing the work for 88546446760100
2017/03/20 17:06:04 GCPPubSubQueue: successfully processed ID 88546446760100
2017/03/20 17:06:04 GCPPubSubQueue: processing ID 88546446760100, published at 2017-03-20 06:35:38.531 +0000 UTC
2017/03/20 17:06:04 GCPPubSubQueue: ack'd ID 88546446760100
2017/03/20 17:06:04 GCPPubSubQueue: I am doing the work for 88546446760100
2017/03/20 17:06:19 GCPPubSubQueue: successfully processed ID 88546446760100

Note the duplicate GCPPubSubQueue: I am doing the work for 88546446760100

@pongad
Copy link
Contributor

pongad commented Mar 20, 2017

I think I know what's going on. IIUC, the MaxOutstandingMessages option gets passed to two places, puller and the flow controller. The puller says "don't pull more than this many" while the flow controller says "don't run callback on more than this many at once". So what's happening is while the callback is running, the puller pulls another message. The second message won't get processed for about 15 seconds (line 91) and so it gets nacked.

I think the fix is to move the fc.acquire above iter.Next. @jba Am I understanding this right?

@jba
Copy link
Contributor

jba commented Mar 20, 2017

Thanks for the excellent bug report and minimal case. They made finding this bug easy. I literally cut'n'pasted your gist and added logging to the client until I figured it out.

We weren't doing deadline extension correctly, so, as @pongad said, the message was implicitly nacked and retransmitted.

The fix is in https://code-review.googlesource.com/11525.

I should point out that your goal of limiting throughput runs counter to our efforts to maximize it. I'm afraid things will only get worse when the StreamingPull API becomes available. Then our client will switch to using that, and there will be no way to prevent getting multiple messages from the server. We will hold them and extend their deadlines for you, and you can always increase MaxExtension from its default of 10 minutes, but you will have perhaps 100 messages sitting in memory while you process them one by one.

Can I ask why you don't want a single process to handle messages concurrently?

@jba
Copy link
Contributor

jba commented Mar 20, 2017

Another alternative once streaming pull turned on is to use the lower-level client at cloud.google.com/go/pubsub/apiv1. Then you will have much better control over your messages.

@bradleyfalzon
Copy link
Author

bradleyfalzon commented Mar 20, 2017

Thanks @jba for the quick response, I've added my use case to #566 (comment) as that seemed like a good place to centralise them for the moment. Both holding messages and processing them concurrently would be a problem for me, and I try to discuss those points in the other issue.

As for using the lower level APIs, my short term solution may be to vendor to the older commit, but I may simply be misusing what Pub/Sub is designed for.

bradleyfalzon added a commit to bradleyfalzon/gopherci that referenced this issue Mar 22, 2017
Google have recently made some breaking changes to their Pub/Sub
client API to support new high performance streaming methods.

https://groups.google.com/forum/#!topic/google-api-go-announce/aaqRDIQ3rvU/discussion
https://groups.google.com/forum/#!topic/google-api-go-announce/8pt6oetAdKc/discussion

The main change is the new methods don't easily support blocking
operations, nor one message at a time use cases. This is being
discussed: googleapis/google-cloud-go#566

This change attempts to use the new API but in a blocking method
indirectly discussed: googleapis/google-cloud-go#569

Since finishing and testing this method, which was successful, it
appears Google is discussing this use case further internally, so
this may not be the final solution, but gets us through for the moment.

If we're required to stop using the Pub/Sub client, and instead use
the APIv1 client, the issue does contain a gist of how it could work,
but it hasn't been tested in various failure modes, as it's a lower
level API - but I'm confident it just requires more testing and likely
no more changes.

Further, these changes did necessitate some refectoring on the internal
APIs, this was mostly opportunistic but made the changes simpler.

This refactors were essentially use a channel to push messages onto
the queue, previously this was an interface called Queuer. Also,
previously new jobs to be executed were sent on a channel, instead
each type of queuer should take a callback, and execute that callback
with the job as the only parameter.

GCPPubSubQueue was tested to ensure only one message is removed from
the queue at any time, allowing other instances to consume the remaining
messages, and on shutdown the executing job is allowed to finish in
full before the process exits.
bradleyfalzon added a commit to bradleyfalzon/gopherci that referenced this issue Mar 22, 2017
Google have recently made some breaking changes to their Pub/Sub
client API to support new high performance streaming methods.

https://groups.google.com/forum/#!topic/google-api-go-announce/aaqRDIQ3rvU/discussion
https://groups.google.com/forum/#!topic/google-api-go-announce/8pt6oetAdKc/discussion

The main change is the new methods don't easily support blocking
operations, nor one message at a time use cases. This is being
discussed: googleapis/google-cloud-go#566

This change attempts to use the new API but in a blocking method
indirectly discussed: googleapis/google-cloud-go#569

Since finishing and testing this method, which was successful, it
appears Google is discussing this use case further internally, so
this may not be the final solution, but gets us through for the moment.

If we're required to stop using the Pub/Sub client, and instead use
the APIv1 client, the issue does contain a gist of how it could work,
but it hasn't been tested in various failure modes, as it's a lower
level API - but I'm confident it just requires more testing and likely
no more changes.

Further, these changes did necessitate some refectoring on the internal
APIs, this was mostly opportunistic but made the changes simpler.

This refactors were essentially use a channel to push messages onto
the queue, previously this was an interface called Queuer. Also,
previously new jobs to be executed were sent on a channel, instead
each type of queuer should take a callback, and execute that callback
with the job as the only parameter.

A callback was chosen instead of another channel, as I wanted to ensure
only one message was consumed at a time, so the new APIs Receive method
has only one instance running, and calls the callback, blocking until
finished. When using a syncronous channel the Receive method became
asyncronous because it would unblock as soon as the message on the
channel was received, allowing the Receiver callback to fetch another
message - but then block as there was be no other listener on the channel.

GCPPubSubQueue was tested to ensure only one message is removed from
the queue at any time, allowing other instances to consume the remaining
messages, and on shutdown the executing job is allowed to finish in
full before the process exits.
bradleyfalzon added a commit to bradleyfalzon/gopherci that referenced this issue Mar 22, 2017
Google have recently made some breaking changes to their Pub/Sub
client API to support new high performance streaming methods.

https://groups.google.com/forum/#!topic/google-api-go-announce/aaqRDIQ3rvU/discussion
https://groups.google.com/forum/#!topic/google-api-go-announce/8pt6oetAdKc/discussion

The main change is the new methods don't easily support blocking
operations, nor one message at a time use cases. This is being
discussed: googleapis/google-cloud-go#566

This change attempts to use the new API but in a blocking method
indirectly discussed: googleapis/google-cloud-go#569

Since finishing and testing this method, which was successful, it
appears Google is discussing this use case further internally, so
this may not be the final solution, but gets us through for the moment.

If we're required to stop using the Pub/Sub client, and instead use
the APIv1 client, the issue does contain a gist of how it could work,
but it hasn't been tested in various failure modes, as it's a lower
level API - but I'm confident it just requires more testing and likely
no more changes.

Further, these changes did necessitate some refectoring on the internal
APIs, this was mostly opportunistic but made the changes simpler.

This refactors were essentially use a channel to push messages onto
the queue, previously this was an interface called Queuer. Also,
previously new jobs to be executed were sent on a channel, instead
each type of queuer should take a callback, and execute that callback
with the job as the only parameter.

A callback was chosen instead of another channel, as I wanted to ensure
only one message was consumed at a time, so the new APIs Receive method
has only one instance running, and calls the callback, blocking until
finished. When using a syncronous channel the Receive method became
asyncronous because it would unblock as soon as the message on the
channel was received, allowing the Receiver callback to fetch another
message - but then block as there was be no other listener on the channel.

GCPPubSubQueue was tested to ensure only one message is removed from
the queue at any time, allowing other instances to consume the remaining
messages, and on shutdown the executing job is allowed to finish in
full before the process exits.
bradleyfalzon added a commit to bradleyfalzon/gopherci that referenced this issue Mar 22, 2017
Google have recently made some breaking changes to their Pub/Sub
client API to support new high performance streaming methods.

https://groups.google.com/forum/#!topic/google-api-go-announce/aaqRDIQ3rvU/discussion
https://groups.google.com/forum/#!topic/google-api-go-announce/8pt6oetAdKc/discussion

The main change is the new methods don't easily support blocking
operations, nor one message at a time use cases. This is being
discussed: googleapis/google-cloud-go#566

This change attempts to use the new API but in a blocking method
indirectly discussed: googleapis/google-cloud-go#569

Since finishing and testing this method, which was successful, it
appears Google is discussing this use case further internally, so
this may not be the final solution, but gets us through for the moment.

If we're required to stop using the Pub/Sub client, and instead use
the APIv1 client, the issue does contain a gist of how it could work,
but it hasn't been tested in various failure modes, as it's a lower
level API - but I'm confident it just requires more testing and likely
no more changes.

Further, these changes did necessitate some refectoring on the internal
APIs, this was mostly opportunistic but made the changes simpler.

This refactors were essentially use a channel to push messages onto
the queue, previously this was an interface called Queuer. Also,
previously new jobs to be executed were sent on a channel, instead
each type of queuer should take a callback, and execute that callback
with the job as the only parameter.

A callback was chosen instead of another channel, as I wanted to ensure
only one message was consumed at a time, so the new APIs Receive method
has only one instance running, and calls the callback, blocking until
finished. When using a syncronous channel the Receive method became
asyncronous because it would unblock as soon as the message on the
channel was received, allowing the Receiver callback to fetch another
message - but then block as there was be no other listener on the channel.

GCPPubSubQueue was tested to ensure only one message is removed from
the queue at any time, allowing other instances to consume the remaining
messages, and on shutdown the executing job is allowed to finish in
full before the process exits.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants