Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better workflow #34

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ edition = "2018"
[dependencies]
arc-swap = "0.4.6"
futures = "0.3.4"
crossbeam-channel = "0.4.2"
piper = {git = "https://github.com/stjepang/piper.git"}

[dev-dependencies]
futures-test = "0.3.4"

[[example]]
name = "raw-simple"
Expand Down
2 changes: 1 addition & 1 deletion src/atomic_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl AtomicCounter {
}
#[inline]
pub fn inc(&self) {
if self.count.fetch_add(1, Ordering::AcqRel) == usize::MAX {
if self.count.fetch_add(1, Ordering::AcqRel) == usize::max_value() {
panic!("usize overflow in AtomicCounter!");
}
}
Expand Down
265 changes: 163 additions & 102 deletions src/bus.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
use crate::channel::{bounded as raw_bounded, Receiver, SendError, Sender, TryRecvError};
use crossbeam_channel as mpsc;
use futures::task::AtomicWaker;
use futures::{
task::{self, Poll},
Sink, Stream,
};
use futures::{task::{self, Poll}, Sink, Stream};
use futures::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use piper::{Event, EventListener};


pub fn bounded<T>(size: usize) -> (Publisher<T>, Subscriber<T>) {
let (sender, receiver) = raw_bounded(size);
let (waker, sleeper) = alarm();
let event = Arc::new(Event::new());
(
Publisher { sender, waker },
Subscriber { receiver, sleeper },
Publisher { sender, event: event.clone() },
Subscriber { receiver, event, listener: None },
)
}

#[derive(Debug)]
pub struct Publisher<T> {
sender: Sender<T>,
waker: Waker,
event: Arc<Event>,
}

impl<T> Sink<T> for Publisher<T> {
Expand All @@ -33,10 +30,8 @@ impl<T> Sink<T> for Publisher<T> {
Poll::Ready(Ok(()))
}

fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.waker.collect_new_wakers();
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.sender.broadcast(item).and_then(|_| {
self.waker.wake_all();
Ok(())
})
}
Expand All @@ -45,14 +40,16 @@ impl<T> Sink<T> for Publisher<T> {
self: Pin<&mut Self>,
_: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.event.notify_all();
Poll::Ready(Ok(()))
}

fn poll_close(
self: Pin<&mut Self>,
_: &mut task::Context<'_>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
self.sender.close();
self.poll_flush(cx)
}
}

Expand All @@ -62,11 +59,26 @@ impl<T> PartialEq for Publisher<T> {
}
}

impl<T> Drop for Publisher<T> {
fn drop(&mut self) {
self.sender.close();
self.event.notify_all();
}
}

impl<T> Eq for Publisher<T> {}

pub struct Subscriber<T> {
receiver: Receiver<T>,
sleeper: Sleeper,
event: Arc<Event>,
listener: Option<EventListener>,
}

impl<T> std::fmt::Debug for Subscriber<T>{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result{
f.debug_struct("Subscriber")
.finish()
}
}

impl<T> Subscriber<T> {
Expand All @@ -78,14 +90,40 @@ impl<T> Subscriber<T> {
impl<T> Stream for Subscriber<T> {
type Item = Arc<T>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
self.sleeper.register(cx.waker());
match self.receiver.try_recv() {
Ok(item) => Poll::Ready(Some(item)),
Err(error) => match error {
TryRecvError::Empty => Poll::Pending,
TryRecvError::Disconnected => Poll::Ready(None),
},
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// If this stream is blocked on an event, first make sure it is unblocked.
if let Some(listener) = self.listener.as_mut() {
futures::ready!(Pin::new(listener).poll(cx));
self.listener = None;
}
loop {
// Attempt to receive a message.
match self.receiver.try_recv() {
Ok(item) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(Some(item));
},
Err(TryRecvError::Disconnected) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(None);
},
Err(TryRecvError::Empty) => {},
}
// Listen for a send event.
match self.listener.as_mut() {
None => {
// Store a listener and try sending the message again.
self.listener = Some(self.event.listen())
},
Some(_) => {
// Go back to the outer loop to poll the listener.
break;
}
}
}
}
}
}
Expand All @@ -94,7 +132,8 @@ impl<T> Clone for Subscriber<T> {
fn clone(&self) -> Self {
Self {
receiver: self.receiver.clone(),
sleeper: self.sleeper.clone(),
event: self.event.clone(),
listener: None,
}
}
}
Expand All @@ -107,94 +146,116 @@ impl<T: Send> PartialEq for Subscriber<T> {

impl<T: Send> Eq for Subscriber<T> {}

// Helper struct used by sync and async implementations to wake Tasks / Threads
#[derive(Debug)]
pub struct Waker {
/// Vector of Wakers to use to wake up subscribers.
wakers: Vec<Arc<AtomicWaker>>,
/// A mpsc Receiver used to receive Wakers
receiver: mpsc::Receiver<Arc<AtomicWaker>>,
}

impl Waker {
fn wake_all(&self) {
for waker in &self.wakers {
waker.wake();
}
#[cfg(test)]
mod test {
use futures_test::{
assert_stream_pending, assert_stream_next, assert_stream_done,
};
use futures_test::task::noop_context;
use futures::pin_mut;
use futures::SinkExt;
use futures::task::Poll;
use std::sync::Arc;
use futures::future::FutureExt;

#[test]
fn subscriber_is_in_pending_state_before_first_data_is_published() {
let (_publisher, subscriber) = super::bounded::<usize>(1);
pin_mut!(subscriber);

// Assert that subscriber stream is pending before the publisher publishes.
assert_stream_pending!(subscriber);
}

/// Receive any new Wakers and add them to the wakers Vec. These will be used to wake up the
/// subscribers when a message is published
fn collect_new_wakers(&mut self) {
while let Ok(receiver) = self.receiver.try_recv() {
self.wakers.push(receiver);
}
#[test]
fn subscriber_receives_an_item_after_it_is_published() {
let mut cx = noop_context();
let (publisher, subscriber) = super::bounded::<usize>(1);
pin_mut!(subscriber);
pin_mut!(publisher);

// Publish one item (1).
assert_eq!(publisher.send(1).poll_unpin(&mut cx), Poll::Ready(Ok(())));

// Assert that the subscriber can receive item (1).
assert_stream_next!(subscriber, Arc::new(1));
}
}

/// Helper struct used by sync and async implementations to register Tasks / Threads to
/// be woken up.
#[derive(Debug)]
pub struct Sleeper {
/// Current Waker to be woken up
waker: Arc<AtomicWaker>,
/// mpsc Sender used to send Wakers to the Publisher
sender: mpsc::Sender<Arc<AtomicWaker>>,
}
#[test]
fn subscriber_recieves_an_item_after_publisher_overflowed() {
let mut cx = noop_context();
let (publisher, subscriber) = super::bounded::<usize>(1);
pin_mut!(subscriber);
pin_mut!(publisher);

// Publish item (1).
assert_eq!(publisher.send(1).poll_unpin(&mut cx), Poll::Ready(Ok(())));

impl Sleeper {
fn register(&self, waker: &task::Waker) {
self.waker.register(waker);
// Assert that the publisher is not blocked even when overflowed
// by publishing another item (2) while queue size is 1
assert_eq!(publisher.send(2).poll_unpin(&mut cx), Poll::Ready(Ok(())));

// Assert that the subscriber receives the second item (2),
// since the first one (1) was dropped
assert_stream_next!(subscriber, Arc::new(2));
}
}
#[test]
fn subscriber_is_done_after_publisher_closes() {
let mut cx = noop_context();
let (publisher, subscriber) = super::bounded::<usize>(1);
pin_mut!(subscriber);
pin_mut!(publisher);

impl Clone for Sleeper {
fn clone(&self) -> Self {
let waker = Arc::new(AtomicWaker::new());
// Send the new waker to the publisher.
// If this fails (Receiver disconnected), presumably the Publisher
// has dropped and when this is polled for the first time, the
// Stream will end.
let _ = self.sender.send(Arc::clone(&waker));
Self {
waker,
sender: self.sender.clone(),
}
// Close Publisher.
assert_eq!(publisher.close().poll_unpin(&mut cx), Poll::Ready(Ok(())));

// Assert that the subscriber is done..
assert_stream_done!(subscriber);
}
}

/// Function used to create a ( Waker, Sleeper ) tuple.
pub fn alarm() -> (Waker, Sleeper) {
let (sender, receiver) = mpsc::unbounded();
let waker = Arc::new(AtomicWaker::new());
let wakers = vec![Arc::clone(&waker)];
(Waker { wakers, receiver }, Sleeper { waker, sender })
}
#[test]
fn subscriber_is_done_after_publisher_drop() {
let (publisher, subscriber) = super::bounded::<usize>(1);
pin_mut!(subscriber);

#[cfg(test)]
mod test {
use futures::executor::block_on;
use futures::stream::{self};
use futures::StreamExt;
// Drop Publisher
drop(publisher);


// Assert that the subscriber is done.
assert_stream_done!(subscriber);
}

#[test]
fn channel() {
let (publisher, subscriber1) = super::bounded(10);
let subscriber2 = subscriber1.clone();

block_on(async move {
stream::iter(1..15)
.map(|i| Ok(i))
.forward(publisher)
.await
.unwrap();
});

let received1: Vec<u32> = block_on(async { subscriber1.map(|x| *x).collect().await });
let received2: Vec<u32> = block_on(async { subscriber2.map(|x| *x).collect().await });
// Test that only the last 10 elements are in the received list.
let expected = (5..15).collect::<Vec<u32>>();
assert_eq!(received1, expected);
assert_eq!(received2, expected);
fn notify() {
let (publisher, subscriber) = super::bounded::<usize>(1);
pin_mut!(subscriber);
pin_mut!(publisher);

// Assert that subscriber stream is pending before the publisher publishes.
assert_stream_pending!(subscriber);

// Publish one item (1).
let mut cx = noop_context();
assert_eq!(publisher.send(1).poll_unpin(&mut cx), Poll::Ready(Ok(())));

// Assert that the subscriber can receive item (1).
assert_stream_next!(subscriber, Arc::new(1));

// Publish one more item (2).
assert_eq!(publisher.send(2).poll_unpin(&mut cx), Poll::Ready(Ok(())));

// Assert that the subscriber can receive item (2).
assert_stream_next!(subscriber, Arc::new(2));

// Assert that the subscirber is pending of another item to be published.
assert_stream_pending!(subscriber);

// Close publisher.
assert_eq!(publisher.close().poll_unpin(&mut cx), Poll::Ready(Ok(())));

// Assert that subscriber is done.
assert_stream_done!(subscriber);
}
}
9 changes: 7 additions & 2 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,17 @@ impl<T> Sender<T> {
pub fn len(&self) -> usize {
self.size - 1
}

/// Closes the Sender
pub fn close(&self) {
self.is_available.store(false, Ordering::Relaxed);
}
}

/// Drop trait is used to let subscribers know that publisher is no longer available.
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.is_available.store(false, Ordering::Relaxed);
self.close();
}
}

Expand Down Expand Up @@ -344,7 +349,7 @@ mod test {
{
let (sender, _receiver) = bounded(3);
// set Sender wi index to usize::MAX
sender.wi.set(usize::MAX);
sender.wi.set(usize::max_value());
sender.broadcast(1).unwrap();
}
}