Skip to content

Commit

Permalink
Add CpuPool support.
Browse files Browse the repository at this point in the history
  • Loading branch information
withoutboats committed Jun 28, 2017
1 parent 6d22bee commit 953e6b3
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 5 deletions.
4 changes: 3 additions & 1 deletion src/rigging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ version = "0.1.0"
anymap = "0.12.1"
backtrace = "0.3.0"
futures = "0.1.13"
futures-cpupool = "0.1.5"
num_cpus = "1.5.1"
route-recognizer = "0.1.12"
serde = "1.0.0"
tokio-core = "0.1.6"
url = "1.4.0"

[dependencies.c3po]
git = "https://github.com/withoutboats/c3po"
branch = "singlethreaded"
git = "https://github.com/withoutboats/c3po"

[dependencies.hyper]
branch = "new-new-service"
Expand Down
28 changes: 24 additions & 4 deletions src/rigging/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ use std::io;
use std::cell::RefCell;
use std::rc::Rc;

use Error;

use anymap::AnyMap;
use core::reactor::Handle;
use c3po::{ConnFuture, Conn, Pool, Config};
use futures::{Future, future, Stream, stream};
use futures::{IntoFuture, Future, future, Stream, stream};
use futures_cpupool::{CpuPool, CpuFuture};
use num_cpus;
use tokio::NewService;

use Error;
use http::StatusCode;
use connections::{Client, Configure, ConnectClient};

Expand All @@ -23,6 +24,7 @@ use connections::{Client, Configure, ConnectClient};
/// * It allows you to cache values in a type map to share them from the
/// format/middleware/resource.
pub struct Environment {
cpu_pool: Rc<CpuPool>,
pools: Rc<AnyMap>,
store: Rc<RefCell<AnyMap>>,
}
Expand Down Expand Up @@ -55,7 +57,22 @@ impl Environment {
"Connection not registered.")))
}
}


/// Spawn a function to run on a thread pool.
///
/// By default, all methods run on the IO thread. If a method performs slow
/// computation or blocking IO, yoiu can choose to spawn it into a thread
/// pool.
pub fn spawn<T, R, F>(&self, f: F) -> CpuFuture<T, Error>
where
T: Send + 'static,
F: FnOnce() -> R + Send + 'static,
R: IntoFuture<Item = T, Error = Error> + Send + 'static,
R::Future: Send + 'static,
{
self.cpu_pool.spawn_fn(f)
}

#[doc(hidden)]
pub fn conn_for<C: NewService>(&self, name: &'static str) -> ConnFuture<Conn<C>, Error> {
if let Some(pools) = self.pools.get::<Vec<(&'static str, Pool<C>)>>() {
Expand Down Expand Up @@ -95,12 +112,14 @@ impl Environment {
#[derive(Clone)]
pub struct PreparedEnv {
pools: Rc<AnyMap>,
cpu_pool: Rc<CpuPool>,
}

impl PreparedEnv {
pub fn new(&self) -> Environment {
Environment {
pools: self.pools.clone(),
cpu_pool: self.cpu_pool.clone(),
store: Rc::new(RefCell::new(AnyMap::new())),
}
}
Expand Down Expand Up @@ -144,6 +163,7 @@ impl EnvBuilder {
pub fn build(self) -> PreparedEnv {
PreparedEnv {
pools: Rc::new(Rc::try_unwrap(self.pools).unwrap().into_inner()),
cpu_pool: Rc::new(CpuPool::new(num_cpus::get() * 4)),
}
}
}
2 changes: 2 additions & 0 deletions src/rigging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
extern crate anymap;
extern crate backtrace;
extern crate futures;
extern crate futures_cpupool;
extern crate hyper;
extern crate tokio_service as tokio;
extern crate tokio_core as core;
extern crate c3po;
extern crate num_cpus;
extern crate route_recognizer as recognizer;
extern crate serde;
extern crate tokio_redis as redis;
Expand Down

0 comments on commit 953e6b3

Please sign in to comment.