Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

actix-rt: Make the process of running System in existing Runtime more clear #173

Merged
merged 5 commits into from Sep 6, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions actix-rt/CHANGES.md
@@ -1,5 +1,11 @@
# Changes

## Unreleased - 2020-xx-xx

### Added

* Add `System::attach_to_tokio` method. [#173]

## [1.1.1] - 2020-04-30

### Fixed
Expand Down
3 changes: 3 additions & 0 deletions actix-rt/Cargo.toml
Expand Up @@ -23,3 +23,6 @@ futures-util = { version = "0.3.4", default-features = false, features = ["alloc
copyless = "0.1.4"
smallvec = "1"
tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] }

[dev-dependencies]
tokio = { version = "0.2.6", features = ["full"] }
2 changes: 1 addition & 1 deletion actix-rt/src/builder.rs
Expand Up @@ -137,7 +137,7 @@ impl AsyncSystemRunner {
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
};
Arbiter::stop_system();
return res;
res
}
})
.flatten()
Expand Down
124 changes: 122 additions & 2 deletions actix-rt/src/system.rs
Expand Up @@ -57,10 +57,59 @@ impl System {
Self::builder().name(name).build()
}

#[allow(clippy::new_ret_no_self)]
/// Create new system using provided tokio Handle.
/// Create new system using provided tokio `LocalSet`.
///
/// This method panics if it can not spawn system arbiter
///
/// Note: This method uses provided `LocalSet` to create a `System` future only.
/// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s.
/// It means that using this method currently it is impossible to make `actix-rt` work in the
/// alternative `tokio` `Runtime`s (e.g. provided by [`tokio_compat`]).
///
/// [`Arbiter`]: struct.Arbiter.html
/// [`tokio_compat`]: https://crates.io/crates/tokio-compat
///
/// # Examples
///
/// ```
/// use tokio::{runtime::Runtime, task::LocalSet};
/// use actix_rt::System;
/// use futures_util::future::try_join_all;
///
/// async fn run_application() {
/// let first_task = tokio::spawn(async {
/// // ...
/// # println!("One task");
/// # Ok::<(),()>(())
/// });
///
/// let second_task = tokio::spawn(async {
/// // ...
/// # println!("Another task");
/// # Ok::<(),()>(())
/// });
///
/// try_join_all(vec![first_task, second_task])
/// .await
/// .expect("Some of the futures finished unexpectedly");
/// }
///
///
/// let mut runtime = tokio::runtime::Builder::new()
/// .core_threads(2)
/// .enable_all()
/// .threaded_scheduler()
/// .build()
/// .unwrap();
///
///
/// let actix_system_task = LocalSet::new();
/// let sys = System::run_in_tokio("actix-main-system", &actix_system_task);
/// actix_system_task.spawn_local(sys);
///
/// let rest_operations = run_application();
/// runtime.block_on(actix_system_task.run_until(rest_operations));
/// ```
pub fn run_in_tokio<T: Into<String>>(
name: T,
local: &LocalSet,
Expand All @@ -71,6 +120,77 @@ impl System {
.run_nonblocking()
}

/// Consume the provided tokio Runtime and start the `System` in it.
/// This method will create a `LocalSet` object and occupy the current thread
/// for the created `System` exclusively. All the other asynchronous tasks that
/// should be executed as well must be aggregated into one future, provided as the last
/// argument to this method.
///
/// Note: This method uses provided `Runtime` to create a `System` future only.
/// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s.
/// It means that using this method currently it is impossible to make `actix-rt` work in the
/// alternative `tokio` `Runtime`s (e.g. provided by `tokio_compat`).
///
/// [`Arbiter`]: struct.Arbiter.html
/// [`tokio_compat`]: https://crates.io/crates/tokio-compat
///
/// # Arguments
///
/// - `name`: Name of the System
/// - `runtime`: A tokio Runtime to run the system in.
/// - `rest_operations`: A future to be executed in the runtime along with the System.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
/// use actix_rt::System;
/// use futures_util::future::try_join_all;
///
/// async fn run_application() {
/// let first_task = tokio::spawn(async {
/// // ...
/// # println!("One task");
/// # Ok::<(),()>(())
/// });
///
/// let second_task = tokio::spawn(async {
/// // ...
/// # println!("Another task");
/// # Ok::<(),()>(())
/// });
///
/// try_join_all(vec![first_task, second_task])
/// .await
/// .expect("Some of the futures finished unexpectedly");
/// }
///
///
/// let runtime = tokio::runtime::Builder::new()
/// .core_threads(2)
/// .enable_all()
/// .threaded_scheduler()
/// .build()
/// .unwrap();
///
/// let rest_operations = run_application();
/// System::attach_to_tokio("actix-main-system", runtime, rest_operations);
/// ```
pub fn attach_to_tokio<Fut, R>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of this function, making it much simpler to host actix in an existing tokio runtime.

Are there alternate designs for the rest_operations idea worth discussing? I seem to recall in the past just joining and awaiting the actix LocalSet with any other long running tasks. This approach feels more opinionated and less extensible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I've attempted several other approaches, but found them not as convenient because of the following factors:

  • LocalSet occupies current thread for System to run;
  • Future, obtained via LocalSet is not Send.

Thus, returning a future from this function (instead of blocking the runtime inside) makes the user-side code not really intuitive.

As an alternative approach, we may make rest_operations a function rather than a future, but IMHO it's less flexible: if you have an async function, you can get a future by simply calling it, and at the same time you can get a future without an additional function (e.g. using an async { } block).

Or do you have an idea how to get rid of the rest_operations argument without complications on the caller site?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like we're moving into a simpler Node.js style <req, res> or Koa's ctx and I'm all for it. I'd love to have Actix just be the web layer on top of default Tokio.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is similar to what @fafhrd91 was describing too about the actix-net features.

name: impl Into<String>,
robjtede marked this conversation as resolved.
Show resolved Hide resolved
mut runtime: tokio::runtime::Runtime,
rest_operations: Fut,
) -> R
where
Fut: std::future::Future<Output = R>,
{
let actix_system_task = LocalSet::new();
let sys = System::run_in_tokio(name.into(), &actix_system_task);
actix_system_task.spawn_local(sys);

runtime.block_on(actix_system_task.run_until(rest_operations))
}

/// Get current running system.
pub fn current() -> System {
CURRENT.with(|cell| match *cell.borrow() {
Expand Down