Skip to content

Commit

Permalink
📝 bump generator to 0.7
Browse files Browse the repository at this point in the history
  • Loading branch information
Xudong-Huang committed Mar 30, 2021
1 parent 9906534 commit d1cd331
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 41 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ log = "0.4"
socket2 = { version = "0.4", features = ["all"] }
num_cpus = "1.1"
smallvec = "1.2"
generator = "0.6"
generator = "0.7"
crossbeam = "0.8"
may_queue = { version = "0.1", path = "may_queue" }

Expand Down
18 changes: 9 additions & 9 deletions src/coroutine_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::cell::UnsafeCell;
use std::fmt;
use std::io;
use std::sync::Arc;
Expand Down Expand Up @@ -267,27 +266,28 @@ impl Builder {
None
};

let done = &DONE as &dyn EventSource as *const _ as *mut dyn EventSource;

// create a join resource, shared by waited coroutine and *this* coroutine
let panic = Arc::new(UnsafeCell::new(None));
let join = Arc::new(UnsafeCell::new(Join::new(panic.clone())));
let panic = Arc::new(AtomicCell::new(None));
let join = Arc::new(Join::new(panic.clone()));
let packet = Arc::new(AtomicCell::new(None));
let their_join = join.clone();
let their_packet = packet.clone();

let subscriber = EventSubscriber {
resource: &DONE as &dyn EventSource as *const _ as *mut dyn EventSource,
};

let closure = move || {
// trigger the JoinHandler
// we must declare the variable before calling f so that stack is prepared
// to unwind these local data. for the panic err we would set it in the
// coroutine local data so that can return from the packet variable
let join = unsafe { &mut *their_join.get() };

// set the return packet
their_packet.swap(Some(f()));

join.trigger();
EventSubscriber { resource: done }
their_join.trigger();
subscriber
};

let mut co = if let Some(mut c) = _co {
Expand Down Expand Up @@ -512,7 +512,7 @@ pub(crate) fn run_coroutine(mut co: CoroutineImpl) {
None => {
// panic happened here
let local = unsafe { &mut *get_co_local(&co) };
let join = unsafe { &mut *local.get_join().get() };
let join = local.get_join();
// set the panic data
if let Some(panic) = co.get_panic_data() {
join.set_panic_data(panic);
Expand Down
9 changes: 4 additions & 5 deletions src/io/sys/windows/net/tcp_stream_connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@ impl TcpStreamConnect {
}
};

socket
.bind(&any.into())
.map(|_| socket.into())
.and_then(|s: std::net::TcpStream| {
socket.bind(&any.into()).map(|_| socket.into()).and_then(
|s: std::net::TcpStream| {
// must register io first
s.set_nonblocking(true)?;
add_socket(&s).map(|_io| TcpStreamConnect {
Expand All @@ -67,7 +65,8 @@ impl TcpStreamConnect {
timeout,
can_drop: DelayDrop::new(),
})
})
},
)
})
}

Expand Down
38 changes: 16 additions & 22 deletions src/join.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::any::Any;
use std::cell::UnsafeCell;
use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -21,12 +20,12 @@ pub struct Join {
// this is the only place that could set the panic Error
// we use to communicate with JoinHandle so that can return the panic info
// this must be ready before the trigger
panic: Arc<UnsafeCell<Option<Box<dyn Any + Send>>>>,
panic: Arc<AtomicCell<Option<Box<dyn Any + Send>>>>,
}

// this is the join resource type
impl Join {
pub fn new(panic: Arc<UnsafeCell<Option<Box<dyn Any + Send>>>>) -> Self {
pub fn new(panic: Arc<AtomicCell<Option<Box<dyn Any + Send>>>>) -> Self {
Join {
to_wake: AtomicOption::none(),
state: AtomicBool::new(true),
Expand All @@ -35,19 +34,18 @@ impl Join {
}

// the the panic for the coroutine
pub fn set_panic_data(&mut self, panic: Box<dyn Any + Send>) {
let p = unsafe { &mut *self.panic.get() };
*p = Some(panic);
pub fn set_panic_data(&self, panic: Box<dyn Any + Send>) {
self.panic.swap(Some(panic));
}

pub fn trigger(&mut self) {
pub fn trigger(&self) {
self.state.store(false, Ordering::Release);
if let Some(w) = self.to_wake.take(Ordering::Acquire) {
w.unpark();
}
}

fn wait(&mut self) {
fn wait(&self) {
if self.state.load(Ordering::Acquire) {
let cur = Blocker::current();
// register the blocker first
Expand All @@ -68,9 +66,9 @@ impl Join {
/// A join handle to a coroutine
pub struct JoinHandle<T> {
co: Coroutine,
join: Arc<UnsafeCell<Join>>,
join: Arc<Join>,
packet: Arc<AtomicCell<Option<T>>>,
panic: Arc<UnsafeCell<Option<Box<dyn Any + Send>>>>,
panic: Arc<AtomicCell<Option<Box<dyn Any + Send>>>>,
}

unsafe impl<T> Send for JoinHandle<T> {}
Expand All @@ -79,9 +77,9 @@ unsafe impl<T> Sync for JoinHandle<T> {}
/// create a JoinHandle
pub fn make_join_handle<T>(
co: Coroutine,
join: Arc<UnsafeCell<Join>>,
join: Arc<Join>,
packet: Arc<AtomicCell<Option<T>>>,
panic: Arc<UnsafeCell<Option<Box<dyn Any + Send>>>>,
panic: Arc<AtomicCell<Option<Box<dyn Any + Send>>>>,
) -> JoinHandle<T> {
JoinHandle {
co,
Expand All @@ -99,26 +97,22 @@ impl<T> JoinHandle<T> {

/// return true if the coroutine is finished
pub fn is_done(&self) -> bool {
let join = unsafe { &*self.join.get() };
!join.state.load(Ordering::Acquire)
!self.join.state.load(Ordering::Acquire)
}

/// block until the coroutine is done
pub fn wait(&self) {
let join = unsafe { &mut *self.join.get() };
join.wait();
self.join.wait();
}

/// Join the coroutine, returning the result it produced.
pub fn join(self) -> Result<T> {
let join = unsafe { &mut *self.join.get() };
join.wait();
self.join.wait();

// take the result
self.packet.take().ok_or_else(|| {
let p = unsafe { &mut *self.panic.get() };
p.take().unwrap_or_else(|| Box::new(Error::Cancel))
})
self.packet
.take()
.ok_or_else(|| self.panic.take().unwrap_or_else(|| Box::new(Error::Cancel)))
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/local.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::any::TypeId;
use std::cell::{RefCell, UnsafeCell};
use std::cell::RefCell;
use std::collections::HashMap;
use std::hash::{BuildHasherDefault, Hasher};
use std::ptr::NonNull;
Expand All @@ -17,14 +17,14 @@ pub struct CoroutineLocal {
// current coroutine handle
co: Coroutine,
// when panic happens, we need to trigger the join here
join: Arc<UnsafeCell<Join>>,
join: Arc<Join>,
// real local data hash map
local_data: LocalMap,
}

impl CoroutineLocal {
/// create coroutine local storage
pub fn new(co: Coroutine, join: Arc<UnsafeCell<Join>>) -> Box<Self> {
pub fn new(co: Coroutine, join: Arc<Join>) -> Box<Self> {
Box::new(CoroutineLocal {
co,
join,
Expand All @@ -38,7 +38,7 @@ impl CoroutineLocal {
}

// get the join handle
pub fn get_join(&self) -> Arc<UnsafeCell<Join>> {
pub fn get_join(&self) -> Arc<Join> {
self.join.clone()
}
}
Expand Down

0 comments on commit d1cd331

Please sign in to comment.