Skip to content

Commit

Permalink
将AsyncDrop从AbortSafeFuture中拆分出来
Browse files Browse the repository at this point in the history
  • Loading branch information
TOETOE55 committed May 15, 2022
1 parent d6b184b commit d06f53f
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 109 deletions.
10 changes: 6 additions & 4 deletions README.md
Expand Up @@ -34,15 +34,17 @@ async fn foo() {
## API

```rust
pub trait AbortSafeFuture {
pub trait AbortSafeFuture: AsyncDrop {
type Output;
fn poll(self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<Self::Output>;
fn poll_cancel(self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<()>;
}

pub trait AsyncDrop {
fn poll_drop(self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<()>;
}
```

为了满足目标一,相比于std的Future,我们多提供了一个`poll_cancel`的方法,在需要中断的时候调用。如果不调用,就可能产生内存泄露(而非UB)。
(有点类似于`AsyncDrop`要做的事情)。
我们通过`AsyncDrop`来达成目标一,在Future完成或者需要中断的时候调用`poll_drop`来传播中断。

而为了满足目标二,`poll_*`的参数是`self: Pin<&mut ManuallyDrop<Self>>`
这样我们除了不能随意移动`Self`,也不能随意析构`Self`
Expand Down
21 changes: 21 additions & 0 deletions src/bin/example.rs
@@ -0,0 +1,21 @@
use abort_safe_future::AbortSafeFutureExt;
use abort_safe_future::combinator::Compat;
use abort_safe_future::executor::block_on;

struct TestDrop;

impl Drop for TestDrop {
fn drop(&mut self) {
println!("`TestDrop` is dropping")
}
}

fn main() {
block_on(Compat::new(async {
let _guard = TestDrop;
async {}.await;
println!("Hello, world!");
}).then(|_| Compat::new(async {
println!("Goodbye, world!");
})));
}
81 changes: 60 additions & 21 deletions src/combinator.rs
Expand Up @@ -4,7 +4,7 @@ use std::mem::ManuallyDrop;
use std::pin::Pin;
use std::task::{Context, Poll, ready};
use pin_project::pin_project;
use crate::future::AbortSafeFuture;
use crate::future::{AbortSafeFuture, AsyncDrop};
use crate::helpers::pin_manually_drop_as_mut;

#[pin_project]
Expand Down Expand Up @@ -40,15 +40,17 @@ impl<Fut: Future> AbortSafeFuture for Compat<Fut> {
let output = if let Some(fut) = this.inner.as_mut().as_pin_mut() {
ready!(fut.poll(cx))
} else {
panic!("Compat::poll called after completion or after cancel")
panic!("Compat::poll called after completion or after canceled")
};

// drop inner future
this.inner.set(None);
Poll::Ready(output)
}
}

fn poll_cancel(mut self: Pin<&mut ManuallyDrop<Self>>, _cx: &mut Context<'_>) -> Poll<()> {
impl<Fut> AsyncDrop for Compat<Fut> {
fn poll_drop(mut self: Pin<&mut ManuallyDrop<Self>>, _cx: &mut Context<'_>) -> Poll<()> {
let mut this = pin_manually_drop_as_mut(&mut self).project();
// drop inner future
this.inner.set(None);
Expand All @@ -57,25 +59,37 @@ impl<Fut: Future> AbortSafeFuture for Compat<Fut> {
}

#[pin_project]
pub struct Then<Fut1, Fut2, F> {
pub struct Then<Fut1, Fut2, F>
where
Fut1: AbortSafeFuture,
Fut2: AbortSafeFuture,
{
#[pin]
inner: ThenInner<Fut1, Fut2>,
f: Option<F>,
}


#[pin_project(project = ThenProj)]
enum ThenInner<Fut1, Fut2> {
Fut1(#[pin] ManuallyDrop<Fut1>),
Fut2(#[pin] ManuallyDrop<Fut2>),
enum ThenInner<Fut1, Fut2>
where
Fut1: AbortSafeFuture,
Fut2: AbortSafeFuture,
{
Fut1(#[pin] ManuallyDrop<Fut1>, Option<Fut1::Output>),
Fut2(#[pin] ManuallyDrop<Fut2>, Option<Fut2::Output>),
Done,
Canceled,
}

impl<Fut1, Fut2, F> Then<Fut1, Fut2, F> {
impl<Fut1, Fut2, F> Then<Fut1, Fut2, F>
where
Fut1: AbortSafeFuture,
Fut2: AbortSafeFuture,
{
pub fn new(fut1: Fut1, f: F) -> Self {
Self {
inner: ThenInner::Fut1(ManuallyDrop::new(fut1)),
inner: ThenInner::Fut1(ManuallyDrop::new(fut1), None),
f: Some(f),
}
}
Expand All @@ -93,32 +107,57 @@ where
let mut this = pin_manually_drop_as_mut(&mut self).project();
let inner = this.inner.as_mut().project();
match inner {
ThenProj::Fut1(fut1) => {
let output = ready!(fut1.poll(cx));
let f = this.f.take().unwrap();
this.inner.set(ThenInner::Fut2(ManuallyDrop::new(f(output))));
ThenProj::Fut1(fut1, tmp @ None) => {
*tmp = Some(ready!(fut1.poll(cx)));
cx.waker().wake_by_ref();
Poll::Pending
}
ThenProj::Fut2(fut2) => {
let output = ready!(fut2.poll(cx));
ThenProj::Fut1(fut1, tmp @ Some(_)) => {
ready!(fut1.poll_drop(cx));
let f = this.f.take().expect("f was None, AndThen::poll may called after canceled");
let tmp = tmp.take().unwrap();
this.inner.set(ThenInner::Fut2(ManuallyDrop::new(f(tmp)), None));
cx.waker().wake_by_ref();
Poll::Pending
}
ThenProj::Fut2(fut2, tmp @ None) => {
*tmp = Some(ready!(fut2.poll(cx)));
cx.waker().wake_by_ref();
Poll::Pending
}
ThenProj::Fut2(fut2, output @ Some(_)) => {
ready!(fut2.poll_drop(cx));
let output = output.take().unwrap();
this.inner.set(ThenInner::Done);
Poll::Ready(output)
}
ThenProj::Done => panic!("AndThen::poll called after completion"),
ThenProj::Canceled => panic!("AndThen::poll called after cancel"),
ThenProj::Canceled => panic!("AndThen::poll called after canceled"),
}
}

fn poll_cancel(mut self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<()> {

}

impl<Fut1, Fut2, F> AsyncDrop for Then<Fut1, Fut2, F>
where
Fut1: AbortSafeFuture,
Fut2: AbortSafeFuture
{
fn poll_drop(mut self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<()> {
let mut this = pin_manually_drop_as_mut(&mut self).project();
// drop `f`
let _ = this.f.take();

let inner = this.inner.as_mut().project();
match inner {
ThenProj::Fut1(fut1) => {
fut1.poll_cancel(cx)
ThenProj::Fut1(fut1, output) => {
let _ = output.take();
fut1.poll_drop(cx)
}
ThenProj::Fut2(fut2) => {
fut2.poll_cancel(cx)
ThenProj::Fut2(fut2, output) => {
let _ = output.take();
fut2.poll_drop(cx)
}
ThenProj::Done => {
this.inner.set(ThenInner::Canceled);
Expand Down
103 changes: 23 additions & 80 deletions src/future.rs
Expand Up @@ -3,124 +3,67 @@ use core::task::Poll;
use core::pin::Pin;
use core::mem::ManuallyDrop;

use std::any::type_name;
use std::task::ready;
use crate::combinator::Then;
use crate::helpers::pin_manually_drop_as_mut;


/// abort safe future
pub trait AbortSafeFuture {
///
// FIXME: 是否需要AbortSafeFuture: AsyncDrop这个约束?
pub trait AbortSafeFuture: AsyncDrop {
/// Future的结果类型
type Output;
/// 与std中的Future类似。
/// 但需要注意:
///
/// * 调用者无法析构`Self`,需要实现者自己实现资源的回收。
/// * 中断子future时,需要调用其`poll_cancel`方法,直到其返回`Poll::Ready`。否则可能内存泄漏。
/// * 完成或中断子future时,需要调用其`poll_drop`方法,直到其返回`Poll::Ready`。否则可能内存泄漏。
fn poll(self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub trait AsyncDrop {

/// 当被取消时,调用此方法。
/// 返回值为`Poll::Ready`时,表示取消成功,此时应该完成了一些资源的回收工作
fn poll_cancel(self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<()>;
/// 当Future成功或者需要中断之后调用,进行一些资源回收的工作。
///
// FIXME: 是否下面这个签名更合适?
// ```rust
// unsafe fn poll_drop(self: *mut self, cx: &mut Context<'_>) -> Poll<()>;
// ```
fn poll_drop(self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<()>;
}

// FIXME: 这个实现是否合理? 违背了`poll`完需要`poll_drop`的约定
impl<F: AbortSafeFuture + Unpin + ?Sized> AbortSafeFuture for &mut ManuallyDrop<F> {
type Output = F::Output;

fn poll(mut self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<Self::Output> {
F::poll(Pin::new(&mut*self), cx)
}

fn poll_cancel(mut self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<()> {
F::poll_cancel(Pin::new(&mut*self), cx)
}
}

impl<F: AbortSafeFuture + Unpin + ?Sized> AbortSafeFuture for Option<Box<ManuallyDrop<F>>> {
type Output = F::Output;

fn poll(mut self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this: &mut Option<_> = &mut*self;
if let Some(fut) = this {
let output = ready!(F::poll(Pin::new(fut), cx));
// drop box is safe
unsafe {
ManuallyDrop::drop(fut);
}
*this = None;

Poll::Ready(output)
} else {
panic!("`{} poll after completion or after cancelled`", type_name::<Self>())
}
}

fn poll_cancel(mut self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<()> {
let this: &mut Option<_> = &mut*self;
if let Some(fut) = this {
ready!(F::poll_cancel(Pin::new(fut), cx));
// drop box is safe
unsafe {
ManuallyDrop::drop(fut);
}
*this = None;
}

Poll::Ready(())
impl<F: AsyncDrop + Unpin + ?Sized> AsyncDrop for &mut ManuallyDrop<F> {
fn poll_drop(mut self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<()> {
F::poll_drop(Pin::new(&mut*self), cx)
}
}

// FIXME: 这个实现是否合理? 违背了`poll`完需要`poll_drop`的约定
impl<F: AbortSafeFuture + ?Sized> AbortSafeFuture for Pin<&mut ManuallyDrop<F>> {
type Output = F::Output;

fn poll(mut self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = pin_manually_drop_as_mut(&mut self).as_deref_mut();
F::poll(inner, cx)
}

fn poll_cancel(mut self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<()> {
let inner = pin_manually_drop_as_mut(&mut self).as_deref_mut();
F::poll_cancel(inner, cx)
}
}

impl<F: AbortSafeFuture + ?Sized> AbortSafeFuture for Option<Pin<Box<ManuallyDrop<F>>>> {
type Output = F::Output;

fn poll(mut self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut inner = pin_manually_drop_as_mut(&mut self);
if let Some(fut) = inner.as_mut().as_pin_mut().as_deref_mut() {
let output = ready!(F::poll(fut.as_mut(), cx));
// drop box is safe
unsafe {
ManuallyDrop::drop(fut.as_mut().get_unchecked_mut());
}
inner.set(None);

Poll::Ready(output)
} else {
panic!("`{} poll after completion or after cancelled`", type_name::<Self>())
}

}

fn poll_cancel(mut self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<()> {
let mut inner = pin_manually_drop_as_mut(&mut self);
if let Some(fut) = inner.as_mut().as_pin_mut().as_deref_mut() {
ready!(F::poll_cancel(fut.as_mut(), cx));
// drop box is safe
unsafe {
ManuallyDrop::drop(fut.as_mut().get_unchecked_mut());
}
inner.set(None);
}

Poll::Ready(())
impl<F: AsyncDrop + ?Sized> AsyncDrop for Pin<&mut ManuallyDrop<F>> {
fn poll_drop(mut self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<()> {
let inner = pin_manually_drop_as_mut(&mut self).as_deref_mut();
F::poll_drop(inner, cx)
}
}


pub trait AbortSafeFutureExt: AbortSafeFuture {
fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
where
Expand Down
10 changes: 6 additions & 4 deletions src/lib.rs
Expand Up @@ -34,15 +34,17 @@
//! ## API
//!
//! ```no_run
//! pub trait AbortSafeFuture {
//! pub trait AbortSafeFuture: AsyncDrop {
//! type Output;
//! fn poll(self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<Self::Output>;
//! fn poll_cancel(self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<()>;
//! }
//!
//! pub trait AsyncDrop {
//! fn poll_drop(self: Pin<&mut ManuallyDrop<Self>>, cx: &mut Context<'_>) -> Poll<()>;
//! }
//! ```
//!
//! 为了满足目标一,相比于std的Future,我们多提供了一个`poll_cancel`的方法,在需要中断的时候调用。如果不调用,就可能产生内存泄露(而非UB)。
//! (有点类似于`AsyncDrop`要做的事情)。
//! 我们通过`AsyncDrop`来达成目标一,在Future完成或者需要中断的时候调用`poll_drop`来传播中断。
//!
//! 而为了满足目标二,`poll_*`的参数是`self: Pin<&mut ManuallyDrop<Self>>`。
//! 这样我们除了不能随意移动`Self`,也不能随意析构`Self`。
Expand Down

0 comments on commit d06f53f

Please sign in to comment.