Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC LibraChannel Data Structure #1462

Closed
wants to merge 1 commit into from
Closed

RFC LibraChannel Data Structure #1462

wants to merge 1 commit into from

Conversation

ankushagarwal
Copy link
Contributor

@ankushagarwal ankushagarwal commented Oct 22, 2019

Summary

This PR is intended to foster discussion for issue #1323.

  • We introduce a new mpsc data structure called LibraChannel which has two ends libra_channel::Receiver, libra_channel::Sender similar to existing mpsc data structures.
  • This data structure is parametrized on
    • The max number of messages to store per validator
    • The per-validator queue style: LIFO, FIFO
    • A generic message type T which implements a ValidatorMessage trait
  • The libra_channel::Sender<T> implements a fn put(&mut self, data: T) method which inserts the message into the internal data structure. This is a non-blocking synchronous call
  • The libra_channel::Receiver<T> implements the Stream trait so that the consumer of this receiver can easily use this inside a select! block
  • Added an example on how to use libra_channel in libra_channel_example.rs

Notes:

  • The validator fairness is not implemented properly right now. Right now all the messages are stored in a struct called PerValidatorQueue. Validator fairness can easily be implemented by improving the algorithms in add_message and get_message methods of PerValidatorQueue
  • Since we are implementing our own poll_next(), it gives us complete control of the order in which we want the consumer of this channel (FIFO, LIFO, etc)
  • We can create one libra_channel per message type and the consumer can do a select! over these as illustrated in the fn recv_loop


/// The sending end of LibraChannel
#[derive(Clone)]
pub struct LibraChannelSender<T: ValidatorMessage> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the crate is named "libra-channel" you don't need to prefix every type in this crate with "LibraChannel". I would recommend dropping the prefix throughout the crate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@ankushagarwal ankushagarwal marked this pull request as ready for review October 22, 2019 18:45
Copy link
Contributor

@dmitri-perelman dmitri-perelman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to be awesome! Some comments inline.

pub trait ValidatorMessage {
/// Extract the Validator from which this message arrived. This
/// will be used for ensuring fairness among validators.
fn get_validator(&self) -> &str;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I don't think we want a string here.
Today we're using a struct AccountAddress for peer ids. So we could either use a template parameter, which implements Eq, Hash (in this case the channel is going to provide fairness based on an abstract Key), or a specific struct that we can use for validators. @phlip9, what's the ultimate struct in libra types for the validator? Is it AccountAddress?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is AccountAddress

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use AccountAddress. Let me know if you want me to update it to a generic parameter.

fn get_validator(&self) -> &str;
}

/// Type is an enum which can be used as a configuration option for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could you extend the comments and explain the policy for treating the messages when the queue is full depending on the policy? (drop the oldest message in the queue if it's LIFO / drop the new message if it's FIFO).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

impl<T: ValidatorMessage> PerValidatorQueue<T> {
fn new(queue_type: Type, max_queue_size: usize) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a TODO for GC of old validator entries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave this some thought and I was not completely convinced that a GC mechanism is needed. The event processor should consume the queues for validators and if a validator dies, we will just have a couple of empty queues in a HashMap, which I think is fine.

Maybe GC makes sense when the validators are highly dynamic and new validators keep coming and old validators keep dying.

for q in self.per_validator_queue.values_mut() {
if !q.is_empty() {
return match self.queue_type {
Type::FIFO => q.pop_back(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be a reverse. The insert is using push_back(), so LIFO is pop_back() and FIFO is pop_front(), isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. Fixed.

}
}

fn add_message(&mut self, message: T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could you please add a high-level comment describing the the overall structure and the semantics? (just a couple of lines)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

}
if let Some(q) = self.per_validator_queue.get_mut(validator) {
q.push_back(message);
if q.len() == self.max_queue_size + 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imho a more explicit approach would be to have VecDeque of size max_queue_size, and then upon insert:

if q.len() == self.max_queue_size {
  match self.queue_type {
    // drop the new message in FIFO ordering
    Type::FIFO => (),
    // drop the oldest message in LIFO ordering
    Type::LIFO => { q.pop_front(); q.push_back(message); },  
  }
} else {
  q.push_back(message);
}

Obviously, it's a matter of taste, just saying :).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one makes more sense. Updated.

}

impl<T: ValidatorMessage> Sender<T> {
/// TODO: We can have this return the old data if the new data is replacing some existing data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, although it'd then depend on FIFO / LIFO ordering (in FIFO ordering the returned value would be the message itself?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. Thinking some more about it, the actual message would not be of any use if we using it for back-pressure. Only a boolean value is enough to indicate that the queue is full.

/// This puts the data into the internal data structure. This is a non-blocking synchronous call
pub fn put(&mut self, data: T) {
let mut shared_state = self.shared_state.lock().unwrap();
// TODO: Enforce maximum size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this comment is unnecessary, the data structure is taking care of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}

impl<T: ValidatorMessage> FusedStream for Receiver<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate on this one a little bit more? Why is it needed and why is it always false? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this. The FusedStream is a trait extension for the Stream trait.

Stream trait's specification has undefined behaviour when calling poll_next() after a Stream has ended (by sending a Poll::Ready(None) in its last response).

FusedStream removes this undefined behaviour by providing a is_terminated function to allow a caller (The select!() macro in this case) to decide whether to call poll_next() on the underlying Stream.

is_terminated may also return true if a Stream has become inactive and can no longer make progress and should be ignored or dropped rather than being polled again.

In the case of LibraChannel, I don't see this channel ending anytime, so I have set is_terminated to false.

If we want to provide a provision to close the channel from the sender side, we can do so by providing a close_channel() method on the Sender struct which sets a boolean value in the SharedState struct, but I don't see a case for that now.

@ankushagarwal
Copy link
Contributor Author

cc @bothra90

@dmitri-perelman
Copy link
Contributor

Hey I'm sorry for saying that: could we move the "round-robin" logic to a separate PR? This PR is already loaded with important business logic. I'd love to land the first piece with the async stuff and the basics of the per-validator queue and discuss the round robin in a separate PR / commit.

@ankushagarwal
Copy link
Contributor Author

ankushagarwal commented Oct 23, 2019

Happy to do that. I plan to close this PR and open two separate PRs for the libra channel and the round-robin data structure.

Let me know if you have any other comments or concerns in this prototype before I start formalizing this.

@ankushagarwal
Copy link
Contributor Author

Opened other PRs which have formal implementations.

bors-libra pushed a commit that referenced this pull request Oct 23, 2019
Summary
libra_channel provides an mpsc channel which has two ends libra_channel::Receiver
and libra_channel::Sender similar to existing mpsc data structures. What makes it different from existing mpsc channels is that we have full control over how the internal queueing in the channel happens and how we schedule messages to be sent out from this channel.

Note that libra_channel does not provide an implementation of this internal queueing mechanism, but provides a trait (MessageQueue) which the consumers of this channel can use to create libra_channels' according to their needs.

Related issue #1323
Related RFC PR : #1462

Test Plan
Added unit tests

Closes: #1483
Approved by: dmitri-perelman
@ankushagarwal ankushagarwal deleted the libra-channel branch October 24, 2019 19:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants