-
-
Notifications
You must be signed in to change notification settings - Fork 116
Replace event channel with broadcast channel #5478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
6d0725b to
c86528c
Compare
2277ddc to
3853696
Compare
6836aab to
4928dc2
Compare
| pub struct Events { | ||
| receiver: Receiver<Event>, | ||
| sender: Sender<Event>, | ||
| /// Unused receiver to prevent the channel from closing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is ugly, but removing it actually makes a lot of tests fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have deactivated the receiver. This is a documented use of InactiveReceiver, so I guess this is how this is supposed to be done:
https://docs.rs/async-broadcast/0.7.0/async_broadcast/struct.InactiveReceiver.html
4928dc2 to
aca11ab
Compare
| #[pin_project] | ||
| pub struct EventEmitter(#[pin] Receiver<Event>); | ||
| #[derive(Debug)] | ||
| pub struct EventEmitter(Mutex<async_broadcast::Receiver<Event>>); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Mutex is needed because recv of broadcast channel takes &mut self instead of &self like with a normal channel. This is the same for async-broadcast crate I used and tokio broadcast channels. I think the idea is that there is no need for synchronization if receiver is only used by a single thread as in broadcast channels each receiver reads from its own chunk of memory, but in our case we expose event emitter to FFI and otherwise allow using it from multiple threads, so we want thread-safety and have to add our own synchronization.
src/events.rs
Outdated
| /// | ||
| /// Returns `None` if no events are available for reception. | ||
| pub async fn try_recv(&self) -> Option<Event> { | ||
| let mut lock = self.0.lock().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, EventEmitter::recv() can be called in parallel by multiple tasks, try_recv() as well, but together they can't be called in parallel as try_recv() would wait for recv(). How critical it is? Afaiu not at all as each task should use its own emitter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean if we call recv() and it starts waiting, attempt to call try_recv() will lock until recv() returns? The other way round we cannot easily deadlock as try_recv() returns immediately.
This is actually a bug, at least needs a comment above recv that it may lock out try_recv if there is no easy way to solve it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.rs/async-broadcast/0.7.0/async_broadcast/index.html describes broadcast channel as "MPMC". But it accepts &mut self in both https://docs.rs/async-broadcast/0.7.0/async_broadcast/struct.Receiver.html#method.recv and https://docs.rs/async-broadcast/0.7.0/async_broadcast/struct.Receiver.html#method.try_recv
So it is not MPMC in the same sense as ordinary channel is, multiple "consumers" cannot read from the same receiver at the same time. I filed upstream issue: smol-rs/async-broadcast#57
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it is not MPMC in the same sense as ordinary channel is, multiple "consumers" cannot read from the same receiver at the same time. Maybe this should be filed as a bug to async-broadcast?
I think they meant that multiple consumers should use multiple receivers. But then yes, it's not MPMC because those receivers don't "steal" messages from each other, but receive message copies.
This is actually a bug, at least needs a comment above
recvthat it may lock outtry_recvif there is no easy way to solve it.
Seems that adding some flag like is_empty: RwLock<bool> (set by recv()) solves this, but then try_recv() can miss events if the flag isn't yet reset, but new events have already arrived.
EDIT: But probably it's ok to miss events if there are other consumers working in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even bool isn't needed, just RwLock<()>. recv() should do try_recv() first which takes a read lock (i.e. calls try_read()) and if try_recv() didn't succeed, take a write lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made try_recv() use try_lock on the mutex. So try_recv may return an error if there are concurrent calls to recv or try_recv, but we don't use it from concurrent threads so it is fine. It is now documented that it may return an error and good thing is that try_recv is non-async again.
163959d to
2b6c35b
Compare
2b6c35b to
d7a16c4
Compare
1357019 to
2c0592a
Compare
src/events.rs
Outdated
| /// Tries to receive an event without blocking. | ||
| /// | ||
| /// Returns error if no events are available for reception | ||
| /// or if receiver is blocked by a concurrent call to [`recv`]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or a concurrent call to try_recv() i guess. If it's done from another os thread
This makes `EventTracker` receive events immediately instead of being moved from event emitter to event tracker by a task spawned from `TestContext::new_internal`. This makes `EventTracker.clear_events` reliable as it is guaranteed to remove all events emitted by the time it is called rather than only events that have been moved already.
2c0592a to
5241ed6
Compare
Purpose of this PR is to make emitter events immediately available to
TestContext"event tracker". Before this PR "event tracker" relied on this task copying events fromEventEmitterto "event tracker" andLogSink:https://github.com/deltachat/deltachat-core-rust/blob/65822e53e669900d21f2c361458c1aa61851f480/src/test_utils.rs#L367-L376
The problem with current approach is that events are not available to event tracker immediately after emitting them because this task may not be waken up immediately after emitting the event. This makes
clear_eventsunreliable as it only consumes events that have already been moved from event emitter to event tracker.This PR replaces event channel with broadcast channel, so it is possible to have primary
EventEmitterand "event tracker" receiving events immediately.LogSinknow also gets its own broadcast receiver for each account and runs a task moving events from receivers to the log channel. This may result in reordering of events in the logs, but it is not as critical as "event tracker" not receiving events immediately as it does not break tests.Related comment:
#5471 (comment)
There is another PR solving the problem of clearing events in event tracker in Python by using special "checkpoint" events: #5477
In Python current approach from this PR is not possible because events are polled via
get_next_eventcall which is not guaranteed to return result immediately as event is available.