Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: Many acks/modacks could cause other acks/modacks to be delayed #9727

Closed
jameshartig opened this issue Apr 8, 2024 · 47 comments · Fixed by #10238
Closed

pubsub: Many acks/modacks could cause other acks/modacks to be delayed #9727

jameshartig opened this issue Apr 8, 2024 · 47 comments · Fixed by #10238
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. status: investigating The issue is under investigation, which is determined to be non-trivial.

Comments

@jameshartig
Copy link
Contributor

Client

PubSub

Environment

CentOS on GCE (t2d-standard-4)

Go Environment

NumGoroutines=40
MaxExtension=15s
MaxOutstandingMessages=40000
MaxOutstandingBytes=-1
Synchronous=false

Expected behavior

I expect to see 0 in subscription/expired_ack_deadlines_count assuming that my AckCount match my PullCount.

Actual behavior

We periodically see a huge rate of expired acks as high as 15k/s. We are currently acknowledging 20k messages per second across 2 GCE instances, ~10k/s per instance, and pulling 20k messages per second across those instances as well. I would expect then that we shouldn't really be seeing any expired Acks.

I don't know the actual distribution of messages across the 40 goroutines but if some of them are getting most of the messages then it's possible for the ackIDBatchSize to be exceeded. When it's exceeded, sendModAck and sendAck both loop internally until all of the ackIDs have been sent. We don't have visibility into the distribution of time it takes to Acknowledge 2500 ackIDs but we can see from the GCP console that the Acknowledge method has a 95th percentile latency of over 100ms. Separately, we are calling ModifyAckDeadline (at the 95th percentile latency takes 40ms) with 16k IDs per second which needs 7 calls per instance which could take more than 250ms+.

Either of those would end up delaying the other since there's only a single sender goroutine which could be contributing to our expired acks issue.

Additionally, since we aren't using exactly-once-delivery, there's no way for us to measure how long it took from when we called Ack to when the request was sent to Pub/Sub. One way to fix that would be if the *AckResult returned from AckWithResult would actually be Ready once the message is sent, even if you're not using exactly-once-delivery.

Screenshots

image
image

Something else that's interesting is that the latencies shown in the GCP Console do not match our application-level metrics (which measure from start of Receive callback to Ack/Nack function call) at all:
image
vs
image

This is what led us to investigate if there was some sort of delay between when we Ack a message and when the underlying Acknowledge is sent by the client.

ModAckCount
image
(the reason for the increase at 19:30 UTC is because we increased MaxOutstandingMessages from 25000 to 40000)

Finally, the increase in expired acks happened after a sharp decrease in the StreamingPull response for which I have no explanation unless some change was made on Pub/Sub's side. It's not clear if this might mean that there's a higher concentration of messages in individual goroutines.
image

Additional context

We don't have any visibility into the 99th percentile modack extension being used and that would have been helpful in debugging.

@jameshartig jameshartig added the triage me I really want to be triaged. label Apr 8, 2024
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Apr 8, 2024
@hongalex hongalex added status: investigating The issue is under investigation, which is determined to be non-trivial. and removed triage me I really want to be triaged. labels Apr 8, 2024
@hongalex
Copy link
Member

hongalex commented Apr 8, 2024

I have some high level thoughts before doing a thorough investigation.

How large are your messages on average? StreamingPull streams support up to 10k MB/s, so having 40 Goroutines (equivalent of 40 streams open) might be overkill. If there's suspicions network bottleneck issues, reducing this could help.

the increase in expired acks happened after a sharp decrease in the StreamingPull response for which I have no explanation unless some change was made on Pub/Sub's side. It's not clear if this might mean that there's a higher concentration of messages in individual goroutines

It's possible the decrease comes from streams restarting, as streams are periodically terminated, but you would need to open a ticket to provide project ID / subscription ID for me to look into this.

Additionally, since we aren't using exactly-once-delivery, there's no way for us to measure how long it took from when we called Ack to when the request was sent to Pub/Sub

We currently have a beta release of the library that includes OpenTelemetry tracing that would potentially provide insight into this. Given your very high throughput, you would have to significantly reduce the sampling rate to make sure you don't hit the tracing quota if using Cloud Trace. This might not be an issue if you use your own backend. Instructions to enable tracing live here for now.

@jameshartig
Copy link
Contributor Author

How large are your messages on average? StreamingPull streams support up to 10k MB/s, so having 40 Goroutines (equivalent of 40 streams open) might be overkill. If there's suspicions network bottleneck issues, reducing this could help.

Mean is 768b and 95th percentile is 1kb. 40 is derived from 10 x NumCPU. We will try lowering that.

We currently have a beta release of the library that includes OpenTelemetry tracing that would potentially provide insight into this.

We will try that out and report back.

@jameshartig
Copy link
Contributor Author

jameshartig commented Apr 9, 2024

@hongalex Here's a trace where we called the Ack after 1.7s but then the ack wasn't started for another 6s and then the ack itself took 8s:
image

Here's one where the ack took 12s to start (maybe because the modack took 10s to end?):
image

Another where the ack took 10s to start and I'm not sure why:
image

There's already a lot of these cases and we only have the sampling set to record a few per minute.

This trace is a bit confusing since we set the MinExtensionPeriod=30s (to workaround this issue) but I see 2 modacks within 100ms of each other. I'm guessing that's because the iterator doesn't check the expiration when it adds to pendingModAcks and just modacks everything that isn't expired:
image

@jameshartig
Copy link
Contributor Author

@hongalex Also for added context, our handler batches messages in-memory and uploads batches to GCS in chunks of ~25k. So we are acking ~25k messages at once which might exacerbate this problem.

@hongalex
Copy link
Member

hongalex commented Apr 9, 2024

Thanks for the screenshots, this helps a lot with understanding the problem. Are these values already including the change to lower NumGoroutines?

So I think the delayed acks are a result of what you wrote in the first comment, that when batching large groups of ackIDs, any delay will result in a lot of further delay for messages latter in the batch. Could you confirm what message batch size you're seeing? This can be seen on the "modack start" and "ack end" events' attributes.

This trace is a bit confusing since we set the MinExtensionPeriod=30s (to workaround this issue) but I see 2 modacks within 100ms of each other. I'm guessing that's because the iterator doesn't check the expiration when it adds to pendingModAcks and just modacks everything that isn't expired:

That's correct. The first modack is the receipt modack that happens regardless, and subsequent modacks happen on a regular schedule.

@jameshartig
Copy link
Contributor Author

Are these values already including the change to lower NumGoroutines?
Yes, it was lowered to 20. I can lower it further if you think that would help?

Could you confirm what message batch size you're seeing? This can be seen on the "modack start" and "ack end" events' attributes.

image

The batch size for the ack was 2500 but the batch size for the modack was only 2 since the StreamingPull Response only contained 2 messages, I assume. I'm not sure why the modack took 44 seconds to complete though. That's longer than the Acknowledge deadline. We might need traces into the grpc library to understand if there's maybe contention on the number of streams or connections since the number of StreamingPull Response's and ModAck's are back up to their normal levels.
image

@hongalex
Copy link
Member

hongalex commented Apr 9, 2024

Interesting, so maybe the bottleneck isn't on the number of streams opened, but the number of modacks sent. If the number of batches coming in is small, then a large number of modacks will be sent, relatively to acks.

Can you explain how your Mod Ack Count graph is being populated?

@jameshartig
Copy link
Contributor Author

We might need traces into the grpc library to understand if there's maybe contention on the number of streams or connections since the number of StreamingPull Response's and ModAck's are back up to their normal levels.

I'm not sure why there aren't any grpc traces. We aren't setting WithTelemetryDisabled and from what I can tell from the code it looks like by default it calls addOpenTelemetryStatsHandler which uses otelgrpc.NewClientHandler(). Since we only are allowing pubsub-related spans (or children of those spans) in the sampler I wonder if the context is getting lost somehow when the modack or ack happens.

Can you explain how your Mod Ack Count graph is being populated?

Both of those graphs are from the OpenCensus data from the pubsub client. We have this function:

func registerPubSubOpenCensusViews() {
	registerPubSubOpenCensusViewsOnce.Do(func() {
		for _, v := range pubsub.DefaultPublishViews {
			lmetric.RegisterOpenCensusView("", v)
		}
		for _, v := range pubsub.DefaultSubscribeViews {
			lmetric.RegisterOpenCensusView("", v)
		}
	})
}

Which will register each of those views with our own lmetric package which exposes them to prometheus, which is the source of those graphs.

@hongalex
Copy link
Member

hongalex commented Apr 9, 2024

I'm not sure why there aren't any grpc traces. We aren't setting WithTelemetryDisabled and from what I can tell from the code it looks like by default it calls addOpenTelemetryStatsHandler which uses otelgrpc.NewClientHandler()

So the generated gRPC traces are children of batch ack/modack spans that live in their own trace, outside of the message trace spans you've been screenshotting. The reason is a bit complicated but it has to do with how parent/child relationships work in trace views. You should be able to see spans named <subscription ID> ack and <subscription ID> modack, where the gRPC traces should exist below that. However, if your sampling rate is too low, it's possible these are not being sampled.

StreamingPull traces should also be generated but they're not very useful, since it only represents the duration of the stream being opened, and not more specific information.

@jameshartig
Copy link
Contributor Author

So the generated gRPC traces are children of batch ack/modack spans that live in their own trace, outside of the message trace spans you've been screenshotting.

Ah the reason our sampler was not catching these was because I thought they'd have semconv.MessagingSystemKey.String("pubsub") set on them but the batch spans don't have any of these generic attributes set on them. Would they be able to get semconv.MessagingSystemKey.String("pubsub") and semconv.MessagingDestinationName(sub) set on them?

I'll make some changes to the sampler to include these other spans.

@hongalex
Copy link
Member

hongalex commented Apr 9, 2024

Would they be able to get semconv.MessagingSystemKey.String("pubsub") and semconv.MessagingDestinationName(sub) set on them?

That's good feedback, let me get these added.

@jameshartig
Copy link
Contributor Author

I fixed our sampler to include the grpc traces and interestingly all of the spans >20s are from ModifyAckDeadline:
image

Unfortunately the spans generated by grpc aren't very helpful:
image

@hongalex
Copy link
Member

I think the next thing is to figure out if Pub/Sub is seeing the same high modack latencies on the server side. I suspect this isn't a bug with the server, and rather either a throttled network or perhaps less likely a bug with the client library, but it wouldn't hurt to check if you can open a support ticket.

@jameshartig
Copy link
Contributor Author

@hongalex The case number is 50592102.

Also, there are a few spans I found where the grpc is significantly delayed:
image
The following attributes were on that span:

messaging.batch.message_count	1
messaging.gcp_pubsub.ack_deadline_seconds	30
messaging.gcp_pubsub.is_receipt_modack	true

Then there are some like:
image
Where the first grpc call returned status_code: 14 and then it retried.
That one also had:

messaging.batch.message_count	1
messaging.gcp_pubsub.ack_deadline_seconds	30
messaging.gcp_pubsub.is_receipt_modack	true

I'm not sure if that indicates some kind of client-side throttling is happening.

@hongalex
Copy link
Member

So I understand that my colleagues are investigating the bug with the message IDs you provided. In the meantime, I'm still trying to rule out what's going on from the client library perspective.

From your latest message (and also earlier messages), it is a bit concerning is that the batch is only comprised of a single message (ack ID). I was able to confirm that Pub/Sub should return, on a best effort basis, as many messages as available StreamingPull Response.

Some open questions I have

  1. Why is the modack latency significantly higher than ack latency?
  2. Why are you receiving batches of 1 or 2 messages from the stream?

I plan to create some kind of reproduction with the numbers you gave me to see if I can repro the behavior above.

@hongalex
Copy link
Member

Some more investigations

Why are you receiving batches of 1 or 2 messages from the stream?

So I realized this might be happening because of how flow control works. It seems that maybe one or two messages are acked at a time, which allows a tiny bit more messages to come through the stream (which issues a modack RPC). Still need to do more investigation to see if this can be mitigated.

@jameshartig
Copy link
Contributor Author

It's hard for me to connect a subscribe span to a modack/ack span but it's easy to go the other way because there's a link. Is there anything we can do to make it easier to navigate both ways (subscribe -> modack/ack and modack/ack -> subscribe) with links?

So I realized this might be happening because of how flow control works.

I'm not sure if this is the reason because we are typically always acking in batches of ~25000 which should translate to ~9 Acknowledge calls containing 2500 ids and the final one which has the leftover. The GCP Console's trace explorer isn't complex enough to let me graph the messaging.batch.message_count field but I looked at a 4 minute window, there were 795 acknowledge spans, over 300 of them had exactly 2500 ids. I couldn't find a single one with less than 10 ids.

I think the reason why were getting batches of 1-2 messages is because we're publishing in 1-2 message batches. My naive understanding of the server architecture is that the streaming subscribe batches are equivalent to the publish batches if our subscribers are keeping up. We have 2 subscribe VMs but we have >50 publishing VMs each publishing messages coming from end-user HTTP requests which typically only have a few messages in them.

We also have the following settings on the publishing side:

DelayThreshold = time.Millisecond
CountThreshold = pubsub.MaxPublishRequestCount
Timeout = 15 * time.Second
BufferedByteLimit = 100 * pubsub.MaxPublishRequestBytes

I don't have the tracing beta version on the publishing side since I can't put beta code in that application so I can't tell you what the batches look like on the publishing side exactly. Depending on the region the servers are publishing between 150 messages/second to 250 messages/second.

One thing we could try is raising the DelayThreshold to 10ms on the publishing side which should allow more messages to be bundled and see what effect that has on the number of StreamingPull responses on the subscriber side. That would meaningfully increase our end-user request latency since we could now see publishes taking up to 10ms so I'm not sure how easily I could get that change in.

Also I noticed an odd pattern several times with the ModAckDeadline spans:

Screenshot 2024-04-11 214357
Screenshot 2024-04-11 212839

It looks like the latencies start to increase until they plateau at 60 seconds (the client-side timeout) and then recover later. Almost like there's something happening on the server where it starts to respond slower and slower and then either crashes or recovers somehow.

@hongalex
Copy link
Member

hongalex commented Apr 12, 2024

I'm not sure if this is the reason because we are typically always acking in batches of ~25000 which should translate to ~9 Acknowledge calls containing 2500 ids and the final one which has the leftover

Hm that's fair. I had contrived a toy example where processing times was randomized between 5 and 9 seconds, which resulted in acks completing randomly within that interval and less batching.

One thing we could try is raising the DelayThreshold to 10ms on the publishing side which should allow more messages to be bundled and see what effect that has on the number of StreamingPull responses on the subscriber side

I would hold off on making publisher side changes for now. I was recently told that Pub/Sub servers will try and deliver as many messages as possible in a single StreamingPull response, even if the messages are published in small batches. It used to be that publish batches were more tightly coupled with subscribe batches, but that was before server side flow control was introduced.

I'll try reproducing with a large number of smaller publishers to see if that changes anything on my end, but aside from that I wasn't able to get my modack latency high.

Edit: forgot to respond

It's hard for me to connect a subscribe span to a modack/ack span but it's easy to go the other way because there's a link. Is there anything we can do to make it easier to navigate both ways (subscribe -> modack/ack and modack/ack -> subscribe) with links?

The OpenTelemetry spec only recently started allowing links to be added after span creation, so this bidirectional linking wasn't previously possible. I'm looking into adding it now.

@hongalex
Copy link
Member

hongalex commented May 2, 2024

@jameshartig apologies for the delay here. Would you be willing to try this fix that batches the initial receipt modacks? In cases where you're receiving a large number of small batched StreamingPull responses, this should reduce the number of modack RPCs being issued and hopefully lower the amount of network overhead those RPCs induce. Note, this branch is separate from the OpenTelemetry preview and builds upon the 1.37.1 release of the library.

go get cloud.google.com/go/pubsub@pubsub-batch-receipts

@jameshartig
Copy link
Contributor Author

@hongalex

I'm willing to test it but I think modacks need to be in their own goroutine because even in your proposed change they still could be held up by a long-running ack. If we had the acknowledge deadline at 10 seconds then we'd have only 10 seconds to send the receipt modack but if a large ack took 10+ seconds we would miss all of those receipt modacks. Even a separate goroutine isn't a perfect solution because we've already found modack calls that take 60+ seconds.

However if you think the underlying delay is a network/grpc contention issue and not something on the server-side then this could help.

@jameshartig
Copy link
Contributor Author

jameshartig commented May 3, 2024

@hongalex The ModAckCountView just counts total ModAcks and not the number of requests so I'm not able to observe that metric to understand the new count of modack requests but if I look at our GCP project metrics I do see a sharp decline in ModifyAckDeadline API calls:
image

Update:
Helping the contention theory there was a significant drop in CPU after deploying that branch:
image

@hongalex
Copy link
Member

hongalex commented May 3, 2024

Yeah so the biggest pain point I saw was the number of receipt modacks that were being issued since they aren't batched.

The reason I came back to this from a client library perspective is because I was working with someone on the backend side and couldn't see the Ack/Modacks actually reach the server, hinting there was network/cpu contention.

I'm curious to see if the fix holds stable. Your previous two graphs are great insights (thanks for sharing those) but let me know if anything changes. I'll start working on getting this fix merged.

Also, batching of receipts are only happening with non-exactly once delivery subscriptions. I don't think you're using that but thought I'd let you know anyway.

@jameshartig
Copy link
Contributor Author

jameshartig commented May 3, 2024

To best test it under load I stopped our subscribers for a few minutes to let ~5 million messages in the backlog. Then I started the subscribers again.
image
It's definitely better than before. The 99th percentile for Ack latency (on the Google side) is 1.5min, 95th is 1.25min and 50th is 30 seconds. But that's still different from our internal metrics for the same time which were 8 seconds, 4 seconds and 3 seconds accordingly so I believe there's still some delayed acks happening. Since this doesn't include tracing I couldn't dig into them specifically.

@jameshartig
Copy link
Contributor Author

The reason I came back to this from a client library perspective is because I was working with someone on the backend side and couldn't see the Ack/Modacks actually reach the server, hinting there was network/cpu contention.

That makes a lot of sense. Do you have any thoughts on if its a Linux kernel tcp contention issue, Go scheduling issue, or something in the Go grpc library? I'm not familiar enough with grpc to know if there's potentially a max-outstanding-in-flight limit or something similar that we might've been hitting.

@hongalex
Copy link
Member

hongalex commented May 3, 2024

To be honest, I'm not confident where the bottleneck is. I suspect it's a gRPC issue, but I don't have the data to back that up.

I remember a while back, I had suggested increasing the WithGRPCConnectionPool option. What value is that set at now?

@jameshartig
Copy link
Contributor Author

I remember a while back, I had suggested increasing the WithGRPCConnectionPool option. What value is that set at now?

I missed that update to the docs so we just have it at the default which would be 4 since we're using t2d-standard-4 instances.

I can bump it up to 8 and see how that affects the ack latencies.

@jameshartig
Copy link
Contributor Author

jameshartig commented May 7, 2024

Raised the pool to 8 but the service was actually doing 50% more messages per second than earlier.
image
The latencies on Google's side were better: 99th percentile for Ack latency was 0.8min, 95th is .68min and 50th is 25 seconds.

Raising the pool to 16:
image
Pretty much the same as 8. 99th percentile for Ack latency was 0.8min, 95th is .7min and 50th is 30 seconds.

Though these numbers are slightly better than the default (4 connections) I'm not sure why grpc might be the bottleneck now since each connection should have 100 streams and there are at most 1 modAck/ack/receipt in-flight with your new batching changes so it shouldn't be more than a handful of streams at any given time.

I might need to wait until your batching is merged and then combined with the otel tracing changes so I could do more digging on tracing. There's still an issue where we might ack 25k messages at a time and if it takes 10 seconds (for example) to ack 2500 messages then we'll end up spending 100 seconds acking those 25k messages since it's still a single goroutine serially acking the messages.

There was a spike in latency (according to the Google API Dashboard) for the Acknowledge call right before and during the spikes in expired acks:
image

That indicates it's taking at most ~200ms however we are acking ~13k per second which means we need to be able to do 5-6 calls per second, if we cannot make Acknowledge calls as fast as we are acking messages then it could fall behind.

@hongalex
Copy link
Member

There's still an issue where we might ack 25k messages at a time and if it takes 10 seconds (for example) to ack 2500 messages then we'll end up spending 100 seconds acking those 25k messages since it's still a single goroutine serially acking the messages.

Yeah so I'm inclined to making all of the ack/modack RPCs concurrent (within a single invocation of sendAck and sendModack). That theoretically should improve the latency you are seeing here. However, expecting 10 seconds per ack/modack RPC is a bit strange which is why I would want to investigate that more.

@jameshartig
Copy link
Contributor Author

Let me know what's next on my end for debugging. Happy to help. We might need the tracing changes though which means maybe we need to wait until your batching branch is merged?

@hongalex
Copy link
Member

@jameshartig Sorry for the delay. Can you try pulling from go get cloud.google.com/go/pubsub@pubsub-batch-receipts again? I added the change that makes the ack/modack RPCs concurrent (called in their own goroutine), so this should improve performance somewhat.

@jameshartig
Copy link
Contributor Author

Running the latest pubsub-batch-receipts code with the default grpc pool seemed to make things worse:
image
The latencies on Google's side: 99th percentile for Ack latency was 0.92min-1.39min, 95th is .8min and 50th is .58min seconds.

Raising the grpc pool to 8:
image
The latencies on Google's side: 99th percentile for Ack latency was 0.94min, 95th is .86min and 50th is .59min seconds.
Here's the Acknowledge latency for this workload from the Console:
image

Raising the grpc pool to 16:
image
The latencies on Google's side: 99th percentile for Ack latency was 0.94min, 95th is .84min and 50th is .59min seconds.
and the acknowledge latency:
image

Overall I didn't see much change which is a bit confusing.

@hongalex
Copy link
Member

Thanks for reporting back. I am a bit confused as to why increasing the number of RPCs in flight does not decrease latency. Something I was hoping you could try is lowering MaxOutstandingMessages. I suggest maybe cutting it in half to 20k to see if that results in any improvement in expired messages. In addition, you could consider further lowering NumGoroutines to 5, since your throughput is still rather low and doesn't need that many streams open.

You can just use the the grpc connection pool = 8 for testing this.

@jameshartig
Copy link
Contributor Author

In addition, you could consider further lowering NumGoroutines to 5, since your throughput is still rather low and doesn't need that many streams open.

We have the following code in our Pub/Sub library:

sub.ReceiveSettings.NumGoroutines = runtime.NumCPU() // used to be runtime.NumCPU() * 5
for sub.ReceiveSettings.NumGoroutines > 1 {
	if sub.ReceiveSettings.NumGoroutines*100 <= sub.ReceiveSettings.MaxOutstandingMessages {
		break
	}
	sub.ReceiveSettings.NumGoroutines--
}

What do you think a good ratio is? The intent is to not have NumGoroutines set to something like 4 when the MaxOutstandingMessages is something like 100.

Also, apologies, my latest comment did not actually reflect any changes to the default grpc connection pool size because of a configuration bug.

Here are the following tests:
NumGoroutines=20, MaxOutstandingMessages=60000, grpcConnectionPool=8
image
Already a decent improvement here. Latencies started off at .37min (99th), .25min (95th), 4sec (50th) and ended at .76min (99th), .59min (95th), and .22min (50th). I'm not sure why it went up and I'll have to do a longer test later to see if it would've continued or plateaued. There was one tiny blip at 25k on the expired deadlines count. I will also note that we had significantly higher throughput (~40k/sec across 2 VMs) during this test than earlier today (~20k/sec across 2 VMs).

NumGoroutines=5, MaxOutstandingMessages=60000, grpcConnectionPool=8
image
This one was significantly better. Latencies were at 9sec (99th), 7sec (95th), 3.6sec (50th). There was a spike on the expired ack graph but it's the lowest we've seen.

I can do some more testing tomorrow.

@hongalex
Copy link
Member

hongalex commented May 15, 2024

What do you think a good ratio is? The intent is to not have NumGoroutines set to something like 4 when the MaxOutstandingMessages is something like 100.

So these two values are technically separate but related in a bit confusing way. NumGoroutines refers to how many iterators we create, where each iterator creates a StreamingPull stream and handles lease management of messages for that stream. Each stream handles 10 MB/s, so given that your throughput hovers around 10-20MB/s, I think a low number of streams is sufficient.

MaxOutstandingMessages configures how many message handlers are invoked at once, but also is passed along to the initial StreamingPullRequest to configure server side flow control. The unfortunate thing is that this flow control is a bit flawed since to the server, the number of outstanding messages is NumGoroutines * MaxOutstandingMessages, which means for an individual subscriber application, flow control is off by a magnitude of NumGoroutines. The client side flow control then kicks in to enforce the intended behavior of MaxOutstandingMessages. We considered setting the flow control limit of each stream to be MaxOutstanding / NumStreams but this is technically a breaking change.

On the last point, I'm going to do some testing to see if setting NumGoroutines or MaxOutstanding to be too high will result in ack expirations. I have a suspicion that excess messages could be flow controlled (corresponding to the concurrency control span) on the client side and then expire. This is the main motivation for suggesting you to decrease MaxOutstandingMessages.

@jameshartig
Copy link
Contributor Author

So these two values are technically separate but related in a bit confusing way. NumGoroutines refers to how many iterators we create, where each iterator creates a StreamingPull stream and handles lease management of messages for that stream. Each stream handles 10 MB/s, so given that your throughput hovers around 10-20MB/s, I think a low number of streams is sufficient.

I think that's what I was looking for. Ideally we would not expose any knobs to our developers but we've had a lot of use cases to expose MaxOutstandingMessages and so we want to automatically configure NumGoroutines for them which is why we try to base it on MaxOutstandingMessages. Our message sizes are between 500b and 4kb so on the high end of things 10MB/s would be 2,500 messages per second. Sounds like we could even raise the ratio in our code to 1000 rather than 100.

The unfortunate thing is that this flow control is a bit flawed since to the server, the number of outstanding messages is NumGoroutines * MaxOutstandingMessages, which means for an individual subscriber application, flow control is off by a magnitude of NumGoroutines. The client side flow control then kicks in to enforce the intended behavior of MaxOutstandingMessages. We considered setting the flow control limit of each stream to be MaxOutstanding / NumStreams but this is technically a breaking change.

Interesting, that seems like a big flaw. I can see strictly speaking why it would be a breaking change but that also feels like an implementation detail that isn't documented either way.

I have a suspicion that excess messages could be flow controlled (corresponding to the concurrency control span) on the client side and then expire.

That's very interesting and something I hadn't considered. But isn't the client doing ModAck's even if they're blocked in the flow controller? I guess they could exceed MaxExtension which we have set to 30 seconds. Should we attempt to raise that? We could potentially try setting the LimitExceededBehavior to FlowControlIgnore or even FlowControlSignalError?

@jameshartig
Copy link
Contributor Author

Apologies, I had a busy morning. Some more tests:

Sanity check raising NumGoroutines back up: NumGoroutines=20, MaxOutstandingMessages=60000, grpcConnectionPool=8
image
Latencies were at .76min (99th), .56min (95th), 18sec (50th).

NumGoroutines=2, MaxOutstandingMessages=60000, grpcConnectionPool=8
image
The expired acks was 0 🎉. Latencies were at 7sec (99th), 5sec (95th), 2sec (50th).
I ran it again just to be sure and got even better results.
image
Latencies were at 4.2sec (99th), 3.5sec (95th), 2.4sec (50th).

NumGoroutines=5, MaxOutstandingMessages=20000, grpcConnectionPool=8
image
I'm a bit surprised by this one. Latencies were at 40sec (99th), 30sec (95th), 14sec (50th).

The lower NumGoroutines performing better would match your theory about the flow mismatch being an issue unless there's something else going on with the goroutine count.

That makes me think we should even be more conservative with NumGoroutines and default it to 1 (or maybe 2?) in almost all cases unless we expect to see high throughput. At least until there's a change made to sync the outstanding message flow control with the server.

@hongalex
Copy link
Member

Thanks for your patience with this issue. I'm glad to see that lowering NumGoroutines to 2 resulted in no expired acks.

Interesting, that seems like a big flaw. I can see strictly speaking why it would be a breaking change but that also feels like an implementation detail that isn't documented either way.

Yeah, it's something we thought about in the past but haven't finalized on a concrete decision on how we want to fix this. I'll prioritize revisiting this, now that we have some stats to backup this claim.

But isn't the client doing ModAck's even if they're blocked in the flow controller?

Yes, modacks start happening soon after messages get received, before flow control.

I guess they could exceed MaxExtension which we have set to 30 seconds. Should we attempt to raise that?

Given that you are able to process messages in under 10s (in your most recent screenshots), this will probably just result in higher ack latency, which might be undesirable. It's better to have the client process the messages as soon as they come in.

We could potentially try setting the LimitExceededBehavior to FlowControlIgnore or even FlowControlSignalError?

Yeah it's worth considering disabling client side flow control,FlowControlIgnore, and just relying on the server side flow control, understanding that the total messages outstanding will be NumGoroutines * MaxOutstandingMessages. FlowControlSignalError likely isn't what you want.

@jameshartig
Copy link
Contributor Author

Yeah it's worth considering disabling client side flow control,FlowControlIgnore, and just relying on the server side flow control, understanding that the total messages outstanding will be NumGoroutines * MaxOutstandingMessages. FlowControlSignalError likely isn't what you want.

That's not controllable for receiving, right? That would need to be a change in the client to make that customizable.

@jameshartig
Copy link
Contributor Author

Thanks for your patience with this issue. I'm glad to see that lowering NumGoroutines to 2 resulted in no expired acks.

One feature request out of this would be to make the NumGoroutines value dynamic based on the throughput from the client's perspective. It seems like majority of consumers (less than 10MB/s) only need 1 stream but the client makes 10 by default. It could start with 1 and increase it based on the throughput it received from the server. That seems like a more sensible default than 10.

@hongalex
Copy link
Member

One feature request out of this would be to make the NumGoroutines value dynamic based on the throughput from the client's perspective. It seems like majority of consumers (less than 10MB/s) only need 1 stream but the client makes 10 by default. It could start with 1 and increase it based on the throughput it received from the server.

A dynamic level of MaxOutstandingMessages is something that we've considered, though NumGoroutines is also interesting. I'll bring it up to folks, though in the meantime, would you be willing to create an issue for that here. This allows us to properly track requests that don't apply to just a single library.

That seems like a more sensible default than 10.

Agreed that the default of 10 doesn't make as much sense and should be significantly lower. This doesn't use to matter until server side flow control was added. The alternative is to introduce a mechanism that fixes the outstanding messages per stream behavior, making that opt-in and eventually deprecating the old behavior altogether.

That's not controllable for receiving, right? That would need to be a change in the client to make that customizable.

You're right. It's something I'll consider adding, like a sub.ReceiveSettings.FlowControlBehavior.

@hongalex
Copy link
Member

hongalex commented May 17, 2024

As an addition to my previous answer on flow control, there's a potential workaround if you want to use a higher number of iterators/NumGoroutines/streams. You would set MaxOutstandingMessages = -1, which disables client & server side flow control for messages. While this would also disable the limit on the number of callbacks, you can set MaxOutstandingBytes to some value that makes sense to you. Conservatively assuming each message is 4KB and you want to process 10k messages/second across 4 streams, each stream would need to handle 2500 messages / second.

If each message takes about 5 second to process, you'll have a limit of 2500 messages/second * (5 second/message) = 12.5k outstanding messages. Since we're looking for bytes instead, you would set 12.5k messages * 4KB/message = MaxOutstandingBytes = 50 MB. Initially, the client will be limited to 10 MB/s , but once you have a steady stream of messages (where the client is simultaneously processing and pulling 10 MB/s, this should be less of a problem).

Edit: Ignore the above. As I finished typing this, I realized the client is then limited to 50 MB of messages simultaneously rather than 250 MB of messages, which is what you need to get 10k messages/second. The math above runs into the same issue of MaxOutstandingMessages, and only deals with the limitations of simultaneous callbacks. You could disable both MaxOutstandingMessages AND MaxOutstandingBytes if you're confident you can process 10 MB/s per stream of messages though.

One of the issues you were running into is that the client library pulls too many messages at once and gets stuck holding the messages while waiting for flow control resources. Disabling both flow control settings and relying purely on the stream's limit of 10 MB/s might solve your problems temporarily until we can add in the other settings like LimitExceeded behavior + sensible flow control limits per stream.

@jameshartig
Copy link
Contributor Author

jameshartig commented May 17, 2024

I'll bring it up to folks, though in the meantime, would you be willing to create an issue for that here. This allows us to properly track requests that don't apply to just a single library.

Does this work? https://issuetracker.google.com/issues/341124500

One of the issues you were running into is that the client library pulls too many messages at once and gets stuck holding the messages while waiting for flow control resources. Disabling both flow control settings and relying purely on the stream's limit of 10 MB/s might solve your problems temporarily until we can add in the other settings like LimitExceeded behavior + sensible flow control limits per stream.

I think that's kind of what we're doing by setting NumGoroutines=2, right? That's limiting it to 20MB/s which is really only an issue when there's a surge in messages. While we still have MaxOutstandingMessages=60000 as long as we can churn through the messages before MaxExtension then we're okay. I think it's possible we could do MaxOutstandingMessages=-1 but we'd probably have to scale other parts of our application to handle a larger amount of messages at once because we have our own version of flow control when bundling, formatting, and ultimately writing to GCS.

@hongalex
Copy link
Member

hongalex commented May 17, 2024

Does this work? https://issuetracker.google.com/issues/341124500

Perfect thanks.

I think that's kind of what we're doing by setting NumGoroutines=2, right?

Since it seems you're ok with a slight increase in ack latency (up to MaxExtension), that seems like a decent tradeoff. If you want minimize ack latency (no waiting for client side flow control) you will probably want to play around with scaling up and disabling client slide flow control altogether.

I'll leave this current issue open until I merge in the change that batches the receipt modacks / makes modacks/acks concurrent.

@jameshartig
Copy link
Contributor Author

Thanks again for all the help here @hongalex!!

@hongalex
Copy link
Member

No, thank you for the detailed graphs and reproduction.

We're hoping to introduce a new setting that will allow configuring the # of callbacks independently of flow control, which should also help. I can tag you when that gets implemented.

@jameshartig
Copy link
Contributor Author

We're hoping to introduce a new setting that will allow configuring the # of callbacks independently of flow control, which should also help. I can tag you when that gets implemented.

Great!

Additionally, should we store the time a message was received in the iterator so we can reject it if its been sitting in the client for longer than MaxExtension or alternatively expose that to the handler in the Message struct? I'm trying to think of how we can prevent already-expired messages from being handled unnecessarily.

@hongalex
Copy link
Member

Yeah it's something I'm working through. Another behavior I'm thinking about is that, currently, messages that are expired are still being processed. The advantage of this is that messages can still be "acked" after they expire, but perhaps a better solution would be to stop processing and nack the message on expiration instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. status: investigating The issue is under investigation, which is determined to be non-trivial.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants