-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[enhancement] Create libra_channel module which provides an mpsc channel
Summary libra_channel provides an mpsc channel which has two ends libra_channel::Receiver and libra_channel::Sender similar to existing mpsc data structures. What makes it different from existing mpsc channels is that we have full control over how the internal queueing in the channel happens and how we schedule messages to be sent out from this channel. Note that libra_channel does not provide an implementation of this internal queueing mechanism, but provides a trait (MessageQueue) which the consumers of this channel can use to create libra_channels' according to their needs. Related issue #1323 Related RFC PR : #1462 Test Plan Added unit tests
- Loading branch information
1 parent
dbddb57
commit c36eb91
Showing
5 changed files
with
198 additions
and
1 deletion.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
//! libra_channel provides an mpsc channel which has two ends `libra_channel::Receiver` | ||
//! and `libra_channel::Sender` similar to existing mpsc data structures. | ||
//! What makes it different from existing mpsc channels is that we have full control | ||
//! over how the internal queueing in the channel happens and how we schedule messages | ||
//! to be sent out from this channel. | ||
//! Note that libra_channel does not provide an implementation of this internal queueing | ||
//! mechanism, but provides a trait (`MessageQueue`) which the consumers of this channel | ||
//! can use to create libra_channels' according to their needs. | ||
|
||
use std::pin::Pin; | ||
use std::sync::{Arc, Mutex}; | ||
use std::task::Waker; | ||
|
||
use futures::async_await::FusedStream; | ||
use futures::stream::Stream; | ||
use futures::task::Context; | ||
use futures::Poll; | ||
|
||
/// MessageQueue is a trait which provides a very simple set of methods for implementing | ||
/// a queue which will be used as the internal queue libra_channel. | ||
pub trait MessageQueue { | ||
/// The actual type of the messages stored in this MessageQueue | ||
type Message; | ||
|
||
/// Push a message to this queue | ||
fn push(&mut self, message: Self::Message); | ||
|
||
/// Pop a message from this queue | ||
fn pop(&mut self) -> Option<Self::Message>; | ||
} | ||
|
||
/// SharedState is a data structure private to this module which is | ||
/// shared by the sender and receiver. | ||
struct SharedState<T> { | ||
/// The internal queue of messages in this Channel | ||
internal_queue: T, | ||
|
||
/// Waker is needed so that the Sender can notify the task executor/scheduler | ||
/// that something has been pushed to the internal_queue and it ready for | ||
/// consumption by the Receiver and then the executor/scheduler will wake up | ||
/// the Receiver task to process the next item. | ||
waker: Option<Waker>, | ||
} | ||
|
||
/// The sending end of the libra_channel. | ||
#[derive(Clone)] | ||
pub struct Sender<T> { | ||
shared_state: Arc<Mutex<SharedState<T>>>, | ||
} | ||
|
||
impl<T: MessageQueue> Sender<T> { | ||
/// This adds the message into the internal queue data structure. This is a non-blocking | ||
/// synchronous call. | ||
/// TODO: We can have this return a boolean if the queue of a validator is capacity | ||
pub fn put(&mut self, message: <T as MessageQueue>::Message) { | ||
let mut shared_state = self.shared_state.lock().unwrap(); | ||
shared_state.internal_queue.push(message); | ||
if let Some(w) = shared_state.waker.take() { | ||
w.wake(); | ||
} | ||
} | ||
} | ||
|
||
/// The receiving end of the libra_channel. | ||
pub struct Receiver<T> { | ||
shared_state: Arc<Mutex<SharedState<T>>>, | ||
} | ||
|
||
impl<T: MessageQueue> Stream for Receiver<T> { | ||
type Item = <T as MessageQueue>::Message; | ||
|
||
/// poll_next checks whether there is something ready for consumption from the internal | ||
/// queue. If there is, then it returns immediately. If the internal_queue is empty, | ||
/// it sets the waker passed to it by the scheduler/executor and returns Pending | ||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
let mut shared_state = self.shared_state.lock().unwrap(); | ||
if let Some(val) = shared_state.internal_queue.pop() { | ||
Poll::Ready(Some(val)) | ||
} else { | ||
shared_state.waker = Some(cx.waker().clone()); | ||
Poll::Pending | ||
} | ||
} | ||
} | ||
|
||
impl<T: MessageQueue> FusedStream for Receiver<T> { | ||
fn is_terminated(&self) -> bool { | ||
false | ||
} | ||
} | ||
|
||
/// Create a new Libra Channel and returns the two ends of the channel. The sender end can | ||
/// be cloned making it an mpsc. | ||
pub fn new<T>(queue: T) -> (Sender<T>, Receiver<T>) | ||
where | ||
T: MessageQueue, | ||
{ | ||
let shared_state = Arc::new(Mutex::new(SharedState { | ||
internal_queue: queue, | ||
waker: None, | ||
})); | ||
let shared_state_clone = Arc::clone(&shared_state); | ||
( | ||
Sender { shared_state }, | ||
Receiver { | ||
shared_state: shared_state_clone, | ||
}, | ||
) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
use crate::libra_channel; | ||
use crate::libra_channel::MessageQueue; | ||
use futures::{executor::block_on, FutureExt, StreamExt}; | ||
use std::collections::VecDeque; | ||
use std::thread; | ||
use std::time::Duration; | ||
|
||
struct TestMessageQueue { | ||
queue: VecDeque<u8>, | ||
} | ||
|
||
impl MessageQueue for TestMessageQueue { | ||
type Message = u8; | ||
|
||
fn push(&mut self, message: Self::Message) { | ||
self.queue.push_back(message); | ||
} | ||
|
||
fn pop(&mut self) -> Option<Self::Message> { | ||
self.queue.pop_front() | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_send_recv_order() { | ||
let mq = TestMessageQueue { | ||
queue: VecDeque::new(), | ||
}; | ||
let (mut sender, mut receiver) = libra_channel::new(mq); | ||
sender.put(0); | ||
sender.put(1); | ||
sender.put(2); | ||
sender.put(3); | ||
let task = async move { | ||
// Ensure that messages are received in order | ||
assert_eq!(receiver.select_next_some().await, 0); | ||
assert_eq!(receiver.select_next_some().await, 1); | ||
assert_eq!(receiver.select_next_some().await, 2); | ||
assert_eq!(receiver.select_next_some().await, 3); | ||
// Ensures that there is no other value which is ready | ||
assert_eq!(receiver.select_next_some().now_or_never(), None); | ||
}; | ||
block_on(task); | ||
} | ||
|
||
#[test] | ||
fn test_empty() { | ||
let mq = TestMessageQueue { | ||
queue: VecDeque::new(), | ||
}; | ||
let (_, mut receiver) = libra_channel::new(mq); | ||
// Ensures that there is no other value which is ready | ||
assert_eq!(receiver.select_next_some().now_or_never(), None); | ||
} | ||
|
||
#[test] | ||
fn test_waker() { | ||
let mq = TestMessageQueue { | ||
queue: VecDeque::new(), | ||
}; | ||
let (mut sender, mut receiver) = libra_channel::new(mq); | ||
// Ensures that there is no other value which is ready | ||
assert_eq!(receiver.select_next_some().now_or_never(), None); | ||
let join_handle = thread::spawn(move || { | ||
block_on(async { | ||
assert_eq!(receiver.select_next_some().await, 0); | ||
}); | ||
block_on(async { | ||
assert_eq!(receiver.select_next_some().await, 1); | ||
}); | ||
block_on(async { | ||
assert_eq!(receiver.select_next_some().await, 2); | ||
}); | ||
}); | ||
thread::sleep(Duration::from_millis(100)); | ||
sender.put(0); | ||
thread::sleep(Duration::from_millis(100)); | ||
sender.put(1); | ||
thread::sleep(Duration::from_millis(100)); | ||
sender.put(2); | ||
join_handle.join().unwrap(); | ||
} |