Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question regarding usage in a non-trivial case #25

Closed
DASPRiD opened this issue May 5, 2022 · 19 comments · Fixed by #26
Closed

Question regarding usage in a non-trivial case #25

DASPRiD opened this issue May 5, 2022 · 19 comments · Fixed by #26

Comments

@DASPRiD
Copy link

DASPRiD commented May 5, 2022

Hi there! First off, thanks for this amazing project, it really helped simplifying my previous setup with handling shutdowns manually!

I tried to look for ways how people use this library in real world applications, but I couldn't really find much. I have a relatively simple question, so maybe you can guide me in the right direction:

Say I have 4 subsystems attached to the top level. Three of those need to send messages to the first one. Where would you create tx and how would you pass it down? At the moment I'm creating all of them in my main function and pass them into the new() function of every subsystem before calling run(subsys) on them. This looks a little bit like this, and I'm wondering if there's a cleaner solution (to be fair, I'm relatively new to rust):

#[tokio::main]
async fn main() -> Result<()> {
    Builder::from_env(Env::default().default_filter_or("debug")).init();

    let (foo_for_alpha_tx, foo_rx) = mpsc::channel(8);
    let foo_for_beta = foo_for_alpha.clone();
    let foo_for_gamma = foo_for_alpha.clone();

    Toplevel::new()
        .start("Alpha", |s| async move {
            Alpha::new(foo_for_alpha_tx).run(s).await
        })
        .start("Beta", |s| async move {
            Beta::new(foo_for_beta_tx).run(s).await
        })
        .start("Gamma", |s| async move {
            Beta::new(foo_for_gamma_tx).run(s).await
        })
        .start("Delta", |s| async move {
            Delta::new(foo_rx).run(s).await
        })
        .catch_signals()
        .handle_shutdown_requests(Duration::from_millis(1000))
        .await
}

In general, it'd be really nice to see a more complete example with subsystem intercommunication in the example directory I guess :)

Another question which popped into my mind, which I couldn't find an answer for: When a subsystem has nested subsystems and a shutdown is requested, do the nested subsystems shut down first before the parent, or is the order not guaranteed?

Thanks a lot for your time!

@Finomnis
Copy link
Owner

Finomnis commented May 6, 2022

Hey!

First of all, thanks for your interest in this library :) I wrote it for myself and am happy to see that other people find it useful too.
We haven't had too many complex scenarios yet, so I'm happy to adjust the API if there are suggestions on how to improve it :)
It's in active development, so if you have suggestions/API change requests, feel free to let me know :) The API isn't fixed yet.

To address your questions:
Yes, that is currently the way I would also do it to pass channels into the subsystems.
I thought about adding an example that would work like this:

.start("Alpha", Alpha::new(tx).run)

But I got problems with the lifetimes. I might revisit it though, maybe I find a way.

About shutdown order: The order is unspecified, it could happen in any order. I would best describe it as "simultaneously", as much as that is possible in an asynchronous system.
So if your usecase is that a parent system needs to wait for something to finish in a child system, I would only use the shutdown token in the child system, and then wait for the child to finish in the parent system.

I just realized there is no is_shutdown_requested function in SubsystemHandle yet that you could use to poll it, which would be useful in that scenario. I might add it in the next version.

@DASPRiD
Copy link
Author

DASPRiD commented May 6, 2022

But I got problems with the lifetimes. I might revisit it though, maybe I find a way.

Yea, because of that lifetime issue I ended up with that async move block with the await inside, that seemed to resolve that issue for me.

@Finomnis
Copy link
Owner

Finomnis commented May 6, 2022

Actually, I think the async move wouldn't be necessary in your case. I think your problem lies in the implementation of Alpha, Beta and Delta. Would you mind showing me the code of those?

I think either the tx/rx object or self in run() is passed as reference. That's why you need the move. If you would pass ownership into it, it shouldn't be necessary, like in the example.

@DASPRiD
Copy link
Author

DASPRiD commented May 6, 2022

Sure, here's one of them in a simplified version:

pub struct AudioPlayer {
    rx: mpsc::Receiver<Play>,
    stream_handle: OutputStreamHandle,
}

impl AudioPlayer {
    pub fn new(rx: mpsc::Receiver<Play>, stream_handle: OutputStreamHandle) -> Self {
        Self {
            rx,
            stream_handle,
        }
    }

    pub async fn run(&mut self, subsys: SubsystemHandle) -> Result<()> {
        tokio::select! {
            _ = subsys.on_shutdown_requested() => {
                info!("Audio player shutting down");
            },
            res = self.process() => res?
        }

        Ok(())
    }

    async fn process(&mut self) -> Result<()> {
        while let Some(play) = self.rx.recv().await {
            // …
        }

        Ok(())
    }
}

@Finomnis
Copy link
Owner

Finomnis commented May 6, 2022

Replace fn run(&mut self, with fn run(mut self,. Then (theoretically) this should work without the async move.

@DASPRiD
Copy link
Author

DASPRiD commented May 6, 2022

Indeed that does seem to compile without issues, neat :). As I said, I'm still relatively new to rust, I'm not perfectly sure when I need to pass self as a mutable reference and when just as a mutable :D

@DASPRiD
Copy link
Author

DASPRiD commented May 6, 2022

What I dislike a little bit is that I have to do the tx cloning all in the main function outside of the closures. Do you have some design pattern which might move the cloning process? Maybe have a function for each subsystem to add it to the top level, borrows the tx and creates it own clone?

@Finomnis
Copy link
Owner

Finomnis commented May 6, 2022

The difference is that the owner changes. In, &mut self the calling functions stays the owner and only borrows self to the function context. In mut self, the function takes ownership of the self object, effectively consuming the self object.
That's why you get the error with &mut self, because the main() function still owns the subsystem objects, but rust can no longer prove that main() lives longer than the object you passed into start(), creating the possibility of a dangling reference.
With async move you move the ownership into the closure, passing it into start. But simply passing the ownership into the run function achieves the same thing.

@Finomnis
Copy link
Owner

Finomnis commented May 6, 2022

I'm having an open pull request I just wrote based on your suggestion, here: #26

This would allow you to write:

    .start("Alpha", Alpha::new(tx.clone()).into_subsystem())

Is that something you would find useful?

@DASPRiD
Copy link
Author

DASPRiD commented May 6, 2022

Hell yes, that looks a lot cleaner to me 👍

@DASPRiD
Copy link
Author

DASPRiD commented May 6, 2022

I assume that there's no real issue in rust having the original tx just dangling in main, doing nothing?

@Finomnis
Copy link
Owner

Finomnis commented May 6, 2022

You could leave out the '.clone()' in the last subsystem, passing the original object into it

@DASPRiD
Copy link
Author

DASPRiD commented May 6, 2022

Good point! Thanks a lot for your input, that's really helpful :)

@Finomnis
Copy link
Owner

Finomnis commented May 6, 2022

Welcome :)

@Finomnis Finomnis linked a pull request May 6, 2022 that will close this issue
@Finomnis
Copy link
Owner

Finomnis commented May 6, 2022

Published as 0.7.0.

Let me know how it worked for you :)

@DASPRiD
Copy link
Author

DASPRiD commented May 6, 2022

Trying it out right now, I get this error message from the compiler:
lifetimes do not match method in trait

Here's my implementation of the trait:

#[async_trait::async_trait]
impl IntoSubsystem<Error> for AudioPlayer {
    async fn run(mut self, subsys: SubsystemHandle) -> Result<()> {
        tokio::select! {
            _ = subsys.on_shutdown_requested() => {
                info!("Audio player shutting down");
            },
            res = self.process() => res?
        }

        Ok(())
    }
}

(Error and Result come from anyhow).

Nevermind, I forgot to install async-trait :)

@Finomnis
Copy link
Owner

Finomnis commented May 8, 2022

Let's talk about this on the tokio discord. Just PM me.

I think I will need to see more code to find the issue, I don't think it's in the code you posted

@Finomnis Finomnis reopened this May 9, 2022
@Finomnis Finomnis closed this as completed May 9, 2022
@Finomnis
Copy link
Owner

Finomnis commented May 9, 2022

@DASPRiD Just saw that you forgot to install async-trait. Does that mean that it works for you now, and I can sleep in peace? :)

@DASPRiD
Copy link
Author

DASPRiD commented May 9, 2022

Yes you can, thanks :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants