Conversation
Centril
left a comment
There was a problem hiding this comment.
My main comment here is that I think Client/ServerFrame don't actually need to exist and could just be Bytes which would also be more efficient and avoid some temporary allocations introduced here.
| let mut in_use_bufs: Vec<ScopeGuard<InUseSerializeBuffer, _>> = Vec::with_capacity(BUF_POOL_CAPACITY); | ||
|
|
||
| while let Some(message) = messages.recv().await { | ||
| 'send: while let Some(message) = messages.recv().await { |
There was a problem hiding this comment.
Is this just for added readability? (I don't mind it.)
| /// Reports encode metrics for an already-encoded message and forwards all of | ||
| /// its frames to the websocket send task. | ||
| fn ws_forward_frames<I>( | ||
| metrics: &SendMetrics, | ||
| outgoing_frames: &mpsc::UnboundedSender<Frame>, | ||
| workload: Option<WorkloadType>, | ||
| num_rows: Option<usize>, | ||
| encoded: (EncodeMetrics, InUseSerializeBuffer, I), | ||
| ) -> Result<InUseSerializeBuffer, mpsc::error::SendError<Frame>> | ||
| where | ||
| I: Iterator<Item = Frame>, | ||
| { |
There was a problem hiding this comment.
| /// Reports encode metrics for an already-encoded message and forwards all of | |
| /// its frames to the websocket send task. | |
| fn ws_forward_frames<I>( | |
| metrics: &SendMetrics, | |
| outgoing_frames: &mpsc::UnboundedSender<Frame>, | |
| workload: Option<WorkloadType>, | |
| num_rows: Option<usize>, | |
| encoded: (EncodeMetrics, InUseSerializeBuffer, I), | |
| ) -> Result<InUseSerializeBuffer, mpsc::error::SendError<Frame>> | |
| where | |
| I: Iterator<Item = Frame>, | |
| { | |
| /// Reports encode metrics for an already-encoded message | |
| /// and forwards all of its frames to the websocket send task. | |
| fn ws_forward_frames( | |
| metrics: &SendMetrics, | |
| outgoing_frames: &mpsc::UnboundedSender<Frame>, | |
| workload: Option<WorkloadType>, | |
| num_rows: Option<usize>, | |
| encoded: (EncodeMetrics, InUseSerializeBuffer, impl Iterator<Item = Frame>), | |
| ) -> Result<InUseSerializeBuffer, mpsc::error::SendError<Frame>> { |
(not so important, just a nit)
| } | ||
| } | ||
|
|
||
| fn finalize_binary_serialize_buffer( |
There was a problem hiding this comment.
| fn finalize_binary_serialize_buffer( | |
| /// Finalizes a binary-encoded message from the server | |
| /// stored in `buffer` by potentially compressing the `buffer` | |
| /// depending on `uncompressed_len` and `compression`. | |
| fn finalize_binary_serialize_buffer( |
| // At this point, we no longer have a use for `msg`, | ||
| // so try to reclaim its buffers. | ||
| msg.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer)); | ||
|
|
||
| let frame = ws_v3::ServerFrame::Single(inner.freeze()); | ||
| let srv_msg = buffer.write_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_NONE, |w| { | ||
| bsatn::to_writer(w.into_inner(), &frame).expect("should be able to bsatn encode v3 server frame"); | ||
| }); | ||
| let srv_msg_len = srv_msg.len(); |
There was a problem hiding this comment.
| // At this point, we no longer have a use for `msg`, | |
| // so try to reclaim its buffers. | |
| msg.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer)); | |
| let frame = ws_v3::ServerFrame::Single(inner.freeze()); | |
| let srv_msg = buffer.write_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_NONE, |w| { | |
| bsatn::to_writer(w.into_inner(), &frame).expect("should be able to bsatn encode v3 server frame"); | |
| }); | |
| let srv_msg_len = srv_msg.len(); | |
| let frame = ws_v3::ServerFrame::Single(inner.freeze()); | |
| let srv_msg = buffer.write_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_NONE, |w| { | |
| bsatn::to_writer(w.into_inner(), &frame).expect("should be able to bsatn encode v3 server frame"); | |
| }); | |
| let srv_msg_len = srv_msg.len(); | |
| // At this point, we no longer have a use for `msg` and `frame`, | |
| // so try to reclaim their buffers. | |
| msg.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer)); | |
| frame.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer)); |
We should reuse frame's buffers as well.
| msg: ws_v2::ServerMessage, | ||
| compression: ws_v1::Compression, | ||
| ) -> (InUseSerializeBuffer, Bytes) { | ||
| let mut inner = BytesMut::with_capacity(SERIALIZE_BUFFER_INIT_CAP); |
There was a problem hiding this comment.
Let's extract out from BsatnRowListBuilderPool::take_row_list_builder a method fn take_buffer(&self) -> BytesMut which you can then use here.
| let mut inner = BytesMut::with_capacity(SERIALIZE_BUFFER_INIT_CAP); | |
| let mut inner = bsatn_rlb_pool.take_buffer(); |
| pub enum ServerFrame { | ||
| /// A single logical server message. | ||
| Single(Bytes), | ||
| /// Multiple logical server messages that should be processed in-order. |
There was a problem hiding this comment.
| /// Multiple logical server messages that should be processed in-order. | |
| /// Multiple logical server messages that should be processed in-order. | |
| /// | |
| /// This is currently never produced. |
| /// The inner bytes are BSATN-encoded v2 [`crate::websocket::v2::ClientMessage`] values. | ||
| #[derive(SpacetimeType, Debug)] | ||
| #[sats(crate = spacetimedb_lib)] | ||
| pub enum ClientFrame { |
There was a problem hiding this comment.
Hmm; both Single and Batch could actually be stored as just Bytes with the logic that the host will try to deserialize as many ClientMessages as it can until the read buffer has been exhausted.
There was a problem hiding this comment.
Alternatively, we could go for a more structured / typed representation here, with Box<[super::v2::ClientMessage]>. This would still include some of the overhead/bookkeeping bytes and allocations which Mazdak's suggestion eliminates, but it would add type safety and eliminate a lot of ser/de boilerplate code. That would look like:
enum ClientFrame {
Single(super::v2::ClientMessage),
Batch(Box<[super::v2::ClientMessage]>),
}And, you know, doing the same thing to ServerMessage.
There was a problem hiding this comment.
Yeah if we go for the tagged representation then a typed one seems better. However, given that the goal of this PR is perf, it seems to me that we should go for the representation with the least overhead. (This PR actually is a regression for the single case, but with my suggestion, the added overhead is a single branch rather than a full extra allocation / taking from the pool + memcpy.)
| /// The inner bytes are BSATN-encoded v2 [`crate::websocket::v2::ServerMessage`] values. | ||
| #[derive(SpacetimeType, Debug)] | ||
| #[sats(crate = spacetimedb_lib)] | ||
| pub enum ServerFrame { |
There was a problem hiding this comment.
ServerFrame could also just be Bytes.
| let frame = match message { | ||
| DataMessage::Binary(message_buf) => bsatn::from_slice::<ws_v3::ClientFrame>(&message_buf)?, | ||
| DataMessage::Text(_) => { | ||
| return Err(MessageHandleError::TextDecode(serde_json::Error::custom( | ||
| "v3 websocket does not support text messages", | ||
| ))) | ||
| } | ||
| }; | ||
|
|
||
| match frame { | ||
| ws_v3::ClientFrame::Single(message) => { | ||
| let message = bsatn::from_slice::<ws_v2::ClientMessage>(&message)?; | ||
| super::message_handlers_v2::handle_decoded_message(client, message, timer).await?; | ||
| } | ||
| ws_v3::ClientFrame::Batch(messages) => { | ||
| for message in messages { | ||
| let message = bsatn::from_slice::<ws_v2::ClientMessage>(&message)?; | ||
| super::message_handlers_v2::handle_decoded_message(client, message, timer).await?; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
If ClientFrame is just message: Bytes (directly from DataMessage), the V2 and V3 code for handle could be the same and we wouldn't need the temporary allocation.
| /// | ||
| /// This mirrors the v2 framing by prepending the compression tag and applying | ||
| /// conditional compression when configured. | ||
| pub fn serialize_v3( |
There was a problem hiding this comment.
The comments I made here re. take_buffer and consume_each_list become moot though if ServerFrame becomes just Bytes. Then serialize_v3 == serialize_v2 and the only difference becomes on the client sides. We'll still need the v3 protocol so that newer clients know that they should deserialize multiple ServerMessages.
| /// The inner bytes are BSATN-encoded v2 [`crate::websocket::v2::ClientMessage`] values. | ||
| #[derive(SpacetimeType, Debug)] | ||
| #[sats(crate = spacetimedb_lib)] | ||
| pub enum ClientFrame { |
There was a problem hiding this comment.
Alternatively, we could go for a more structured / typed representation here, with Box<[super::v2::ClientMessage]>. This would still include some of the overhead/bookkeeping bytes and allocations which Mazdak's suggestion eliminates, but it would add type safety and eliminate a lot of ser/de boilerplate code. That would look like:
enum ClientFrame {
Single(super::v2::ClientMessage),
Batch(Box<[super::v2::ClientMessage]>),
}And, you know, doing the same thing to ServerMessage.
| pub mod common; | ||
| pub mod v1; | ||
| pub mod v2; | ||
| pub mod v3; |
There was a problem hiding this comment.
Why do you prefer adding a new version with a wrapper, rather than adding new message variants to v2?
| pub enum ServerFrame { | ||
| /// A single logical server message. | ||
| Single(Bytes), | ||
| /// Multiple logical server messages that should be processed in-order. |
There was a problem hiding this comment.
What are the intended semantics for ServerFrame::Batch? Will the server always respond ServerFrame::Single to ClientFrame::Single and ServerFrame::Batch to ClientFrame::Batch, or is the server free to re-group messages so long as order is preserved?
There was a problem hiding this comment.
I was also wondering this xD
Description of Changes
The
v3WebSocket API adds a thin transport layer around the existingv2message schema so that multiple logicalClientMessages can be sent in a single WebSocket frame.The motivation is throughput. In
v2, each logical client message requires its own WebSocket frame, which adds per-frame overhead in the client runtime, server framing/compression path, and network stack. High-throughput clients naturally issue bursts of requests, and batching those requests into a single frame materially reduces that overhead while preserving the existing logical message model.v3keeps thev2message schema intact and treats batching as a transport concern rather than a semantic protocol change. This lets the server support both protocols cleanly:v2remains unchanged for existing clientsv3allows new clients to batch logical messages without changing reducer/procedure semanticsOn the server side, this PR adds:
v3.bsatn.spacetimedbprotocol supportClientFrame/ServerFrametransport envelopesv2logical messagesAPI and ABI breaking changes
None.
v2clients continue to work unchanged.Expected complexity level and risk
2
Testing
Testing will be included in the patches that update the sdk bindings