Skip to content

Commit

Permalink
use box waker
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
  • Loading branch information
BusyJay committed Sep 22, 2022
1 parent 4c2bab5 commit d418f60
Showing 1 changed file with 44 additions and 21 deletions.
65 changes: 44 additions & 21 deletions components/tikv_util/src/mpsc/future.rs
Expand Up @@ -4,8 +4,9 @@

use std::{
pin::Pin,
sync::atomic::{self, AtomicUsize, Ordering},
task::{Context, Poll},
ptr,
sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering},
task::{Context, Poll, Waker},
};

use crossbeam::{
Expand All @@ -14,8 +15,6 @@ use crossbeam::{
};
use futures::{Stream, StreamExt};

use crate::future::FastWaker;

#[derive(Clone, Copy)]
pub enum WakePolicy {
Immediately,
Expand All @@ -24,21 +23,49 @@ pub enum WakePolicy {

struct Queue<T> {
queue: SegQueue<T>,
waker: FastWaker,
waker: AtomicPtr<Waker>,
liveness: AtomicUsize,
policy: WakePolicy,
}

impl<T> Queue<T> {
#[inline]
fn wake(&self, policy: WakePolicy) {
match policy {
WakePolicy::Immediately => self.waker.wake(),
WakePolicy::TillReach(n) => {
if self.queue.len() < n {
return;
}
self.waker.wake();
if let WakePolicy::TillReach(n) = policy {
if self.queue.len() < n {
return;
}
}
let ptr = self.waker.swap(ptr::null_mut(), Ordering::AcqRel);
unsafe {
if !ptr.is_null() {
Box::from_raw(ptr).wake();
}
}
}

// If there is already a waker, true is returned.
fn register_waker(&self, waker: &Waker) -> bool {
let w = Box::new(waker.clone());
let ptr = self.waker.swap(Box::into_raw(w), Ordering::AcqRel);
unsafe {
if ptr.is_null() {
false
} else {
drop(Box::from_raw(ptr));
true
}
}
}
}

impl<T> Drop for Queue<T> {
#[inline]
fn drop(&mut self) {
let ptr = self.waker.swap(ptr::null_mut(), Ordering::SeqCst);
unsafe {
if !ptr.is_null() {
drop(Box::from_raw(ptr));
}
}
}
Expand Down Expand Up @@ -91,7 +118,7 @@ impl<T> Drop for Sender<T> {
.fetch_sub(SENDER_COUNT_BASE, Ordering::Release);
if previous == SENDER_COUNT_BASE | RECEIVER_COUNT_BASE {
// The last sender is dropped, we need to wake up the receiver.
queue.waker.wake();
queue.wake(WakePolicy::Immediately);
} else if previous == SENDER_COUNT_BASE {
atomic::fence(Ordering::Acquire);
drop(unsafe { Box::from_raw(self.queue) });
Expand All @@ -115,17 +142,13 @@ impl<T: Send> Stream for Receiver<T> {
if let Some(t) = queue.queue.pop() {
return Poll::Ready(Some(t));
}
if unsafe { !queue.waker.register(cx.waker()) } {
// If there is no previous waker, we still need to poll again in case some
// task is pushed before registering current waker.
if !queue.register_waker(cx.waker()) {
// In case the message is pushed right before registering waker.
if let Some(t) = queue.queue.pop() {
return Poll::Ready(Some(t));
}
// It may be just a stale wake. So register it again.
if unsafe { !queue.waker.register(cx.waker()) } {
// It must be some as stale wake is cleared at the first register.
let t = queue.queue.pop().unwrap();
return Poll::Ready(Some(t));
}
}
if queue.liveness.load(Ordering::Acquire) & !RECEIVER_COUNT_BASE != 0 {
return Poll::Pending;
Expand Down Expand Up @@ -168,7 +191,7 @@ unsafe impl<T: Send> Send for Receiver<T> {}
pub fn unbounded<T>(policy: WakePolicy) -> (Sender<T>, Receiver<T>) {
let queue = Box::into_raw(Box::new(Queue {
queue: SegQueue::new(),
waker: FastWaker::default(),
waker: AtomicPtr::default(),
liveness: AtomicUsize::new(SENDER_COUNT_BASE | RECEIVER_COUNT_BASE),
policy,
}));
Expand Down

0 comments on commit d418f60

Please sign in to comment.