Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: Receive doesn't call the callback function upon new unacknowledged messages #740

Closed
aseure opened this issue Aug 25, 2017 · 20 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@aseure
Copy link

aseure commented Aug 25, 2017

Version: v0.12.0

We have discovered that one of our Go programs, which should consume ~800 messages once every hour, is not correctly receiving the Pubsub messages that should have been sent by the callback function passed to the Receive. At startup, it will correctly get the messages already available and unacknowledged. However, one hour later, when the ~800 new messages arrive, nothing happens. By restarting a new instance, it works again and will block the hour after with the new messages, etc.

By adding debug Print* statements, we discovered that it's actually blocked at this select from iterator.go.

What could be the issue?

@jba jba self-assigned this Aug 25, 2017
@jba jba added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. priority: p0 Highest priority. Critical issue. P0 implies highest priority. api: pubsub Issues related to the Pub/Sub API. labels Aug 25, 2017
@jba
Copy link
Contributor

jba commented Aug 25, 2017

Please roll back to v0.11.0 while we investigate.

@aseure
Copy link
Author

aseure commented Aug 25, 2017

Rolling back to v0.11.0, seems to have fixed the issue: two consecutive hours of messages have been processed. I'll let it run for a few more hours to make sure it's working. If so, I'll try to git bisect between v0.11.0 and v0.12.0 and keep you updated.

Thank you for the quick answer.

@jba
Copy link
Contributor

jba commented Aug 25, 2017

Unless it gives you pleasure, I wouldn't worry about the git bisect. It's almost certainly the change from Pull to StreamingPull that causes this. I will try to reproduce.

@jba
Copy link
Contributor

jba commented Aug 28, 2017

I suspect there is some network issue that is disconnecting your stream, and the client is not properly recovering from that. But I can't reproduce. I ran this program over the weekend. I published 800 messages at 1-hour intervals and received them in a separate process. I also tried a 2-hour interval. In both cases I was still receiving messages after many hours.

My 2-hour run looked like:

> go run pubsub.go publish -project MY-PROJECT-ID -topic test-topic -count 800 -interval 2h

and in another shell:

go run pubsub.go receive -project MY-PROJECT-ID -topic test-topic -sub test-sub -interval 2h

Can you try to reproduce with my code? Also, I would like to see if any errors are being returned from the low-level stream calls. Maybe you could instrument your system to log them? The relevant places in the v0.12.0 code are:

Thanks in advance for your help.

@yanickbelanger
Copy link

yanickbelanger commented Aug 30, 2017

It looks like we also get the error with our setup: one of our GKE container stops receiving pubsub messages after a while, and a restart of the container resumes the message processing, until it stops again. We suspected that the receive function returned with no error, but it does not, it just stops receiving messages. We will probably rollback to the previous version of the client.

@jba
Copy link
Contributor

jba commented Aug 30, 2017

@yanickbelanger It would be great if you could log the low-level Send and Recv calls as I described above.

@yanickbelanger
Copy link

@jba I've just pushed on our cluster a test service that has some fmt.printf around the line you mentioned above. It should receive a message every minutes from another service. We'll see how it goes.

@yanickbelanger
Copy link

We cannot reproduce the issue in our test. We suspect the relatively short interval between the test messages may prevent our service to reach the conditions leading to the issue. We've added a subscription to another topic that publishes messages every 60-180 minutes, will report back later.

BTW, it's probaly normal, but even if our test went well, we still see errors from the logs I've added as you suggested above:

  • *status.statusError, rpc error: code = Internal desc = stream terminated by RST_STREAM with error code: INTERNAL_ERROR
  • *status.statusError, rpc error: code = Unavailable desc = The service was unable to fulfill your request. Please try again. [code=8a75]
  • *status.statusError, rpc error: code = Unavailable desc = transport is closing

@dansiemon
Copy link

I see the same with a service that pulls from many subscriptions. The ones with frequent messages keep going fine but the subscriptions that only have messages every ~15 minutes stop receiving after they initially drain the queue.

@yanickbelanger
Copy link

We've seen the issue with our last test. The service received successfully few messages withing a couple of hours, then stop receiving the next messages. Once restarted, the service processed the pending messages. Between the last received message and the service restart, we got the following logs (added in service.go after our discussion above):

--- Streaming pull send (send) begin for projects/[project]/subscriptions/[subscription]
--- !!! Streaming pull recv (fetchMessages) error for projects/[project]/subscriptions/[subscription]: *status.statusError, rpc error: code = Internal desc = stream terminated by RST_STREAM with error code: INTERNAL_ERROR
--- Streaming pull send (openLocked) for projects/[project]/subscriptions/[subscription]
--- Streaming pull recv (fetchMessages) for projects/[project]/subscriptions/[subscription]
--- !!! Streaming pull recv (fetchMessages) error for projects/[project]/subscriptions/[subscription]: *status.statusError, rpc error: code = Unavailable desc = The service was unable to fulfill your request. Please try again. [code=8a75]
--- Streaming pull send (openLocked) for projects/[project]/subscriptions/[subscription]
--- Streaming pull recv (fetchMessages) for projects/[project]/subscriptions/[subscription]

@pongad
Copy link
Contributor

pongad commented Sep 2, 2017

@yanickbelanger Just to make sure I understand the logs correctly, are the sends and recvs succeeding? Or is it that the sends and recvs themselves never return?

@pongad
Copy link
Contributor

pongad commented Sep 2, 2017

@jba I think I know what's going on. On L427 we unlock before Send or Recv so that both can happen at the same time. gRPC allows one Send and one Recv to happen concurrently, but not multiple Sends or multiple Recvs. This is usually OK, since each stream only has one goroutine pulling messages and one sending acks.

Looking at the logs, I think it's possible that a Recv fails, so we attempt to reopen the stream (L433). But reopening a stream causes a Send (L392). Now we have two Sends, one from reopening and one from acking. The two can happen concurrently since the latter is out of lock.

Please let me know if you think I'm on the right track.

EDIT:
I take that back; it doesn't make sense. L427 shouldn't be able to observe the stream doing the send on L392.

@pongad
Copy link
Contributor

pongad commented Sep 4, 2017

I bring good news! I'm finally able to to repro.
@jba I reproed this by your test program, but I had to run the receiver side on a GCE instance. Perhaps there's something about google internal network that prevents this from triggering...?

I modified fetchMessages like this:

	log.Println("fetch calling")
	err := p.call(func(spc pb.Subscriber_StreamingPullClient) error {
		var err error
		log.Println("inner fetch calling")
		res, err = spc.Recv()
		log.Println("inner fetch returned", res, err)
		return err
	})
	log.Println("fetch returned", err)

and modified send similarly.

I made the publishing half publish 2 messages every 15 minutes. I got these logs:

2017/09/04 07:53:36 fetch calling
2017/09/04 07:53:36 inner fetch calling
2017/09/04 07:53:46 received 0 messages
2017/09/04 07:53:49 inner fetch returned (messages elided)  <nil>
2017/09/04 07:53:49 fetch returned <nil>
2017/09/04 07:53:49 fetch calling
2017/09/04 07:53:49 inner fetch calling
2017/09/04 07:53:49 send calling
2017/09/04 07:53:49 inner send calling (ack IDs elided)
2017/09/04 07:53:49 inner send returned <nil>
2017/09/04 07:53:49 send returned <nil>
2017/09/04 07:58:46 received 2 messages
2017/09/04 08:03:46 received 2 messages
2017/09/04 08:08:46 received 2 messages
2017/09/04 08:13:46 received 2 messages
2017/09/04 08:18:46 received 2 messages

The call to spc.Recv simply never returns, despite new messages being published around 08:08.
I suspect that this is a bug in gRPC. I used gRPC v1.5.0 for testing. Tomorrow, I'll test it again with v1.6.0 to see.

@yanickbelanger @dansiemon ~~~Could you let us know which gRPC version you're on?~~~ Are you running on GCE as well?

EDIT: Tested with v1.6.0 and encountered the same problem. I'll try to see if I can repro this from raw gRPC client as well.

@dansiemon
Copy link

Yes, I was running in GCE (GKE) w/ gRPC 1.6.

@pongad
Copy link
Contributor

pongad commented Sep 5, 2017

I can repro this in both Go and Java using only gRPC-generated code. I think the root cause of this is probably on the server and not the client especially because Java and Go have different gRPC implementations.

@jba I'll open an internal bug for this and CC you

@yanickbelanger
Copy link

@pongad Yes I'm running on GKE.

@jonbretman
Copy link

@pongad what is the best way to track progress of the internal bug?

@jba
Copy link
Contributor

jba commented Sep 7, 2017

We'll update this issue when we have more information.

@purohit
Copy link

purohit commented Sep 15, 2017

I inadvertently updated to v0.12.0 as well and we got the same problem, although our queue backpressure was much greater (30,000+ messages). We were using queue-based autoscaling (an alpha feature on GCE VM instances) which completely stopped working until we rolled back to v0.11.0 as suggested. Then, everything started working again.

As a quick fix, I changed the clusters to CPU-based autoscaling, which worked for a few hours, and then broke again as was predicted by a previous comment in this post (i.e., the issue is with consuming queue messages at all after a few hours, not the type of autoscaling).

If any future problems crop up, I will post them here.

@junghoahnsc
Copy link

We had the exact same issue in our production.
After we upgraded to v0.12.0, the streaming pulling from a subscription that has a periodic publishing stopped after one or two bulk pullings. However the pulling from subscriptions that have a continuous publishing works well. v0.11.0 works fine in both cases now.

@jba jba added priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. and removed priority: p0 Highest priority. Critical issue. P0 implies highest priority. labels Nov 28, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

8 participants