-
Notifications
You must be signed in to change notification settings - Fork 577
client-api: Move websocket sender to its own tokio task #2906
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Split the websocket stream into send and receive halves and spawns a new tokio task to handle the sending. Also move message serialization + compression to a blocking task if the message appears to be large. This addresses two issues: 1. The `select!` loop is not blocked on sending messages, and can thus react to auxiliary events. Namely, when a module exits, we want to terminate the connection as soon as possible in order to release any database handles. 2. Large outgoing messages should not occupy tokio worker threads, in particular when there are a large number of clients receiving large intial updates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to figure out what's going on with the SerializeBuffer
and fix it before merging, but otherwise this looks good to me.
Also close the messages queue after the close went through. Accordingly, closed and exited are the same -- we can just drop incoming messages when closed.
Updated to:
I think that we should also clear the message queue and cancel outstanding execution futures in the latter case, but that can be left to a future change. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked through this for a while, and I'm still not very confident that I understand the error cases. I think we should do some bot testing with this to see what effect it has, but I think I'd like to try writing some tests for this, so we can trigger some of these tricky cases.
message: impl ToProtocol<Encoded = SwitchedServerMessage> + Send + 'static, | ||
) -> (SerializeBuffer, Result<(), WsError>) { | ||
let (workload, num_rows) = metrics_metadata.unzip(); | ||
let start_serialize = Instant::now(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to time serialization inside the blocking task, since the time spent switching to a blocking thread can be significant (especially if we use up all of our blocking threads).
// as serialization and compression can take a long time. | ||
// The threshold of 1024 rows is arbitrary, and may need to be refined. | ||
let (msg_alloc, msg_data) = if num_rows.is_some_and(|n| n > 1024) { | ||
asyncify(move || serialize(serialize_buf, message, config)).await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a blocking task here feels risky. I'd like to remove blocking tasks generally, and I think this would be the first place where the number of blocking tasks isn't tied to the number of requests per second.
What do you think about starting by adding the timing metric, and maybe a warning log message any time that serialization takes longer than some threshold?
An alternative would be sending these to rayon instead of blocking threads, which would limit the number of threads working on serialization, and put the work on pinned cores.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The time this takes can be >1sec for pathological cases, and only after measuring that I added the asyncify
.
Using rayon instead seems fine.
log::warn!("error sending ping: {e:#}"); | ||
} | ||
// If the sender is already gone, | ||
// we'll time out the connection eventually. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not break from the loop here instead of waiting for a timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unordered_tx.send
fails if either the connection is bad or we already sent a close frame ourselves. Without more rework, I can't distinguish those. In the latter case we need to keep polling the recv end until the other end responds with a close.
.expect("should have a unique referent to `msg_alloc`"); | ||
|
||
// Ignoring send errors is apparently fine in this case. | ||
let _ = unordered_tx.send(err.into()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar question here. It feels like we should always break if this fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This I don't know. It was like this before.
enum UnorderedWsMessage { | ||
Close(CloseFrame), | ||
Ping(Bytes), | ||
Error(MessageExecutionError), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this includes error like a reducer failing, then we do want it to be order with subscription updates, since the the reason for the reducer could be data-dependent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not an error result of a reducer call (this will appear on the MeteredReceiver
), but an error calling the reducer in the first place. For example, if the reducer does not exist or the arguments are wrong. Iow, the reducer wasn't actually called.
I'm less sure about the other message types, e.g. subscribe commands. But I'd like to point out that it was not ordered in the code before this patch.
Split the websocket stream into send and receive halves and spawns a
new tokio task to handle the sending. Also move message serialization +
compression to a blocking task if the message appears to be large.
This addresses two issues:
The
select!
loop is not blocked on sending messages, and can thusreact to auxiliary events. Namely, when a module exits, we want to
terminate the connection as soon as possible in order to release any
database handles.
Large outgoing messages should not occupy tokio worker threads, in
particular when there are a large number of clients receiving large
intial updates.
Expected complexity level and risk
4 - The state transitions remain hard to follow.
Testing
and observed no hangs / delays (which I did before this patch).
In reconnection scenarios, all clients where disconnected timely, but
could reconnect almost immediately.