Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Structured Concurrency Support #81

Closed
matklad opened this issue Apr 10, 2022 · 5 comments
Closed

Structured Concurrency Support #81

matklad opened this issue Apr 10, 2022 · 5 comments

Comments

@matklad
Copy link

matklad commented Apr 10, 2022

disclaimer: I am async noob, and I've just had my first morning cup of tea, so I maybe talking nonsense :-)

So, today I was writing some tokio code which looked like this:

pub async fn run_service(token: CancellationToken) -> Result<()> {
    let server = tokio::spawn(run_server(token.clone()));
    let client = tokio::spawn(run_client(token.clone()));
    server.await.unwrap()?;
    client.await.unwrap()?;
    Ok(())
}

async fn run_server(token: CancellationToken) -> Result<()> { ... }
async fn run_client(token: CancellationToken) -> Result<()> { ... }

Essentially, I am running two bits of concurrent work.

This code made me mildly uncomfortable -- if the server panics (or, if I add some other early-return logic), the client will be left running, violating the structured concurrency.

This isn't easy to fix: I can add let guard = token.clone().drop_guard(), but that requires client to cooperate and is not strictly structured. I can also add some code to call JoinHandle::abort, but that is annoying to write and is still not strictly structured. In general, I think it is well-understood that what I want to do here is impossible without some kind of language support for async drop.

But than it dawned on me that I can just do

pub async fn run_service(token: CancellationToken) -> Result<()> {
    let server = run_server(token.clone());
    let client = run_client(token.clone());
    let (server_res, client_res) = tokio::join!(server, client);
    server_res.or(client_res)
}

this is strictly structured and has almost exactly the semantics I want (the exact semantics would require select! shenanigans).

It's very curious that, what is impossible with spawn, falls out of join! naturally. I think that's just how the world works -- with spawn, the task we want to structurally cancel may be, at this moment, running on a different thread, so we physically can't preempt it. With join!, select!, we know that both "tasks" run on the same thread, so, if during execution of a task we decide that we need to kill another one, we can just do that, as we know that that other task isn't currently running.

Which finally brings me to this issue :) It seems to me that async structured concurrency is actually possible for single-threaded executors. As tokio-uring is aimed at thread-per-core architecture, it seems that it might give a shot at supporting structured concurrency as well!

cc tokio-rs/tokio#1879

@Ralith
Copy link

Ralith commented Apr 10, 2022

almost exactly the semantics I want (the exact semantics would require select! shenanigans).

Did you see try_join?

@matklad
Copy link
Author

matklad commented Apr 10, 2022

Yeah, try_join is close, but it doesn't support graceful cancellation. Here's what I came up with for my use-case (hiding behind details to not distract from the main idea of "structured concurrency works with thread-per-core"):

pub async fn try_par<E, F1, F2>(token: &CancellationToken, f1: F1, f2: F2) -> Result<(), E>
where
    F1: Future<Output = Result<(), E>>,
    F2: Future<Output = Result<(), E>>,
{
    let f1 = f1.fuse();
    let f2 = f2.fuse();
    tokio::pin!(f1, f2);

    let mut rs: [Option<Result<(), E>>; 2] = [None, None];
    loop {
        rs = match rs {
            [Some(r1), Some(r2)] => return r1.or(r2),
            _ => rs,
        };
        let (r, _) = futures_util::future::select(&mut f1, &mut f2).await.factor_first();
        if r.is_err() {
            token.cancel();
        }
        *if rs[0].is_none() { &mut rs[0] } else { &mut rs[1] } = Some(r);
    }
}

@Noah-Kennedy
Copy link
Contributor

@matklad I'm not sure really that this falls within the scope of tokio-uring. IMO the goal of tokio-uring is to figure out the kinks and quirks of io_uring and work things out from an API perspective before eventually implementing the finalized API into tokio. In particular it would be great to get the API to work on the multi threaded runtime.

I think a better place for this would be in tokio-util or tokio itself. You could probably build something around localsets that would work for this.

@Noah-Kennedy
Copy link
Contributor

Closing this as its probably a better question for the broader tokio project.

@matklad
Copy link
Author

matklad commented Feb 18, 2024

For posterity, the API along these lines was implemented in the moro crate: https://github.com/nikomatsakis/moro

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants