Skip to content

Commit

Permalink
Change timeout/delay functions to non-async
Browse files Browse the repository at this point in the history
Because of a compiler bug, the `async` implementations of
`delay`/`delay_until`/`timeout`/`timeout_at` produce much larger RAM
footprint than they should.

Fixes rtic-rs#890.
  • Loading branch information
dalegaard committed Mar 26, 2024
1 parent fa2a5b4 commit 99e69d1
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 98 deletions.
2 changes: 2 additions & 0 deletions rtic-time/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ For each category, *Added*, *Changed*, *Fixed* add new entries at the top!

### Changed

- Replace `async` implementations of `delay`/`delay_until`/`timeout`/`timeout_at` with strucs to reduce memory usage.

### Fixed

- Docs: Rename `DelayUs` to `DelayNs` in docs.
Expand Down
199 changes: 101 additions & 98 deletions rtic-time/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,12 @@
#![deny(missing_docs)]
#![allow(incomplete_features)]

use core::future::{poll_fn, Future};
use core::future::Future;
use core::pin::Pin;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use core::task::{Poll, Waker};
use futures_util::{
future::{select, Either},
pin_mut,
};
use linked_list::{Link, LinkedList};
pub use monotonic::Monotonic;
use rtic_common::dropper::OnDrop;

pub mod half_period_counter;
mod linked_list;
Expand Down Expand Up @@ -75,26 +70,6 @@ pub struct TimerQueue<Mono: Monotonic> {
/// This indicates that there was a timeout.
pub struct TimeoutError;

/// This is needed to make the async closure in `delay_until` accept that we "share"
/// the link possible between threads.
struct LinkPtr<Mono: Monotonic>(*mut Option<linked_list::Link<WaitingWaker<Mono>>>);

impl<Mono: Monotonic> Clone for LinkPtr<Mono> {
fn clone(&self) -> Self {
LinkPtr(self.0)
}
}

impl<Mono: Monotonic> LinkPtr<Mono> {
/// This will dereference the pointer stored within and give out an `&mut`.
unsafe fn get(&mut self) -> &mut Option<linked_list::Link<WaitingWaker<Mono>>> {
&mut *self.0
}
}

unsafe impl<Mono: Monotonic> Send for LinkPtr<Mono> {}
unsafe impl<Mono: Monotonic> Sync for LinkPtr<Mono> {}

impl<Mono: Monotonic> TimerQueue<Mono> {
/// Make a new queue.
pub const fn new() -> Self {
Expand Down Expand Up @@ -166,29 +141,25 @@ impl<Mono: Monotonic> TimerQueue<Mono> {
}

/// Timeout at a specific time.
pub async fn timeout_at<F: Future>(
&self,
instant: Mono::Instant,
future: F,
) -> Result<F::Output, TimeoutError> {
let delay = self.delay_until(instant);

pin_mut!(future);
pin_mut!(delay);

match select(future, delay).await {
Either::Left((r, _)) => Ok(r),
Either::Right(_) => Err(TimeoutError),
pub fn timeout_at<F: Future>(&self, instant: Mono::Instant, future: F) -> Timeout<'_, Mono, F> {
Timeout {
delay: Delay::<Mono> {
instant,
queue: &self.queue,
link_ptr: None,
marker: AtomicUsize::new(0),
},
future,
}
}

/// Timeout after at least a specific duration.
#[inline]
pub async fn timeout_after<F: Future>(
pub fn timeout_after<F: Future>(
&self,
duration: Mono::Duration,
future: F,
) -> Result<F::Output, TimeoutError> {
) -> Timeout<'_, Mono, F> {
let now = Mono::now();
let mut timeout = now + duration;
if now != timeout {
Expand All @@ -197,12 +168,12 @@ impl<Mono: Monotonic> TimerQueue<Mono> {

// Wait for one period longer, because by definition timers have an uncertainty
// of one period, so waiting for 'at least' needs to compensate for that.
self.timeout_at(timeout, future).await
self.timeout_at(timeout, future)
}

/// Delay for at least some duration of time.
#[inline]
pub async fn delay(&self, duration: Mono::Duration) {
pub fn delay(&self, duration: Mono::Duration) -> Delay<'_, Mono> {
let now = Mono::now();
let mut timeout = now + duration;
if now != timeout {
Expand All @@ -211,79 +182,111 @@ impl<Mono: Monotonic> TimerQueue<Mono> {

// Wait for one period longer, because by definition timers have an uncertainty
// of one period, so waiting for 'at least' needs to compensate for that.
self.delay_until(timeout).await;
self.delay_until(timeout)
}

/// Delay to some specific time instant.
pub async fn delay_until(&self, instant: Mono::Instant) {
pub fn delay_until(&self, instant: Mono::Instant) -> Delay<'_, Mono> {
if !self.initialized.load(Ordering::Relaxed) {
panic!(
"The timer queue is not initialized with a monotonic, you need to run `initialize`"
);
}
Delay::<Mono> {
instant,
queue: &self.queue,
link_ptr: None,
marker: AtomicUsize::new(0),
}
}
}

let mut link_ptr: Option<linked_list::Link<WaitingWaker<Mono>>> = None;
/// Future returned by `delay` and `delay_until`.
pub struct Delay<'q, Mono: Monotonic> {
instant: Mono::Instant,
queue: &'q LinkedList<WaitingWaker<Mono>>,
link_ptr: Option<linked_list::Link<WaitingWaker<Mono>>>,
marker: AtomicUsize,
}

// Make this future `Drop`-safe
// SAFETY(link_ptr): Shadow the original definition of `link_ptr` so we can't abuse it.
let mut link_ptr =
LinkPtr(&mut link_ptr as *mut Option<linked_list::Link<WaitingWaker<Mono>>>);
let mut link_ptr2 = link_ptr.clone();
impl<'q, Mono: Monotonic> Future for Delay<'q, Mono> {
type Output = ();

let queue = &self.queue;
let marker = &AtomicUsize::new(0);
fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
// SAFETY: We ensure we never move anything out of this.
let this = unsafe { self.get_unchecked_mut() };

let dropper = OnDrop::new(|| {
queue.delete(marker.load(Ordering::Relaxed));
});
if Mono::now() >= this.instant {
return Poll::Ready(());
}

poll_fn(|cx| {
if Mono::now() >= instant {
return Poll::Ready(());
// SAFETY: this is dereferenced only here and in `drop`. As the queue deletion is done only
// in `drop` we can't do this access concurrently with queue removal.
let link = &mut this.link_ptr;
if link.is_none() {
let link_ref = link.insert(Link::new(WaitingWaker {
waker: cx.waker().clone(),
release_at: this.instant,
was_popped: AtomicBool::new(false),
}));

// SAFETY(new_unchecked): The address to the link is stable as it is defined
// outside this stack frame.
// SAFETY(insert): `link_ref` lfetime comes from `link_ptr` which itself is owned by
// the `Delay` struct. The `Delay::drop` impl ensures that the link is removed from the
// queue on drop, which happens before the struct and thus `link_ptr` goes out of
// scope.
let (head_updated, addr) = unsafe { this.queue.insert(Pin::new_unchecked(link_ref)) };
this.marker.store(addr, Ordering::Relaxed);
if head_updated {
Mono::pend_interrupt()
}
}

// SAFETY: This pointer is only dereferenced here and on drop of the future
// which happens outside this `poll_fn`'s stack frame, so this mutable access cannot
// happen at the same time as `dropper` runs.
let link = unsafe { link_ptr2.get() };
if link.is_none() {
let link_ref = link.insert(Link::new(WaitingWaker {
waker: cx.waker().clone(),
release_at: instant,
was_popped: AtomicBool::new(false),
}));

// SAFETY(new_unchecked): The address to the link is stable as it is defined
//outside this stack frame.
// SAFETY(insert): `link_ref` lifetime comes from `link_ptr` that is shadowed, and
// we make sure in `dropper` that the link is removed from the queue before
// dropping `link_ptr` AND `dropper` makes sure that the shadowed `link_ptr` lives
// until the end of the stack frame.
let (head_updated, addr) = unsafe { queue.insert(Pin::new_unchecked(link_ref)) };

marker.store(addr, Ordering::Relaxed);

if head_updated {
// Pend the monotonic handler if the queue head was updated.
Mono::pend_interrupt()
}
Poll::Pending
}
}

impl<'q, Mono: Monotonic> Drop for Delay<'q, Mono> {
fn drop(&mut self) {
// SAFETY: Drop cannot be run at the same time as poll, so we can't end up
// derefencing this concurrently to the one in `poll`.
match self.link_ptr.as_ref() {
None => return,
// If it was popped from the queue there is no need to run delete
Some(link) if link.val.was_popped.load(Ordering::Relaxed) => return,
_ => {}
}
self.queue.delete(self.marker.load(Ordering::Relaxed));
}
}

/// Future returned by `timeout` and `timeout_at`.
pub struct Timeout<'q, Mono: Monotonic, F> {
delay: Delay<'q, Mono>,
future: F,
}

impl<'q, Mono: Monotonic, F: Future> Future for Timeout<'q, Mono, F> {
type Output = Result<F::Output, TimeoutError>;

fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
let inner = unsafe { self.get_unchecked_mut() };

{
let f = unsafe { Pin::new_unchecked(&mut inner.future) };
if let Poll::Ready(v) = f.poll(cx) {
return Poll::Ready(Ok(v));
}
}

Poll::Pending
})
.await;

// SAFETY: We only run this and dereference the pointer if we have
// exited the `poll_fn` below in the `drop(dropper)` call. The other dereference
// of this pointer is in the `poll_fn`.
if let Some(link) = unsafe { link_ptr.get() } {
if link.val.was_popped.load(Ordering::Relaxed) {
// If it was popped from the queue there is no need to run delete
dropper.defuse();
{
let d = unsafe { Pin::new_unchecked(&mut inner.delay) };
if d.poll(cx).is_ready() {
return Poll::Ready(Err(TimeoutError));
}
} else {
// Make sure that our link is deleted from the list before we drop this stack
drop(dropper);
}

Poll::Pending
}
}

0 comments on commit 99e69d1

Please sign in to comment.