Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Structured Concurrency #6468

Open
straight-shoota opened this issue Jul 30, 2018 · 45 comments
Open

[RFC] Structured Concurrency #6468

straight-shoota opened this issue Jul 30, 2018 · 45 comments

Comments

@straight-shoota
Copy link
Member

straight-shoota commented Jul 30, 2018

Crystal has an great concurrency model based on Fibers and Channels, which can be used to pass messages around.

Fibers are conceptually pretty simple. If you spawn one, it takes off from the main context and runs concurrently for an indefinite amount of time, depending on what it does and how long it takes to do this. From the perspective of the main control flow, it's essentially fire and forget.

Real life problems typically ask for a more sophisticated way of handling concurrent tasks. Sometimes you need to wait for either one, some or all tasks to be finished before continuing the main scope. Error handling in concurrent tasks is also important and the ability to cancel the remaining tasks if others have finished or errored.

Fibers and Channels can be used to implement a model for structured concurrency. Given that this is a pretty common idiom, I'd like to see a generalized implementation in Crystal's stdlib.

What we have

HTTP::Server#listen uses a custom implementation of a wait group executing a number of tasks simultaneously and waiting for all to finish. Other examples are in the parallel macro or Crystal::Compiler#codegen_many_units. parallel is the only feature of structured concurrency currently available in the stdlib, but it is only suitable for a fixed number of concurrent tasks that are known at compile time.

A more generalized approach would help to make this concept easily re-useable.
It can be implemented based on the existing features that Fiber and Channel provide. The only thing that's missing is a way to deliberatly kill fibers and unwrap their stack (see #3561, and a proposed implementation in #6450).

Background

I recommend reading the articles referenced below. They both describe a model of structured concurrency which essentially restricts the execution of concurrent tasks to a specific scope and having tools to manage them. This contrasts with the model of go (Go) and spawn (Crystal) which just fires off a new fiber without caring about it's life cycle. This makes it hard to follow control flow: what happens where and when in which scope.

The main idea of this proposal is to understand that each fiber is limited to the scope it is executed in:

Every time our control splits into multiple concurrent paths, we want to make sure that they join up again.

This ensures that fibers don't get lost doing whatever stuff they might not even be supposed to do anymore.
I believe this concept can be applied to almost any real-life use case of fibers.
Having a structured flow of control also allows for a proper exception flow. Right now, unhandled exceptions within a fiber are just printed and ignored. When a fiber is scoped to some parent context, an exception can just be propagated there.

Prototype

I have implemented a simple prototype of a concurrency feature (based on Fiber.cancel from #6450). The idea is to have a coordination tool for running fibers, called a Spindle. It is used to spawn fibers and ensure to collect them. This particular implementation allows running multiple tasks concurrently and if one of them fails, it cancels all the others. This is of course just an example of behaviour, there are many different ways to react.

The code can be found at: https://gist.github.com/straight-shoota/4437971943bae7000f03fabf3d814a2f

I don't have a concrete proposal how this should be implemented in terms of stdlib API's but the general idea is to provide tools for running tasks concurrently. We could even think about removing unscoped spawn (it can be considered harmful after all), but that's not necessarily required and can probably be decided upon later.

References

Some examples of similar libraries:

@RX14
Copy link
Contributor

RX14 commented Jul 31, 2018

I'd rather write a "good" Promise implementation and then have Promise.all/Promise.first etc. As far as i'm concerned the abstraction of a fiber with a return value, called a promise or future, is a better and more composable than waitgroups and can still be used as waitgroups pretty easily.

Talking about abstractions based on cancellation should wait untilwe actually have cancellation.

@straight-shoota
Copy link
Member Author

Promises are more about communication than structuring. You can do a lot of things in a similar way, but it feels like an inferior solution to me.

Running two methods concurrently would look like this with a promise:

promise = future ->do_some_stuff
do_some_other_stuff
promise.get

In my example implementation it looks like this:

concurrent do |spindle|
  spindle.spawn ->do_some_stuff
  spindle.spawn ->do_some_other_stuff
end

It's little bit more verbose, but easier to follow control flow. Especially when it gets more complicated.

(Verbosity could actually be reduced when using with spindle yield, but I think I like the expressiveness better.)

One way or the other: a good promise implementation would need a way to cancel a fiber as well, wouldn't it?

@RX14
Copy link
Contributor

RX14 commented Jul 31, 2018

@straight-shoota you could do it like that or you could use promises like this:

Promise.all(
  Promise.new ->func,
  Promise.new ->func2
)

which is both less verbose and more powerful since you can do

value1, value2 = Promise.all(
  Promise.new ->func,
  Promise.new ->func2
)

very easily to get the function's return values.

Changing it to "first fiber wins" aka a race becomes s/all/first/, just changing one function instead of having to refactor your code to use a different concurrency mechanism (consider the difference between a function taking a spindle as a param and returning a promise, thats a big refactor)

@RX14
Copy link
Contributor

RX14 commented Jul 31, 2018

and JS promises have no way to cancel them, we don't need cancellation for them

@straight-shoota
Copy link
Member Author

These Promise.all and Promise.first are nice, but they only allow a static number of concurrent tasks. This is just like the parallel macro and not sufficient for a real-world application where tasks can be dynamically added at runtime (like connection handlers of a network server for example).

@straight-shoota
Copy link
Member Author

At least I would want to have a way to cancel a promise/fiber (and do it gracefully) in case its result/effect is no longer required. It shouldn't block resources for unnecessary work.

@RX14
Copy link
Contributor

RX14 commented Jul 31, 2018

@straight-shoota No, .all and .first can easily take arrays.

@straight-shoota
Copy link
Member Author

How would that work with adding child tasks during runtime? You would have to append them to the array - somehow that could probably be made to work with .all and .first. But it makes a really ugly API using an array as registry for tasks.

@RX14
Copy link
Contributor

RX14 commented Jul 31, 2018

Ah I see what you want. We could easily add block-based versions of Promise.first and Promise.all which do what you want, but I can't help but feel that adding tasks at runtime would be hard to track. I'd be happy to do that though.

@ysbaddaden
Copy link
Contributor

Promises isn't Structured Concurrency. Don't mix concepts, please :)

Structured Concurrency is about controlling the lifetime of nested coroutines (they can't outlive their parent). We can spawn new coroutines at any time (e.g. on Socket#accept) and eventually tell one or many to stop at any time. Lifetime is all that's cared about —use channels to pass values.

A promise defers computation of a fixed set of fibers to return a set of values. Since Promise.all waits for values, you can't add a fiber at any time to the list (e.g. Socket#accept) and the current fiber is blocked. Promise.all only happens to fake Structure Concurrency.

Passing a mutable Array to Promise.all would be an ugly hack, and prone to concurrency issues: I can push a fiber? what if previous fibers are finished already? we raise an exception? what if we have a TCP server that responded to all requests and got idle, then comes a new request? what if I pop or delete a fiber from the array?

Promises != Structured Concurrency.

@RX14
Copy link
Contributor

RX14 commented Aug 1, 2018

I was thinking more of a

Promise.all do |promises|
  promises << ...
end

interface in addition to the other promise interfaces to make it do wait groups as well.

I guess it's probably quite a hack though, and better to separate the two concepts.

@straight-shoota
Copy link
Member Author

@RX14 That doesn't seem much different from

concurrent do |spindle|
  spindle.spawn ...
end

So we pretty much want the same thing in this regard ;)

@refi64
Copy link
Contributor

refi64 commented Aug 1, 2018

Honestly I kind of like the idea of having something other than promises, which I sometimes feel like is a bad solution but manages to be used everywhere...

@RX14
Copy link
Contributor

RX14 commented Aug 1, 2018

@straight-shoota yeah I like the concept I was just wondering if we could work it into the "promise" concept for simplicity.

@stakach
Copy link
Contributor

stakach commented Aug 2, 2018

I've been writing a Promise library for Crystal Lang and it's almost complete (core implementation complete with specs)
https://github.com/spider-gazelle/promise

It might not be "Structured Concurrency" in the most strict sense however it does simplify coordinating a bunch of async events and it's quite a popular paradigm. Would love to see it in the standard library if that's something you would consider.

I think my implementation is pretty neat in any case:

require "promise"

promise = Promise.new(Int32)
result = p.then { |result| result.not_nil! + 100 }.then { |result| "change #{result} type" }
promise.resolve(10)

# Can also grab the value via a future
result.value # => "change 110 type"

Any .then block can change promise types as it propagates down the chain.
.catch blocks can only propagate exceptions and are used to recover values back to the initiating promise value type.

Thanks to the Crystal type safety it puts most promise implementations to shame.

@RX14
Copy link
Contributor

RX14 commented Aug 2, 2018

I don't think a promise library in crystal would look anything at all like a JS promise library. .then isn't really required at all. Libraries should expose synchronous APIs, and any uses of Promise should really stay internal to the application.

In fact making promises too much like JS will mean people start using promises like JS and crystal promises absolutely should not be used the same as JS. They should be used pretty sparingly.

@stakach
Copy link
Contributor

stakach commented Aug 2, 2018

Well yeah (exposing synchronous APIs)
Not sure how you escape from using the equivalent of .then though it's kind of the je ne sais quoi of promises. Without it all you have is futures.

@RX14
Copy link
Contributor

RX14 commented Aug 2, 2018

@stakach ah, my terminology was all messed up. I'd like futures in the stdlib and promises perhaps can be a shard built on that.

@RX14
Copy link
Contributor

RX14 commented Aug 2, 2018

I'm still not sure what the conceptual difference between a promise and future would be in crystal though

@jhass
Copy link
Member

jhass commented Aug 2, 2018

I like to think of promises and futures as either end of a pipe, the promise is where I can put something in and the future is where I can wait and listen for the result.

Now taking this picture, I just described a Channel with a buffer capacity of one that can only be written to once. So perhaps for us promises would actually be just redundant to channels.

@stakach
Copy link
Contributor

stakach commented Aug 2, 2018

Promises are really complimentary to channels. Channels for distributing async work and promises for handling the results.

For instance, I'm working on updating this influxdb library to the latest version of Crystal lang as the original maintainer doesn't have the time.

The original version uses spawn to perform an async request (a channel here would be ideal) - however either way you can never know if the request succeeded and you don't always want to wait around at this point for the response. That bit of code looks pretty messy:

      if sync
        send_write(body).status_code == 204
      else
        spawn { send_write(body) }
        true
      end

A Promise in this case is the perfect solution.

  • You can ignore the result like the async implementation above
  • You can be synchronous if you want to be, using Promise#value
  • You can use a call back to handle a success or failure condition

For comparison, the promise version of the above would be:

promise = Promise.new(Bool)
channel.send {promise, body}
promise

Promises provide flexibility and are simple to use. I don't think they should be seen as competition to other flow control solutions, they are just one of many tools

@RX14
Copy link
Contributor

RX14 commented Aug 2, 2018

Well, regardless of the terminology what I want is a fiber which can return a value, and you can wait for it to complete with a value or error

@straight-shoota
Copy link
Member Author

@stakach I don't think a promise is the right tool in your example. The library method #write should just be blocking like any Crystal IO method and directly call #send_write. It should be the caller's task to manage concurrency and execute this method in a separate fiber where appropriate. The library simply doesn't need to care about that.

This would be idiomatic Crystal with a simplified API by removing sync argument and a dependency on a promise implementation. It's consistent with other IO methods, including others on the same type (select, query, drop) don't seem to have an option to execute asynchronously.

@stakach
Copy link
Contributor

stakach commented Aug 2, 2018

@straight-shoota problem is running the write concurrently doesn't work on the current version of Crystal so it has to be via the channel to ensure serial writes.

Basically because the HTTP client response from influx is chunked, crystal yields the current fiber while it waits for IO mid HTTP response cycle.
Then I can start another request while the client is still processing the previous response which leads to weird errors. (not sure if this is an issue with the crystal lang HTTP::Client however if you take that influxdb library, make the minor changes to run on 0.25.1 and run the tests, it blows up) - this is a common issue with fibers and the library should definitely deal with this not the caller.

@straight-shoota
Copy link
Member Author

Yes, that's a shortcoming of HTTP::Client. It currently can't multiplex concurrent requests to the same endpoint over multiple connections. See #6011

Until this is resolved, you would need to use several client instances or guard one with a mutex. But this issue exists whether #write is async or not (it could always be called from a different fiber).

@stakach
Copy link
Contributor

stakach commented Aug 2, 2018

The problem of being called from multiple different fibers is already solved by using the channel - any fiber can call the code, the channel is used to make the HTTP::Client requests in serial and the promise returns the result to the calling fibers.

Very little complexity vs locks and/or multiple client instances

@stakach
Copy link
Contributor

stakach commented Aug 3, 2018

@RX14 I deliver you "a fiber which can return a value, and you can wait for it to complete with a value or error"
https://github.com/spider-gazelle/promise#simple-concurrency-control

It's not an alternative structured concurrency but I still think it's pretty cool.

@stakach
Copy link
Contributor

stakach commented Aug 4, 2018

@straight-shoota thanks for the advice - I threw a lock around the influxdb HTTP client, you were right, was definitely the way to go.
Also watched the Trio video does seem pretty cool. Look forward to Spindles landing!

@asterite
Copy link
Member

Indeed. Go doesn't have promises either, I'm sure there's a good reason (might be lack of generics, but I'm not sure). I remember @waj was always against promises. The whole point of non-blocking IO and spawn is to avoid callback hell. But wait group is something good.

@vinipsmaker
Copy link

He overlooks the problem too much and the metaphors between one domain and the other are forced to match his worldview.

This article is nice.

Just two notes:

GC vs systems language

GC languages and systems-languages (i.e. languages where the sole runtime is the execution stack itself and you have to care about pointer and lifetimes) behave differently in such situations.

Thread-start functions (or, fiber-start functions) in GC'ed languages will be closures rather than actual plain functions and, if some value is captured from parent scope, these values won't be destroyed when the parent dies. So we don't usually worry about child threads/fibers overlapping the lifetime of the parent thread/fiber (just like what happens in Go's). If you need deterministic destruction of objects, just be explicit about it. pthread_cancel and pthread_join will do in C. Extending the use-case to the world beyond C, you'll probably follow Java's approach and change cancellation API to throw exceptions and triggers stack unwinding on the target thread/fiber. If you have C++, there will also be RAII scope guards properly handling cancellation requests (and I bet RAII is already outside of Crystal's scope).

Don't complicate the state machine

This author too fancies about abstractions. There are two strategies here: graceful vs ungraceful shutdown. Interruption/cancellation API is about graceful shutdown. Don't fancy about timeouts and stuff. Ask the thread/fiber to gracefully shutdown and give it its time.

Want ungraceful shutdown leaking resources and possibly leaving live locks and breaking invariants on data structures? As a rule of thumb this is bad design and you don't want to turn into an idiom culturally seeded on your language. Two approaches here would be:

  • Spawn a real process (os.exit() won't clean any resources) with a greater level of isolation and kill the process when it fails to satisfy an associated watchdog.
  • kill() that unschedules the fiber forever as soon as scheduler has a chance. Marked with a BIG BOLD “don't use it if you don't know what you're doing” in the documentation. No fancying about timeout, just a simple API. If you want to give time for the fiber to finish, it's cancel() + timedjoin() (the only API specifically designed for dirty shutdown here is kill() and all others are a fundamental vocabulary that emerged elsewhere).

@stakach
Copy link
Contributor

stakach commented Dec 12, 2018

@vinipsmaker the fiber cancel method, that @straight-shoota has implemented, raises an error CancelledException which unwinds the stack (including finally blocks which would cleanup locks). So uses existing language features to implement nurseries in a really nice clean way

@chocolateboy
Copy link
Contributor

Is this impacted (e.g. made easier, harder, or superseded) in any way by the proposed MT support? Even if it's orthogonal, as suggested here, it would be nice to have eyes on the MT proposal with compatibility (or non-incompatibility) with this proposal in mind.

@straight-shoota
Copy link
Member Author

@chocolateboy Structured concurrency does not depend on multithreading, but it can easily integrate it. A concurrency context allows to configure very specifically whether tasks can be executed in parallel, how many threads, error handling etc.

@yxhuvud
Copy link
Contributor

yxhuvud commented Mar 4, 2020

I suppose only a limited version of this would be possible after 1.0, as a unlimited ability to spawn without a lifetime restriction isn't really compatible with the concept.

@straight-shoota
Copy link
Member Author

@yxhuvud Not sure I understand.
I'm convinced that when a tool for structured concurrency is available, ideally there should really be no reason to use a random anywhere. Best case scenario would be to avoid that completely. But you should still be able to have both, structured and unstructured fibers.
I don't see a reason why unrestricted spawns wouldn't work with a concept of structured concurrency. Technically, the structure would be build upon the unstructured primitives anyway.
Removing spawn as top level method would be possible when there's a different tool for spawning fibers in context. But it's not strictly necessary.

@asterite
Copy link
Member

asterite commented Mar 4, 2020

The way I see it, spawn coupled with Channel and select are the low-level concurrency primitives in Crystal. In fact, they are the same in Go. I see them similar to Erlang/Elixir receive and ! (send). People don't usually use them, they use abstractions on top of them (like GenServer), but from time to time they are useful... especially for building more abstractions.

So we should keep those in Crystal, and build good abstractions on top of them. For instance, actors would be nice.

@yxhuvud
Copy link
Contributor

yxhuvud commented Mar 5, 2020

"I don't see a reason why unrestricted spawns wouldn't work with a concept of structured concurrency."

Because it defeats the point. Users need to be forced to think about the lifetime of the things they spawn, and it should possible to see using visual inspection of the code what fibers can be running at any given point in the code. Or to quote the referred blogpost referred to above with the (quite telling) title notes-on-structured-concurrency-or-go-statement-considered-harmful :

The good news, though, is that these problems can all be solved: Dijkstra showed us how! We need to:

-   Find a replacement for go statements that has similar power, but follows the "black box rule",
-   Build that new construct into our concurrency framework as a primitive, and don't include any form of go statement.

The whole point of that whole post is a denouncement of the Go model with unbounded spawns!

And yes, fibers would obviously still be implemented using low level constructs. But probably used as often as Thread.new by end-user applications (ie never). The easiest way to create them (spawn) should not be the one with the worst long time maintainability properties. Hence my comment about a limited implementation (similar to how C still has goto).

@straight-shoota
Copy link
Member Author

straight-shoota commented Mar 5, 2020

Okay, I see what you mean. There's no technical reason, but it would make sense to advocate the higher level interface. I agree to that. And frankly, assuming its used in every API, there would be no reason for direct access to the low level interface.

I just feel that can only be achieved as a second step. The first step would be to design and implement a high-level interface for structured concurrency. Then wait for it being adopted. After that, the now (hopefully) unused features can be removed/reduced.

@raphendyr
Copy link

raphendyr commented Aug 12, 2021

I would like to link a presentation about structured concurrency in context of C++ to this conversation. Memory management doesn't apply to Crystal, but allocated resources could be external, e.g., database connection. Hence, the presentation might help people to see how resource allocation can be done safer with structured concurrency.

In addition, I like how Martin creates connection between object lifetime management and fiber lifetime management (see Martin Sústrik's blog post linked in issue description).

@yxhuvud
Copy link
Contributor

yxhuvud commented Aug 27, 2021

BTW, the swift proposal around structured concurrency https://github.com/apple/swift-evolution/blob/main/proposals/0304-structured-concurrency.md is also very interesting.

@ludovicdeluna
Copy link

ludovicdeluna commented Nov 13, 2021

Interesting. The concept of Executors confuse me. I can understand the wrap of the Job around a Task, but the way the Executors is expressed seems over-complex to me.

I've played with the Task in C#, very close to the concept of future in C++ materialized by the Async template. For the two, a lightweight process is spread in a thread-pool. We can grab the result later and continue our work. The way Task is used in C# permit to organize the work (when it start, when we need to wait), perform check on the task and act on failure. This is a good way to materialize asynchronous running.

The other way is to have a long running work and use the classical channel / subroutine as in Go or Elixir (with the Mailbox / Job primitives). A bit light for more advanced management, but everything is possible (Elixir have other models to organize long running jobs, but the basic is here).

A very interesting topic anyway :) Thanks a lot.

@cyangle
Copy link
Contributor

cyangle commented Dec 5, 2021

There's also discussion of go concurrency bugs and structured concurrency. Some bugs were caused by unbounded go statements in the golang stdlib.

I think structured concurrency should be provided by the language as primitives and the stdlib should always use it.

Kotlin coroutines is a very good implementation of structured concurrency. I think Crystal should have something similar to it.

@chocolateboy
Copy link
Contributor

chocolateboy commented Dec 5, 2021

Ruby implementation: Polyphony (discussion)

@stakach
Copy link
Contributor

stakach commented Jun 6, 2023

@straight-shoota even Java has Structured Concurrency. Now there is one reason to use Java over Crystal, how long are we going to allow this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests