Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement DefaultContext and newSingle/FixedThreadPoolContext via a shared pool of threads #261

Open
elizarov opened this issue Feb 27, 2018 · 13 comments

Comments

@elizarov
Copy link
Member

@elizarov elizarov commented Feb 27, 2018

Background

newFixedThreadPoolContext is actively used in coroutines code as a concurrency-limiting mechanism. For example, to limit a number of concurrent request to the database to 10 one typically defines:

val DB = newFixedThreadPoolContext(10, "DB")

and then wraps all DB invocation into withContext(DB) { ... } blocks.

This approach have the following problems:

  • This withContext(DB) invocation performs an actual switch to a different thread which is extremely expensive.
  • The result of newFixedThreadPoolContext references the underlying threads and must be explicitly closed when no longer used. This is quite error-prone as programmers may use newFixedThreadPoolContext in their code without realizing this fact, thus leaking threads.

Solution

The plan is to reimplement newFixedThreadPoolContext from scratch so that it does not create any threads. Instead, there will be one shared pool of threads that creates new thread strictly when they are needed. Thus, newFixedThreadPoolContext does not create its own threads, but acts only as a semaphore that limits the number of concurrent operations running in this context.

Moreover, DefaultContext, which is currently equal to CommonPool (backed by ForkJointPool.commonPool), is going to be redefined in this way:

val DefaultContext = newFixedThreadPoolContext(defaultParallelism, "DefaultContext")

The current plan is to set defaultParallelism to nCPUs + 1 as a compromise value that ensures utilization of the underlying hardware even if one coroutine accidentally blocks and helps us avoid issue #198

Now, with this redefinition of DefaultContext the code that is used to define its own DB context continues to work as before (limiting the number of concurrent DB operations). However, both issues identified above are solved:

  • This withContext(DB) invocation does not actually perform thread context switch anymore. It only switches coroutine context and separately keeps track of and limits the number of concurrently running coroutines in DB context.
  • There is not need to close newFixedThreadPoolContext anymore, as it is not backed by any physical threads, no risk of leaking threads.

This change also affects newSingleThreadContext as its implementation is:

fun newSingleThreadContext(name: String) = newFixedThreadPoolContext(1, name)

This might break some code (feedback is welcome!) as there could have been some code in the wild that assumed that everything working in newSingleThreadContext was indeed happening in the single instance of Thread and used ThreadLocal for to store something, for example. The workaround for this code is to use Executors.newSingleThreadExecutor().toCoroutineDispatcher().

This issue is related to the discussion on IO dispatcher in #79. It is inefficient to use Executors.newCachedThreadPool().toCoroutineContext() due to the thread context switches. The plan, as a part of this issue, is to define the following constant:

val IO: CoroutineContext = ...

The name is to be discussed in #79

Coroutines working in this context share the same thread pool as DefaultContext, so there is no cost of thread switch when doing withContext(IO) { ... }, but there is no inherent limit on the number of such concurrently executed operations.

Note, that we also avoid issue #216 with this rewrite.

Open questions

  • Shall we rename newFixedThreadPoolContext and newSingleThreadContext after this rewrite or leave their names as is? Can we name it better?

  • Should we leave newSingleThreadContext defined as before (with all the context switch cost) to avoid potentially breaking existing code? This would work especially well if newFixedThreadPoolContext is somehow renamed (old is deprecated), but newSingleThreadContext retains the old name.

@fvasco

This comment has been minimized.

Copy link
Contributor

@fvasco fvasco commented Feb 27, 2018

newFixedThreadPoolContext is actively used in coroutines code as a concurrency-limiting mechanism

Why not define a JVM-like Semaphore?

fun Mutex(permits: Int = 1) : Mutex

It is also possible consider to introduce:

fun Mutex.asCoroutineDispatcher(delegated : CoroutineDispatcher = DefaultDispatcher): CoroutineDispatcher

The current plan is to set defaultParallelism to nCPUs + 1

Usually the GC pressure is enough to cover accidentally blocks.
Setting defaultParallelism=max(nCPUs, 2) should work same.

I consider a different question to create a flexible thread pool and mixing blocking and nonblocking operations in it, are you considering to benchmark a prototype?
A context-switch is costly, I agree, but this cost is lesser then an I/O operation.

Coroutines working in this context share the same common pool (so no cost of thread switch when doing withContext(IO) { ... })

ForkJoinPool uses a task queue for each thread, withContext(IO) doesn't force any context switch but locks the thread, so all other tasks are forced to switch on another thread.
Switch a task on another thread on multiprocessor (NUMA) server requires to refresh the local CPU cache (L1, L2), this is the greatest cost of the context switch.

@elizarov

This comment has been minimized.

Copy link
Member Author

@elizarov elizarov commented Feb 27, 2018

@fvasco Unfortunately, the Mutex interface (with suspending lock) is not an appropriate abstraction to efficiently implement a coroutine dispatcher on top of it. The "semaphore" has to be an internal data structure tightly coupled with the implementation of the corresponding dispatcher.

The appropriate value of defaultParallelism is questionable, but I personally like nCPUs+1 more than max(nCPUs, 2), because the former is more regular, too.

The questions of a flexible thread-pool and blocking IO are indeed different, but it looks that they can be solved with a single implementation effort. We'll definitely start implementation with benchmarks and experiment with different strategies.

The idea is that similarly to FJP, this implementation is going to be "sticky" and will only move coroutines to another thread when absolutely necessarily. We plan to be way more lazy in this respect than FJP (it can be shown that FJP work-stealing strategy actually has adverse performance impact on a typical CSP-style code).

With respect to blocking operations it means that we'll give some time for a blocking operations to complete before moving all other coroutines to another thread. It seems to be the most efficient strategy based on our study of other languages and libraries, but we'll see how it actually works out in practice.

@fvasco

This comment has been minimized.

Copy link
Contributor

@fvasco fvasco commented Feb 27, 2018

Hi @elizarov,
thank you for quick response, I will stay tuned for updates.

Regarding Semaphore like a dispatcher I suspected your consideration too late, sorry for this miss.

However I consider a valid option to use a Mutex to limit the concurrent access to a resource, more appropriate than using a custom CoroutineDispatcher.

@fvasco

This comment has been minimized.

Copy link
Contributor

@fvasco fvasco commented Mar 5, 2018

Should we leave newSingleThreadContext defined as before (with all the context switch cost) to avoid potentially breaking existing code?

I say yes, may be something like:

fun newDedicatedThreadDispatcher(
        threadPoolSize: Int = 1,
        threadFactory: ThreadFactory? = Executors.defaultThreadFactory()
): CoroutineDispatcher
elizarov added a commit that referenced this issue Oct 6, 2018
* The will be replaced by another mechanism in the future.
  See #261 for details.
* The proposed replacement is to use the standard java API:
  Executors.newSingleThreadExecutor/newFixedThreadPool
  and convert to dispatcher via asCoroutineDispatcher() extension.
@voddan

This comment has been minimized.

Copy link

@voddan voddan commented Oct 6, 2018

Shall we rename newFixedThreadPoolContext and newSingleThreadContext after this rewrite or leave their names as is? Can we name it better?

I believe we shall, because we can. Something like Dispatchers.fixedPool is more inline with the current naming.

Moreover, I don't think we need a separate factory method to create a single threaded pool. It seems like an obvious case of a fixed-size pool, even if some optimization is happening under the covers. Using Dispatchers.fixedPool(size = 1) should cover all the use cases.

@voddan

This comment has been minimized.

Copy link

@voddan voddan commented Oct 6, 2018

What should I do if I need a grantee that my thread pool is never blocked (for more than 50ms)? For example I use a coroutine as a timer:

launch {
    delay(1000)
    println("step 1")

    delay(1000)
    println("step 2")

    delay(1000)
    println("step 3")
}

This code fails if for some reason all the threads in the pool are blocked (do CPU-bound work) and the scheduler is unable to switch this coroutine to a thread on time. Will the new pool be able to battle that?

@qwwdfsad qwwdfsad added the design label Oct 8, 2018
qwwdfsad added a commit that referenced this issue Oct 8, 2018
* The will be replaced by another mechanism in the future.
  See #261 for details.
* The proposed replacement is to use the standard java API:
  Executors.newSingleThreadExecutor/newFixedThreadPool
  and convert to dispatcher via asCoroutineDispatcher() extension.
@fvasco

This comment has been minimized.

Copy link
Contributor

@fvasco fvasco commented Apr 28, 2019

Regarding my consideration above, now is available the issue #1088.

@justjake

This comment has been minimized.

Copy link

@justjake justjake commented Feb 21, 2020

Hi, I saw the deprecation notice of https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/new-single-thread-context.html

Is there a recommended, current way to execute coroutines with FIFO semantics? Specifically I need to sequence access of SQLite database to occur from a single thread.

@fvasco

This comment has been minimized.

Copy link
Contributor

@fvasco fvasco commented Feb 22, 2020

@justjake

Is there a recommended, current way to execute coroutines with FIFO semantics?

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html

Specifically I need to sequence access of SQLite database to occur from a single thread.

You should not use the coroutine.

@justjake

This comment has been minimized.

Copy link

@justjake justjake commented Feb 22, 2020

@fvasco thank you for your reply. However I am now more confused: why is coroutines not the right tool here? My thought was that callers that require SQLite could await sub-operations dispatched to the SQLite dispatcher that uses only a single thread. Currently in my application this uses a concrete SQLiteJob with callbacks pushed into a queue, a Java ThreadPool, etc, which is cumbersome and already needs a suspend / resume to be used from coroutines.

@fvasco

This comment has been minimized.

Copy link
Contributor

@fvasco fvasco commented Feb 23, 2020

why is coroutines not the right tool here?

It is possible to run multiple coroutine concurrently on a single thread, but you want to "sequence access of SQLite database to occur from a single thread".

which is cumbersome and already needs a suspend / resume to be used from coroutines

Put the logic in a function, example:

suspend fun <T> sqlContext(block: (Sqlite) -> T) : T =
  executor.submit{ block(sqlite) }.await()
@matejdro

This comment has been minimized.

Copy link

@matejdro matejdro commented Feb 23, 2020

Any reason why would this executor be better than mutex?

@elizarov

This comment has been minimized.

Copy link
Member Author

@elizarov elizarov commented Mar 13, 2020

The reason is that it does a different thing than mutex:

  • With mutex (or semaphore) you can limit concurrency. That is, you can limit how many concurrent operations of a certain type your code is doing.
  • With this limited dispatcher you can limit parallelism. That is, you can limit how many things your code does in parallel. This is all about CPU-resource allocation. You can limit how many CPU cores/threads this or that kind of code could consume or block.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked pull requests

Successfully merging a pull request may close this issue.

None yet
6 participants
You can’t perform that action at this time.