Skip to content
Permalink
Browse files

simplifed FutWait to ignore spins_first

and go to parking_lot directly
clippy
  • Loading branch information...
abbychau committed May 20, 2019
1 parent 103bf7a commit f311c7c02c392a656df85d662f8b3c1048536457
Showing with 62 additions and 52 deletions.
  1. +1 −1 src/broadcast.rs
  2. +1 −1 src/memory.rs
  3. +3 −3 src/mpmc.rs
  4. +48 −39 src/multiqueue.rs
  5. +9 −8 src/wait.rs
@@ -607,7 +607,7 @@ impl<T: Clone + Sync> BroadcastFutReceiver<T> {
Ok(sreceiver) => Ok(BroadcastFutUniReceiver {
receiver: sreceiver,
}),
Err((o, receiver)) => Err((o, BroadcastFutReceiver { receiver: receiver })),
Err((o, receiver)) => Err((o, BroadcastFutReceiver { receiver })),
}
}
}
@@ -34,7 +34,7 @@ pub struct MemoryManager {
impl ToFree {
pub fn new<T>(val: *mut T, num: usize) -> ToFree {
unsafe fn do_free<F>(pt: *mut u8, num: usize) {
let to_free: *mut F = mem::transmute(pt as *mut F);
let to_free: *mut F = pt as *mut F;
for i in 0..num as isize {
ptr::read(to_free.offset(i));
}
@@ -401,7 +401,7 @@ impl<T> MPMCUniReceiver<T> {
/// }
/// ```
pub fn iter_with<R, F: FnMut(&T) -> R>(self, op: F) -> MPMCUniIter<R, F, T> {
MPMCUniIter { recv: self, op: op }
MPMCUniIter { recv: self, op }
}

/// Returns a non-owning iterator that iterates over the queue
@@ -423,7 +423,7 @@ impl<T> MPMCUniReceiver<T> {
/// }
/// ```
pub fn try_iter_with<'a, R, F: FnMut(&T) -> R>(&'a self, op: F) -> MPMCUniRefIter<'a, R, F, T> {
MPMCUniRefIter { recv: self, op: op }
MPMCUniRefIter { recv: self, op }
}
}

@@ -469,7 +469,7 @@ impl<T> MPMCFutReceiver<T> {
Ok(sreceiver) => Ok(MPMCFutUniReceiver {
receiver: sreceiver,
}),
Err((o, receiver)) => Err((o, MPMCFutReceiver { receiver: receiver })),
Err((o, receiver)) => Err((o, MPMCFutReceiver { receiver })),
}
}
}
@@ -12,7 +12,9 @@ use std::thread::yield_now;

use crate::alloc;
use crate::atomicsignal::LoadedSignal;
use crate::countedindex::{get_valid_wrap, is_tagged, rm_tag, CountedIndex, Index, INITIAL_QUEUE_FLAG};
use crate::countedindex::{
get_valid_wrap, is_tagged, rm_tag, CountedIndex, Index, INITIAL_QUEUE_FLAG,
};
use crate::memory::{MemToken, MemoryManager};
use crate::wait::*;

@@ -202,7 +204,7 @@ pub struct FutInnerUniRecv<RW: QueueRW<T>, R, F: FnMut(&T) -> R, T> {
}

struct FutWait {
spins_first: usize,
// spins_first: usize,
spins_yield: usize,
parked: parking_lot::Mutex<VecDeque<Task>>,
}
@@ -221,8 +223,8 @@ impl<RW: QueueRW<T>, T> MultiQueue<RW, T> {

fn new_internal(_capacity: Index, wait: Arc<Wait>) -> (InnerSend<RW, T>, InnerRecv<RW, T>) {
let capacity = get_valid_wrap(_capacity);
let queuedat : *mut QueueEntry<T> = alloc::allocate(capacity as usize);
let refdat : *mut RefCnt = alloc::allocate(capacity as usize);
let queuedat: *mut QueueEntry<T> = alloc::allocate(capacity as usize);
let refdat: *mut RefCnt = alloc::allocate(capacity as usize);
unsafe {
for i in 0..capacity as isize {
let elem: &QueueEntry<T> = &*queuedat.offset(i);
@@ -248,7 +250,7 @@ impl<RW: QueueRW<T>, T> MultiQueue<RW, T> {
refs: refdat,
capacity: capacity as isize,
waiter: wait,
needs_notify: needs_notify,
needs_notify,
mk: PhantomData,
d3: unsafe { mem::uninitialized() },

@@ -267,7 +269,7 @@ impl<RW: QueueRW<T>, T> MultiQueue<RW, T> {

let mreader = InnerRecv {
queue: qarc.clone(),
reader: reader,
reader,
token: qarc.manager.get_token(),
alive: true,
};
@@ -659,7 +661,7 @@ impl<RW: QueueRW<T>, T> FutInnerRecv<RW, T> {
reader: new_mreader,
wait: new_wait,
prod_wait: new_pwait,
op: op,
op,
})
} else {
Err((
@@ -710,7 +712,7 @@ impl<RW: QueueRW<T>, R, F: FnMut(&T) -> R, T> FutInnerUniRecv<RW, R, F, T> {
reader: rx,
wait: self.wait.clone(),
prod_wait: self.prod_wait.clone(),
op: op,
op,
}
}

@@ -815,36 +817,43 @@ impl<RW: QueueRW<T>, R, F: for<'r> FnMut(&T) -> R, T> Stream for FutInnerUniRecv

impl FutWait {
pub fn new() -> FutWait {
FutWait::with_spins(DEFAULT_TRY_SPINS, DEFAULT_YIELD_SPINS)
}

pub fn with_spins(spins_first: usize, spins_yield: usize) -> FutWait {
//FutWait::with_spins(DEFAULT_TRY_SPINS, DEFAULT_YIELD_SPINS)
FutWait {
spins_first: spins_first,
spins_yield,
// spins_first: spins_first,
spins_yield: DEFAULT_YIELD_SPINS,
parked: parking_lot::Mutex::new(VecDeque::new()),
}
}

pub fn fut_wait(&self, seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
self.spin(seq, at, wc) && self.park(seq, at, wc)
}

pub fn spin(&self, seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
for _ in 0..self.spins_first {
if check(seq, at, wc) {
return false;
}
}
// pub fn with_spins(spins_first: usize, spins_yield: usize) -> FutWait {
// FutWait {
// // spins_first: spins_first,
// // spins_yield,
// parked: parking_lot::Mutex::new(VecDeque::new()),
// }
// }

for _ in 0..self.spins_yield {
yield_now();
if check(seq, at, wc) {
return false;
}
}
true
}
pub fn fut_wait(&self, seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
//self.spin(seq, at, wc) &&
self.park(seq, at, wc)
}

// #[allow(dead_code)]
// pub fn spin(&self, seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
// for _ in 0..self.spins_first {
// if check(seq, at, wc) {
// return false;
// }
// }

// for _ in 0..self.spins_yield {
// yield_now();
// if check(seq, at, wc) {
// return false;
// }
// }
// true
// }

pub fn park(&self, seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
let mut parked = self.parked.lock();
@@ -860,12 +869,12 @@ impl FutWait {
f: F,
mut val: T,
) -> Result<(), TrySendError<T>> {
for _ in 0..self.spins_first {
match f(val) {
Err(TrySendError::Full(v)) => val = v,
v => return v,
}
}
// for _ in 0..self.spins_first {
// match f(val) {
// Err(TrySendError::Full(v)) => val = v,
// v => return v,
// }
// }

for _ in 0..self.spins_yield {
yield_now();
@@ -971,7 +980,7 @@ impl<RW: QueueRW<T>, T> Clone for FutInnerRecv<RW, T> {

impl Clone for FutWait {
fn clone(&self) -> FutWait {
FutWait::with_spins(self.spins_first, self.spins_yield)
FutWait::new()
}
}

@@ -31,14 +31,15 @@ pub fn load_tagless(val: &AtomicUsize) -> usize {
#[inline(always)]
pub fn check(seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
let cur_count = load_tagless(at);

if wc.load(Relaxed) == 0 || seq == cur_count || past(seq, cur_count).1 {
true
} else {
use std::{thread, time};
thread::sleep(time::Duration::from_millis(DEFAULT_CHECK_DELAY));
false
}
wc.load(Relaxed) == 0 || seq == cur_count || past(seq, cur_count).1

// if wc.load(Relaxed) == 0 || seq == cur_count || past(seq, cur_count).1 {
// true
// } else {
// use std::{thread, time};
// thread::sleep(time::Duration::from_millis(DEFAULT_CHECK_DELAY));
// false
// }
}

/// This is the trait that something implements to allow receivers

0 comments on commit f311c7c

Please sign in to comment.
You can’t perform that action at this time.