From 3e10da6f80cf47a84efaec20200ccd273ddf3f8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Sun, 17 Feb 2019 21:44:04 +0200 Subject: [PATCH] Re-implement MainContext channel around a manual channel This allows us to ensure that dropping the Receiver and its GSource will also directly drop the closure, instead of having to wait for all Senders to disappear too. We have to use a mutex and a shared struct for this, and as such it makes sense to directly implement the channel as part of this shared struct. As the std mpsc channel internally also uses mutexes this should not cause any considerable performance difference. At the same time also simplify some more code and add a few more tests. Fixes https://github.com/gtk-rs/glib/issues/454 --- src/main_context_channel.rs | 529 +++++++++++++++++++++++++----------- 1 file changed, 377 insertions(+), 152 deletions(-) diff --git a/src/main_context_channel.rs b/src/main_context_channel.rs index e7b99c58..6aca2c39 100644 --- a/src/main_context_channel.rs +++ b/src/main_context_channel.rs @@ -3,11 +3,11 @@ // Licensed under the MIT license, see the LICENSE file or use std::cell::RefCell; -use std::marker::PhantomData; +use std::collections::VecDeque; use std::mem; use std::ptr; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; +use std::sync::{Arc, Condvar, Mutex}; use Continue; use MainContext; @@ -18,28 +18,227 @@ use SourceId; use get_thread_id; use ffi as glib_ffi; -use translate::{from_glib_full, mut_override, ToGlib, ToGlibPtr}; +use translate::{mut_override, FromGlibPtrFull, FromGlibPtrNone, ToGlib, ToGlibPtr}; + +#[derive(Debug)] +struct ChannelInner { + queue: VecDeque, + // This will be None when the receiver disappears and Some(NULL) + // before the receiver is attached + source: Option<*mut glib_ffi::GSource>, +} + +impl ChannelInner { + fn receiver_disconnected(&self) -> bool { + match self.source { + // Disconnected + None => true, + // Not attached or destroyed yet + Some(source) if source.is_null() => false, + // Receiver still there is destroyed already + Some(source) + if unsafe { glib_ffi::g_source_is_destroyed(source) } != glib_ffi::GFALSE => + { + true + } + // Receiver still running + Some(_) => false, + } + } + + fn set_ready_time(&mut self, ready_time: i64) { + if let Some(source) = self.source { + if !source.is_null() { + unsafe { + glib_ffi::g_source_set_ready_time(source, ready_time); + } + } + } + } + + fn source(&self) -> Option { + match self.source { + // Disconnected + None => None, + // Not attached or destroyed yet + Some(source) if source.is_null() => None, + // Receiver still there is destroyed already + Some(source) + if unsafe { glib_ffi::g_source_is_destroyed(source) != glib_ffi::GFALSE } => + { + None + } + // Receiver still running + Some(source) => Some(unsafe { Source::from_glib_none(source) }), + } + } +} + +#[derive(Debug)] +struct ChannelBound { + bound: usize, + cond: Condvar, +} + +#[derive(Debug)] +struct Channel(Arc<(Mutex>, Option)>); + +unsafe impl Send for Channel {} +unsafe impl Sync for Channel {} + +impl Clone for Channel { + fn clone(&self) -> Channel { + Channel(self.0.clone()) + } +} + +impl Channel { + fn new(bound: Option) -> Channel { + Channel(Arc::new(( + Mutex::new(ChannelInner { + queue: VecDeque::new(), + source: Some(ptr::null_mut()), + }), + bound.map(|bound| ChannelBound { + bound, + cond: Condvar::new(), + }), + ))) + } + + fn send(&self, t: T) -> Result<(), mpsc::SendError> { + let mut inner = (self.0).0.lock().unwrap(); + + // If we have a bounded channel then we need to wait here until enough free space is + // available or the receiver disappears + // + // A special case here is a bound of 0: the queue must be empty for accepting + // new data and then we will again wait later for the data to be actually taken + // out + if let Some(ChannelBound { bound, ref cond }) = (self.0).1 { + while inner.queue.len() >= bound + && inner.queue.len() != 0 + && !inner.receiver_disconnected() + { + inner = cond.wait(inner).unwrap(); + } + } + + // Error out directly if the receiver is disconnected + if inner.receiver_disconnected() { + return Err(mpsc::SendError(t)); + } + + // Store the item on our queue + inner.queue.push_back(t); + + // and then wake up the GSource + inner.set_ready_time(0); + + // If we have a bound of 0 we need to wait until the receiver actually + // handled the data + if let Some(ChannelBound { bound: 0, ref cond }) = (self.0).1 { + while inner.queue.len() != 0 && !inner.receiver_disconnected() { + inner = cond.wait(inner).unwrap(); + } + + // If the receiver was destroyed in the meantime take out the item and report an error + if inner.receiver_disconnected() { + // If the item is not in the queue anymore then the receiver just handled it before + // getting disconnected and all is good + if let Some(t) = inner.queue.pop_front() { + return Err(mpsc::SendError(t)); + } + } + } + + Ok(()) + } + + fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError> { + let mut inner = (self.0).0.lock().unwrap(); + + let ChannelBound { bound, ref cond } = (self.0) + .1 + .as_ref() + .expect("called try_send() on an unbounded channel"); + + // Check if the queue is full and handle the special case of a 0 bound + if inner.queue.len() >= *bound && inner.queue.len() != 0 { + return Err(mpsc::TrySendError::Full(t)); + } + + // Error out directly if the receiver is disconnected + if inner.receiver_disconnected() { + return Err(mpsc::TrySendError::Disconnected(t)); + } + + // Store the item on our queue + inner.queue.push_back(t); + + // and then wake up the GSource + inner.set_ready_time(0); + + // If we have a bound of 0 we need to wait until the receiver actually + // handled the data + if *bound == 0 { + while inner.queue.len() != 0 && !inner.receiver_disconnected() { + inner = cond.wait(inner).unwrap(); + } + + // If the receiver was destroyed in the meantime take out the item and report an error + if inner.receiver_disconnected() { + // If the item is not in the queue anymore then the receiver just handled it before + // getting disconnected and all is good + if let Some(t) = inner.queue.pop_front() { + return Err(mpsc::TrySendError::Disconnected(t)); + } + } + } + + Ok(()) + } + + fn try_recv(&self) -> Result { + let mut inner = (self.0).0.lock().unwrap(); + + // Pop item if we have any + if let Some(item) = inner.queue.pop_front() { + // Wake up a sender that is currently waiting, if any + if let Some(ChannelBound { ref cond, .. }) = (self.0).1 { + cond.notify_one(); + } + return Ok(item); + } + + // If there are no senders left we are disconnected or otherwise empty. That's the case if + // the only remaining strong reference is the one of the receiver + if Arc::strong_count(&self.0) == 1 { + Err(mpsc::TryRecvError::Disconnected) + } else { + Err(mpsc::TryRecvError::Empty) + } + } +} #[repr(C)] -struct ChannelSource { +struct ChannelSource Continue + 'static> { source: glib_ffi::GSource, - receiver: Option>, - callback: Option Continue + 'static>>>, + thread_id: usize, source_funcs: Option>, - thread_id: Option, - ready: AtomicBool, + channel: Option>, + callback: Option>, } unsafe extern "C" fn prepare( source: *mut glib_ffi::GSource, timeout: *mut i32, ) -> glib_ffi::gboolean { - let source = &*(source as *const ChannelSource); - *timeout = -1; - // Check if we have at least one item available in the receiver - if source.ready.load(Ordering::SeqCst) { + // We're always ready when the ready time was set to 0. There + // will be at least one item or the senders are disconnected now + if glib_ffi::g_source_get_ready_time(source) == 0 { glib_ffi::GTRUE } else { glib_ffi::GFALSE @@ -47,45 +246,41 @@ unsafe extern "C" fn prepare( } unsafe extern "C" fn check(source: *mut glib_ffi::GSource) -> glib_ffi::gboolean { - let source = &*(source as *const ChannelSource); - - // Check if we have at least one item available in the receiver - if source.ready.load(Ordering::SeqCst) { + // We're always ready when the ready time was set to 0. There + // will be at least one item or the senders are disconnected now + if glib_ffi::g_source_get_ready_time(source) == 0 { glib_ffi::GTRUE } else { glib_ffi::GFALSE } } -unsafe extern "C" fn dispatch( +unsafe extern "C" fn dispatch Continue + 'static>( source: *mut glib_ffi::GSource, callback: glib_ffi::GSourceFunc, _user_data: glib_ffi::gpointer, ) -> glib_ffi::gboolean { - let source = &mut *(source as *mut ChannelSource); + let source = &mut *(source as *mut ChannelSource); assert!(callback.is_none()); glib_ffi::g_source_set_ready_time(&mut source.source, -1); - source.ready.store(false, Ordering::SeqCst); // Check the thread to ensure we're only ever called from the same thread assert_eq!( get_thread_id(), - source - .thread_id - .expect("ChannelSource called before Receiver was attached"), + source.thread_id, "Source dispatched on a different thread than before" ); - // Now iterate over all items that we currently have in the receiver until it is + // Now iterate over all items that we currently have in the channel until it is // empty again. If all senders are disconnected at some point we remove the GSource // from the main context it was attached to as it will never ever be called again. - let receiver = source - .receiver + let channel = source + .channel .as_ref() - .expect("ChannelSource without Receiver"); + .expect("ChannelSource without Channel"); loop { - match receiver.try_recv() { + match channel.try_recv() { Err(mpsc::TryRecvError::Empty) => break, Err(mpsc::TryRecvError::Disconnected) => return glib_ffi::G_SOURCE_REMOVE, Ok(item) => { @@ -103,54 +298,26 @@ unsafe extern "C" fn dispatch( glib_ffi::G_SOURCE_CONTINUE } -unsafe extern "C" fn finalize(source: *mut glib_ffi::GSource) { - let source = &mut *(source as *mut ChannelSource); +unsafe extern "C" fn finalize Continue + 'static>( + source: *mut glib_ffi::GSource, +) { + let source = &mut *(source as *mut ChannelSource); // Drop all memory we own by taking it out of the Options - let _ = source.receiver.take(); - let _ = source.callback.take(); - let _ = source.source_funcs.take(); -} - -impl ChannelSource { - fn new(receiver: mpsc::Receiver, priority: Priority) -> Source { - unsafe { - let source_funcs = Box::new(glib_ffi::GSourceFuncs { - check: Some(check::), - prepare: Some(prepare::), - dispatch: Some(dispatch::), - finalize: Some(finalize::), - closure_callback: None, - closure_marshal: None, - }); - - let source = glib_ffi::g_source_new( - mut_override(&*source_funcs), - mem::size_of::>() as u32, - ) as *mut ChannelSource; - assert!(!source.is_null()); - - { - let source = &mut *source; - ptr::write(&mut source.receiver, Some(receiver)); - ptr::write(&mut source.callback, None); - ptr::write(&mut source.source_funcs, Some(source_funcs)); - source.thread_id = None; - source.ready = AtomicBool::new(false); - } - - glib_ffi::g_source_set_priority(mut_override(&(*source).source), priority.to_glib()); - - from_glib_full(source as *mut glib_ffi::GSource) + let channel = source.channel.take().expect("Receiver without channel"); + + { + // Set the source inside the channel to None so that all senders know that there + // is no receiver left and wake up the condition variable if any + let mut inner = (channel.0).0.lock().unwrap(); + inner.source = None; + if let Some(ChannelBound { ref cond, .. }) = (channel.0).1 { + cond.notify_all(); } } - fn mark_ready(&self) { - self.ready.store(true, Ordering::SeqCst); - unsafe { - glib_ffi::g_source_set_ready_time(mut_override(&self.source), 0); - } - } + let _ = source.callback.take(); + let _ = source.source_funcs.take(); } /// A `Sender` that can be used to send items to the corresponding main context receiver. @@ -161,37 +328,41 @@ impl ChannelSource { /// /// [`MainContext::channel()`]: struct.MainContext.html#method.channel #[derive(Clone, Debug)] -pub struct Sender(Option>, Source); +pub struct Sender(Option>); impl Sender { /// Sends a value to the channel. pub fn send(&self, t: T) -> Result<(), mpsc::SendError> { - // If the source is destroyed the receiver end is disconnected - if self.1.is_destroyed() { - return Err(mpsc::SendError(t)); - } - - let sender = self.0.as_ref().expect("No Sender anymore"); - sender.send(t)?; - - // Once sending succeeded, wake up the receiver - unsafe { - let source = &*(self.1.to_glib_none().0 as *const ChannelSource); - source.mark_ready(); - } - Ok(()) + self.0.as_ref().expect("Sender with no channel").send(t) } } impl Drop for Sender { fn drop(&mut self) { - // Wake up the Source so that it can be removed if this was the last sender. - // We have to drop the Sender first because of that - drop(self.0.take().expect("No Sender anymore")); - unsafe { - let source = &*(self.1.to_glib_none().0 as *const ChannelSource); - source.mark_ready(); + // Wake up the receiver after dropping our own reference to ensure + // that after the last sender is dropped the receiver will see a strong + // reference count of exactly 1 by itself. + let channel = self.0.take().expect("Sender with no channel"); + + let source = { + let inner = (channel.0).0.lock().unwrap(); + + // If there's no receiver left we don't care + if inner.receiver_disconnected() { + return; + } + + // Get a strong reference to the source + match inner.source() { + None => return, + Some(source) => source, + } + }; + + // Drop the channel and wake up the source/receiver + drop(channel); + glib_ffi::g_source_set_ready_time(source.to_glib_none().0, 0); } } } @@ -204,55 +375,46 @@ impl Drop for Sender { /// /// [`MainContext::sync_channel()`]: struct.MainContext.html#method.sync_channel #[derive(Clone, Debug)] -pub struct SyncSender(Option>, Source); +pub struct SyncSender(Option>); impl SyncSender { /// Sends a value to the channel and blocks if the channel is full. pub fn send(&self, t: T) -> Result<(), mpsc::SendError> { - // If the source is destroyed the receiver end is disconnected - if self.1.is_destroyed() { - return Err(mpsc::SendError(t)); - } - - let sender = self.0.as_ref().expect("No Sender anymore"); - sender.send(t)?; - - // Once sending succeeded, wake up the receiver - unsafe { - let source = &*(self.1.to_glib_none().0 as *const ChannelSource); - source.mark_ready(); - } - Ok(()) + self.0.as_ref().expect("Sender with no channel").send(t) } /// Sends a value to the channel. pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError> { - // If the source is destroyed the receiver end is disconnected - if self.1.is_destroyed() { - return Err(mpsc::TrySendError::Disconnected(t)); - } - - let sender = self.0.as_ref().expect("No Sender anymore"); - sender.try_send(t)?; - - // Once sending succeeded, wake up the receiver - unsafe { - let source = &*(self.1.to_glib_none().0 as *const ChannelSource); - source.mark_ready(); - } - Ok(()) + self.0.as_ref().expect("Sender with no channel").try_send(t) } } impl Drop for SyncSender { fn drop(&mut self) { - // Wake up the Source so that it can be removed if this was the last sender. - // We have to drop the Sender first because of that - drop(self.0.take().expect("No Sender anymore")); - unsafe { - let source = &*(self.1.to_glib_none().0 as *const ChannelSource); - source.mark_ready(); + // Wake up the receiver after dropping our own reference to ensure + // that after the last sender is dropped the receiver will see a strong + // reference count of exactly 1 by itself. + let channel = self.0.take().expect("Sender with no channel"); + + let source = { + let inner = (channel.0).0.lock().unwrap(); + + // If there's no receiver left we don't care + if inner.receiver_disconnected() { + return; + } + + // Get a strong reference to the source + match inner.source() { + None => return, + Some(source) => source, + } + }; + + // Drop the channel and wake up the source/receiver + drop(channel); + glib_ffi::g_source_set_ready_time(source.to_glib_none().0, 0); } } } @@ -266,7 +428,7 @@ impl Drop for SyncSender { /// [`MainContext::channel()`]: struct.MainContext.html#method.channel /// [`MainContext::sync_channel()`]: struct.MainContext.html#method.sync_channel #[derive(Debug)] -pub struct Receiver(Option, PhantomData<*const T>); +pub struct Receiver(Option>, Priority); // It's safe to send the Receiver to other threads for attaching it as // long as the items to be sent can also be sent between threads. @@ -274,10 +436,13 @@ unsafe impl Send for Receiver {} impl Drop for Receiver { fn drop(&mut self) { - // If the receiver was never attached to a main context - // we need to destroy the underlying source - if let Some(source) = self.0.take() { - source.destroy(); + // If the receiver was never attached to a main context we need to let all the senders know + if let Some(channel) = self.0.take() { + let mut inner = (channel.0).0.lock().unwrap(); + inner.source = None; + if let Some(ChannelBound { ref cond, .. }) = (channel.0).1 { + cond.notify_all(); + } } } } @@ -291,7 +456,7 @@ impl Receiver { /// # Panics /// /// This function panics if called from a thread that is not the owner of the provided - /// `context`, or if `None` is provided of the thread default main context. + /// `context`, or, if `None` is provided, of the thread default main context. pub fn attach<'a, P: Into>, F: FnMut(T) -> Continue + 'static>( mut self, context: P, @@ -299,14 +464,52 @@ impl Receiver { ) -> SourceId { let context = context.into(); unsafe { - let source = self.0.take().expect("No Source anymore"); + let channel = self.0.take().expect("Receiver without channel"); + + let source_funcs = Box::new(glib_ffi::GSourceFuncs { + check: Some(check::), + prepare: Some(prepare::), + dispatch: Some(dispatch::), + finalize: Some(finalize::), + closure_callback: None, + closure_marshal: None, + }); + + let source = glib_ffi::g_source_new( + mut_override(&*source_funcs), + mem::size_of::>() as u32, + ) as *mut ChannelSource; + assert!(!source.is_null()); + // Set up the GSource { - let source = &mut *(source.to_glib_none().0 as *mut ChannelSource); - source.callback = Some(RefCell::new(Box::new(func))); - source.thread_id = Some(get_thread_id()); + let source = &mut *source; + let mut inner = (channel.0).0.lock().unwrap(); + + glib_ffi::g_source_set_priority(mut_override(&source.source), self.1.to_glib()); + + // We're immediately ready if the queue is not empty or if no sender is left at this point + glib_ffi::g_source_set_ready_time( + mut_override(&source.source), + if !inner.queue.is_empty() || Arc::strong_count(&channel.0) == 1 { + 0 + } else { + -1 + }, + ); + inner.source = Some(&mut source.source); + } + + // Store all our data inside our part of the GSource + { + let source = &mut *source; + source.thread_id = get_thread_id(); + ptr::write(&mut source.channel, Some(channel)); + ptr::write(&mut source.callback, Some(RefCell::new(func))); + ptr::write(&mut source.source_funcs, Some(source_funcs)); } + let source = Source::from_glib_full(mut_override(&(*source).source)); let id = if let Some(context) = context { assert!(context.is_owner()); source.attach(context) @@ -336,12 +539,9 @@ impl MainContext { /// /// The returned `Sender` behaves the same as `std::sync::mpsc::Sender`. pub fn channel(priority: Priority) -> (Sender, Receiver) { - let (sender, receiver) = mpsc::channel(); - - let source = ChannelSource::new(receiver, priority); - - let receiver = Receiver(Some(source.clone()), PhantomData); - let sender = Sender(Some(sender), source); + let channel = Channel::new(None); + let receiver = Receiver(Some(channel.clone()), priority); + let sender = Sender(Some(channel)); (sender, receiver) } @@ -360,17 +560,10 @@ impl MainContext { /// will fail. /// /// The returned `SyncSender` behaves the same as `std::sync::mpsc::SyncSender`. - /// - /// # Panics - /// - /// This function will panic if the current thread is not the owner of the main context. pub fn sync_channel(priority: Priority, bound: usize) -> (SyncSender, Receiver) { - let (sender, receiver) = mpsc::sync_channel(bound); - - let source = ChannelSource::new(receiver, priority); - - let receiver = Receiver(Some(source.clone()), PhantomData); - let sender = SyncSender(Some(sender), source); + let channel = Channel::new(Some(bound)); + let receiver = Receiver(Some(channel.clone()), priority); + let sender = SyncSender(Some(channel)); (sender, receiver) } @@ -465,4 +658,36 @@ mod tests { assert_eq!(sender.send(1), Err(mpsc::SendError(1))); } + + #[test] + fn test_remove_receiver_and_drop_source() { + let c = MainContext::new(); + + c.acquire(); + + let (sender, receiver) = MainContext::channel::(Priority::default()); + + struct Helper(Arc>); + impl Drop for Helper { + fn drop(&mut self) { + *self.0.lock().unwrap() = true; + } + } + + let dropped = Arc::new(Mutex::new(false)); + let helper = Helper(dropped.clone()); + let source_id = receiver.attach(&c, move |_| { + let _helper = &helper; + Continue(true) + }); + + let source = c.find_source_by_id(&source_id).unwrap(); + source.destroy(); + + // This should drop the closure + drop(source); + + assert_eq!(*dropped.lock().unwrap(), true); + assert_eq!(sender.send(1), Err(mpsc::SendError(1))); + } }