Skip to content

Commit

Permalink
[enhancement] Create libra_channel module which provides an mpsc channel
Browse files Browse the repository at this point in the history
Summary
libra_channel provides an mpsc channel which has two ends libra_channel::Receiver
and libra_channel::Sender similar to existing mpsc data structures. What makes it different from existing mpsc channels is that we have full control over how the internal queueing in the channel happens and how we schedule messages to be sent out from this channel.

Note that libra_channel does not provide an implementation of this internal queueing mechanism, but provides a trait (MessageQueue) which the consumers of this channel can use to create libra_channels' according to their needs.

Related issue #1323
Related RFC PR : #1462

Test Plan
Added unit tests

Closes: #1483
Approved by: dmitri-perelman
  • Loading branch information
ankushagarwal authored and bors-libra committed Oct 23, 2019
1 parent f367100 commit ae372ef
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion common/channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ publish = false
edition = "2018"

[dependencies]
futures = { version = "=0.3.0-alpha.19", package = "futures-preview" }
futures = { version = "=0.3.0-alpha.19", package = "futures-preview", features = ["async-await"]}
lazy_static = "1.3.0"
libra-logger = { path = "../logger", version = "0.1.0" }
libra-metrics = { path = "../metrics", version = "0.1.0" }
libra-types = { path = "../../types", version = "0.1.0" }

[dev-dependencies]
rusty-fork = "0.2.1"
4 changes: 4 additions & 0 deletions common/channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ use std::{
#[cfg(test)]
mod test;

pub mod libra_channel;
#[cfg(test)]
mod libra_channel_test;

const MAX_TIMEOUT: Duration = Duration::from_secs(24 * 60 * 60);

/// Wrapper around a value with an entry timestamp
Expand Down
109 changes: 109 additions & 0 deletions common/channel/src/libra_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
//! libra_channel provides an mpsc channel which has two ends `libra_channel::Receiver`
//! and `libra_channel::Sender` similar to existing mpsc data structures.
//! What makes it different from existing mpsc channels is that we have full control
//! over how the internal queueing in the channel happens and how we schedule messages
//! to be sent out from this channel.
//! Note that libra_channel does not provide an implementation of this internal queueing
//! mechanism, but provides a trait (`MessageQueue`) which the consumers of this channel
//! can use to create libra_channels' according to their needs.

use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::Waker;

use futures::async_await::FusedStream;
use futures::stream::Stream;
use futures::task::Context;
use futures::Poll;

/// MessageQueue is a trait which provides a very simple set of methods for implementing
/// a queue which will be used as the internal queue libra_channel.
pub trait MessageQueue {
/// The actual type of the messages stored in this MessageQueue
type Message;

/// Push a message to this queue
fn push(&mut self, message: Self::Message);

/// Pop a message from this queue
fn pop(&mut self) -> Option<Self::Message>;
}

/// SharedState is a data structure private to this module which is
/// shared by the sender and receiver.
struct SharedState<T> {
/// The internal queue of messages in this Channel
internal_queue: T,

/// Waker is needed so that the Sender can notify the task executor/scheduler
/// that something has been pushed to the internal_queue and it ready for
/// consumption by the Receiver and then the executor/scheduler will wake up
/// the Receiver task to process the next item.
waker: Option<Waker>,
}

/// The sending end of the libra_channel.
#[derive(Clone)]
pub struct Sender<T> {
shared_state: Arc<Mutex<SharedState<T>>>,
}

impl<T: MessageQueue> Sender<T> {
/// This adds the message into the internal queue data structure. This is a non-blocking
/// synchronous call.
/// TODO: We can have this return a boolean if the queue of a validator is capacity
pub fn put(&mut self, message: <T as MessageQueue>::Message) {
let mut shared_state = self.shared_state.lock().unwrap();
shared_state.internal_queue.push(message);
if let Some(w) = shared_state.waker.take() {
w.wake();
}
}
}

/// The receiving end of the libra_channel.
pub struct Receiver<T> {
shared_state: Arc<Mutex<SharedState<T>>>,
}

impl<T: MessageQueue> Stream for Receiver<T> {
type Item = <T as MessageQueue>::Message;

/// poll_next checks whether there is something ready for consumption from the internal
/// queue. If there is, then it returns immediately. If the internal_queue is empty,
/// it sets the waker passed to it by the scheduler/executor and returns Pending
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut shared_state = self.shared_state.lock().unwrap();
if let Some(val) = shared_state.internal_queue.pop() {
Poll::Ready(Some(val))
} else {
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}

impl<T: MessageQueue> FusedStream for Receiver<T> {
fn is_terminated(&self) -> bool {
false
}
}

/// Create a new Libra Channel and returns the two ends of the channel. The sender end can
/// be cloned making it an mpsc.
pub fn new<T>(queue: T) -> (Sender<T>, Receiver<T>)
where
T: MessageQueue,
{
let shared_state = Arc::new(Mutex::new(SharedState {
internal_queue: queue,
waker: None,
}));
let shared_state_clone = Arc::clone(&shared_state);
(
Sender { shared_state },
Receiver {
shared_state: shared_state_clone,
},
)
}
82 changes: 82 additions & 0 deletions common/channel/src/libra_channel_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use crate::libra_channel;
use crate::libra_channel::MessageQueue;
use futures::{executor::block_on, FutureExt, StreamExt};
use std::collections::VecDeque;
use std::thread;
use std::time::Duration;

struct TestMessageQueue {
queue: VecDeque<u8>,
}

impl MessageQueue for TestMessageQueue {
type Message = u8;

fn push(&mut self, message: Self::Message) {
self.queue.push_back(message);
}

fn pop(&mut self) -> Option<Self::Message> {
self.queue.pop_front()
}
}

#[test]
fn test_send_recv_order() {
let mq = TestMessageQueue {
queue: VecDeque::new(),
};
let (mut sender, mut receiver) = libra_channel::new(mq);
sender.put(0);
sender.put(1);
sender.put(2);
sender.put(3);
let task = async move {
// Ensure that messages are received in order
assert_eq!(receiver.select_next_some().await, 0);
assert_eq!(receiver.select_next_some().await, 1);
assert_eq!(receiver.select_next_some().await, 2);
assert_eq!(receiver.select_next_some().await, 3);
// Ensures that there is no other value which is ready
assert_eq!(receiver.select_next_some().now_or_never(), None);
};
block_on(task);
}

#[test]
fn test_empty() {
let mq = TestMessageQueue {
queue: VecDeque::new(),
};
let (_, mut receiver) = libra_channel::new(mq);
// Ensures that there is no other value which is ready
assert_eq!(receiver.select_next_some().now_or_never(), None);
}

#[test]
fn test_waker() {
let mq = TestMessageQueue {
queue: VecDeque::new(),
};
let (mut sender, mut receiver) = libra_channel::new(mq);
// Ensures that there is no other value which is ready
assert_eq!(receiver.select_next_some().now_or_never(), None);
let join_handle = thread::spawn(move || {
block_on(async {
assert_eq!(receiver.select_next_some().await, 0);
});
block_on(async {
assert_eq!(receiver.select_next_some().await, 1);
});
block_on(async {
assert_eq!(receiver.select_next_some().await, 2);
});
});
thread::sleep(Duration::from_millis(100));
sender.put(0);
thread::sleep(Duration::from_millis(100));
sender.put(1);
thread::sleep(Duration::from_millis(100));
sender.put(2);
join_handle.join().unwrap();
}

0 comments on commit ae372ef

Please sign in to comment.