Skip to content

Commit

Permalink
Refine WaitQueue with fast path to return early
Browse files Browse the repository at this point in the history
  • Loading branch information
liqinggd authored and tatetian committed May 11, 2024
1 parent 035e12a commit 81cca42
Showing 1 changed file with 34 additions and 5 deletions.
39 changes: 34 additions & 5 deletions framework/aster-frame/src/sync/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use alloc::{collections::VecDeque, sync::Arc};
use core::{
sync::atomic::{AtomicBool, Ordering},
sync::atomic::{AtomicBool, AtomicU32, Ordering},
time::Duration,
};

Expand All @@ -19,12 +19,15 @@ use crate::{
/// Other threads may invoke the `wake`-family methods of a wait queue to
/// wake up one or many waiter threads.
pub struct WaitQueue {
// A copy of `wakers.len()`, used for the lock-free fast path in `wake_one` and `wake_all`.
num_wakers: AtomicU32,
wakers: SpinLock<VecDeque<Arc<Waker>>>,
}

impl WaitQueue {
pub const fn new() -> Self {
WaitQueue {
num_wakers: AtomicU32::new(0),
wakers: SpinLock::new(VecDeque::new()),
}
}
Expand Down Expand Up @@ -111,8 +114,20 @@ impl WaitQueue {

/// Wake up one waiting thread.
pub fn wake_one(&self) {
while let Some(waker) = self.wakers.lock_irq_disabled().pop_front() {
// Fast path
if self.is_empty() {
return;
}

loop {
let mut wakers = self.wakers.lock_irq_disabled();
let Some(waker) = wakers.pop_front() else {
break;
};
self.num_wakers.fetch_sub(1, Ordering::Release);
// Avoid holding lock when calling `wake_up`
drop(wakers);

if waker.wake_up() {
return;
}
Expand All @@ -121,18 +136,32 @@ impl WaitQueue {

/// Wake up all waiting threads.
pub fn wake_all(&self) {
while let Some(waker) = self.wakers.lock_irq_disabled().pop_front() {
// Fast path
if self.is_empty() {
return;
}

loop {
let mut wakers = self.wakers.lock_irq_disabled();
let Some(waker) = wakers.pop_front() else {
break;
};
self.num_wakers.fetch_sub(1, Ordering::Release);
// Avoid holding lock when calling `wake_up`
drop(wakers);

waker.wake_up();
}
}

pub fn is_empty(&self) -> bool {
self.wakers.lock_irq_disabled().is_empty()
self.num_wakers.load(Ordering::Acquire) == 0
}

fn enqueue(&self, waker: Arc<Waker>) {
self.wakers.lock_irq_disabled().push_back(waker);
let mut wakers = self.wakers.lock_irq_disabled();
wakers.push_back(waker);
self.num_wakers.fetch_add(1, Ordering::Release);
}
}

Expand Down

0 comments on commit 81cca42

Please sign in to comment.