Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync wrapper for BlockingPermitFuture #1

Merged
merged 25 commits into from Dec 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bf11c40
clarify Sync
dekellum Dec 11, 2019
d35588b
add Sync wrapper for BlockingPermitFuture
dekellum Dec 14, 2019
c381e5d
improve Sync wrapper poll impl
dekellum Dec 15, 2019
b51355e
cleanup sync asserts
dekellum Dec 15, 2019
82696ed
lock updates
dekellum Dec 19, 2019
d7d7759
Add ctors to (Sync)BlockingPermitFuture types
dekellum Dec 19, 2019
ae6b0f3
BlockingPermit::run always available
dekellum Dec 19, 2019
20d4c18
cosmetic
dekellum Dec 19, 2019
3442cd0
re-integrate with new tokio::sync::Semaphore
dekellum Dec 20, 2019
ddf2dd4
Semphorish for common ctor
dekellum Dec 20, 2019
53c00cd
dep cleanup
dekellum Dec 20, 2019
4c8f6e0
test more feature permutations
dekellum Dec 20, 2019
4ec3d5f
fix travis.yml
dekellum Dec 20, 2019
ae298db
CI with RUSTFLAGS=-Dwarnings
dekellum Dec 20, 2019
8f4ff47
tokio::sync::Semaphore is Sync if asked correctly
dekellum Dec 20, 2019
2006a84
typo
dekellum Dec 20, 2019
31aba40
Assert sync-ness of tokio variant at compile time
dekellum Dec 20, 2019
36cca06
One impl of blocking_permit_future is sufficient
dekellum Dec 20, 2019
90a8423
always use BlockingPermit::run in macro
dekellum Dec 20, 2019
fb8f453
PinBox for safer tokio semaphore
dekellum Dec 20, 2019
8983490
safety notes for intrusive
dekellum Dec 20, 2019
70786af
fix up benches, avoid deadlocking
dekellum Dec 20, 2019
eea322a
permit.make_sync() for more fair in_place benchmarks
dekellum Dec 20, 2019
182d35a
feature cleanup, add tokio-omnibus
dekellum Dec 20, 2019
48390f8
use tokio-omnibus as CI shorthand
dekellum Dec 20, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions .travis.yml
Expand Up @@ -6,6 +6,11 @@ script:
- 'cargo test'
- 'cargo test --features current-thread'
- 'cargo test --no-default-features --features tokio-oneshot'
- 'cargo test --no-default-features --features tokio-threaded,tokio-oneshot'
- 'cargo test --no-default-features --features tokio-semaphore,tokio-oneshot'
- 'cargo test --no-default-features --features tokio-omnibus'
- 'cargo test --features futures-intrusive'
- 'cargo test --features tokio-threaded,futures-intrusive'
- 'cargo test --all-features'
- 'if [ "$TRAVIS_RUST_VERSION" == "nightly" ]; then cargo build --all-features --all-targets; fi'

Expand All @@ -17,6 +22,8 @@ before_cache:
- rm -rf /home/travis/.cargo/registry

env: # important!
global: RUSTFLAGS=-Dwarnings

matrix:
include:
- rust: 1.39.0
Expand Down
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 20 additions & 11 deletions Cargo.toml
Expand Up @@ -16,29 +16,38 @@ build = "build.rs"
[dependencies]
futures-core = { version=">=0.3.1, <0.4" }
futures-channel = { version=">=0.3.1, <0.4", optional=true }
lazy_static = { version=">= 1.3.0, <1.5" }
log = { version=">=0.4.4, <0.5" }
crossbeam-channel = { version=">=0.4.0, <0.5" }
num_cpus = { version=">=1.11.1, <1.12" }
futures-intrusive = { version=">=0.2.2, <0.3" }
futures-intrusive = { version=">=0.2.2, <0.3", optional=true }
tokio = { version=">=0.2.2, <0.3", optional=true }

[dev-dependencies]
env_logger = { version=">=0.7.1, <0.8", default-features=false }
tempfile = { version=">=3.1.0, <3.2" }
rand = { version=">=0.7.0, <0.8" }
futures-executor = { version=">=0.3.1, <0.4" }
futures-util = { version=">=0.3.1, <0.4" }
lazy_static = { version=">=1.3.0, <1.5" }
rand = { version=">=0.7.0, <0.8" }
tempfile = { version=">=3.1.0, <3.2" }

[features]
# Need some oneshot channel for dispatch_rx, default to futures
# Use default-features=false in combination with tokio-oneshot, if desired.
default=["futures-channel"]
tokio-oneshot=[
"tokio", "tokio/sync"
]
tokio-threaded=[
"tokio", "tokio/rt-core", "tokio/rt-threaded", "tokio/blocking"
]
current-thread=[] # testing hack

# All available tokio integrations
tokio-omnibus=["tokio-oneshot", "tokio-threaded", "tokio-semaphore"]

# Instead of (default) futures-channel oneshot
tokio-oneshot=["tokio", "tokio/sync"]

# Tokio threaded runtime, including blocking support
tokio-threaded=["tokio", "tokio/rt-core", "tokio/rt-threaded", "tokio/blocking"]

# Instead of futures-intrusive semaphore. (Both are optional)
tokio-semaphore=["tokio", "tokio/sync"]

current-thread=[] # testing hack only

[profile.bench]
lto = "thin"
Expand Down
61 changes: 38 additions & 23 deletions benches/dispatch.rs
@@ -1,3 +1,4 @@
#![cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
#![warn(rust_2018_idioms)]

#![feature(test)]
Expand All @@ -19,11 +20,13 @@ use blocking_permit::{
blocking_permit_future,
dispatch_rx, DispatchPool,
deregister_dispatch_pool, register_dispatch_pool,
Semaphore
Semaphore,
Semaphorish,
};

lazy_static! {
static ref TEST_SET: Semaphore = Semaphore::new(true, 20);
static ref DEFAULT_SET: Semaphore = Semaphore::default_new(4);
static ref SLEEP_SET: Semaphore = Semaphore::default_new(20);
}

#[cfg(feature="tokio-threaded")]
Expand All @@ -34,7 +37,8 @@ fn noop_threaded_dispatch_rx(b: &mut Bencher) {
.create();

let mut rt = tokio::runtime::Builder::new()
.num_threads(4)
.core_threads(4)
.max_threads(4)
.threaded_scheduler()
.on_thread_start(move || {
register_dispatch_pool(pool.clone());
Expand All @@ -48,7 +52,7 @@ fn noop_threaded_dispatch_rx(b: &mut Bencher) {
b.iter(|| {
let futures: FuturesUnordered<_> = (0..100).map(|_| {
rt.spawn(async {
let p = blocking_permit_future(&TEST_SET)
let p = blocking_permit_future(&DEFAULT_SET)
.await
.unwrap();
p.enter();
Expand All @@ -70,15 +74,16 @@ fn noop_threaded_dispatch_rx(b: &mut Bencher) {
#[bench]
fn noop_threaded_spawn_blocking(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.num_threads(4)
.core_threads(4)
.max_threads(4+4+1)
.threaded_scheduler()
.build()
.unwrap();

b.iter(|| {
let futures: FuturesUnordered<_> = (0..100).map(|_| {
rt.spawn(async {
let p = blocking_permit_future(&TEST_SET)
let p = blocking_permit_future(&DEFAULT_SET)
.await
.unwrap();
p.enter();
Expand All @@ -100,15 +105,17 @@ fn noop_threaded_spawn_blocking(b: &mut Bencher) {
#[bench]
fn noop_threaded_in_place(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.num_threads(4)
.core_threads(4)
.max_threads(4+4+1)
.threaded_scheduler()
.build()
.unwrap();

b.iter(|| {
let futures: FuturesUnordered<_> = (0..100).map(|_| {
rt.spawn(async {
let p = blocking_permit_future(&TEST_SET)
let p = blocking_permit_future(&DEFAULT_SET)
.make_sync()
.await
.unwrap();
let r = p.run(|| 41);
Expand All @@ -134,7 +141,7 @@ fn noop_local_dispatch_rx(b: &mut Bencher) {
let sp = pool.spawner();
for _ in 0..100 {
sp.spawn(async {
let p = blocking_permit_future(&TEST_SET)
let p = blocking_permit_future(&DEFAULT_SET)
.await
.unwrap();
p.enter();
Expand All @@ -157,7 +164,8 @@ fn r_expensive_threaded_dispatch_rx(b: &mut Bencher) {
.create();

let mut rt = tokio::runtime::Builder::new()
.num_threads(4)
.core_threads(4)
.max_threads(4)
.threaded_scheduler()
.on_thread_start(move || {
register_dispatch_pool(pool.clone());
Expand All @@ -171,7 +179,7 @@ fn r_expensive_threaded_dispatch_rx(b: &mut Bencher) {
b.iter(|| {
let futures: FuturesUnordered<_> = (0..100).map(|_| {
rt.spawn(async {
let p = blocking_permit_future(&TEST_SET)
let p = blocking_permit_future(&DEFAULT_SET)
.await
.unwrap();
p.enter();
Expand All @@ -198,15 +206,16 @@ fn r_expensive_threaded_dispatch_rx(b: &mut Bencher) {
#[bench]
fn r_expensive_threaded_spawn_blocking(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.num_threads(4)
.core_threads(4)
.max_threads(4+4+1)
.threaded_scheduler()
.build()
.unwrap();

b.iter(|| {
let futures: FuturesUnordered<_> = (0..100).map(|_| {
rt.spawn(async {
let p = blocking_permit_future(&TEST_SET)
let p = blocking_permit_future(&DEFAULT_SET)
.await
.unwrap();
p.enter();
Expand All @@ -228,15 +237,17 @@ fn r_expensive_threaded_spawn_blocking(b: &mut Bencher) {
#[bench]
fn r_expensive_threaded_in_place(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.num_threads(4)
.core_threads(4)
.max_threads(4+4+1)
.threaded_scheduler()
.build()
.unwrap();

b.iter(|| {
let futures: FuturesUnordered<_> = (0..100).map(|_| {
rt.spawn(async {
let p = blocking_permit_future(&TEST_SET)
let p = blocking_permit_future(&DEFAULT_SET)
.make_sync()
.await
.unwrap();
let r = p.run(|| expensive_comp());
Expand All @@ -262,7 +273,7 @@ fn r_expensive_local_dispatch_rx(b: &mut Bencher) {
let sp = pool.spawner();
for _ in 0..100 {
sp.spawn(async {
let p = blocking_permit_future(&TEST_SET)
let p = blocking_permit_future(&DEFAULT_SET)
.await
.unwrap();
p.enter();
Expand All @@ -285,7 +296,8 @@ fn sleep_threaded_dispatch_rx(b: &mut Bencher) {
.create();

let mut rt = tokio::runtime::Builder::new()
.num_threads(4)
.core_threads(4)
.max_threads(4)
.threaded_scheduler()
.on_thread_start(move || {
register_dispatch_pool(pool.clone());
Expand All @@ -299,7 +311,7 @@ fn sleep_threaded_dispatch_rx(b: &mut Bencher) {
b.iter(|| {
let futures: FuturesUnordered<_> = (0..100).map(|_| {
rt.spawn(async {
let p = blocking_permit_future(&TEST_SET)
let p = blocking_permit_future(&SLEEP_SET)
.await
.unwrap();
p.enter();
Expand All @@ -321,15 +333,16 @@ fn sleep_threaded_dispatch_rx(b: &mut Bencher) {
#[bench]
fn sleep_threaded_spawn_blocking(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.num_threads(4)
.core_threads(4)
.max_threads(4+20+1)
.threaded_scheduler()
.build()
.unwrap();

b.iter(|| {
let futures: FuturesUnordered<_> = (0..100).map(|_| {
rt.spawn(async {
let p = blocking_permit_future(&TEST_SET)
let p = blocking_permit_future(&SLEEP_SET)
.await
.unwrap();
p.enter();
Expand All @@ -351,15 +364,17 @@ fn sleep_threaded_spawn_blocking(b: &mut Bencher) {
#[bench]
fn sleep_threaded_in_place(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.num_threads(20)
.core_threads(4)
.max_threads(4+20+1)
.threaded_scheduler()
.build()
.unwrap();

b.iter(|| {
let futures: FuturesUnordered<_> = (0..100).map(|_| {
rt.spawn(async {
let p = blocking_permit_future(&TEST_SET)
let p = blocking_permit_future(&SLEEP_SET)
.make_sync()
.await
.unwrap();
let r = p.run(|| random_sleep());
Expand All @@ -385,7 +400,7 @@ fn sleep_local_dispatch_rx(b: &mut Bencher) {
let sp = pool.spawner();
for _ in 0..100 {
sp.spawn(async {
let p = blocking_permit_future(&TEST_SET)
let p = blocking_permit_future(&SLEEP_SET)
.await
.unwrap();
p.enter();
Expand Down
10 changes: 6 additions & 4 deletions src/fs.rs
@@ -1,15 +1,16 @@
// TODO: This is currently just a subset of potential usage from tokio_fs
// TODO: This is currently just a subset of potential usage from tokio_fs,
// retained for testing.

use std::fs;
use std::io;
use std::path::Path;

use lazy_static::lazy_static;

use crate::{dispatch_or_permit, Semaphore};
use crate::{dispatch_or_permit, Semaphore, Semaphorish};

lazy_static! {
pub static ref BLOCKING_SET: Semaphore = Semaphore::new(true, 1);
static ref BLOCKING_SET: Semaphore = Semaphore::default_new(1);
}

/// Creates a new, empty directory at the provided path
Expand Down Expand Up @@ -71,7 +72,8 @@ mod tests {

{
let mut rt = tokio::runtime::Builder::new()
.num_threads(2)
.core_threads(2)
.max_threads(2)
.threaded_scheduler()
.build()
.unwrap();
Expand Down