Skip to content

Commit

Permalink
fix(core/request_table): move to tokio Notify
Browse files Browse the repository at this point in the history
Removes some old dirty hack preventing high CPU usage in some cases
  • Loading branch information
loyd committed Apr 19, 2024
1 parent cf0492f commit 2bb87df
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- telemetry: now `elfo_message_handling_time_seconds` doesn't include the time of task switching if an actor is preempted due to elfo's budget system.
- telemetry: don't produce allocator metrics if not enabled.
- telemeter: validate quantiles (0.0..=1.0) in the config.
- core: get rid of a dirty hack preventing high CPU usage in case of the large number of responses.

## [0.2.0-alpha.14] - 2024-02-27
### Fixed
Expand Down
31 changes: 9 additions & 22 deletions elfo-core/src/request_table.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{fmt, marker::PhantomData, sync::Arc};

use futures_intrusive::sync::ManualResetEvent;
use idr_ebr::Guard as EbrGuard;
use parking_lot::Mutex;
use slotmap::{new_key_type, Key, SlotMap};
use smallvec::SmallVec;
use tokio::sync::Notify;

use crate::{
address_book::AddressBook, envelope::Envelope, errors::RequestError, message::AnyMessage,
Expand Down Expand Up @@ -41,7 +41,7 @@ impl RequestId {

pub(crate) struct RequestTable {
owner: Addr,
notifier: ManualResetEvent,
notifier: Notify,
requests: Mutex<SlotMap<RequestId, RequestData>>,
}

Expand Down Expand Up @@ -98,7 +98,7 @@ impl RequestTable {
pub(crate) fn new(owner: Addr) -> Self {
Self {
owner,
notifier: ManualResetEvent::new(false),
notifier: Notify::new(),
requests: Mutex::new(SlotMap::default()),
}
}
Expand All @@ -124,34 +124,19 @@ impl RequestTable {
}

pub(crate) async fn wait(&self, request_id: RequestId) -> Responses {
let mut n = 0;

loop {
self.notifier.wait().await;
let waiting = self.notifier.notified();

{
let mut requests = self.requests.lock();
let request = requests.get(request_id).expect("unknown request");

if request.remainder == 0 {
let data = requests.remove(request_id).expect("under lock");

// TODO: use another approach.
if requests.values().all(|data| data.remainder != 0) {
self.notifier.reset();
}

break data.responses;
break requests.remove(request_id).expect("under lock");
}
}

// XXX: dirty fix to avoid high CPU usage.
n += 1;
if n % 10 == 0 {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
} else {
tokio::task::yield_now().await;
}
waiting.await;
}
}

Expand All @@ -169,7 +154,9 @@ impl RequestTable {
let request = ward!(requests.get_mut(data.request_id));

if request.push(response) {
self.notifier.set();
// Actors can perform multiple requests in parallel using different
// wakers, so we should wake all possible wakers up.
self.notifier.notify_waiters();
}
}
}
Expand Down

0 comments on commit 2bb87df

Please sign in to comment.