Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I think there's an error in impl<T> Stream for RingReceiver<T> #27

Closed
najamelan opened this issue Sep 19, 2019 · 23 comments · Fixed by #30
Closed

I think there's an error in impl<T> Stream for RingReceiver<T> #27

najamelan opened this issue Sep 19, 2019 · 23 comments · Fixed by #30

Comments

@najamelan
Copy link

Err(RecvError::Empty) => {
   // Keep polling thread awake.
   ctx.waker().wake_by_ref();
   Poll::Pending
}

AFAICT you should just return Pending here and store the waker. When new data comes in, then you call wake (I suppose that means your ringbuffer needs to wake the waker).

@brunocodutra
Copy link
Owner

brunocodutra commented Sep 19, 2019

AFAICT you should just return Pending here and store the waker. When new data comes in, then you call wake (I suppose that means your ringbuffer needs to wake the waker).

Generally speaking, your assessment is correct. It was a conscious decision in the design to keep the polling thread awake to avoid having to keep an unbounded number of wakers stored (remember it's a MPMC channel) and tax producers with the unconditional overhead of checking whether there are wakers to awake. Considering ring-channel is throughput oriented, chances are there will always be work to do, so there's little use in letting the polling thread sleep anyways. You can have a run at the benchmarks to see that ring-channel achieves very high throughput, when compared with regular channels.

Does that sound reasonable?

@najamelan
Copy link
Author

Hmm, this is async code, so there isn't necessarily any threads involved. But basically what you are doing here is running code in an endless loop consuming CPU when no data is present. The whole point of how async in rust is designed is to avoid having to loop. Think about it, what's the advantage of this over using the sync API and just doing something like:

loop
{
   match rx.recv()
   {
       Ok(data) => // do something useful
       Err(RecvError::Empty) => {}
   }
}

Now if this is single threaded, you can't actually do anything else here, because this loop will not exit, but in async code, the thread is shared with other async tasks, so if you just run the CPU in a loop it's really a problem.

Sure, it requires storing an unknown amount of wakers, but it's up to the user not to go insane and have a trillion tasks polling this channel, unless of course they have the memory to back it up. In any case it's consuming some memory to do something useful, whereas this impl is just endlessly burning cpu cycles for doing nothing. I haven't looked at how wake works, have you tested this? Because strictly speaking the context is still awake here so maybe it's just a no-op and the task never wakes up?

I think it's worth considering that the library could be useful for much more than just high throughput. I just advised someone on reddit to use it, where they need to communicate between tasks and they only want to consider the last item sent. That could be done with a ring_channel of size 1. I personally am interested in this for use cases where you don't want back pressure, but not unbounded channels either. That's an interesting property on it's own, regardless of throughput characteristics.

All it takes to do this correctly is keep a Vec<Waker>, when data comes in and when the channel get's closed by the sender, you just drain it, wake them all and then drop them.

I wouldn't use the implementation for async as it's written right now, which is a pity.

@brunocodutra
Copy link
Owner

brunocodutra commented Sep 21, 2019

Hmm, this is async code, so there isn't necessarily any threads involved. But basically what you are doing here is running code in an endless loop consuming CPU when no data is present.

It is true that the polling thread will never sleep and our future will spin the CPU even if there are no messages to process, but in general the executor has a backlog of futures to process and it is not true that calling wake() immediately prevents the other futures from making progress. Behind the scenes what happens is that the future is put back on the queue and all other futures that have called wake() are polled before our future is polled again.

Sure, it requires storing an unknown amount of wakers, but it's up to the user not to go insane and have a trillion tasks polling this channel, unless of course they have the memory to back it up. In any case it's consuming some memory to do something useful, whereas this impl is just endlessly burning cpu cycles for doing nothing.

The problem is not at all related to memory consumption, the real issue are the concurrent reads and writes to the shared Vec<Waker>. Every time a writer sends a message or a reader finds an empty buffer this shared Vec<Waker> has to be accessed. What's more, lock-free programming is extremely tricky and I'm yet to see a design that does not require a spin-lock to avoid deadlocks. If you're not familiar with it, a spin-lock is essentially a loop on an atomic variable that runs the CPU idle, exactly what we're trying to avoid.

I haven't looked at how wake works, have you tested this? Because strictly speaking the context is still awake here so maybe it's just a no-op and the task never wakes up?

The call to wake() is probably very cheap indeed, but it's the bottleneck created by the shared Vec<Waker> that kills throughput, if I recall correctly it dropped by orders of magnitude.

I personally am interested in this for use cases where you don't want back pressure, but not unbounded channels either. That's an interesting property on it's own, regardless of throughput characteristics. [...] I wouldn't use the implementation for async as it's written right now, which is a pity.

Don't get me wrong, I'm absolutely interested in making ring-channel fit your use-case. It's very unlikely that we would be able to make it a silver bullet that serves all purposes though, but we can always consider providing different flavors for each use-case.

I'll revive an old unpublished branch where I explored alternative implementations of this shared Vec<Waker> and hopefully get a working prototype soon. Meanwhile, would you be able to share more details about your use-case? Did you actually observe high CPU usage in practice?

@najamelan
Copy link
Author

Ok, I think I see what you mean. Do we agree this only concerns the async API? The sync api could just ignore the vector.

So yes, every read/write would have to check the vector even if there where no wakers in it. It could surely be improved by finding a better implementation, like setting an AtomicBool when a waker get's added and unsetting it on write, that would mean that you only need to check the bool if no wakers are present.

I haven't actually done any benchmarks yet, I just looked at the code and found it suspicious to wake in a loop. I will try to play around a bit with it soon and see what the effect on cpu are for infrequent events.

@brunocodutra
Copy link
Owner

brunocodutra commented Sep 23, 2019

Do we agree this only concerns the async API?

Yes.

Actually no, there could be a receiver waiting on a Waker through the async API, while messages are pushed through the sync API. The receiver would never poll again despite the fact the buffer would be constantly full if the sync API were to ignore the wakers.

So yes, every read/write would have to check the vector even if there where no wakers in it. It could surely be improved by finding a better implementation, like setting an AtomicBool when a waker get's added and unsetting it on write, that would mean that you only need to check the bool if no wakers are present.

It would rather have to be an AtomicUsize that counts how many are waiting, but yes, that makes sense. Another thing I realized was that the list of Wakers is not completely unbounded. I mean, it is, but each receiver can only be associated with one task at a time, which means there's at most one Waker per receiver. I'm not sure whether this actually helps or not, but it's something I haven't realized before.

I will try to play around a bit with it soon and see what the effect on cpu are for infrequent events.

That would be awesome!

@mcseemk
Copy link

mcseemk commented Sep 24, 2019

Actually, ring_channel async receiver always consume 100% CPU doesn't matter frequent or infrequent messages.

Example code (change FREQ_MS to try different event frequencies):

use futures::stream::StreamExt;
use tokio::runtime::Runtime;
use tokio::timer::Interval;
use tokio::future::FutureExt;
use futures::future::ready;
use ring_channel::*;
use std::num::NonZeroUsize;
use std::time::Instant;
use std::sync::atomic::{AtomicU64, Ordering};

static ELAPSED_EMA: AtomicU64 = AtomicU64::new(0);
static JITTER_EMA: AtomicU64 = AtomicU64::new(0);

const FREQ_MS: u64 = 1_000;

fn main() {

    let rt = match Runtime::new() {
        Ok(x) => x,
        Err(e) => {
            eprintln!("{}: cannot create tokio runtime, error: {}", line!(), e);
            return;
        }
    };

    let (tx, rx) = ring_channel(NonZeroUsize::new(1).unwrap());

    let producer = Interval::new(Instant::now(), Duration::from_millis(FREQ_MS))
        .for_each(move |_| {
            tx.send(Instant::now()).unwrap();
            ready(())
        });

    let consumer = rx.for_each(move |instant| {
        let elapsed = instant.elapsed().as_nanos() as u64;
        let mut elapsed_ema = ELAPSED_EMA.load(Ordering::Relaxed);
        if elapsed_ema != 0 {
            elapsed_ema = (9_999 * elapsed_ema + elapsed) / 10_000;
        } else {
            elapsed_ema = elapsed;
        }
        ELAPSED_EMA.store(elapsed_ema, Ordering::Relaxed);
        let jitter =
            if elapsed > elapsed_ema { elapsed - elapsed_ema } 
                else { elapsed_ema - elapsed };
        let mut jitter_ema = JITTER_EMA.load(Ordering::Relaxed);
        if jitter_ema != 0 {
            jitter_ema = (9_999 * jitter_ema + jitter) / 10_000;
        } else {
            jitter_ema = jitter;
        }
        JITTER_EMA.store(jitter_ema, Ordering::Relaxed);
        println!("Elapsed {}ns, avg {}ns, jitter {}ns", elapsed, elapsed_ema, jitter_ema);
        ready(())
    });

    rt.spawn(consumer);
    rt.block_on(producer);
}

@najamelan
Copy link
Author

@mcseemk thanks for chipping in. I'm very busy right now, so haven't found time to run tests.

If you put your code in a block which specifies the language...

```rust

...it will have syntax highlighting and be more readable.

@mcseemk
Copy link

mcseemk commented Sep 24, 2019

@mcseemk thanks for chipping in. I'm very busy right now, so haven't found time to run tests.

If you put your code in a block which specifies the language...

```rust

...it will have syntax highlighting and be more readable.

Great, thanks for the hint!

@najamelan
Copy link
Author

Actually no, there could be a receiver waiting on a Waker through the async API, while messages are pushed through the sync API. The receiver would never poll again despite the fact the buffer would be constantly full if the sync API were to ignore the wakers.

Yeah, the question was more whether you want to support mixing the API's. I can imagine it can be useful. However adding overhead for someone that doesn't use async (in reader nor writer) is probably not justified especially if you care about maximizing the throughput.

On the other hand, if the current impl really just goes to 100% CPU, I doubt anyone will want to use that. It seems to confirm there really is a problem.

@mcseemk
Copy link

mcseemk commented Sep 24, 2019

Yeah, I would love to use ring_channel in my async programs, but 100% CPU is a show-stopper at the moment.

@brunocodutra
Copy link
Owner

brunocodutra commented Sep 24, 2019

Yeah, the question was more whether you want to support mixing the API's. I can imagine it can be useful. However adding overhead for someone that doesn't use async (in reader nor writer) is probably not justified especially if you care about maximizing the throughput.

I think a reasonable way out would be to provide both RingReceiver and say BlockingRingReceiver, such that the former can be converted to the later and only the later implements Stream. The first RingReceiver to be converted would upgrade the shared control block to a flavor that keeps track of wakers. RingSenders do not need to be converted and would seamlessly start waking pending receivers once the shared control block gets upgraded. The remaining RingReceivers would still work fine after conversion and could also be converted to BlockingRingReceivers independently.

@najamelan, @mcseemk how does that sound for a design?

@najamelan
Copy link
Author

najamelan commented Sep 24, 2019

It sounds like a nifty design! except:

RingReceiver and say BlockingRingReceiver, such that the former can be converted to the later and only the later implements Stream

I suppose you accidentally reversed the names?

It just leaves the performance issue for async to be solved.

@mcseemk
Copy link

mcseemk commented Sep 24, 2019

Sounds good to me. I wouldn't mind also having an mpsc blocking/async ring receiver as well.
Single consumer version should cover 80% use cases and would be potentially more performant/easier to implement for async polling.

@brunocodutra
Copy link
Owner

I suppose you accidentally reversed the names?

@najamelan actually not, my reasoning is that from the point of view recv the call blocks by parking the thread until the Waker is awoken. This is a blocking version of RingReceiver, which returns even if the buffer is empty.

I can see now this naming convention may be confusing, what would you call them instead?

I wouldn't mind also having an mpsc blocking/async ring receiver as well

@mcseemk sounds like a great idea, specially because it can be incrementally built later with no impact to existing code.

@najamelan
Copy link
Author

@najamelan actually not, my reasoning is that from the point of view recv the call blocks by parking the thread until the Waker is awoken. This is a blocking version of RingReceiver, which returns even if the buffer is empty.

I'm a bit confused now. In principle if the receiver implements Stream, it shouldn't block threads, because it's async, so it blocks the task awaiting it, but never blocks the thread. Synchronous interfaces block threads, or just returns an error like WouldBlock, or Empty, and the thread get's woken up when new data is available, and thus do not need wakers, since wakers are for async. They also don't need executors, nor tasks. In general I haven't had time to look closely enough at the sync implementation of ring_channel though, so I'm probably missing something crucial.

@brunocodutra
Copy link
Owner

brunocodutra commented Sep 28, 2019

I'm a bit confused now. In principle if the receiver implements Stream, it shouldn't block threads, because it's async, so it blocks the task awaiting it, but never blocks the thread.

That's correct, through the Stream interface BlockingRingReceiver::poll_next would return Poll::Pending and it's up to the executor what to do with it. In my comment above I was referring to BlockingRingReceiver::recv, which parks (thus blocks) the calling thread without spinning the CPU. This is exactly how std::sync::mpsc::Receiver::recv behaves: "This function will always block the current thread if there is no data available and it's possible for more data to be sent."

Does that clarify it?

EDIT: In fact, recv can be trivially implemented in terms of the StreamExt::next:

impl<T> BlockingRingReceiver<T> {
    pub fn recv(&mut self) -> Result<T, RecvError> {
        block_on(self.next()).ok_or(RecvError::Disconnected)
    }
}

@brunocodutra
Copy link
Owner

@najamelan @mcseemk I got a working implementation of the core logic that keeps track of Wakers to avoid spinning the CPU through the Stream interface. There's still some work to do, but I'd love some early feedback if you're able find the time to have a look.

@mcseemk
Copy link

mcseemk commented Sep 29, 2019

@najamelan @brunocodutra I've just done some tests, all looks good to me. New version of ring-channel looks virtually indistinguishable from futures::channel::mpsc in terms of async performance / CPU consumption. The latency per message is just under 0.1ms, which I presume is as good as it gets with async anyway.

@brunocodutra
Copy link
Owner

Fantastic, thanks for checking @mcseemk!
I presume you used the code you posted above to get this number?

@najamelan
Copy link
Author

i ran the benchmarks, I haven't had time to really understand the code, but at least the results are these:

Benchmark results
Running target/release/deps/concurrency-9f82666c638de450
concurrency/10000       time:   [320.70 us 326.18 us 331.33 us]                              
                        thrpt:  [30.181 Melem/s 30.657 Melem/s 31.181 Melem/s]
Found 4 outliers among 100 measurements (4.00%)
  3 (3.00%) low mild
  1 (1.00%) high mild

     Running target/release/deps/futures-2e521ff894685f8b
futures/mpmc/4x4x1000/1 time:   [390.92 us 412.37 us 436.43 us]                                    
                        thrpt:  [2.2913 Melem/s 2.4250 Melem/s 2.5580 Melem/s]
Found 14 outliers among 100 measurements (14.00%)
  6 (6.00%) low mild
  1 (1.00%) high mild
  7 (7.00%) high severe
futures/mpmc/4x4x1000/8 time:   [197.32 us 202.58 us 208.95 us]                                    
                        thrpt:  [4.7857 Melem/s 4.9363 Melem/s 5.0678 Melem/s]
Found 6 outliers among 100 measurements (6.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild
  2 (2.00%) high severe
futures/mpmc/4x4x1000/1000                                                                            
                        time:   [182.16 us 190.89 us 201.06 us]
                        thrpt:  [4.9736 Melem/s 5.2385 Melem/s 5.4897 Melem/s]
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe

futures/mpsc/7x1x1000/1 time:   [61.782 us 63.231 us 64.660 us]                                    
                        thrpt:  [15.465 Melem/s 15.815 Melem/s 16.186 Melem/s]
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe
futures/mpsc/7x1x1000/8 time:   [243.27 us 252.28 us 260.98 us]                                    
                        thrpt:  [3.8317 Melem/s 3.9639 Melem/s 4.1107 Melem/s]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe
futures/mpsc/7x1x1000/1000                                                                            
                        time:   [249.98 us 254.53 us 258.78 us]
                        thrpt:  [3.8643 Melem/s 3.9288 Melem/s 4.0003 Melem/s]
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high severe

Benchmarking futures/spmc/1x7x1000/1: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 10.5s or reduce sample count to 40
futures/spmc/1x7x1000/1 time:   [3.1005 ms 3.3259 ms 3.5215 ms]                                     
                        thrpt:  [283.97 Kelem/s 300.67 Kelem/s 322.53 Kelem/s]
Found 9 outliers among 100 measurements (9.00%)
  6 (6.00%) low mild
  2 (2.00%) high mild
  1 (1.00%) high severe
Benchmarking futures/spmc/1x7x1000/8: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 9.5s or reduce sample count to 50
futures/spmc/1x7x1000/8 time:   [2.3968 ms 2.4472 ms 2.4980 ms]                                     
                        thrpt:  [400.31 Kelem/s 408.63 Kelem/s 417.22 Kelem/s]
Found 11 outliers among 100 measurements (11.00%)
  5 (5.00%) low severe
  4 (4.00%) low mild
  1 (1.00%) high mild
  1 (1.00%) high severe
Benchmarking futures/spmc/1x7x1000/1000: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.9s or reduce sample count to 60
futures/spmc/1x7x1000/1000                                                                             
                        time:   [962.04 us 1.0386 ms 1.1237 ms]
                        thrpt:  [889.89 Kelem/s 962.83 Kelem/s 1.0395 Melem/s]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

futures/spsc/1x1x1000/1 time:   [95.765 us 97.763 us 100.03 us]                                    
                        thrpt:  [9.9973 Melem/s 10.229 Melem/s 10.442 Melem/s]
Found 5 outliers among 100 measurements (5.00%)
  3 (3.00%) low mild
  1 (1.00%) high mild
  1 (1.00%) high severe
futures/spsc/1x1x1000/2 time:   [171.25 us 173.43 us 175.69 us]                                    
                        thrpt:  [5.6918 Melem/s 5.7661 Melem/s 5.8393 Melem/s]
Found 8 outliers among 100 measurements (8.00%)
  2 (2.00%) low mild
  2 (2.00%) high mild
  4 (4.00%) high severe
futures/spsc/1x1x1000/1000                                                                            
                        time:   [228.00 us 231.37 us 235.06 us]
                        thrpt:  [4.2542 Melem/s 4.3221 Melem/s 4.3859 Melem/s]
Found 4 outliers among 100 measurements (4.00%)
  3 (3.00%) high mild
  1 (1.00%) high severe

     Running target/release/deps/throughput-2a1729882c69d15b
mpmc/4x4x1000/1         time:   [103.44 us 105.95 us 108.55 us]                            
                        thrpt:  [9.2120 Melem/s 9.4383 Melem/s 9.6670 Melem/s]
Found 5 outliers among 100 measurements (5.00%)
  3 (3.00%) low mild
  2 (2.00%) high mild
mpmc/4x4x1000/8         time:   [147.77 us 151.37 us 154.95 us]                            
                        thrpt:  [6.4535 Melem/s 6.6063 Melem/s 6.7672 Melem/s]
Found 5 outliers among 100 measurements (5.00%)
  2 (2.00%) low mild
  3 (3.00%) high mild
mpmc/4x4x1000/1000      time:   [125.40 us 130.34 us 135.26 us]                               
                        thrpt:  [7.3932 Melem/s 7.6724 Melem/s 7.9744 Melem/s]
Found 5 outliers among 100 measurements (5.00%)
  5 (5.00%) high mild

mpsc/7x1x1000/1         time:   [74.924 us 76.430 us 77.822 us]                            
                        thrpt:  [12.850 Melem/s 13.084 Melem/s 13.347 Melem/s]
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) low mild
  1 (1.00%) high mild
mpsc/7x1x1000/8         time:   [149.11 us 151.93 us 154.89 us]                            
                        thrpt:  [6.4563 Melem/s 6.5819 Melem/s 6.7065 Melem/s]
Found 4 outliers among 100 measurements (4.00%)
  3 (3.00%) high mild
  1 (1.00%) high severe
mpsc/7x1x1000/1000      time:   [128.19 us 130.20 us 132.30 us]                               
                        thrpt:  [7.5588 Melem/s 7.6807 Melem/s 7.8011 Melem/s]
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild

spmc/1x7x1000/1         time:   [119.72 us 123.17 us 126.56 us]                            
                        thrpt:  [7.9011 Melem/s 8.1186 Melem/s 8.3530 Melem/s]
Found 3 outliers among 100 measurements (3.00%)
  2 (2.00%) high mild
  1 (1.00%) high severe
spmc/1x7x1000/8         time:   [127.61 us 130.02 us 132.32 us]                            
                        thrpt:  [7.5572 Melem/s 7.6912 Melem/s 7.8362 Melem/s]
Found 6 outliers among 100 measurements (6.00%)
  5 (5.00%) high mild
  1 (1.00%) high severe
spmc/1x7x1000/1000      time:   [106.28 us 108.36 us 110.72 us]                               
                        thrpt:  [9.0315 Melem/s 9.2287 Melem/s 9.4095 Melem/s]
Found 5 outliers among 100 measurements (5.00%)
  1 (1.00%) low mild
  4 (4.00%) high mild

spsc/1x1x1000/1         time:   [76.011 us 77.322 us 78.670 us]                            
                        thrpt:  [12.711 Melem/s 12.933 Melem/s 13.156 Melem/s]
spsc/1x1x1000/2         time:   [133.79 us 136.34 us 138.79 us]                            
                        thrpt:  [7.2053 Melem/s 7.3346 Melem/s 7.4742 Melem/s]
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) low mild
  1 (1.00%) high mild
spsc/1x1x1000/1000      time:   [93.918 us 95.640 us 97.428 us]                               
                        thrpt:  [10.264 Melem/s 10.456 Melem/s 10.648 Melem/s]
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe

@mcseemk
Copy link

mcseemk commented Sep 29, 2019

Fantastic, thanks for checking @mcseemk!
I presume you used the code you posted above to get this number?

Yes, slightly modified code above. I'm mostly interested in latency/jitter rather than throughput so just ran 100k messages 1ms apart and got average latency and its deviation. On my PC I couldn't find any statistically meaningful difference between mpsc futures and ring-channel.
Interestingly enough, for mpsc the latency drops significantly (almost 2 times) when using async-std runtime instead of tokio runtime, but I couldn't run ring-channel with async-std because of dependency hell.
Anyway, the latency seems to be determined by runtime. For spinning version of ring-channel the latency is between 100-200ns at the cost of 100% CPU, for properly async one it immediately jumps to some 90,000ns, but the CPU is much happier.

@najamelan
Copy link
Author

@ALL, thanks for looking into it! I look forward playing with ringchannel, and when I get round to it, I will review the code and try some benchmarks. 90us seems a lot, but then it depends the hardware and exact bench.

@brunocodutra
Copy link
Owner

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants