Skip to content

Commit

Permalink
Merge 9e65603 into 15bfcb0
Browse files Browse the repository at this point in the history
  • Loading branch information
Finomnis committed Oct 18, 2023
2 parents 15bfcb0 + 9e65603 commit b766802
Show file tree
Hide file tree
Showing 54 changed files with 2,426 additions and 2,021 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/audit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions-rs/audit-check@v1
- uses: rustsec/audit-check@v1.4.1
with:
token: ${{ secrets.GITHUB_TOKEN }}
19 changes: 18 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,23 @@ jobs:
- name: Run cargo test
run: cargo test -- --test-threads 1

msrv:
name: Minimum Supported Rust Version
runs-on: ubuntu-latest
env:
RUSTFLAGS: "-D warnings"
steps:
- name: Checkout sources
uses: actions/checkout@v3

- name: Install MSRV toolchain
uses: dtolnay/rust-toolchain@1.63.0

#- uses: Swatinem/rust-cache@v1

- name: Run cargo build
run: cargo build

lints:
name: Lints
runs-on: ubuntu-latest
Expand Down Expand Up @@ -115,7 +132,7 @@ jobs:
runs-on: ubuntu-latest
environment: production
if: github.event_name == 'release'
needs: [build, test, lints, docs, leaks]
needs: [build, test, msrv, lints, docs, leaks]
steps:
- name: Checkout sources
uses: actions/checkout@v3
Expand Down
37 changes: 17 additions & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
[package]
name = "tokio-graceful-shutdown"
authors = ["Finomnis <finomnis@gmail.com>"]
version = "0.13.0"
edition = "2018"
version = "0.14.0"
edition = "2021"
rust-version = "1.63"
license = "MIT OR Apache-2.0"
readme = "README.md"
repository = "https://github.com/Finomnis/tokio-graceful-shutdown"
Expand All @@ -20,39 +21,35 @@ exclude = [
]

[dependencies]
# Error definitions
thiserror = "1.0.32"
miette = "5.3.0"
tracing = { version = "0.1.37", default-features = false }

# For async utilities
tokio = { version = "1.20.1", default-features = false, features = [
tokio = { version = "1.32.0", default-features = false, features = [
"signal",
"rt",
"macros",
"time",
] }
tokio-util = { version = "0.7.2", default-features = false }
futures = "0.3.23"
async-recursion = "1.0.0"
pin-project-lite = "0.2.9"
tokio-util = { version = "0.7.8", default-features = false }

# For 'IntoSubsystem' trait
async-trait = "0.1.57"

# For logging
log = "0.4.17"
pin-project-lite = "0.2.13"
thiserror = "1.0.49"
miette = "5.10.0"
async-trait = "0.1.73"
atomic = "0.6.0"
bytemuck = { version = "1.14.0", features = ["derive"] }

[dev-dependencies]
# Error propagation
anyhow = "1.0.61"
anyhow = "1.0.75"
eyre = "0.6.8"
miette = { version = "5.3.0", features = ["fancy"] }
miette = { version = "5.10.0", features = ["fancy"] }

# Logging
env_logger = "0.10.0"
tracing-subscriber = "0.3.17"
tracing-test = "0.2.4"

# Tokio
tokio = { version = "1.20.1", features = ["full"] }
tokio = { version = "1.32.0", features = ["full"] }

# Hyper example
hyper = { version = "0.14.20", features = ["full"] }
Expand Down
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,19 @@ This subsystem can now be executed like this:
```rust
#[tokio::main]
async fn main() -> Result<()> {
Toplevel::new()
.start("Subsys1", subsys1)
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("Subsys1", subsys1))
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
```

The `Toplevel` object is the root object of the subsystem tree.
Subsystems can then be started using the `start()` functionality of the toplevel object.
Subsystems can then be started in it using the `start()` method
of its `SubsystemHandle` object.

The `catch_signals()` method signals the `Toplevel` object to listen for SIGINT/SIGTERM/Ctrl+C and initiate a shutdown thereafter.

Expand Down
29 changes: 29 additions & 0 deletions TODO.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
- Search for all TODOs in code
- Port over documentation
- Port over tests






Done:

- Name
- Error handling

- SubsystemBuilder (or PreparedSubsystem, or something similar)!
- Allows creating subsystems:
- from FnOnce (non-restartable)
- from Fn (restartable)
- from `trait Subsystem`
- from `trait RestartableSubsystem`
- maybe gets passed directly into `start`

- Solution to the entire "restartable subsystems" problem:
- Make single subsystems awaitable! (Through the object returned from `start`)
- Restart is then trivial to implement.
- Make subsystem "Shutdown on Error/Panic"
- Start/await it in a loop in the parent subsystem

- Error generic
48 changes: 48 additions & 0 deletions Thoughts.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
need:
- function that runs on all exit paths of the subsystem
- that also has access to the locked parent, to remove itself from the list of children
- is list of children really important?
- atomic counter could work
- but what about error propagation?

- error propagation maybe not necessary.
- register closure that will be executed on error/shutdown of the child
- every subsystem can have 'shutdown triggers' attached to it

fixed facts:
- subsystems will never change their parent

Suggestions:
Subsystem have **no** reference to their parents.
They
- Use `joiner_tokens` for awaiting children
- Use `cancellation_token` for shutdown
- Simply 'drop' a subsystem to hard cancel an entire tree

Open question: How to propagate errors?
- Not at all? (Do we need error propagation?)
- Through the joiner_tokens?
If through the joiner_tokens:
- Have `none`/`some(vec)` in every joiner_token for collecting errors
- While walking up, put the error in the first available location
- When dropping the token, propagate errors up if unconsumed
- Uncaught errors simply get printed
This should provide a quite natural way of propagating/dropping errors,
and should work well with partial shutdown.

Open question: How to deal with errors?
- Every spawned subsystem can register functions that handle errors of their children
- Possibilities are:
- pass further up
- Ignore
- shutdown self and children
- Probably a mechanism that will be inside of the joiner_token

Open question: Ownership?
- Parents should own children
- But: HOW do children remove themselves from the parent once they are finished?
- IMPORTANT QUESTION this is. Might be the one stone that breaks this construct.
- The solution *might* be: The joiner_token removes them. That way, there is no recursive dependency.
- Might need a fancy data structure that allows efficient RAII based object tracking
- solved! (?)
-> Implemented in remote_drop_collection
36 changes: 19 additions & 17 deletions examples/01_normal_shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,42 @@
//! If custom arguments for the subsystem coroutines are required,
//! a struct has to be used instead, as seen in other examples.

use env_logger::{Builder, Env};
use miette::Result;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{SubsystemHandle, Toplevel};
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle, Toplevel};

async fn subsys1(subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem1 started.");
tracing::info!("Subsystem1 started.");
subsys.on_shutdown_requested().await;
log::info!("Shutting down Subsystem1 ...");
tracing::info!("Shutting down Subsystem1 ...");
sleep(Duration::from_millis(400)).await;
log::info!("Subsystem1 stopped.");
tracing::info!("Subsystem1 stopped.");
Ok(())
}

async fn subsys2(subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem2 started.");
tracing::info!("Subsystem2 started.");
subsys.on_shutdown_requested().await;
log::info!("Shutting down Subsystem2 ...");
tracing::info!("Shutting down Subsystem2 ...");
sleep(Duration::from_millis(500)).await;
log::info!("Subsystem2 stopped.");
tracing::info!("Subsystem2 stopped.");
Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
// Init logging
Builder::from_env(Env::default().default_filter_or("debug")).init();
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();

// Create toplevel
Toplevel::new()
.start("Subsys1", subsys1)
.start("Subsys2", subsys2)
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("Subsys1", subsys1));
s.start(SubsystemBuilder::new("Subsys2", subsys2));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
38 changes: 20 additions & 18 deletions examples/02_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,26 @@
//! custom parameters to be passed to the subsystem.
//!
//! There are two ways of using structs as subsystems, by either
//! wrapping them in an async closure, or by implementing the
//! wrapping them in a closure, or by implementing the
//! IntoSubsystem trait. Note, though, that the IntoSubsystem
//! trait requires an additional dependency, `async-trait`.

use async_trait::async_trait;
use env_logger::{Builder, Env};
use miette::Result;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle, Toplevel};
use tokio_graceful_shutdown::{IntoSubsystem, SubsystemBuilder, SubsystemHandle, Toplevel};

struct Subsystem1 {
arg: u32,
}

impl Subsystem1 {
async fn run(self, subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem1 started. Extra argument: {}", self.arg);
tracing::info!("Subsystem1 started. Extra argument: {}", self.arg);
subsys.on_shutdown_requested().await;
log::info!("Shutting down Subsystem1 ...");
tracing::info!("Shutting down Subsystem1 ...");
sleep(Duration::from_millis(500)).await;
log::info!("Subsystem1 stopped.");
tracing::info!("Subsystem1 stopped.");
Ok(())
}
}
Expand All @@ -34,29 +33,32 @@ struct Subsystem2 {
#[async_trait]
impl IntoSubsystem<miette::Report> for Subsystem2 {
async fn run(self, subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem2 started. Extra argument: {}", self.arg);
tracing::info!("Subsystem2 started. Extra argument: {}", self.arg);
subsys.on_shutdown_requested().await;
log::info!("Shutting down Subsystem2 ...");
tracing::info!("Shutting down Subsystem2 ...");
sleep(Duration::from_millis(500)).await;
log::info!("Subsystem2 stopped.");
tracing::info!("Subsystem2 stopped.");
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<()> {
// Init logging
Builder::from_env(Env::default().default_filter_or("debug")).init();
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();

let subsys1 = Subsystem1 { arg: 42 };
let subsys2 = Subsystem2 { arg: 69 };

// Create toplevel
Toplevel::new()
.start("Subsys1", |a| subsys1.run(a))
.start("Subsys2", subsys2.into_subsystem())
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("Subsys1", |a| subsys1.run(a)));
s.start(SubsystemBuilder::new("Subsys2", subsys2.into_subsystem()));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
28 changes: 15 additions & 13 deletions examples/03_shutdown_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,32 @@
//! so the subsystem gets cancelled and the program returns an appropriate
//! error code.

use env_logger::{Builder, Env};
use miette::Result;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{SubsystemHandle, Toplevel};
use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle, Toplevel};

async fn subsys1(subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem1 started.");
tracing::info!("Subsystem1 started.");
subsys.on_shutdown_requested().await;
log::info!("Shutting down Subsystem1 ...");
tracing::info!("Shutting down Subsystem1 ...");
sleep(Duration::from_millis(2000)).await;
log::info!("Subsystem1 stopped.");
tracing::info!("Subsystem1 stopped.");
Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
// Init logging
Builder::from_env(Env::default().default_filter_or("debug")).init();
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();

// Create toplevel
Toplevel::new()
.start("Subsys1", subsys1)
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(500))
.await
.map_err(Into::into)
// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("Subsys1", subsys1));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(500))
.await
.map_err(Into::into)
}

0 comments on commit b766802

Please sign in to comment.