Skip to content

Support back-pressure on reads and subscriptions. #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 23, 2025

Conversation

rickardoberg
Copy link
Contributor

The current readStreamReactive implementation is not reactive at all, since the backpressure is not sent to the server. Instead the server sends messages as fast as possible and there is a blocking loop in ReadSubscription. If you have a 1000 subscriptions that are consuming slower than the server is sending, then that translates into 1000 blocking threads.

This PR changes so that the client properly uses backpressure by sending requests to the server when available. It amortizes the cost of the call to batches of 512 (i.e. if client requests Long.MAX_VALUE then only send out requests for 512 at a time), and topping it up if it goes below 512*3/4 (i.e. don't allow outstanding requests to reach 0 as that would introduce an unnecessary delay), but those numbers are just for illustrative purposes.

With this change a client that makes 1000 readStreamReactive subscriptions and is not able to consume the events fast enough will not create 1000 threads.

…ation where requests are sent to the server.

Amortize the cost by only sending requests at most every 512 request, unless the requests are all small.
@CLAassistant
Copy link

CLAassistant commented Dec 17, 2024

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 2 committers have signed the CLA.

✅ YoEight
❌ rickardoberg
You have signed the CLA already but the status is still pending? Let us recheck it.

Copy link
Contributor

@YoEight YoEight left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @rickardoberg and thank you for that early Christmas present!

I only have a few remarks so far.

@rickardoberg
Copy link
Contributor Author

You're welcome! And if you're feeling the mood, if there's a way to also add that the reactive stream does not close when there are no more events (i.e. it's a catch-up subscription) that would be FANTASTIC. Then the reactive stream API becomes the perfect way to integrate with the server.

@YoEight
Copy link
Contributor

YoEight commented Jan 14, 2025

After investigating further, I’ve found that this PR’s back-pressure mechanism requires the server to send the correct number of messages, which isn’t happening at the moment. Unfortunately, that means a portion of the patch is effectively wasted from a performance standpoint.

@rickardoberg
Copy link
Contributor Author

Do you have any sense of what it would take to fix that on the server-side?

@YoEight
Copy link
Contributor

YoEight commented Jan 15, 2025

@rickardoberg

We have no plan on supporting it so hard to tell. We'll need time to investigate.

Also could you extend a bit on that issue you are having?

And if you're feeling the mood, if there's a way to also add that the reactive stream does not close when there are no more events (i.e. it's a catch-up subscription) that would be FANTASTIC.

@rickardoberg
Copy link
Contributor Author

@YoEight if this could get prioritized that would be great. It would finally make EventStore scalable end-to-end, which is amazing. Right now with blocking clients its like you have run a marathon and stop for a teaparty just before the finish line. It is so bizarre to me that this has not been fixed yet.

For the second part, currently the reactive stream API can only be used to do the initial read of a stream. There is no way to use it to wait for events, as the stream is completed once there are no more events. This forces clients to use the Subscription API, which is not great since that API is also blocking. My suggestion is to add a setting to the options on connect so that you can create a reactive stream that does not complete when there are no more events, just like the Subscription API.

Once you have fixed the server issue for this PR, and added reactive streams that don't stop, only then have you finished the marathon, metaphorically speaking. Then there is at least one proper client API for both initial read and subscriptions, and it is NOT blocking like the other APIs.

Does that answer your question?

@YoEight
Copy link
Contributor

YoEight commented Jan 16, 2025

if this could get prioritized that would be great. It would finally make EventStore scalable end-to-end, which is amazing. Right now with blocking clients its like you have run a marathon and stop for a teaparty just before the finish line. It is so bizarre to me that this has not been fixed yet.

What do you mean by blocking clients. Did you experience issues with the other clients that we have? While back-pressure is important, if your consumer code is being overwhelmed by EventStore's throughput, it likely indicates that your application is under-provisioned. Interacting with EventStore is primarily I/O bound, so performance is largely determined by the speed of your network and storage devices. If you want to manage your resource consumption effectively and maintain stability when the consumer side can't keep up, back-pressure is essential for sure. However, I don't think it's a critical issue for EventStore, in my opinion at least.

For the second part, currently the reactive stream API can only be used to do the initial read of a stream. There is no way to use it to wait for events, as the stream is completed once there are no more events. This forces clients to use the Subscription API, which is not great since that API is also blocking. My suggestion is to add a setting to the options on connect so that you can create a reactive stream that does not complete when there are no more events, just like the Subscription API.

I thought I understood what you meant initially but now I'm confused. How the reactive implementation is blocking considering it's the gRPC observer that is driving the streaming operation? The underlying gRPC implementation is non-blocking. What am I missing here?

@rickardoberg
Copy link
Contributor Author

if this could get prioritized that would be great. It would finally make EventStore scalable end-to-end, which is amazing. Right now with blocking clients its like you have run a marathon and stop for a teaparty just before the finish line. It is so bizarre to me that this has not been fixed yet.

What do you mean by blocking clients. Did you experience issues with the other clients that we have? While back-pressure is important, if your consumer code is being overwhelmed by EventStore's throughput, it likely indicates that your application is under-provisioned. Interacting with EventStore is primarily I/O bound, so performance is largely determined by the speed of your network and storage devices. If you want to manage your resource consumption effectively and maintain stability when the consumer side can't keep up, back-pressure is essential for sure. However, I don't think it's a critical issue for EventStore, in my opinion at least.

Let's take this step by step. Currently the Subscription API is blocking. Currently the reactive stream API is blocking. All other things being equal it is always faster to read an event than write one, because that's just how I/O is. Therefore, under any normal circumstances EventStore will be able to provide events faster than a consumer can handle them, if there is any kind of writing done by the consumer. Because of this you HAVE to do back pressure handling, or else there will be blocking. In the EventStore client case the Subscription API will be blocking on onEvent, as it has to wait for the consumer code to handle/write the event. In our case the consumer is a proper back pressure handling reactive stream server (necessary since yours isn't). So for each Subscription there is one thread waiting for onEvent to finish, and internally that consumer does the same wait-until-requests loop as your client does (=bad, but required). But that consumer is then a reactive stream server, so that its clients get proper backpressure handling. If EventStore had backpressure handling we would maybe not even need this service in between EventStore and the actual workload, but as it is we need it to cover for your bad implementation.

To say that "consumer code is being overwhelmed by EventStore's throughput, it likely indicates that your application is under-provisioned" is non-sensical, given the premise that reads will always be faster than writes. In the case of the EventStore reactive stream API client it is also blocking, as I've already shown. So as a user of EventStore I am forced to have one thread per stream being read, which is terrible and unnecessary and why I'm saying that you built this amazing product only to stop for a teaparty at the very end and provide blocking clients.

For the second part, currently the reactive stream API can only be used to do the initial read of a stream. There is no way to use it to wait for events, as the stream is completed once there are no more events. This forces clients to use the Subscription API, which is not great since that API is also blocking. My suggestion is to add a setting to the options on connect so that you can create a reactive stream that does not complete when there are no more events, just like the Subscription API.

I thought I understood what you meant initially but now I'm confused. How the reactive implementation is blocking considering it's the gRPC observer that is driving the streaming operation? The underlying gRPC implementation is non-blocking. What am I missing here?

The while loop in ReadSubscription.onNext is what makes it blocking. That while loop should not exist, which is the main point of this PR. The client should request data, and then the EventStore server provides it and sends it to the final consumer through your client. But if backpressure is not handled correctly (as it is currently) then you need that while loop, and then it becomes blocking, i.e. you get one thread per subscription(=bad!).

Your base assumption should be that the EventStore server can provide events faster than the consumer can handle it. If there are niche cases where the consumer is faster, great, but that should not be assumed to be the normal case (see I/O argument above). And so because of that, you need to design the client to cater for this, hence this PR. Until this is fixed 1000 client subscriptions = 1000 threads -> bad! It's that simple.

Clearer?

@rickardoberg
Copy link
Contributor Author

@YoEight if you want to hop on a video call and discuss this, I'd be more than happy to. Just let me know what you need to understand the issues here.

@YoEight
Copy link
Contributor

YoEight commented Jan 16, 2025

The while loop in ReadSubscription.onNext is what makes it blocking. That while loop should not exist, which is the main point of this PR. The client should request data, and then the EventStore server provides it and sends it to the final consumer through your client. But if backpressure is not handled correctly (as it is currently) then you need that while loop, and then it becomes blocking, i.e. you get one thread per subscription(=bad!).

I understand now: the PR requires server-side back-pressure to be effective, and since that’s not currently on the roadmap, it might not happen soon. While it’s not impossible in the future, the PR will likely remain blocked until back-pressure is introduced. The need for a dedicated thread is specific to Java, since the other languages we support provide green threads.

I’ll let the rest of the team know that we need to prioritize back-pressure so we can address your core issue.

@YoEight
Copy link
Contributor

YoEight commented Jan 16, 2025

Ok after some digging, it seems most of the gRPC back-pressure should just work on the server, as it is handled at the transport layer. The Java client needed that patch of yours to leverage it. Now, we should still provide access to flowControlWindow channel options for finer tuning when the client starts. With your patch, we are now giving window update through the request calls that we have now.

@YoEight
Copy link
Contributor

YoEight commented Jan 17, 2025

I can confirm that the server does support back-pressure when a client reports HTTP/2 control flow window updates properly. The .NET and the Rust clients do it. I need to check with the Go client. I will report my findings with this PR's patch later to see if it finally behaves properly.

@w1am w1am self-requested a review January 20, 2025 15:43
@YoEight
Copy link
Contributor

YoEight commented Jan 20, 2025

@rickardoberg Any chance you can give a try to the latest version of that PR? I addressed both regular subscriptions and reads.

@YoEight YoEight changed the title Make reactive stream reading non-blocking Support back-pressure on reads and subscriptions. Jan 20, 2025
@rickardoberg
Copy link
Contributor Author

@YoEight I tried out the updated branch with my own test code. Unfortunately I can't actually test it properly, because of ReadResponseObserver:L86 which starts with sending out a request with the configured batch size. My test is to see what happens if the subscriber only does request(1), which simulates a really slow subscriber that never finishes handling the first message. I would suggest to call this.requestStream.disableAutoRequestWithInitial(0) so that the subscriber really can drive the flow of the stream. Then I can test this and see if it behaves properly.

I tried to figure out if these changes now support catchup-subscription streams, i.e. don't call onComplete when there are no more events. Is that implemented now, or do you want to do that separately?

@YoEight
Copy link
Contributor

YoEight commented Jan 23, 2025

The issue with this change is that it introduces a behavior-breaking change. It shifts the responsibility to the subscriber to drive the streaming process, meaning they will need to explicitly call request at least once to properly initiate streaming. One potential approach is to set a very small buffer size (e.g., 1 or 2) but either never process the messages or delay processing significantly. This should allow you to observe that the server stops sending messages.

I did update the subscription implementation so it now supports back-pressure as well.

@YoEight YoEight force-pushed the batchingreactivestream branch from 1b0df79 to 693a35e Compare January 23, 2025 04:36
@rickardoberg
Copy link
Contributor Author

In most cases where subscribers don't care about back pressure they will do an initial request of Long.MAX_VALUE. This will create blocking code as well, but in the subscriber code. Project Reactor Flux does this by default, for example. The API is literally defined as requiring subscriber to call request for anything to happen.

@YoEight
Copy link
Contributor

YoEight commented Jan 23, 2025

Why would calling request(Long.MAX_VALUE) create blocking code? It merely signals the server that it can send everything it has.

I’m not opposed to giving subscribers (in the reactive sense) more control over how the stream is consumed. As it stands, this patch doesn’t break any existing behavior.

Additionally, due to the rebranding, we’ll also be renaming the Java clients and other related components. I’d prefer to incorporate your suggestion as part of that effort. After working on back-pressure, I find myself increasingly inclined to transition everything toward a reactive approach.

@YoEight YoEight merged commit 12021e6 into kurrent-io:trunk Jan 23, 2025
36 of 39 checks passed
@rickardoberg
Copy link
Contributor Author

It goes back to how this PR started: if the subscriber cannot handle the events faster than ES can send them (the normal case if any write I/O is involved) then there will be one thread per subscriber waiting for the previous events to be handled. I.e. 1000 subscribers=1000 gRPC threads trying to push events. But instead of blocking in the loop in your client it is now blocking in a similar situation in the subscriber code. It's all the same. In our case the subscriber sends the events over a websocket to another server which projects them to a database read model, so in total the pipeline after ES client is muuuuch slower than ES can send events. Instead of having a thread blocked in the subscriber it is better to allow those threads to push events to subscribers that are ready to handle them and return immediately.

As for wanting to do everything reactive: welcome to the dark side! The full potential is only revealed once you go full reactive in all steps of the chain, like we have (ES->websocket->migration rules->projection->read model update subscribers). It is incredibly powerful and efficient.

@YoEight
Copy link
Contributor

YoEight commented Jan 23, 2025

It goes back to how this PR started: if the subscriber cannot handle the events faster than ES can send them (the normal case if any write I/O is involved) then there will be one thread per subscriber waiting for the previous events to be handled.
I.e. 1000 subscribers=1000 gRPC threads trying to push events. But instead of blocking in the loop in your client it is now blocking in a similar situation in the subscriber code. It's all the same.

I'm confused, where in the subscriber you see any blocking code?

In our case the subscriber sends the events over a websocket to another server which projects them to a database read model, so in total the pipeline after ES client is muuuuch slower than ES can send events. Instead of having a thread blocked in the subscriber it is better to allow those threads to push events to subscribers that are ready to handle them and return immediately.

When you say “blocking,” it seems you’re referring to the user code being the one blocking during the onNext function call. My concern is specifically about the client itself not engaging in any kind of blocking. If you’re performing an expensive operation when processing a message, I wouldn’t classify that as a blocking issue in this context.

As for the client-server interaction, the server will pause sending new messages if the buffer is full. Your code blocking while handling a message doesn’t impact the internal behavior of the client in any way.

As for wanting to do everything reactive: welcome to the dark side! The full potential is only revealed once you go full reactive in all steps of the chain, like we have (ES->websocket->migration rules->projection->read model update subscribers). It is incredibly powerful and efficient.

Just to clarify, I don’t think the reactive approach is inherently better, it’s simply what Java offers. Personally, I prefer how .NET or Rust implemented their gRPC libraries, where we don’t need to manually provide control flow updates to the server. Perhaps more recent versions of Java handle this differently, but our requirement to maintain Java 8 compatibility significantly limits our options.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants