diff --git a/CHANGELOG.md b/CHANGELOG.md index 531e5d57..77124f3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - telemetry: now `elfo_message_handling_time_seconds` doesn't include the time of task switching if an actor is preempted due to elfo's budget system. - telemetry: don't produce allocator metrics if not enabled. - telemeter: validate quantiles (0.0..=1.0) in the config. +- core: get rid of a dirty hack preventing high CPU usage in case of the large number of responses. ## [0.2.0-alpha.14] - 2024-02-27 ### Fixed diff --git a/elfo-core/src/request_table.rs b/elfo-core/src/request_table.rs index ed47dc55..c0184def 100644 --- a/elfo-core/src/request_table.rs +++ b/elfo-core/src/request_table.rs @@ -1,10 +1,10 @@ use std::{fmt, marker::PhantomData, sync::Arc}; -use futures_intrusive::sync::ManualResetEvent; use idr_ebr::Guard as EbrGuard; use parking_lot::Mutex; use slotmap::{new_key_type, Key, SlotMap}; use smallvec::SmallVec; +use tokio::sync::Notify; use crate::{ address_book::AddressBook, envelope::Envelope, errors::RequestError, message::AnyMessage, @@ -41,7 +41,7 @@ impl RequestId { pub(crate) struct RequestTable { owner: Addr, - notifier: ManualResetEvent, + notifier: Notify, requests: Mutex>, } @@ -98,7 +98,7 @@ impl RequestTable { pub(crate) fn new(owner: Addr) -> Self { Self { owner, - notifier: ManualResetEvent::new(false), + notifier: Notify::new(), requests: Mutex::new(SlotMap::default()), } } @@ -124,34 +124,19 @@ impl RequestTable { } pub(crate) async fn wait(&self, request_id: RequestId) -> Responses { - let mut n = 0; - loop { - self.notifier.wait().await; + let waiting = self.notifier.notified(); { let mut requests = self.requests.lock(); let request = requests.get(request_id).expect("unknown request"); if request.remainder == 0 { - let data = requests.remove(request_id).expect("under lock"); - - // TODO: use another approach. - if requests.values().all(|data| data.remainder != 0) { - self.notifier.reset(); - } - - break data.responses; + break requests.remove(request_id).expect("under lock"); } } - // XXX: dirty fix to avoid high CPU usage. - n += 1; - if n % 10 == 0 { - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - } else { - tokio::task::yield_now().await; - } + waiting.await; } } @@ -169,7 +154,9 @@ impl RequestTable { let request = ward!(requests.get_mut(data.request_id)); if request.push(response) { - self.notifier.set(); + // Actors can perform multiple requests in parallel using different + // wakers, so we should wake all possible wakers up. + self.notifier.notify_waiters(); } } }