Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared channel benchmark fails/hangs with probability #261

Closed
thirstycrow opened this issue Jan 21, 2021 · 13 comments · Fixed by #263
Closed

Shared channel benchmark fails/hangs with probability #261

thirstycrow opened this issue Jan 21, 2021 · 13 comments · Fixed by #263

Comments

@thirstycrow
Copy link
Contributor

I wrote a benchmark for shared channels, which has a probability to fail with free(): invalid pointer:

$ cargo +nightly bench --bench shared_channel
    Finished bench [optimized] target(s) in 0.14s
     Running target/release/deps/shared_channel-013a11e62adf9291
Shared channel (size: 10): 877.422145ms, 877.4221ns
Shared channel (size: 100): 387.61481ms, 387.6148ns
free(): invalid pointer
error: process didn't exit successfully: `/project/target/release/deps/shared_channel-013a11e62adf9291 --bench` (signal: 6, SIGABRT: process abort signal)

It also has a probability (lower than that of freeing invalid pointer) to hang:

$ cargo +nightly bench --bench shared_channel
    Finished bench [optimized] target(s) in 0.02s
     Running target/release/deps/shared_channel-013a11e62adf9291
Shared channel (size: 10): 815.309948ms, 815.30994ns
Shared channel (size: 100): 384.314961ms, 384.31497ns
prod waiting
recv waiting
recv waiting
prod waiting
^C

Code for the benchmark:

use std::time::{Duration, Instant};

use glommio::channels::shared_channel;
use glommio::prelude::*;
use glommio::timer::sleep;

fn main() {
    const RUNS: usize = 1_000_000;

    bench_shared_channel(RUNS, 10);
    bench_shared_channel(RUNS, 100);
    bench_shared_channel(RUNS, 1000);
    bench_shared_channel(RUNS, 10000);
}

fn bench_shared_channel(runs: usize, channel_size: usize) {
    let (sender, receiver) = shared_channel::new_bounded(channel_size);
    let t = Instant::now();

    let producer = LocalExecutorBuilder::new()
        .pin_to_cpu(1)
        .spawn(move || async move {
            let sender = sender.connect();

            let mut timeout = Local::local(async {
                sleep(Duration::from_secs(2)).await;
                println!("prod waiting")
            })
            .detach();

            for i in 0..runs {
                if (i + 1) % 10000 == 0 {
                    timeout.cancel();
                    timeout = Local::local(async {
                        sleep(Duration::from_secs(2)).await;
                        println!("prod waiting")
                    })
                    .detach();
                }
                sender.send(1).await.unwrap();
            }

            timeout.cancel();
        });

    let consumer = LocalExecutorBuilder::new()
        .pin_to_cpu(2)
        .spawn(move || async move {
            let receiver = receiver.connect();

            let mut timeout = Local::local(async {
                sleep(Duration::from_secs(2)).await;
                println!("recv waiting")
            })
            .detach();

            for i in 0..runs {
                if (i + 1) % 10000 == 0 {
                    timeout.cancel();
                    timeout = Local::local(async {
                        sleep(Duration::from_secs(2)).await;
                        println!("recv waiting")
                    })
                    .detach();
                }
                receiver.recv().await.unwrap();
            }

            timeout.cancel();
        });

    producer.unwrap().join().unwrap();
    consumer.unwrap().join().unwrap();

    let label = format!("Shared channel (size: {})", channel_size);
    print_elapsed(label.as_ref(), t, runs);
}

fn print_elapsed(label: &str, start: Instant, runs: usize) {
    let elapsed = start.elapsed();
    let avg = elapsed.as_nanos() as f32 / runs as f32;
    println!("{}: {:?}, {:?}ns", label, elapsed, avg);
}
@thirstycrow thirstycrow changed the title Shared channel benchmark fails with probability Shared channel benchmark fails/hangs with probability Jan 21, 2021
@glommer
Copy link
Collaborator

glommer commented Jan 21, 2021

Thanks for the reproducer ! I will take a look soon

@glommer
Copy link
Collaborator

glommer commented Jan 21, 2021

Is the code above the version with higher or lower probability or reproducing ?

@glommer
Copy link
Collaborator

glommer commented Jan 21, 2021

I ran this with the LLVM thread sanitizer and the output is not very useful. It claims there are data races inside the buffer accesses, but the places where it complains about are properly wrapped in Atomics.

@zserik - you touched the shared channels recently so tagging you just in case you have any idea.

glommer pushed a commit to glommer/glommio that referenced this issue Jan 21, 2021
While searching for another issue (DataDog#261) I realized that we never unregister
channels on drop.

That has, unfortunately, nothing to do with the issue but that's a leak
so let's fix it.
@glommer
Copy link
Collaborator

glommer commented Jan 21, 2021

Hi - this is to let you know that I don't yet have a solution for this but I am working on it.
I am convinced that the problem lies with the eventfd registration. This code in BufferHalf::must_notify isn't really thread safe. Here's an (unconfirmed) theory of how that might be:

        let eventfd = self.opposite_eventfd();  <== grabs a valid eventfd memory pointer
        let mem = eventfd.take(); <== removes the eventfd memory pointer
        let ret = mem.as_ref().map(|x| x.load(Ordering::Acquire) as _); <== finds the eventfd number
        // == the remove reactor may have finished,  callled disconnect, and its eventfd memory is no longer valid
        eventfd.set(mem); <== reinstall the invalid eventfd
        match ret {
            None | Some(0) => None,
            Some(x) => Some(x),
        }

If I remove this code and just returns None, then I can no longer reproduce the memory issue.
There aren't any notifications anymore either, so the code won't work unless you add .spin_before_park to the reactor builder to avoid sleeping.

I'll keep you posted.

glommer pushed a commit that referenced this issue Jan 21, 2021
While searching for another issue (#261) I realized that we never unregister
channels on drop.

That has, unfortunately, nothing to do with the issue but that's a leak
so let's fix it.
@thirstycrow
Copy link
Contributor Author

thirstycrow commented Jan 22, 2021

This code in BufferHalf::must_notify isn't really thread safe. Here's an (unconfirmed) theory of how that might be:

        let eventfd = self.opposite_eventfd();  <== grabs a valid eventfd memory pointer
        let mem = eventfd.take(); <== removes the eventfd memory pointer
        let ret = mem.as_ref().map(|x| x.load(Ordering::Acquire) as _); <== finds the eventfd number
        // == the remove reactor may have finished,  callled disconnect, and its eventfd memory is no longer valid
        eventfd.set(mem); <== reinstall the invalid eventfd
        match ret {
            None | Some(0) => None,
            Some(x) => Some(x),
        }

It seems to be the exact cause for the memory issues. I'm now walkround it by commenting out the eventfd clearing statement in the disconnect methods, and it works without spin_before_park.

impl<T: Copy> Producer<T> {
    ...
    pub(crate) fn disconnect(&self) -> bool {
        // (*self.buffer).producer_eventfd.set(None);
        (*self.buffer).disconnect_producer()
    }
}

impl<T: Copy> Consumer<T> {
    ...
    pub(crate) fn disconnect(&self) -> bool {
        // (*self.buffer).consumer_eventfd.set(None);
        (*self.buffer).disconnect_consumer()
    }
}

However the hanging problem is still there, with very low probability to occur. Have to reproduce it with a loop. And seems it's only reproduceable when the receiving executor was built without spin_before_park.

use glommio::channels::shared_channel;
use glommio::prelude::*;
use std::sync::mpsc::sync_channel;
use std::time::{Duration, Instant};

fn test_spsc(capacity: usize) {
    let runs: u32 = 10_000_000;
    let (sender, receiver) = shared_channel::new_bounded(capacity);

    let sender = LocalExecutorBuilder::new()
        .pin_to_cpu(0)
        //.spin_before_park(Duration::from_millis(10))
        .spawn(move || async move {
            let sender = sender.connect();
            // let t = Instant::now();
            for _ in 0..runs {
                sender.send(1).await.unwrap();
            }
            // println!(
            //     "cost of sending shared channel {:#?}, capacity: {}",
            //     t.elapsed() / runs,
            //     capacity
            // );
            drop(sender);
        })
        .unwrap();

    let receiver = LocalExecutorBuilder::new()
        //.spin_before_park(Duration::from_millis(10))
        .pin_to_cpu(1)
        .spawn(move || async move {
            let receiver = receiver.connect();
            // let t = Instant::now();
            for _ in 0..runs {
                receiver.recv().await.unwrap();
            }
            // println!(
            //     "cost of receiving shared channel: {:#?}, capacity {}",
            //     t.elapsed() / runs,
            //     capacity
            // );
        })
        .unwrap();

    sender.join().unwrap();
    receiver.join().unwrap();
}

fn main() {
    // test_rust_std(1024);
    // test_spsc(1024);
    for i in 0..10000 {
        println!("==========");
        println!("Round {}", i);
        //test_spsc(10);
        test_spsc(100);
        test_spsc(1000);
        test_spsc(10000);
    }
}

@thirstycrow
Copy link
Contributor Author

thirstycrow commented Jan 22, 2021

I added some log in ConnectedSender and ConnectedReceiver:

    pub fn try_send(&self, item: T) -> Result<(), T> {
        ...
        if self.state.buffer.consumer_disconnected() {
            return Err(GlommioError::Closed(ResourceType::Channel(item)));
        }
        match self.state.buffer.try_push(item) {
            None => {
                if let Some(fd) = self.state.buffer.must_notify() {
                    self.reactor.upgrade().unwrap().notify(fd);
                    println!("SEND: pushed and notified");
                } else {
                    println!("SEND: pushed without notifying");
                }
                Ok(())
            }
    fn wait_for_room(&self, cx: &mut Context<'_>) -> Poll<()> {
        match self.state.buffer.free_space() > 0 {
            true => Poll::Ready(()),
            false => {
                self.reactor
                    .upgrade()
                    .unwrap()
                    .add_shared_channel_waker(self.id, cx.waker().clone());
                println!(
                    "SEND: wait for room! free space: {}",
                    self.state.buffer.free_space()
                );
                Poll::Pending
            }
        }
    }
    fn recv_one(&self, cx: &mut Context<'_>) -> Poll<Option<T>> {
        match self.state.buffer.try_pop() {
            None if !self.state.buffer.producer_disconnected() => {
                self.reactor
                    .upgrade()
                    .unwrap()
                    .add_shared_channel_waker(self.id, cx.waker().clone());
                println!("RECV: wait for message! size: {}", self.state.buffer.size());
                Poll::Pending
            }
            res => {
                if let Some(fd) = self.state.buffer.must_notify() {
                    self.reactor.upgrade().unwrap().notify(fd);
                    println!("RECV: notify room");
                }
                Poll::Ready(res)
            }
        }
    }

Here is some last lines printed before the benchmark hangs:

RECV: notify room
RECV: wait for message! size: 0
SEND: pushed and notified
SEND: wait for room! free space: 0
RECV: wait for message! size: 0 <== The receiver is waiting
SEND: pushed without notifying  <== But the sender does not find an eventfd to notify. There was one before
SEND: pushed without notifying
SEND: wait for room! free space: 0
RECV: notify room               <== However the receiver is still scheduled later. Why?
RECV: notify room
RECV: wait for message! size: 0 <== The receiver is waiting again
SEND: pushed without notifying  <== The sender still did not notify
SEND: wait for room! free space: 0
                                <== No luck this time, the program hangs here

@thirstycrow
Copy link
Contributor Author

thirstycrow commented Jan 22, 2021

From the log, I found ret in must_notify changes from time to time. But it should always be the same, right? Since there is only one sender/receiver.

    fn must_notify(&self) -> Option<RawFd> {
        let eventfd = self.opposite_eventfd();
        let mem = eventfd.take();
        let ret = mem.as_ref().map(|x| x.load(Ordering::Acquire) as _);  <== this value changes from time to time
        eventfd.set(mem);
        println!("executor {}: {:?}", Local::id(), ret);
        match ret {
            None | Some(0) => None,
            Some(x) => Some(x),
        }
    }
RECV 13: notify room
RECV 13: wait for message! size: 0
executor 12: Some(10)
SEND 12: pushed and notified
SEND: wait for room! free space: 0
executor 13: Some(8)
RECV 13: notify room
RECV 13: wait for message! size: 0
executor 12: Some(10)
SEND 12: pushed and notified
SEND: wait for room! free space: 0
executor 13: Some(0) <==
executor 13: Some(8) <==
RECV 13: notify room
RECV 13: wait for message! size: 0
executor 12: Some(0) <==
SEND 12: pushed without notifying
executor 12: Some(10) <==
SEND 12: pushed and notified
SEND: wait for room! free space: 0
executor 13: Some(8)
RECV 13: notify room
RECV 13: wait for message! size: 0
executor 12: Some(0)
SEND 12: pushed without notifying
SEND: wait for room! free space: 0

@glommer
Copy link
Collaborator

glommer commented Jan 22, 2021

Right, so if you don't use spin_before_park, the hangs are expected. That is because the notifier system is what wakes up the reactor if it goes to sleep. Without it, the reactor may sleep, and nobody will notify it.

I have the sketch for a solution for this problem. Unfortunately it will require an API change for the connect function, which will now return a future: we'll have to wait for the other side to connect (or give up connecting), and then store an Arc for the peer's memory area in our side of the channel. That will mean that the notification memory is always valid even if the reactor has already exited.

Stay tuned, and thanks for verifying!

glommer pushed a commit to glommer/glommio that referenced this issue Jan 22, 2021
It will unfortunately not be possible to fix DataDog#261 without an API change.
This is because the channel needs to keep the remote end - or at least
part of its state - alive to avoid a data race.

We don't know when - or if - the remote end will connect. Therefore this
needs to be made into a future that can resolve when the connection happens
(or when the endpoint disconnects)

This will be a bit complex, so it pays to add the API change separately
to reduce the change.

Note that some of the examples as-is won't work anymore: that is because
they were connecting both ends in the same thread. One can make that work
by spawning tasks, but the whole point of shared channels is to connect
different executors so the examples are changed to reflect that.
glommer pushed a commit to glommer/glommio that referenced this issue Jan 22, 2021
Issue DataDog#261 exposed a serious weakness in how we're handling remote
notifications: the lifetime of the eventfd and the memory area for
notifications is disconnected from each other, plus the remote end
of the channel has no way of keeping that alive.

To pave the way for solving this problem, this patch introduces the
SleepNotifier. Wrapped around an Arc, it will ease the task of allowing
the peer keeping the channel alive. Plus the lifetimes of all entities
involved in the notification is kept together.
glommer pushed a commit to glommer/glommio that referenced this issue Jan 22, 2021
When an executor goes to sleep, it's write into a memory area that is
stored in the shared buffer to signal to the other side of the shared
channel that it needs to be notified and wake up.

This works well with long-lived executors for which the shutdown process
is well behaved so that the channels will be always empty. While that is
still a good practice (we are adding that to the docs!) we don't want to
force that or depend on that for correctness.

In this patch we create a global table that maps executor IDs to their
SleepNotifier. Because the SleepNotifier is wrapped in an Arc, a connected
channel can keep its peer alive by holding onto that Arc. Even if the reactor
dies, the memory will still be valid and so will the eventfd.

Fixes DataDog#261
glommer pushed a commit to glommer/glommio that referenced this issue Jan 22, 2021
It will unfortunately not be possible to fix DataDog#261 without an API change.
This is because the channel needs to keep the remote end - or at least
part of its state - alive to avoid a data race.

We don't know when - or if - the remote end will connect. Therefore this
needs to be made into a future that can resolve when the connection happens
(or when the endpoint disconnects)

This will be a bit complex, so it pays to add the API change separately
to reduce the change.

Note that some of the examples as-is won't work anymore: that is because
they were connecting both ends in the same thread. One can make that work
by spawning tasks, but the whole point of shared channels is to connect
different executors so the examples are changed to reflect that.
glommer pushed a commit to glommer/glommio that referenced this issue Jan 22, 2021
Issue DataDog#261 exposed a serious weakness in how we're handling remote
notifications: the lifetime of the eventfd and the memory area for
notifications is disconnected from each other, plus the remote end
of the channel has no way of keeping that alive.

To pave the way for solving this problem, this patch introduces the
SleepNotifier. Wrapped around an Arc, it will ease the task of allowing
the peer keeping the channel alive. Plus the lifetimes of all entities
involved in the notification is kept together.
glommer pushed a commit to glommer/glommio that referenced this issue Jan 22, 2021
When an executor goes to sleep, it's write into a memory area that is
stored in the shared buffer to signal to the other side of the shared
channel that it needs to be notified and wake up.

This works well with long-lived executors for which the shutdown process
is well behaved so that the channels will be always empty. While that is
still a good practice (we are adding that to the docs!) we don't want to
force that or depend on that for correctness.

In this patch we create a global table that maps executor IDs to their
SleepNotifier. Because the SleepNotifier is wrapped in an Arc, a connected
channel can keep its peer alive by holding onto that Arc. Even if the reactor
dies, the memory will still be valid and so will the eventfd.

Fixes DataDog#261
@glommer glommer mentioned this issue Jan 22, 2021
@glommer
Copy link
Collaborator

glommer commented Jan 22, 2021

I just sent a PR for this.
Notice that as I said, you now need to connect().await your channels instead of connect(). I really don't see another way
to fix this while maintaining the overall architecture sanity.

Because I haven't released the current version on crates yet (waiting for a liburing release, which is a semi-blocker), I haven't bumped the semver for this.

Should be merged soon, by it passes my local tests now.

@glommer
Copy link
Collaborator

glommer commented Jan 22, 2021

One comment about your benchmark, btw: Creating an destroying executor is very expensive! You are measuring that as well in the cost of your shared channel

glommer pushed a commit to glommer/glommio that referenced this issue Jan 22, 2021
It will unfortunately not be possible to fix DataDog#261 without an API change.
This is because the channel needs to keep the remote end - or at least
part of its state - alive to avoid a data race.

We don't know when - or if - the remote end will connect. Therefore this
needs to be made into a future that can resolve when the connection happens
(or when the endpoint disconnects)

This will be a bit complex, so it pays to add the API change separately
to reduce the change.

Note that some of the examples as-is won't work anymore: that is because
they were connecting both ends in the same thread. One can make that work
by spawning tasks, but the whole point of shared channels is to connect
different executors so the examples are changed to reflect that.
glommer pushed a commit to glommer/glommio that referenced this issue Jan 22, 2021
Issue DataDog#261 exposed a serious weakness in how we're handling remote
notifications: the lifetime of the eventfd and the memory area for
notifications is disconnected from each other, plus the remote end
of the channel has no way of keeping that alive.

To pave the way for solving this problem, this patch introduces the
SleepNotifier. Wrapped around an Arc, it will ease the task of allowing
the peer keeping the channel alive. Plus the lifetimes of all entities
involved in the notification is kept together.
glommer pushed a commit to glommer/glommio that referenced this issue Jan 22, 2021
When an executor goes to sleep, it's write into a memory area that is
stored in the shared buffer to signal to the other side of the shared
channel that it needs to be notified and wake up.

This works well with long-lived executors for which the shutdown process
is well behaved so that the channels will be always empty. While that is
still a good practice (we are adding that to the docs!) we don't want to
force that or depend on that for correctness.

In this patch we create a global table that maps executor IDs to their
SleepNotifier. Because the SleepNotifier is wrapped in an Arc, a connected
channel can keep its peer alive by holding onto that Arc. Even if the reactor
dies, the memory will still be valid and so will the eventfd.

Fixes DataDog#261
@thirstycrow
Copy link
Contributor Author

thirstycrow commented Jan 25, 2021

Is it still necessary to use spin_before_park with #263 ? There's still a chance to hang without it.

@glommer
Copy link
Collaborator

glommer commented Jan 25, 2021

no, spin_before_park should not be necessary. How often can you reproduce it ? Locally all my attempts passed.
I will consider adding debug code in that branch, then

@glommer
Copy link
Collaborator

glommer commented Jan 25, 2021

spin_before_park is an optimization: because sleeping is expensive, you may choose to spin a bit waiting for new work to arrive before declaring your reactor idle. It's a good practice to have it (although it will increase your CPU utilization), but should never be necessary (unless there is a bug, like in this case)

glommer pushed a commit to glommer/glommio that referenced this issue Jan 25, 2021
There are some problems in our current spsc implementation that are related to
memory order. Those problems are fixed (in particular a Relaxed load from the
real tail), but none of them are really the root cause of what we are seeing.

The real root cause is that because of the possibility of reordering at the CPU
level, it could be that we marked ourselves as going to sleep and checked if new
work has happened, but our peer did those things in the opposite order.

A barrier is needed, that we were missing. Barriers are needed both in the producer
and the receiver side, which can be quite expensive. To avoid that, we are using
a technique much like the one described at
https://www.scylladb.com/2018/02/15/memory-barriers-seastar-linux/

Fully Fixes DataDog#261
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants