Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Client request pipelining #1068

Closed
schreter opened this issue Mar 19, 2024 · 9 comments · Fixed by #1092
Closed

RFC: Client request pipelining #1068

schreter opened this issue Mar 19, 2024 · 9 comments · Fixed by #1092

Comments

@schreter
Copy link
Collaborator

schreter commented Mar 19, 2024

Currently, client_write() sends the request to openraft via mpsc channel, which spools (potentially many) requests. The openraft core works on batches of requests and produces batches of responses. However, on the client_write() side, each task has to explicitly await the client_write().

To improve things, one could think about multiple solutions.

What immediately comes to mind is to use another mpsc channel, where the consumer of replies can consume them at its own pace (also pipelined). However, this would probably require major rework and I don't know whether we can integrate it with existing API. So I don't think this is the way to go.

What we already have, is a semi-abstraction of reply handling - sending the reply via oneshot channel. So my suggestion would be the following:

Instead of hard-coding a oneshot channel to send the reply back, the RaftMsg::ClientWriteRequest would require a new type ReplyConsumerType: ReplyConsumer or so on the config for tx.

The ReplyConsumer would be something like this:

trait ReplyConsumer<D, R> {
    fn from_request(request: D) -> (D, Self);
    fn request_completed(self, R);
}

so it can be built from AppData. The only additional change in openraft would be to use tx.request_completed() instead of tx.send() to send a reply to ClientWriteRequest.

The default implementation of ReplyConsumer would internally create and use oneshot channel and implement a Future (basically, wrapping the request as is done today). This would require zero change to the user code (beyond specifying the default type for ReplyConsumer).

If the user uses the default implementation for ReplyConsumerType, then client_write() would be simply enabled by where ReplyConsumerType: Future. There would be no change to the client. Of course, the user is also free to create a different implementation which implements Future, so client_write() can be used also with custom implementation.

A second, synchronous API client_write_ff() (or so, ff for fire-and-forget) would not require implementing Future, instead it would rely on the implementation of ReplyConsumer doing the "right thing" to send the reply for further pipelined processing (e.g., via posting to a user-specific mpsc channel spooling replies).

With this fairly small change, we could enable fully-pipelined processing also in the caller of client_write() w/o resorting to a workaround with task-per-request.

Thoughts?

Copy link

👋 Thanks for opening this issue!

Get help or engage by:

  • /help : to print help messages.
  • /assignme : to assign this issue to you.

@drmingdrmer
Copy link
Member

If the user uses the default implementation for ReplyConsumerType,

I have a general understanding of your concept.
It seems that ReplyConsumer is an internal component of Openraft.
Could you explain whether and how a user can decide to utilize the default implementation of ReplyConsumerType?

Additionally, would you be able to provide the method signatures for the revised client_write() and client_write_ff() functions?

@schreter
Copy link
Collaborator Author

No, ReplyConsumer is a public trait, which can be implemented by the user as needed (for example, we'd push replies into reorder buffer reordering them with read-only requests on the same client connection). But, to ease the implementation, I'm suggesting to provide something like DefaultReplyConsumer which implements ReplyConsumer and Future and internally uses oneshot channel. The type config would specify which reply consumer to use.

As for the signature, it would be:

    pub async fn client_write(
        &self,
        app_data: C::D,
    ) -> Result<ClientWriteResponse<C>, RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>>
    where C::ReplyConsumer: Future<Output = Result<
        ClientWriteResponse<C>,
        RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>
    >>;

    pub fn client_write_ff(&self, app_data: C::D);

The only suboptimal thing is that you can call client_write_ff also with the default reply consumer. OTOH, it would do exactly what it says - fire and forget. The response and any potential error would be lost, so probably not a big deal.

@drmingdrmer
Copy link
Member

  • I see that Openraft calls ReplyConsumer::request_completed() to send back the response. But why does it needs a D to create a ReplyConsumer with ReplyConsumer::from_request(D)?

  • From the method signature I do not see how a user get an instance of ReplyConsumer, then who will poll the Future of ReplyConsumer?

@schreter
Copy link
Collaborator Author

But why does it needs a D to create a ReplyConsumer with ReplyConsumer::from_request(D)?

The idea is to pass additional information "hidden" in D, which allows us to create the consumer. For example, the queue where to put the reply. Otherwise the reply consumer be stateless, which is not what we want. Alternatively, one could pass reference to SM or similar, where it could be hidden as well. But, some form of context for the reply consumer is needed.

From the method signature I do not see how a user get an instance of ReplyConsumer, then who will poll the Future of ReplyConsumer?

The user won't get the instance. The Future is polled within client_write(), same as today with oneshot::Receiver and client_write() would still return C::R. I.e., in the default case it will produce exactly the same binary code as we have today.

In the other case where the user implements ReplyConsumer, say, to post responses to a queue, the ReplyConsumer::request_completed() will be called with the response and then the user is free to do with the reply whatever he pleases (in this case, post the response to some queue, which was noted down when creating the ReplyConsumer based on the context).

@drmingdrmer
Copy link
Member

Make sense.

The abstraction for queuing replies is an elegant design. 👍

@drmingdrmer
Copy link
Member

@schreter
Isn't there an issue with the Future implementation for ReplyConsumer?

The instance of ReplyConsuemr will be sent to RaftCore so that RaftCore send back the reply via it.
Thus the Future should not be implemented for ReplyConsumer but instead, it should be implemented for another instance like a receiver.

The ReplyConsumer API would be like the following: from_request builds a ReplyConsumer and a receiver Future.

pub type AppDataResult<C> = Result<(LogIdOf<C>, ROf<C>), ForwardToLeader<C>>;

#[derive(Debug, Clone, thiserror::Error)]
#[error("The response consumer has been closed.")]
pub struct Closed;

pub trait ReplyConsumer<C>: OptionalSend + 'static
where C: RaftTypeConfig
{
    fn from_request(
        app_data: C::D,
    ) -> (
        C::D,
        Self,
        Option<impl Future<Output = Result<AppDataResult<C>, Closed>>>,
    )
    where Self: Sized;

    // ...
}

Make sence?

@schreter
Copy link
Collaborator Author

schreter commented Apr 3, 2024

The ReplyConsumer API would be like the following: from_request builds a ReplyConsumer and a receiver Future.

You are right, we need two objects, one that we send to consume the reply (e.g., send it over oneshot channel) and one to await in/return back to the caller. That also helps with defining the "other" API:

pub trait ReplyConsumer<C>: OptionalSend + 'static
where C: RaftTypeConfig
{
    /// The type generated for the send side, which may be a `Future`.
    /// (the name is preliminary, it's suboptimal)
    type SendResult;

    fn from_request(
        app_data: C::D,
    ) -> (
        C::D,
        Self,
        Self::SendResult,
    )
    where Self: Sized;

    // ...
}

impl Raft {
    // ...

    pub async fn client_write(
        &self,
        app_data: C::D,
    ) -> Result<ClientWriteResponse<C>, RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>>
    where C::ReplyConsumer::SendResult: Future<Output = Result<
        ClientWriteResponse<C>,
        RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>
    >>;

    pub fn client_write_ff(&self, app_data: C::D) -> C::ReplyConsumer::SendResult;

    // ...
}

I.e., the client_write() version with Future result would just await the future, but even if you'd use client_write_ff() with Future-based SendResult, it would be well-formed. You could await the future yourself somewhere else (e.g., send multiple requests and then join! them and the like).

Then, client_write() would be just a thin wrapper:

    pub async fn client_write(
        &self,
        app_data: C::D,
    ) -> Result<ClientWriteResponse<C>, RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>>
    where C::ReplyConsumer::SendResult: Future<Output = Result<
        ClientWriteResponse<C>,
        RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>
    >> {
        self.client_write_ff(app_data).await
    }

@drmingdrmer
Copy link
Member

Make sense

drmingdrmer added a commit to drmingdrmer/openraft that referenced this issue Apr 7, 2024
…t write response

This commit introduces the `Responder` trait that defines the mechanism
by which `RaftCore` sends responses back to the client after processing
write requests.  Applications can now customize response handling by
implementing their own version of the `RaftTypeConfig::Responder` trait.

The `Responder::from_app_data(RaftTypeConfig::D)` method is invoked to
create a new `Responder` instance when a client write request is
received.
Once the write operation is completed within `RaftCore`,
`Responder::send(WriteResult)` is called to dispatch the result
back to the client.

By default, `RaftTypeConfig::Responder` retains the existing
functionality using a oneshot channel, ensuring backward compatibility.

This change is non-breaking, requiring no modifications to existing
applications.

- Fix: databendlabs#1068
drmingdrmer added a commit to drmingdrmer/openraft that referenced this issue Apr 7, 2024
…t write response

This commit introduces the `Responder` trait that defines the mechanism
by which `RaftCore` sends responses back to the client after processing
write requests.  Applications can now customize response handling by
implementing their own version of the `RaftTypeConfig::Responder` trait.

The `Responder::from_app_data(RaftTypeConfig::D)` method is invoked to
create a new `Responder` instance when a client write request is
received.
Once the write operation is completed within `RaftCore`,
`Responder::send(WriteResult)` is called to dispatch the result
back to the client.

By default, `RaftTypeConfig::Responder` retains the existing
functionality using a oneshot channel, ensuring backward compatibility.

This change is non-breaking, requiring no modifications to existing
applications.

- Fix: databendlabs#1068
drmingdrmer added a commit to drmingdrmer/openraft that referenced this issue Apr 8, 2024
…t write response

This commit introduces the `Responder` trait that defines the mechanism
by which `RaftCore` sends responses back to the client after processing
write requests.  Applications can now customize response handling by
implementing their own version of the `RaftTypeConfig::Responder` trait.

The `Responder::from_app_data(RaftTypeConfig::D)` method is invoked to
create a new `Responder` instance when a client write request is
received.
Once the write operation is completed within `RaftCore`,
`Responder::send(WriteResult)` is called to dispatch the result
back to the client.

By default, `RaftTypeConfig::Responder` retains the existing
functionality using a oneshot channel, ensuring backward compatibility.

This change is non-breaking, requiring no modifications to existing
applications.

- Fix: databendlabs#1068
drmingdrmer added a commit to drmingdrmer/openraft that referenced this issue Apr 8, 2024
…t write response

This commit introduces the `Responder` trait that defines the mechanism
by which `RaftCore` sends responses back to the client after processing
write requests.  Applications can now customize response handling by
implementing their own version of the `RaftTypeConfig::Responder` trait.

The `Responder::from_app_data(RaftTypeConfig::D)` method is invoked to
create a new `Responder` instance when a client write request is
received.
Once the write operation is completed within `RaftCore`,
`Responder::send(WriteResult)` is called to dispatch the result
back to the client.

By default, `RaftTypeConfig::Responder` retains the existing
functionality using a oneshot channel, ensuring backward compatibility.

This change is non-breaking, requiring no modifications to existing
applications.

- Fix: databendlabs#1068
drmingdrmer added a commit to drmingdrmer/openraft that referenced this issue Apr 8, 2024
…t write response

This commit introduces the `Responder` trait that defines the mechanism
by which `RaftCore` sends responses back to the client after processing
write requests.  Applications can now customize response handling by
implementing their own version of the `RaftTypeConfig::Responder` trait.

The `Responder::from_app_data(RaftTypeConfig::D)` method is invoked to
create a new `Responder` instance when a client write request is
received.
Once the write operation is completed within `RaftCore`,
`Responder::send(WriteResult)` is called to dispatch the result
back to the client.

By default, `RaftTypeConfig::Responder` retains the existing
functionality using a oneshot channel, ensuring backward compatibility.

This change is non-breaking, requiring no modifications to existing
applications.

- Fix: databendlabs#1068
drmingdrmer added a commit that referenced this issue Apr 8, 2024
…t write response

This commit introduces the `Responder` trait that defines the mechanism
by which `RaftCore` sends responses back to the client after processing
write requests.  Applications can now customize response handling by
implementing their own version of the `RaftTypeConfig::Responder` trait.

The `Responder::from_app_data(RaftTypeConfig::D)` method is invoked to
create a new `Responder` instance when a client write request is
received.
Once the write operation is completed within `RaftCore`,
`Responder::send(WriteResult)` is called to dispatch the result
back to the client.

By default, `RaftTypeConfig::Responder` retains the existing
functionality using a oneshot channel, ensuring backward compatibility.

This change is non-breaking, requiring no modifications to existing
applications.

- Fix: #1068
drmingdrmer added a commit to drmingdrmer/openraft that referenced this issue Apr 8, 2024
…t write response

This commit introduces the `Responder` trait that defines the mechanism
by which `RaftCore` sends responses back to the client after processing
write requests.  Applications can now customize response handling by
implementing their own version of the `RaftTypeConfig::Responder` trait.

The `Responder::from_app_data(RaftTypeConfig::D)` method is invoked to
create a new `Responder` instance when a client write request is
received.
Once the write operation is completed within `RaftCore`,
`Responder::send(WriteResult)` is called to dispatch the result
back to the client.

By default, `RaftTypeConfig::Responder` retains the existing
functionality using a oneshot channel, ensuring backward compatibility.

This change is non-breaking, requiring no modifications to existing
applications.

- Fix: databendlabs#1068
drmingdrmer added a commit to drmingdrmer/openraft that referenced this issue Apr 8, 2024
…t write response

This commit introduces the `Responder` trait that defines the mechanism
by which `RaftCore` sends responses back to the client after processing
write requests.  Applications can now customize response handling by
implementing their own version of the `RaftTypeConfig::Responder` trait.

The `Responder::from_app_data(RaftTypeConfig::D)` method is invoked to
create a new `Responder` instance when a client write request is
received.
Once the write operation is completed within `RaftCore`,
`Responder::send(WriteResult)` is called to dispatch the result
back to the client.

By default, `RaftTypeConfig::Responder` retains the existing
functionality using a oneshot channel, ensuring backward compatibility.

This change is non-breaking, requiring no modifications to existing
applications.

- Fix: databendlabs#1068
drmingdrmer added a commit to drmingdrmer/openraft that referenced this issue Apr 8, 2024
`Raft<C>::client_write_ff() -> C::Responder::Receiver` submit a client
request to Raft to update the state machine, returns an application
defined response receiver `Responder::Receiver` to receive the response.

`_ff` means fire and forget.

It is same as [`Raft::client_write`] but does not wait for the response.
When using this method, it is the application's responsibility for
defining mechanism building and consuming the `Responder::Receiver`.

- Part of databendlabs#1068
drmingdrmer added a commit that referenced this issue Apr 8, 2024
`Raft<C>::client_write_ff() -> C::Responder::Receiver` submit a client
request to Raft to update the state machine, returns an application
defined response receiver `Responder::Receiver` to receive the response.

`_ff` means fire and forget.

It is same as [`Raft::client_write`] but does not wait for the response.
When using this method, it is the application's responsibility for
defining mechanism building and consuming the `Responder::Receiver`.

- Part of #1068
drmingdrmer added a commit to drmingdrmer/openraft that referenced this issue Apr 9, 2024
`Raft<C>::client_write_ff() -> C::Responder::Receiver` submit a client
request to Raft to update the state machine, returns an application
defined response receiver `Responder::Receiver` to receive the response.

`_ff` means fire and forget.

It is same as [`Raft::client_write`] but does not wait for the response.
When using this method, it is the application's responsibility for
defining mechanism building and consuming the `Responder::Receiver`.

- Part of databendlabs#1068
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants