Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single Dispatchers.IO is not enough - threads bulkheads is required #1273

Closed
swarmshine opened this issue Jun 15, 2019 · 3 comments
Closed

Comments

@swarmshine
Copy link

Problem
Suppose that our application works with two databases that provides third-party blocking API:

fun queryFirstDatabse(query: String): String
fun querySecondDatabse(query: String): String

And our application wants to publish two REST endpoints:

@Rest
suspend fun first() = withContext(Dispatchers.IO) {
    queryFirstDatabse("well running query")
}
@Rest
suspend fun second() = withContext(Dispatchers.IO) {
    querySecondDatabse("possibly blocking query due to a bug")
}

Suppose our second method starts to block for long time due to a bug.
After 64 invocation of second method there will be no free thread left in Dispatchers.IO.
As a result first method will start to block too.

Another case: suppose that our second database querying function not blocks forever.
Instead due to a performance problem in second database it starts to execute slowly.
This will increase latency not only for second REST method but also for first REST method too. And whole system performance will suffer due to one buggy second method.

One solution that I can think of is to explicitly separate thread pools for different methods:

val FirstDispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
val SecondDispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()

@Rest
suspend fun first() = withContext(FirstDispatcher) {
    queryFirstDatabse("well running query")
}

@Rest
suspend fun second() = withContext(SecondDispatcher) {
    querySecondDatabse("possibly blocking query due to a bug")
}

This way problems in second method will not affect first method.
But we are loosing nice property of Dispatchers.IO: it prevents unnecessary context-switch.
In this example if there are no overloading of dispatchers queues the context switch will not occur.

withContext(Dispatchers.Default){
    cpuIntensiveBusinessLogic()
    withContext(Dispatchers.IO) {
        queryFirstDatabse("first query")
    }
    cpuIntensiveBusinessLogic()
    withContext(Dispatchers.IO) {
        querySecondDatabse("second query")
    }
    cpuIntensiveBusinessLogic()
}

With our fix with explicit dispatchers built on top of different executors context-switch will always occur:

withContext(Dispatchers.Default){
    cpuIntensiveBusinessLogic()
    withContext(FirstDispatcher) {
        queryFirstDatabse("first query")
    }
    cpuIntensiveBusinessLogic()
    withContext(SecondDispatcher) {
        querySecondDatabse("second query")
    }
    cpuIntensiveBusinessLogic()
}

Current state
Context switch reducing approach for IO and Deafult dispatchers internally implemented via kotlinx.coroutines.scheduling.LimitingDispatcher. Actually Dispatchers.IO and Dispatchers.Default both sits on top of a single dispatcher and simply limit amount of blocking and cpu intensive tasks.

Proposal
Provide api to enable users to build lightweight limiting dispatchers that doesn't own any resources (its own threads) and simply provides a view of the original dispatchers.

val DefaultPoolSize = 4
val FirstPoolSize = 10
val SecondPoolSize = 15

val AppWideDispatcher = Executors.newFixedThreadPool(DefaultPoolSize + FirstPoolSize + SecondPoolSize).asCoroutineDispatcher()

val DefaultDispatcher = AppWideDispatcher.limit(DefaultPoolSize)
val FirstDispatcher = AppWideDispatcher.blocking(FirstPoolSize)
val SecondDispatcher = AppWideDispatcher.blocking(SecondPoolSize)

withContext(DefaultDispatcher){
    cpuIntensiveBusinessLogic()
    withContext(FirstDispatcher) {
        queryFirstDatabse("first query")
    }
    cpuIntensiveBusinessLogic()
    withContext(SecondDispatcher) {
        querySecondDatabse("second query")
    }
    cpuIntensiveBusinessLogic()
}

or if we decided to use default IO initialized by max threads count of 64:

val FirstDispatcher = Dispatchers.IO.limit(10)
val SecondDispatcher = Dispatchers.IO.limit(10)

withContext(Dispatchers.Default){
    cpuIntensiveBusinessLogic()
    withContext(FirstDispatcher) {
        queryFirstDatabse("first query")
    }
    cpuIntensiveBusinessLogic()
    withContext(SecondDispatcher) {
        querySecondDatabse("second query")
    }
    cpuIntensiveBusinessLogic()
}
@fvasco
Copy link
Contributor

fvasco commented Jun 15, 2019

@swarmshine take a look here #261

@swarmshine
Copy link
Author

swarmshine commented Jun 15, 2019

Thank you, @fvasco !
Closing issue as a duplicate.

@glasser
Copy link
Contributor

glasser commented Aug 16, 2019

FWIW I think you can do this today:

val FirstDispatcher = (Dispatchers.Default as ExperimentalCoroutineDispatcher).blocking(20)

It uses an internal coroutines API which may change when you upgrade, but I believe it does work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants