Skip to content

Commit

Permalink
Fix deadlocks in some situations
Browse files Browse the repository at this point in the history
  • Loading branch information
DelSkayn committed Apr 29, 2024
1 parent 49ec387 commit b3530cb
Show file tree
Hide file tree
Showing 8 changed files with 614 additions and 22 deletions.
447 changes: 447 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ keywords = ["stack","call","async","memory","runtime"]
repository = "https://github.com/DelSkayn/reblessive.git"

[dev-dependencies]
criterion = "0.5.1"
futures-util = "0.3.30"
pollster = "0.3.0"
tokio = { version = "1.36.0", features = ["full"] }
Expand All @@ -22,3 +23,7 @@ tree = []
all-features = true
# defines the configuration attribute `docsrs`
rustdoc-args = ["--cfg", "docsrs"]

[[bench]]
name = "fibbo"
harness = false
101 changes: 101 additions & 0 deletions benches/fibbo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use criterion::{criterion_group, criterion_main, Criterion};
use reblessive::{Stack, Stk};

async fn heavy_fibbo(ctx: &mut Stk, n: usize) -> usize {
match n {
0 => 1,
1 => 1,
x => {
ctx.run(move |ctx| heavy_fibbo(ctx, x - 1)).await
+ ctx.run(move |ctx| heavy_fibbo(ctx, x - 2)).await
}
}
}

fn bench_fibbo(c: &mut Criterion) {
c.bench_function("fib 15", |b| {
b.iter(|| {
// Create a stack to run the function in.
let mut stack = Stack::new();

// run the function to completion on the stack.
let res = stack.enter(|ctx| heavy_fibbo(ctx, 15)).finish();
assert_eq!(res, 987);
})
});

c.bench_function("fib 20", |b| {
b.iter(|| {
// Create a stack to run the function in.
let mut stack = Stack::new();

// run the function to completion on the stack.
let res = stack.enter(|ctx| heavy_fibbo(ctx, 20)).finish();
assert_eq!(res, 10946);
})
});

c.bench_function("fib 25", |b| {
b.iter(|| {
// Create a stack to run the function in.
let mut stack = Stack::new();

// run the function to completion on the stack.
let res = stack.enter(|ctx| heavy_fibbo(ctx, 30)).finish();
assert_eq!(res, 1346269);
})
});
}

fn bench_fibbo_async(c: &mut Criterion) {
c.bench_function("fib async 15", |b| {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
b.iter(|| {
rt.block_on(async {
// Create a stack to run the function in.
let mut stack = Stack::new();

// run the function to completion on the stack.
let res = stack.enter(|ctx| heavy_fibbo(ctx, 15)).finish_async().await;
assert_eq!(res, 987);
})
})
});

c.bench_function("fib async 20", |b| {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
b.iter(|| {
rt.block_on(async {
// Create a stack to run the function in.
let mut stack = Stack::new();

// run the function to completion on the stack.
let res = stack.enter(|ctx| heavy_fibbo(ctx, 20)).finish_async().await;
assert_eq!(res, 10946);
})
})
});

c.bench_function("fib async 25", |b| {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
b.iter(|| {
rt.block_on(async {
// Create a stack to run the function in.
let mut stack = Stack::new();

// run the function to completion on the stack.
let res = stack.enter(|ctx| heavy_fibbo(ctx, 30)).finish_async().await;
assert_eq!(res, 1346269);
})
})
});
}

criterion_group!(benches, bench_fibbo, bench_fibbo_async);
criterion_main!(benches);
12 changes: 12 additions & 0 deletions src/stack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl<'a, R> Future for FinishFuture<'a, R> {
enter_stack_context(self.runner.ptr, || {
unsafe {
let tasks = &self.runner.ptr.tasks;
self.runner.ptr.context.set(NonNull::from(&*cx).cast());

loop {
let Some(mut task) = tasks.last() else {
Expand Down Expand Up @@ -111,6 +112,7 @@ impl<'a, 'b, R> Future for StepFuture<'a, 'b, R> {

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
enter_stack_context(self.runner.ptr, || {
self.runner.ptr.context.set(NonNull::from(&*cx).cast());
unsafe {
match self.runner.ptr.drive_head(cx) {
Poll::Pending => {
Expand Down Expand Up @@ -188,6 +190,7 @@ impl<'a, R> Runner<'a, R> {
enter_stack_context(self.ptr, || {
let waker = stub_ctx::get();
let mut context = Context::from_waker(&waker);
self.ptr.context.set(NonNull::from(&context).cast());

while let Some(mut task) = self.ptr.tasks.last() {
loop {
Expand Down Expand Up @@ -237,6 +240,7 @@ impl<'a, R> Runner<'a, R> {
unsafe {
let waker = stub_ctx::get();
let mut context = Context::from_waker(&waker);
self.ptr.context.set(NonNull::from(&context).cast());

match self.ptr.drive_head(&mut context) {
Poll::Pending => match self.stack_state() {
Expand All @@ -255,6 +259,7 @@ impl<'a, R> Runner<'a, R> {
}
}
}
self.ptr.context.set(NonNull::dangling());
None
})
}
Expand Down Expand Up @@ -338,6 +343,7 @@ where
pub struct Stack {
state: Cell<State>,
tasks: StackTasks,
context: Cell<NonNull<()>>,
}

unsafe impl Send for Stack {}
Expand All @@ -351,6 +357,7 @@ impl Stack {
Stack {
state: Cell::new(State::Base),
tasks: StackTasks::new(),
context: Cell::new(NonNull::dangling()),
}
}

Expand All @@ -360,6 +367,7 @@ impl Stack {
Stack {
state: Cell::new(State::Base),
tasks: StackTasks::with_capacity(cap),
context: Cell::new(NonNull::dangling()),
}
}

Expand Down Expand Up @@ -423,6 +431,10 @@ impl Stack {
self.state.set(state)
}

pub(crate) fn set_context(&self, cx: NonNull<()>) -> NonNull<()> {
self.context.replace(cx)
}

pub(crate) fn clear(&self) {
self.set_state(State::Cancelled);
enter_stack_context(self, || self.tasks.clear());
Expand Down
22 changes: 18 additions & 4 deletions src/stack/stk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ where
type Output = R;

#[inline]
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// SAFETY: Pinning isn't structural for any of the fields.
let this = unsafe { self.get_unchecked_mut() };
unsafe {
Expand All @@ -55,9 +55,23 @@ where
let place = NonNull::from(&this.res);
let fut = (x)(M::create());

stack
.tasks
.push(async move { place.as_ref().get().write(Some(fut.await)) });
// If the context is not the one created at the root we have entered this
// future from a different schedular, this means that this future might not
// wake up when it needs to unless we wake the waker. However we don't always
// wan't to clone and run the waker cause that has significant performance
// impacts.
if stack.context.get() == NonNull::from(&*cx).cast() {
stack.tasks.push(async move {
place.as_ref().get().write(Some(fut.await));
});
} else {
let waker = cx.waker().clone();

stack.tasks.push(async move {
place.as_ref().get().write(Some(fut.await));
waker.wake()
});
}

// Set the state to new task, signifying that no new future can be pushed and
// that we should yield back to stack executor.
Expand Down
7 changes: 4 additions & 3 deletions src/stub_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use std::{
task::{RawWaker, RawWakerVTable, Waker},
};

unsafe fn stub_clone(_: *const ()) -> RawWaker {
panic!("Called an non-reblessive async function withing a non-async reblessive context");
unsafe fn stub_clone(data: *const ()) -> RawWaker {
//panic!("Called an non-reblessive async function withing a non-async reblessive context");
RawWaker::new(data, &STUB_WAKER_V_TABLE)
}

unsafe fn stub_wake(_: *const ()) {
panic!("Called an non-reblessive async function withing a non-async reblessive context");
//panic!("Called an non-reblessive async function withing a non-async reblessive context");
}

unsafe fn stub_drop(_: *const ()) {}
Expand Down
41 changes: 26 additions & 15 deletions src/tree/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub use crate::stack::YieldFuture;
use crate::{
defer::Defer,
stack::{enter_stack_context, State},
Stack,
};
Expand Down Expand Up @@ -32,35 +33,40 @@ impl<'a, R> Future for FinishFuture<'a, R> {

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
enter_stack_context(&self.runner.ptr.root, || {
let ptr = self.runner.ptr.root.set_context(NonNull::from(&*cx).cast());
let defer = Defer::new(self.runner.ptr, |schedular| {
schedular.root.set_context(ptr);
});

enter_tree_context(&self.runner.ptr.fanout, || {
loop {
// First we need finish all fanout futures.
while !self.runner.ptr.fanout.is_empty() {
if unsafe { self.runner.ptr.fanout.poll(cx) }.is_pending() {
while !defer.fanout.is_empty() {
if unsafe { defer.fanout.poll(cx) }.is_pending() {
return Poll::Pending;
}
}

// No futures left in fanout, run on the root stack.
match self.runner.ptr.root.drive_head(cx) {
match defer.root.drive_head(cx) {
Poll::Ready(_) => {
if self.runner.ptr.root.tasks().is_empty() {
if defer.root.tasks().is_empty() {
unsafe {
return Poll::Ready(
(*self.runner.place.as_ref().get()).take().unwrap(),
);
}
}
}
Poll::Pending => match self.runner.ptr.root.get_state() {
Poll::Pending => match defer.root.get_state() {
State::Base => {
if self.runner.ptr.fanout.is_empty() {
if defer.fanout.is_empty() {
return Poll::Pending;
}
}
State::Cancelled => unreachable!("TreeStack dropped while stepping"),
State::NewTask | State::Yield => {
self.runner.ptr.root.set_state(State::Base);
defer.root.set_state(State::Base);
}
},
}
Expand All @@ -80,35 +86,40 @@ impl<'a, 'b, R> Future for StepFuture<'a, 'b, R> {

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
enter_stack_context(&self.runner.ptr.root, || {
enter_tree_context(&self.runner.ptr.fanout, || {
if !self.runner.ptr.fanout.is_empty() {
if unsafe { self.runner.ptr.fanout.poll(cx) }.is_pending() {
let ptr = self.runner.ptr.root.set_context(NonNull::from(&*cx).cast());
let defer = Defer::new(self.runner.ptr, |schedular| {
schedular.root.set_context(ptr);
});

enter_tree_context(&defer.fanout, || {
if !defer.fanout.is_empty() {
if unsafe { defer.fanout.poll(cx) }.is_pending() {
return Poll::Pending;
}
}

// No futures left in fanout, run on the root stack.l
match self.runner.ptr.root.drive_head(cx) {
match defer.root.drive_head(cx) {
Poll::Ready(_) => {
if self.runner.ptr.root.tasks().is_empty() {
if defer.root.tasks().is_empty() {
unsafe {
return Poll::Ready(Some(
(*self.runner.place.as_ref().get()).take().unwrap(),
));
}
}
}
Poll::Pending => match self.runner.ptr.root.get_state() {
Poll::Pending => match defer.root.get_state() {
State::Base => {
if self.runner.ptr.fanout.is_empty() {
if defer.fanout.is_empty() {
return Poll::Pending;
} else {
return Poll::Ready(None);
}
}
State::Cancelled => unreachable!("TreeStack dropped while stepping"),
State::NewTask | State::Yield => {
self.runner.ptr.root.set_state(State::Base);
defer.root.set_state(State::Base);
}
},
}
Expand Down
1 change: 1 addition & 0 deletions src/tree/stk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl Stk {
YieldFuture { done: false }
}

/// Create a scope in which multiple reblessive futures can be polled at the same time.
pub fn scope<'a, F, Fut, R>(&'a mut self, f: F) -> ScopeFuture<'a, F, R>
where
F: FnOnce(&'a ScopeStk) -> Fut,
Expand Down

0 comments on commit b3530cb

Please sign in to comment.