New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add Worker trait #236
add Worker trait #236
Conversation
@thibault-martinez maybe we could try the |
bee-protocol/src/worker/mod.rs
Outdated
|
||
type Receiver: Stream<Item = Self::Event>; | ||
|
||
async fn run(self, receiver: Self::Receiver) -> Result<(), WorkerError>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether we could separate run
into distinct start
and stop
methods such that we can know externally when each worker has started and when it has finished its shutdown (right now it's a bit opaque and this is going to be annoying to work with when we want more granular control over startup/shutdown)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zesterer what should stop
do? I think most (all?) workers just break the loop and write the Stopped
log entry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, stop
would break the loop. I do wonder whether the existing shutdown system might have a part to play here instead though.
bee-protocol/src/worker/mod.rs
Outdated
pub(crate) trait Worker { | ||
type Event; | ||
|
||
type Receiver: Stream<Item = Self::Event>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that Event
/Receiver
should exist here. Doing things through the event bus by passing a reference to the node in might work better here. Something like this:
impl<N: Node> Worker<N> for MyWorker {
fn start(node: &N) -> Result<Self, Self::Error> {
let (tx, rx) = channel();
node.event_bus().register_handler::<MyEvent>(move |e| tx.send(e.clone()));
task::spawn(move || {
while let Ok(e) = rx.recv().await {
// Use `e` here
}
});
Ok(Self)
}
}
Additionally, since this is a pretty common use-case, a utility method on EventBus
like EventBus::register_receiver<E>() -> Receiver<E>
would be quite a neat way to simplify this case to the following:
impl<N: Node> Worker<N> for MyWorker {
fn start(node: &N) -> Result<Self, Self::Error> {
let rx = node.event_bus().register_receiver::<MyEvent>();
task::spawn(move || {
while let Ok(e) = rx.recv().await {
// Use `e` here
}
});
Ok(Self)
}
}
const DEPS: &'static [TypeId] = &[]; | ||
|
||
type Event = BroadcasterWorkerEvent; | ||
type Receiver = ShutdownStream<Fuse<mpsc::UnboundedReceiver<BroadcasterWorkerEvent>>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I mentioned this before, but I'd really like to replace all of this sort of thing with a central event bus + a way to get references to other workers via the N: Node
. Probably for a follow-up PR though.
fixes #234.