Skip to content

client-api: Move websocket sender to its own tokio task #2906

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

kim
Copy link
Contributor

@kim kim commented Jun 26, 2025

Split the websocket stream into send and receive halves and spawns a
new tokio task to handle the sending. Also move message serialization +
compression to a blocking task if the message appears to be large.

This addresses two issues:

  1. The select! loop is not blocked on sending messages, and can thus
    react to auxiliary events. Namely, when a module exits, we want to
    terminate the connection as soon as possible in order to release any
    database handles.

  2. Large outgoing messages should not occupy tokio worker threads, in
    particular when there are a large number of clients receiving large
    intial updates.

Expected complexity level and risk

4 - The state transitions remain hard to follow.

Testing

  • Ran a stress test with many clients and large initial updates,
    and observed no hangs / delays (which I did before this patch).
    In reconnection scenarios, all clients where disconnected timely, but
    could reconnect almost immediately.

Split the websocket stream into send and receive halves and spawns a
new tokio task to handle the sending. Also move message serialization +
compression to a blocking task if the message appears to be large.

This addresses two issues:

1. The `select!` loop is not blocked on sending messages, and can thus
   react to auxiliary events. Namely, when a module exits, we want to
   terminate the connection as soon as possible in order to release any
   database handles.

2. Large outgoing messages should not occupy tokio worker threads, in
   particular when there are a large number of clients receiving large
   intial updates.
@kim kim requested review from Centril, gefjon and jsdt June 26, 2025 18:02
Copy link
Contributor

@gefjon gefjon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to figure out what's going on with the SerializeBuffer and fix it before merging, but otherwise this looks good to me.

kim added 2 commits June 27, 2025 10:07
Also close the messages queue after the close went through.
Accordingly, closed and exited are the same -- we can just drop incoming
messages when closed.
@kim
Copy link
Contributor Author

kim commented Jun 27, 2025

Updated to:

  • Reclaim the serialize buffer
  • Not send any more data after sending a Close frame (as mandated by the RFC)

I think that we should also clear the message queue and cancel outstanding execution futures in the latter case, but that can be left to a future change.

Copy link
Contributor

@jsdt jsdt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked through this for a while, and I'm still not very confident that I understand the error cases. I think we should do some bot testing with this to see what effect it has, but I think I'd like to try writing some tests for this, so we can trigger some of these tricky cases.

message: impl ToProtocol<Encoded = SwitchedServerMessage> + Send + 'static,
) -> (SerializeBuffer, Result<(), WsError>) {
let (workload, num_rows) = metrics_metadata.unzip();
let start_serialize = Instant::now();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to time serialization inside the blocking task, since the time spent switching to a blocking thread can be significant (especially if we use up all of our blocking threads).

// as serialization and compression can take a long time.
// The threshold of 1024 rows is arbitrary, and may need to be refined.
let (msg_alloc, msg_data) = if num_rows.is_some_and(|n| n > 1024) {
asyncify(move || serialize(serialize_buf, message, config)).await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a blocking task here feels risky. I'd like to remove blocking tasks generally, and I think this would be the first place where the number of blocking tasks isn't tied to the number of requests per second.

What do you think about starting by adding the timing metric, and maybe a warning log message any time that serialization takes longer than some threshold?

An alternative would be sending these to rayon instead of blocking threads, which would limit the number of threads working on serialization, and put the work on pinned cores.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The time this takes can be >1sec for pathological cases, and only after measuring that I added the asyncify.

Using rayon instead seems fine.

log::warn!("error sending ping: {e:#}");
}
// If the sender is already gone,
// we'll time out the connection eventually.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not break from the loop here instead of waiting for a timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unordered_tx.send fails if either the connection is bad or we already sent a close frame ourselves. Without more rework, I can't distinguish those. In the latter case we need to keep polling the recv end until the other end responds with a close.

.expect("should have a unique referent to `msg_alloc`");

// Ignoring send errors is apparently fine in this case.
let _ = unordered_tx.send(err.into());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question here. It feels like we should always break if this fails.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This I don't know. It was like this before.

enum UnorderedWsMessage {
Close(CloseFrame),
Ping(Bytes),
Error(MessageExecutionError),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this includes error like a reducer failing, then we do want it to be order with subscription updates, since the the reason for the reducer could be data-dependent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not an error result of a reducer call (this will appear on the MeteredReceiver), but an error calling the reducer in the first place. For example, if the reducer does not exist or the arguments are wrong. Iow, the reducer wasn't actually called.

I'm less sure about the other message types, e.g. subscribe commands. But I'd like to point out that it was not ordered in the code before this patch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants