Skip to content

Commit

Permalink
Errors during partial shutdown no longer cause global shutdown
Browse files Browse the repository at this point in the history
If an error during a partial shutdown happens, that error should
be delivered to the task that issued the partial shutdown.

Instead, there was a bug where the error would cause a global shutdown.
This change fixes that bug.
  • Loading branch information
Finomnis committed Dec 5, 2021
1 parent 3439479 commit f0000d3
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 2 deletions.
61 changes: 61 additions & 0 deletions examples/14_partial_shutdown_error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//! This example demonstrates how to perform a partial shutdown of the system.
//!
//! Subsys1 will perform a partial shutdown after 5 seconds, which will in turn
//! shut down Subsys2 and Subsys3, leaving Subsys1 running.

use anyhow::Result;
use env_logger::{Builder, Env};
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{SubsystemHandle, Toplevel};

async fn subsys3(subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsys3 started.");
subsys.on_shutdown_requested().await;
panic!("Subsystem3 threw an error!")
}

async fn subsys2(mut subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsys2 started.");
subsys.start("Subsys3", subsys3);
subsys.on_shutdown_requested().await;
log::info!("Subsys2 stopped.");
Ok(())
}

async fn subsys1(mut subsys: SubsystemHandle) -> Result<()> {
// This subsystem shuts down the nested subsystem after 5 seconds.
log::info!("Subsys1 started.");

log::info!("Starting nested subsystem ...");
let nested_subsys = subsys.start("Subsys2", subsys2);
log::info!("Nested subsystem started.");

tokio::select! {
_ = subsys.on_shutdown_requested() => (),
_ = sleep(Duration::from_secs(1)) => {
log::info!("Shutting down nested subsystem ...");
if let Err(err) = subsys.perform_partial_shutdown(nested_subsys).await{
log::warn!("Partial shutdown failed: {}", err);
};
log::info!("Nested subsystem shut down.");
subsys.on_shutdown_requested().await;
}
};

log::info!("Subsys1 stopped.");

Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
// Init logging
Builder::from_env(Env::default().default_filter_or("debug")).init();

// Create toplevel
Toplevel::new()
.start("Subsys1", subsys1)
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
}
11 changes: 9 additions & 2 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ impl SubsystemRunner {
async fn handle_subsystem(
mut inner_joinhandle: JoinHandle<Result<()>>,
shutdown_token: ShutdownToken,
local_shutdown_token: ShutdownToken,
name: String,
cancellation_requested: Event,
) -> Result<Result<(), ()>, JoinError> {
Expand All @@ -34,12 +35,16 @@ impl SubsystemRunner {
Ok(Ok(())) => {Ok(Ok(()))},
Ok(Err(e)) => {
log::error!("Error in subsystem '{}': {:?}", name, e);
shutdown_token.shutdown();
if !local_shutdown_token.is_shutting_down() {
shutdown_token.shutdown();
}
Ok(Err(()))
},
Err(e) => {
log::error!("Error in subsystem '{}': {}", name, e);
shutdown_token.shutdown();
if !local_shutdown_token.is_shutting_down() {
shutdown_token.shutdown();
}
Err(e)
}
}
Expand All @@ -61,6 +66,7 @@ impl SubsystemRunner {
pub fn new<Fut: 'static + Future<Output = Result<()>> + Send>(
name: String,
shutdown_token: ShutdownToken,
local_shutdown_token: ShutdownToken,
subsystem_future: Fut,
) -> Self {
let (cancellation_requested, request_cancellation) = Event::create();
Expand All @@ -71,6 +77,7 @@ impl SubsystemRunner {
let outer_joinhandle = tokio::spawn(Self::handle_subsystem(
inner_joinhandle,
shutdown_token,
local_shutdown_token,
name,
cancellation_requested,
));
Expand Down
1 change: 1 addition & 0 deletions src/subsystem/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl SubsystemHandle {
let subsystem_runner = SubsystemRunner::new(
name,
subsystem_handle.global_shutdown_token().clone(),
new_subsystem.local_shutdown_token.clone(),
subsystem(subsystem_handle),
);

Expand Down
2 changes: 2 additions & 0 deletions tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ async fn partial_shutdown_panic_gets_propagated_correctly() {
assert_eq!(result.err(), Some(PartialShutdownError::SubsystemFailed));
assert!(nested_started.get());
assert!(nested_finished.get());
assert!(!subsys.local_shutdown_token().is_shutting_down());

subsys.request_shutdown();
Ok(())
Expand Down Expand Up @@ -643,6 +644,7 @@ async fn partial_shutdown_error_gets_propagated_correctly() {
assert_eq!(result.err(), Some(PartialShutdownError::SubsystemFailed));
assert!(nested_started.get());
assert!(nested_finished.get());
assert!(!subsys.local_shutdown_token().is_shutting_down());

subsys.request_shutdown();
Ok(())
Expand Down

0 comments on commit f0000d3

Please sign in to comment.