Skip to content

Commit

Permalink
WIP: Fix test_waker_threaded
Browse files Browse the repository at this point in the history
This fixes a few race conditions in `test_waker_threaded()` and adds a
timeout to ensure that it doesn’t hang forever if something goes wrong.

Fixes: cloudhead#19 — test_waker_threaded fails or hangs in Rust 1.70
  • Loading branch information
danielparks committed Jul 8, 2023
1 parent 77d3926 commit 2363ef0
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 22 deletions.
1 change: 0 additions & 1 deletion rust-toolchain

This file was deleted.

69 changes: 48 additions & 21 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ impl Waker {
match (&self.writer).write_all(&[0x1]) {
Ok(_) => Ok(()),
Err(e) if e.kind() == WouldBlock => {
// Could this cause a race condition?
Waker::reset(self.reader.as_raw_fd())?;
self.wake()
}
Expand Down Expand Up @@ -978,39 +979,65 @@ mod tests {
let mut wakes = 0;
let mut received = 0;

while !handle.is_finished() {
loop {
events.clear();

let count = sources.poll(&mut events, Timeout::Never).unwrap();
if count > 0 {
let event = events.pop().unwrap();
assert_eq!(event.key, "waker");
assert!(events.is_empty());
match sources.poll(&mut events, Timeout::from_millis(1)) {
Ok(0) => {
panic!("Got Ok(0) from sources.poll()");
}
Ok(_) => {
let event = events.pop().unwrap();
assert_eq!(event.key, "waker");
assert!(events.is_empty());

// * If the waker has already been dropped in the other
// thread, we'll get a "bad file descriptor" error.
// * If the waker hasn't been dropped, we'll get `Ok(())`.
if let Err(e) = Waker::reset(event.source) {
// Currently a "bad file descriptor" error uses
// ErrorKind::Uncategorized, which is undocumented and
// isn’t supposed to be matched against.
assert_eq!(Some(9), e.raw_os_error());
}

// There's always a message on the channel if we got woken up.
rx.recv().unwrap();
received += 1;
// It is possible to get 0 or more messages on the channel
// after waking, since the last time we woke we might have
// gotten the the message that we just received the wake
// for. Or, we might get multiple messages if the sender is
// fast enough.
while rx.try_recv().is_ok() {
received += 1;
}

// We may get additional messages on the channel, if the sending is
// faster than the waking.
while rx.try_recv().is_ok() {
received += 1;
wakes += 1;
}
Err(e) if e.kind() == io::ErrorKind::TimedOut => {
// The other thread has probably finished and dropped the
// waker, so there was nothing left to wake the poll() call.
if handle.is_finished() {
break;
}
// Nope, still going, try again.

if received == iterations {
// Error: "bad file descriptor", as the waker handle gets
// dropped by the other thread.
Waker::reset(event.source).unwrap_err();
break;
// FIXME? limit this somehow? Maybe the thread limits it
// enough already?
}
Err(e) => {
panic!("Error {e:?} in sources.poll()");
}

Waker::reset(event.source).ok(); // We might get the "bad file descriptor" error here.
wakes += 1;
}
}
eprintln!("out of loop");
handle.join().unwrap();
eprintln!("joined");
std::mem::drop(rx);
eprintln!("dropped rx");
std::mem::drop(sources);
eprintln!("dropped sources");

assert_eq!(received, iterations);
assert!(wakes <= received);
println!("done");
}
}

0 comments on commit 2363ef0

Please sign in to comment.