Skip to content

Commit

Permalink
Revert "Just use Sender/Receiver for now in CpuPool"
Browse files Browse the repository at this point in the history
This reverts commit 1e5553a.
  • Loading branch information
alexcrichton committed Aug 7, 2016
1 parent 6614bf1 commit ef16d5d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 17 deletions.
1 change: 1 addition & 0 deletions futures-cpupool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ computation on the threads themselves.

[dependencies]
futures = { path = "..", version = "0.1" }
crossbeam = "0.2"
num_cpus = "1.0"
27 changes: 10 additions & 17 deletions futures-cpupool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,17 @@

#![deny(missing_docs)]

extern crate crossbeam;
extern crate futures;
extern crate num_cpus;

use std::any::Any;
use std::panic::{self, AssertUnwindSafe};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Sender, Receiver};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

use crossbeam::sync::MsQueue;
use futures::{Future, promise, Promise, Task, Poll};

/// A thread pool intended to run CPU intensive work.
Expand All @@ -72,12 +73,10 @@ use futures::{Future, promise, Promise, Task, Poll};
/// the underlying thread pool.
pub struct CpuPool {
inner: Arc<Inner>,
tx: Sender<Message>,
}

struct Inner {
// TODO: use a lock free mpmc queue
queue: Mutex<Receiver<Message>>,
queue: MsQueue<Message>,
cnt: AtomicUsize,
size: u32,
}
Expand Down Expand Up @@ -112,11 +111,9 @@ impl CpuPool {
/// and clones can be made of it to get multiple references to the same
/// thread pool.
pub fn new(size: u32) -> CpuPool {
let (tx, rx) = channel();
let pool = CpuPool {
tx: tx,
inner: Arc::new(Inner {
queue: Mutex::new(rx),
queue: MsQueue::new(),
cnt: AtomicUsize::new(1),
size: size,
}),
Expand Down Expand Up @@ -148,9 +145,9 @@ impl CpuPool {
R: Send + 'static,
{
let (tx, rx) = promise();
self.tx.send(Message::Run(Box::new(|| {
self.inner.queue.push(Message::Run(Box::new(|| {
tx.complete(panic::catch_unwind(AssertUnwindSafe(f)));
}))).unwrap();
})));
CpuFuture { inner: rx }
}

Expand All @@ -161,8 +158,7 @@ impl CpuPool {
// don't have a catch_unwind per unit of work.
let res = panic::catch_unwind(AssertUnwindSafe(|| {
while !done {
let msg = self.inner.queue.lock().unwrap().recv().unwrap();
match msg {
match self.inner.queue.pop() {
Message::Close => done = true,
Message::Run(r) => r.call_box(),
}
Expand All @@ -179,10 +175,7 @@ impl CpuPool {
impl Clone for CpuPool {
fn clone(&self) -> CpuPool {
self.inner.cnt.fetch_add(1, Ordering::Relaxed);
CpuPool {
inner: self.inner.clone(),
tx: self.tx.clone(),
}
CpuPool { inner: self.inner.clone() }
}
}

Expand All @@ -192,7 +185,7 @@ impl Drop for CpuPool {
return
}
for _ in 0..self.inner.size {
self.tx.send(Message::Close).unwrap();
self.inner.queue.push(Message::Close);
}
}
}
Expand Down

0 comments on commit ef16d5d

Please sign in to comment.