Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New signal for changed Mutable, non-blocking functions to obtain guards #77

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 109 additions & 2 deletions src/signal/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::pin::Pin;
use std::marker::Unpin;
use std::ops::{Deref, DerefMut};
// TODO use parking_lot ?
use std::sync::{Arc, Weak, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::sync::{
Arc, Weak, Mutex, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard,
TryLockError, TryLockResult
};
// TODO use parking_lot ?
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::task::{Poll, Waker, Context};
Expand Down Expand Up @@ -165,11 +168,11 @@ impl<'a, A> Deref for MutableLockRef<'a, A> {
}
}


#[repr(transparent)]
pub struct ReadOnlyMutable<A>(Arc<MutableState<A>>);

impl<A> ReadOnlyMutable<A> {

// TODO return Result ?
#[inline]
pub fn lock_ref(&self) -> MutableLockRef<A> {
Expand All @@ -178,6 +181,40 @@ impl<A> ReadOnlyMutable<A> {
}
}

/// Attempts to acquire a read lock.
///
/// If the access could not be granted at this time, then `Err` is returned.
/// Otherwise, an RAII guard is returned which will release the shared access
/// when it is dropped.
///
/// This function does not block.
///
/// This function does not provide any guarantees with respect to the ordering
/// of whether contentious readers or writers will acquire the lock first.
///
/// # Errors
///
/// This function will return the `Poisoned` error if the `ReadOnlyMutable`
// is poisoned. A `ReadOnlyMutable` is poisoned whenever a writer panics
/// while holding an exclusive lock. `Poisoned` will only be returned if
/// the lock would have otherwise been acquired.
///
/// This function will return the `WouldBlock` error if the
/// `ReadOnlyMutable` could not be acquired because it was already locked
/// exclusively.
#[inline]
pub fn try_lock_ref(&self) -> TryLockResult<MutableLockRef<A>> {
match self.0.lock.try_read() {
Ok(lock) => Ok(MutableLockRef { lock }),
Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock),
Err(TryLockError::Poisoned(poison_error)) => {
let lock = poison_error.into_inner();
let poison_error = PoisonError::new(MutableLockRef { lock });
Err(TryLockError::Poisoned(poison_error))
}
}
}

fn signal_state(&self) -> MutableSignalState<A> {
let signal = MutableSignalState::new(self.0.clone());

Expand All @@ -188,6 +225,14 @@ impl<A> ReadOnlyMutable<A> {
signal
}

/** Obtain a signal that emits `()` whenever the underlying value is
* changed.
* The inner `RwLock` is not locked when polling this signal, so
* polling this signal will never block. */
pub fn signal_changed(&self) -> ChangedSignal<A> {
ChangedSignal(self.signal_state())
}

#[inline]
pub fn signal_ref<B, F>(&self, f: F) -> MutableSignalRef<A, F> where F: FnMut(&A) -> B {
MutableSignalRef(self.signal_state(), f)
Expand Down Expand Up @@ -313,6 +358,44 @@ impl<A> Mutable<A> {
lock: self.state().lock.write().unwrap(),
}
}

/// Attempts to obtain a write lock.
///
/// If the lock could not be acquired at this time, then `Err` is returned.
/// Otherwise, an RAII guard is returned which will release the lock when
/// it is dropped.
///
/// This function does not block.
///
/// This function does not provide any guarantees with respect to the ordering
/// of whether contentious readers or writers will acquire the lock first.
///
/// # Errors
///
/// This function will return the `Poisoned` error if the `Mutable` is
/// poisoned. A `Mutable` is poisoned whenever a writer panics while holding
/// an exclusive lock. `Poisoned` will only be returned if the lock would
/// have otherwise been acquired.
///
/// This function will return the `WouldBlock` error if the `Mutable` could
/// not be acquired because it was already locked exclusively.
pub fn try_lock_mut(&self) -> TryLockResult<MutableLockMut<A>> {
match self.state().lock.try_write() {
Ok(lock) => Ok(MutableLockMut {
mutated: false,
lock,
}),
Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock),
Err(TryLockError::Poisoned(poison_error)) => {
let lock = MutableLockMut {
mutated: false,
lock: poison_error.into_inner(),
};
let poison_error = PoisonError::new(lock);
Err(TryLockError::Poisoned(poison_error))
}
}
}
}

impl<A> From<A> for Mutable<A> {
Expand Down Expand Up @@ -405,6 +488,30 @@ impl<A> Drop for Mutable<A> {
}
}

/** A signal that emits `()` whenever the underlying value is changed.
* The inner `RwLock` is not locked when polling this signal, so
* polling this signal will never block. */
// TODO remove it from signals when it's dropped
#[derive(Debug)]
#[repr(transparent)]
#[must_use = "Signals do nothing unless polled"]
pub struct ChangedSignal<A>(MutableSignalState<A>);

impl<A> Signal for ChangedSignal<A> {
type Item = ();

fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.0.waker.is_changed() {
Poll::Ready(Some(()))
} else if self.0.state.senders.load(Ordering::SeqCst) == 0 {
Poll::Ready(None)
} else {
self.0.waker.set_waker(cx);
Poll::Pending
}
}
}


// TODO remove it from signals when it's dropped
#[derive(Debug)]
Expand Down