Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: spawn_blocking with a thread pool #11778

Open
lbguilherme opened this issue Jan 28, 2022 · 6 comments
Open

Proposal: spawn_blocking with a thread pool #11778

lbguilherme opened this issue Jan 28, 2022 · 6 comments

Comments

@lbguilherme
Copy link
Contributor

lbguilherme commented Jan 28, 2022

Crystal currently has cooperative Fibers running on top of an event loop. This usually happens in a single thread, or in a group of a small number worker threads with -Dpreview_mt. Either way, there are multiple fibers running per thread and those Fibers are attached to that thread, there is no work-stealing (to move a Fiber from a busy thread to an available thread) nor there is preemption (to pause a Fiber that is taking too much time and let other Fibers use that thread for a while). Both of these things are hard to do and can have performance compromises. Some runtimes do it, some don't. I'm assuming we won't.

It all works well as long as the Fiber code doesn't block. This means:

  • You can't perform long heavy computation in a Fiber, or else no other Fiber will get a chance to run until it finishes;
  • You can't do blocking system operations from a Fiber (like waitpid or IO though an mmap-ed file), or else your program will stay idle even when other Fibers have work to do;
  • You can't call into native C libraries that do either of those two things (very common), as they are not aware of Fibers, and many of them don't provide some async non-blocking alternative;
  • You can't use any mutex implementation other than the one provided by the stdlib because they are not aware of the event loop, this may happen when calling into native C libraries;
  • You can't rely on thread-local data as if it were fiber-local data.

Please refer to #1454

We have been doing fine with these restrictions so far, but they are quite limiting. Also, the stdlib currently has places where it may block the event loop and there is no good solution for this (#4236, for one).

I propose we add a spawn_blocking method with semantics very similar to spawn, but for running Fibers outside an event-loop. Instead, we will have a thread pool of many threads. spawn_blocking creates a Fiber and send it to be executed in a thread from this thread pool, it will be executed there until it completes. This thread will actually run the event loop, but just to ensure non-blocking calls still work. There will never be more than one Fiber in the same thread. This method should only be available on -Dpreview_mt programs.

Tokio (an async executor for Rust) has exactly this concept: https://docs.rs/tokio/1.16.1/tokio/task/fn.spawn_blocking.html. By default, its thread pool has 512 threads for this, on the grounds that most likely they will be blocked on IO.

spawn_blocking do
  x = 0_i64
  50_000_000_000_i64.times { x += 1 }
  puts "Done: #{x}"
end

10.times do |i|
  p i
  sleep 1
end
@yxhuvud
Copy link
Contributor

yxhuvud commented Jan 28, 2022

I'm quite optimistic that we may get a work-stealing scheduler at some point. But that would only alleviate the problem you describe as if more blocking tasks than workers happen then it will still block progress until the blocking tasks complete.

Running multiple separate threadpools raise a bunch of questions though, even if restricting it to one active fiber per thread do simplify a lot of the logic compared to arbitrary spawning of threadpools. That said, I think making it possible to create arbitrary threadpools running in a few dedicated threads would also be pretty nice.
Should the threads created up front on program start or on demand? Creating new threads can be slow.
If something runs forever, do we need to be able to catch that somehow?
Does there need to be an upper limit on the amount of threads used?
What happens if the fiber that is spawned blocking spawns a fiber? Or a blocking_spawn, for that matter.
Should the api for blocking_spawn be blocking the current fiber even if it doesn't block the rest of the event loop?

It would also be a good idea to think about how it relates to #6468 about structured concurrency. Two of the leading tenets in the underlying blog posts for that issue is to a: Have a predefined lifetime in code for all tasks, and b: having a setup where errors bouble up. (*) There is also a lot of talk of cancellation, which may or may not be relevant for this. However, if the nursery concept is expanded with a parameter that tells it to create one or many threads to run fibers in, then a lot of the functionality falls out for very little effort.

Consider the following example using my nested_scheduler shard:

require "nested_scheduler"

NestedScheduler::ThreadPool.nursery(thread_count: 1) do |pl|
  pl.spawn do
    x = 0_i64
    50_000_000_000_i64.times do
      x += 1
    end
    puts "Done: #{x}"
  end

  10.times do |i|
    p i
    sleep 1
  end
end

This will execute the loop in parallel without interfering with the main execution. It will also propagate any exceptions being raised in the spawned fiber and block exiting the program until the blocking computation is done. (It also works today, though any future version of crystal could of course break it).

Looking at what can be achieved this is not all that far from what the proposed solution would achieve.

(*) third tenet being that it doesn't interfere with normal resource cleanup. In practice this means local variables from the scope that created the nursery is safe to use in any spawned fibers. That is a really nice quality of life feature but doesn't matter as much here.

It should also be noted that this will make a work stealing scheduler a bit harder as it would break the interchangeability of fiber execution - fibers that wait needs to be enqueued in the correct threadpool (for example, if the spawn_blocking above communicated intermediate results using a channel, then it should be reenqueued in the private pool), so the obvious way to schedule fibers in the same thread would no longer work for fibers that needs to be executed in a different thread. This is probably not an argument against the feature, though.

@rdp
Copy link
Contributor

rdp commented Feb 2, 2022

I'd name it something like 'spawn_in_dedicated_thread' since it doesn't "always" block? Maybe the core functionality would be a 1:1 fiber to thread option?

@straight-shoota
Copy link
Member

I would perhaps consider as a very basic step a method that runs a proc in a thread, waits for it to finish and then returns its result.
This would be a simple structural piece, liken adapter to integrate a thread-blocking call into the scheduler.

A simple implementation could look like this:

def run_blocking(&proc : -> T) forall T
  done = Atomic(Int8).new(0)

  result = uninitialized T
  Thread.new do
    result = proc.call
  	done.set 1
  end
  
  while done.get == 0
    Fiber.yield
  end
  result
end

Error handling is missing for simplicity, but should fit in perfectly. Integration of a thread pool should also be an easy enhancement.

It integrates well into the idea of structured concurrency because the offloaded control flow is fully contained.
spawn_blocking would just be a composition with spawn (but that's not contained anymore).

This kind of concurrency mechanism is used for calls to blocking lib functions. So the proc executed in the thread should typically be pretty slim, maybe just that single call.
I think for this use case there is not really even a need for Crystal concurrency operations in the proc. So it would probably be fine if spawning a thread from within that thread would no be available.
But it really shouldn't be hard to allow that in the way that it delegates to the scheduler of the originating fiber.

@straight-shoota
Copy link
Member

straight-shoota commented Aug 16, 2022

This use case has come up again in #12392 and https://stackoverflow.com/questions/73016552/making-a-blocking-call-asynchronous

We're missing a proper way to implement blocking function calls in a dedicated thread.

Currently, the best option is probably to use the undocumented Thread API. Perhaps we should document it and make it part of the supported API?

Another alternative would be an enhancement to the scheduler to signal that a fiber is blocking and thus needs to run on a dedicated thread.

@yxhuvud
Copy link
Contributor

yxhuvud commented Aug 16, 2022

One problem with exposing Thread is that it doesn't have any run_loop going, which mean interaction with the event loop, mutexes and channels doesn't work properly. rescedule is not sufficient. It would for example be very nice to be able to send data back on a channel to be consumed by crystal proper, even the the thread by itself is blocking hard most of the time.

Or in other words, how much are you willing to limit what the spawned fiber is allowed to do?

@straight-shoota
Copy link
Member

There are obviously lots of use cases with different semantics and requirements.

I think it would be a good start to have a basic means for running a function in a dedicated fiber, without any direct interaction with the main program, except some input arguments, and maybe a return value (as demonstrated in #11778 (comment)). That would already cover a lot of simple use cases.
Concurrency primitives that build on top of Crystal's scheduler and fibers obviously will not work in such a thread, but that should be an acceptable concession for a low-level approach.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants