Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait on NestedSubsystem #37

Closed
viveksjain opened this issue May 22, 2022 · 27 comments
Closed

Wait on NestedSubsystem #37

viveksjain opened this issue May 22, 2022 · 27 comments

Comments

@viveksjain
Copy link

Thanks for writing this library! It seems helpful for my async code. I couldn't figure out how to handle this use case though: I have a nested subsystem, which will shut itself down on error. I want to retry from the top level in a loop. Something like

async fn top_subsystem(subsys: SubsystemHandle) -> Result<()> {
  let init = one_time_expensive_setup();
  loop {
    let s = cheap_setup();
    let nested = subsys.start("nested", nested_subsystem);
    nested.wait();
  }
}

async fn nested_subsystem(subsys: SubsystemHandle) -> Result<()> {
  ...
  // On error:
  subsys.request_shutdown();
}

How would I do the nested.wait() part?

@Finomnis
Copy link
Owner

Finomnis commented May 22, 2022

Hey!
Thanks for your kind words, much appreciated.
It is currently not yet possible to wait for nested subsystems. I was close to rewriting the partial shutdown mechanism to include just that, but I stopped the effort due to a bunch of unanswered conceptual questions.

That was the PR: https://github.com/Finomnis/tokio-graceful-shutdown/pull/36/files

The reason I stopped this is because it inherently created a couple of race conditions. Like, where should potential errors be delivered? To Toplevel or join? It can only be one of them, because Errors do not implement Clone. If join is the answer, what about if an error happens after start but before join? Should errors in subsystems that are being joined still trigger program shutdown?

The tldr; of that is that instead of making subsystems joinable, I might instead make Toplevel nestable. That would mean that errors and panics in subsystems could be properly handled, as they would only bubble up to the next Toplevel object, instead of causing a global program shutdown.

What are your thoughts?

@viveksjain
Copy link
Author

Yeah that sounds good. Those are the semantics I want from NestedSubsystem anyways, I was wondering why it has to be a different type from Toplevel (I would expect just a single Subsystem type). I guess you'd have to figure out what semantics of catch_signals should be but otherwise it seems clear to me.

@Finomnis
Copy link
Owner

Finomnis commented May 23, 2022

Nested subsystems are part of the subsystem tree that propagates errors up to the toplevel object. If you think that's a bad design decision we can rethink the API in general. Toplevel objects, on the other hand, would catch errors and force you to consume them.

Jah I thought about catch_signals, but I came to the conclusion that it shouldn't really matter. It is based on tokios signal handlers, and they don't specify that you can't have multiple running at the same time. Removing the function from the nested version of Toplevel just feels like removing something for the sake of it. Maybe there even is a usecase for someone who wants to handle events in a nested Toplevel instead of the top Toplevel.

So I think all I will add is, additionally to Toplevel::new(), a Toplevel::nested(&SubsystemHandle) constructor. Which produces the same Toplevel object, with the difference that it reacts to shutdown/cancelations from its parent.

@Finomnis
Copy link
Owner

One of my acceptance criteria for this task was even that I rewrite the example with the restarting subsystem, which would be exactly your usecase, I guess.

@Finomnis
Copy link
Owner

I'll continue working on it probably next week, I sadly don't have much time left this week

@viveksjain
Copy link
Author

Nested subsystems are part of the subsystem tree that propagates errors up to the toplevel object. If you think that's a bad design decision we can rethink the API in general. Toplevel objects, on the other hand, would catch errors and force you to consume them.

Certainly would require some thought, but at least to me having just one type and recursive structure seems clearer. Still seems possible to create the same propagation semantics as today with some additional code tweaks to the API. Example of how I might imagine it working:

let top_handle = Subsystem::new();
let handle2 = top_handle.clone();
let s1_handle = handle.start("subsys1", |sub_handle| fn1(sub_handle , handle2));
let s2_handle = handle.start("subsys2", fn2);
handle.wait_shutdown(Duration::from_millis(100))

async fn fn1(handle: SubsystemHandle , top_handle: SubsystemHandle) -> Result<()> {
  let sub_handle = handle.start("subsys3", fn2);
  sleep(Duration::from_millis(1000)).await;
  top_handle.request_shutdown();
handle.wait_shutdown(Duration::from_millis(100))
}

async fn fn2(handle: SubsystemHandle) -> Result<()> {
  loop {
    if handle.is_shutdown_requested() {
      println!("Shutting down");
      return Ok(());
    }
  }
}

My 2c as someone new to the APIs, but I haven't spent too much time thinking through it so certainly possible the above doesn't make sense/isn't possible for some reason :)

@Finomnis
Copy link
Owner

Finomnis commented May 24, 2022

In general this is the right thought, the problem is error handling.
How far should an error propate up? All the way to the top, or to the next join?
Keep in mind that errors aren't copy/clone, so every error must have exactly one destination.

One could of course say error handling should be done by the user and all we provide is the shutdown token, but I think then it is easier to use tokio's shutdown token directly.

This will be the only difference between a nested subsystem and a nested toplevel: subsystems propagate errors upwards, while toplevels collect and process them.

Maybe you have an idea of how to combine those two, I am running out of ideas.

I had the idea of having a start and a start_detached that spawn subsystems that do or do not propagate errors. But that lead to other problems, although the idea might be worth investigating again.

But still, there would be two different structs, one for a subsystem that can be joined and which does not propagate errors up, and the other one that can't be joined and forwards errors to the parent. (as their API would be different)

@viveksjain
Copy link
Author

I imagine each subsystem would be responsible for propagating errors manually up the chain if necessary. For most usage I expect calling wait_shutdown at the end would be sufficient. If a subsystem wants to propagate errors but not wait, it could always be done manually with something like

async fn fn1(handle: SubsystemHandle , top_handle: SubsystemHandle) -> Result<()> {
  let sub_handle = handle.start("subsys3", fn2);
  tokio::spawn(async move {
    // Propagate error
    if let Err(err) = sub_handle.wait_shutdown() {
      top_handle.set_error(err);
      // Or maybe request_shutdown should take Option<ErrType>?
    }
  });
  ...
}

Maybe it's not ideal for some types of use cases though.

@Finomnis
Copy link
Owner

Finomnis commented May 24, 2022

I'm thinking about removing error propagation entirely, because after thinking about it a little more, you really don't gain much from automatic error-propagation to the top.

The new api could resemble something like this:

async fn my_subsys(handle: SubsystemHandle) -> Result<()> {
    let (nested_joinhandle1, nested_subsys1) = handle.spawn("nested1", nested1);
    let (nested_joinhandle2, nested_subsys2) = handle.spawn("nested2", nested2);

   tokio::join(
       nested_joinhandle1,
       nested_joinhandle2,
   )
}

I think the term spawn fits better instead of start as it then resembles tokio::spawn more closely.

It's important to make the joinhandles must_use to prevent them from just being discarded. But then all usecases should be satisfied:

  • Errors get propagated up automatically as users have to join nested subsystems and therefore are obliged to handle errors of nested subsystems.
  • The question of where to deliver errors is gone, as errors are obviously delivered to the joinhandle
  • If a nested_subsys::request_shutdown exists, the question of partial shutdown is also solved
  • The question of how cancellation propagates to subsystems could be solved by having the joinhandle auto-cancel subsystems on drop

Implementing your restart-a-subsystem-until-failure should also be trivial with this.

The reason I kept nested_joinhandle and nested_subsys separate is because there is the usecase where you would want to move/copy the handle to the subsystem to another task/context to perform a partial shutdown from there while joining it from somewhere else.

What are your thoughts about that?

Edit:
After re-reading your suggestion I think your idea and what I just wrote aren't that far apart from each other.
tokio::spawn would still escape cancellation, but having a tokio::spawn without any shutdown/stopping/error propagation mechanism is an error in programming imo anyway.

@Finomnis
Copy link
Owner

Finomnis commented May 24, 2022

I might attempt to re-write this library within the next weeks, by simply implementing an API skeleton with a bunch of todo!() everywhere and then rewriting all the examples, to see what corner cases we forgot to consider.
I'll keep you posted here to get your thoughs on it.

@viveksjain
Copy link
Author

Yeah, that looks good to me! Agreed that both suggestions are fairly similar. While searching around for some rust async stuff, I had also come across tokio-rs/tokio#1879 and this proposal sounds quite similar to Structured Concurrency. I guess there's nothing new under the sun (or perhaps, great minds think alike 😛) Disclaimer: I haven't read through the issue in detail or any links. The last comment does mention some libs though, probably worth seeing how they compare to this proposal.

@Finomnis
Copy link
Owner

I think structured concurrency, or specifically the 'nursery' pattern is pretty cool. I don't think it is strongly connected to this library though, this library should leave the specific use case up to the user imo and be as non-specialized as possible.

@viveksjain
Copy link
Author

viveksjain commented May 25, 2022

OK, so just to clarify for me: the main difference is that structured concurrency requires nested scopes? And this proposal does not, because you can pass around the joinhandle (although it does make structured concurrency very easy)? I also don't see cancellation support in async_nursery, this proposal has it which is definitely useful IMO, but that seems orthogonal to structured concurrency.

@Finomnis
Copy link
Owner

Tbh I haven't read that deeply into it. I'm familiar with the concepts, but both of them aren't really relevant for this library, so I haven't dealt with them in detail yet.

@Finomnis
Copy link
Owner

Finomnis commented May 25, 2022

Turns out there are problems with the simplified API. The biggest one is that the code the user will write is very quick to revert to cancellation instead of graceful shutdown.

If we say the user has to forward errors, then that means that the user also has to initiate shutdown requests when a nested error happens.

Example subsystem structure:

SubsysA
   -> SubsysB
       -> SubsysC1
       -> SubsysC2

If subsystem C1 throws an error, what happens? Ideally, if B is unable to restart/recover from an error from C1, then the whole system should shut down gracefully.

But there is a high chance that B is implemented like this:

tokio::select!(
    _ => c1_joinhandle => (...something...),
    _ => c2_joinhandle => (...something...),
)

This code would cancel C2 instead of shutting it down gracefully, and at the same time it would disconnect from the C2 error propagation.

So how would you implement the behaviour "If either C1 or C2 throws an error, shut down the entire Program"?

async fn subsys_b(subsys: SubsystemHandle) -> Result<()> {
    let (c1_joinhandle, c1) = subsys.start("SubsysC1", subsys_c1);
    let (c2_joinhandle, c2) = subsys.start("SubsysC2", subsys_c2);

    let subsys_clone1 = subsys.clone();
    let subsys_clone2 = subsys.clone();

    let (err1, err2) = tokio::join!(
        async move {
            c1_joinhandle.await.map_err(|err| {
                subsys_clone1.global_shutdown();
                err
            })
        },
        async move {
            c2_joinhandle.await.map_err(|err| {
                subsys_clone2.global_shutdown();
                err
            })
        },
    );

    combine_errors(err1, err2)
}

For one, this is a mess and not the programming style I'd like to cause with my API.
Further, there are a couple of problems:

  • Errors only get propagated to Toplevel as soon as all subsystems are stopped, as tokio::join()
  • To actually propagate errors, there needs to be a magical function combine_errors, as tokio::join() returns errors in a tuple
  • Using tokio::try_join() is not an option, as it again cancels the other branches instead of shutting them down gracefully.

@viveksjain
Copy link
Author

Those are good points. Would some helper like subsys.join() help here? Docs would need some warning about potential problems with using tokio::join. But the custom join would have the following behavior:

  • Wait on all nested subsystems that have been started
  • If one subsystem returns error, the entire subsystem is canceled
  • Combines errors into a Vec (or GracefulShutdownError)
    It would still be the case that errors are only propagated to Toplevel once all subsystems are stopped, but this seems fine for typical usage?

I believe this would work for my use case, but not sure if it addresses all your concerns and whether there are any other edge cases.

@Finomnis
Copy link
Owner

While that would work, we are then back at square one where we cannot react to the termination of a single subsystem any more. I remind you that errors are non-copyable, so a singular error path is required. That would mean that joining a single subsystem would have to remove its error path from the handle's join call.

I think we are slowly shifting back to the existing pattern then, and I feel like making the original Toplevel nestable becomes the most lucrative option again.

@viveksjain
Copy link
Author

Ah of course, I keep forgetting that because I don't need it and difficult to understand for me without a specific example. In that case nesting Toplevel makes sense. Just one request, please rename the classes :). I think Toplevel that can be nested and a NestedSubsystem would only add to the confusion.

@Finomnis
Copy link
Owner

I agree with your naming feedback. I'm open to suggestions.

@viveksjain
Copy link
Author

I didn't give a suggestion because I'm not good at naming :). Name should make it clear that the only real difference between the two (AIUI) is that one of them propagates errors up the chain and one doesn't. Some initial ideas are Subsystem/ScopedSubsystem and ErrPropagatingSubsystem/PassthroughSubsystem.

@Finomnis
Copy link
Owner

Toplevel itself isn't a subsystem, though. It doesn't run any code, it is only a carrier of subsystems.

@Finomnis
Copy link
Owner

@viveksjain
Copy link
Author

Yup, that looks great!

@Finomnis
Copy link
Owner

Finomnis commented Jun 1, 2022

@viveksjain Released as 0.10.0-beta.0. Would you mind testing it in your project to see if you run into any issues? If you still have a use case for it, of course.

@viveksjain
Copy link
Author

Happy to, will try to do so over the next week or so

@viveksjain
Copy link
Author

I did get the chance to try out the beta version, and happy to say that it works great. Helped me clean up my code quite a bit. Feel free to close this issue!

One observation I will mention is that in most cases, I would like a newly started subsystem to select! on the parent's on_shutdown_requested so that it shuts down immediately on error, and this was causing quite a bit of boilerplate. I was able to address it with the following macro though

/// Start a nested subsystem that `select!`s on `subsys.on_shutdown_requested()` to stop automatically.
/// `subsystem_to_start` must have type `Future<Output=anyhow::Result<()>>`.
#[macro_export]
macro_rules! start {
    ($subsys:expr, $name:expr, $subsystem_to_start:expr) => {
        let subsys_clone = $subsys.clone();
        $subsys.start($name, move |_h: SubsystemHandle| async move {
            tokio::select! {
                r = $subsystem_to_start => r,
                _ = subsys_clone.on_shutdown_requested() => Ok::<(), anyhow::Error>(())
            }
        });
    };
}

Will leave it up to you on whether such a feature makes sense to be in the lib or not (in which case it can probably be another function on SubsystemHandle, rather than a macro).

@Finomnis
Copy link
Owner

Finomnis commented Jun 6, 2022

Moved to new issue.

@Finomnis Finomnis closed this as completed Jun 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants