Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel possibly not acting fair #111

Open
RRethy opened this issue Sep 6, 2017 · 10 comments

Comments

Projects
None yet
5 participants
@RRethy
Copy link

commented Sep 6, 2017

I noticed a few scenarios with channels where they do not seem to be fair and are filling their buffer in the wrong order. The main gist of the post can be seen from the first example, while the additional examples provide alternate scenarios with more unexpected outputs.

suspend fun sendString(channel: SendChannel<String>, s: String) {
    while (true) {
        channel.send(s)
    }
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<String>()
    launch(coroutineContext) { sendString(channel, "foo") }
    launch(coroutineContext) { sendString(channel, "BAR!") }
    repeat(6) { // receive first six
        println(channel.receive())
    }
}

Outputs:

foo
foo
BAR!
foo
foo
BAR!

If the channel is given a buffer, the buffer is then filled with as many foo's as it can hold, and then a single BAR! which causes an output of the following when the channel has a buffer of 4:

foo
foo
foo
foo
foo
foo

The buffer is always filled to its max capacity with "foo" and only gets a "BAR!" afterwards at which point the send("BAR!") is suspended until the multiple foo's in the buffer are drained.

Filling the buffer like this also has some other weird results when a delay is added either before or after channel.receive() is called, as seen below:

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<String>(2)
    launch(coroutineContext) { sendString(channel, "foo") }
    launch(coroutineContext) { sendString(channel, "BAR!") }
    repeat(6) { // receive first six
        delay(1L)
        println(channel.receive())
    }
}

Outputs:

foo
BAR!
foo
BAR!
foo
BAR!

Above is what I anticipated the original code to output since that seems more like the channel is acting fair.

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<String>()
    launch(coroutineContext) { sendString(channel, "foo") }
    launch(coroutineContext) { sendString(channel, "BAR!") }
    repeat(6) { // receive first six
        println(channel.receive())
        delay(1L)
    }
}

Outputs:

foo
foo
BAR!
foo
BAR!
foo

From the code above and through some debugging, send("foo") is getting called, then it is getting to send("foo") a second time at which point it suspends, then it goes to send("BAR!") and suspends which causes the next strings in the buffer to be ordered such that "foo" is before "BAR!" for the first two items emitted.

And the last scenario:

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<String>()
    launch(coroutineContext) { sendString(channel, "foo") }
    launch(coroutineContext) { sendString(channel, "BAR!") }
    delay(1L)
    repeat(6) { // receive first six
        println(channel.receive())
    }
}

Outputs:

foo
BAR!
foo
foo
BAR!
foo

This case was the weirdest as the execution of the while loop and the suspensions from send in sendString were in a very weird order that caused two "foo"'s to output in position 2 and 3.

Sorry for the long post, I noticed this while playing around with coroutines and did not think it was intended so I thought I would post here. If this is intended or there is a silly I'm making, any explanation is appreciated. Thanks.

@elizarov

This comment has been minimized.

Copy link
Member

commented Sep 7, 2017

This is a side effect of the fact that all the coroutines run under the same context in the same thread. Let me provide a step by step explanation of the first scenario (with unbuffered rendezvous channel):

  1. From the "main" coroutine we schedule two coroutines to run using launch (let's call them "foo" coroutine and "bar" coroutine).
  2. The "main" coroutine attempts to receive from the channel, there is no sender, so receive function suspends (freeing the execution thread for the next coroutine in queue). Now the execution queue looks like this: "foo", "bar". Suspended "main" waits on receive and is not scheduled for execution.
  3. The "foo" coroutine is executed, it sends "foo", which rendezvous with the suspended "main" coroutine and resumes it. Now the execution queue looks like this: "foo", "bar", "main" (to receive "foo")
  4. The "foo" coroutine continues to execute (it was not suspended, but successfully sent) and attempts to send again. This time there is no one the receive side, no rendezvous, and it gets suspended. Execution queue: "bar", "main" (to receive "foo"). Suspended "foo" waits on send.
  5. The "bar" coroutine is executed. It attempts to send "bar", but there is no rendezvous with receiver, so it suspends, too. Execution queue: "main" (to receive "foo"). Suspended "for" and "bar" wait on send (in this order)
  6. The "main" coroutine is executing, resuming from receive with a value of "foo". It prints "foo", then does another receive.
  7. Now there is a queue of both "for" and "bar" waiting on send, so the "main" coroutine will successfully rendezvous with them printing, one more "foo" and printing "bar" and resuming the corresponding coroutines. Execution queue: "main", "foo", "bar". We are still executing "main".
  8. The next attempt to receive by the "main" coroutine does not rendezvous and suspends. Execution queue: "foo", "bar". Suspended "main" waits on receive.
  9. You can see that now we are in a similar state that was after step 2, so you can continue from step 3.

The resulting "foo", "foo", "bar, "foo", "foo", "bar", etc does look counter-intivive, though. I'll keep this issue open for two reasons:

  • Think if something can be adjusted in channel rendezvous or execution logic so that outcome becomes more "fair".
  • Or document and explain this behavior better.
@RRethy

This comment has been minimized.

Copy link
Author

commented Sep 7, 2017

Thanks for the detailed walkthrough, output makes a lot more sense, also helped in figuring out how the delay(1L)'s were affecting it.

@fvasco

This comment has been minimized.

Copy link
Contributor

commented Sep 8, 2017

@elizarov

This comment has been minimized.

Copy link
Member

commented Sep 8, 2017

One way to solve the issue of fairness and to make this behavior more intuitive is to change the logic (and specification) of send to make it always suspend on invocation. It already suspends when it cannot send (no rendezvous/buffer full), but it does not suspend otherwise. It can yield in the latter case. With this change, the execution in the topic-starter example will become fair. I think we can add an optional fair boolean parameter to channel constructors to enable this behavior.

@elizarov

This comment has been minimized.

Copy link
Member

commented Sep 8, 2017

The fair channels would also solve an issue of cancellability some people are facing with channels. The issue is that suspending invocations are cancellable only when they do suspend, so when you are continuously sending into the channel it is somewhat based on your luck (esp. in multi-thread scenarios) whether the sender can be cancelled. A send on a fair channel will always suspend (even if just to yield), so it will be always cancellable. This leads me think that we might want to have fair channels by default, while leaving an option of using non-fair channels as a performance optimization.

@fvasco

This comment has been minimized.

Copy link
Contributor

commented Sep 8, 2017

Is it "safe" to support unfair channel?
Unlimited/Conflated channel should be always fair.

Maybe this decision should be motivated, if performance gain of unfair channel is negligible then I don't find any reason to supporting it.

@elizarov

This comment has been minimized.

Copy link
Member

commented Sep 8, 2017

I'm going to find out the performance difference between fair/unfair channels before making this decision. It is also related to #113, since the performance difference should be more pronounced with SPSC channels.

@fvasco

This comment has been minimized.

Copy link
Contributor

commented Oct 3, 2017

A little case study on Unlimited/Conflated Channel.
The following case is valid on single thread, but it is acceptable for the ForkJoinPool on a loaded machine.

Here an example code:

fun main(vararg args: String) {
    val channel = Channel<Int>(Channel.UNLIMITED) // CONFLATED
    val context = newSingleThreadContext("test")

    // producer
    launch(context) {
        var i = 0
        while (true) {
            channel.send(i++)
            // yield()
        }
    }

    // consumer
    launch(context) {
        channel.consumeEach { println(it) }
    }
    Thread.sleep(15000)
}

As explained above this program doesn't print anything, yield fixes this issue.
Moreover producing a large block of data without consuming leads some issue on GC and CPU cache (expecialy for unlimited channel).

@AlexTrotsenko

This comment has been minimized.

Copy link

commented Jun 14, 2019

I'm going to find out the performance difference between fair/unfair channels before making this decision.

@elizarov since there were no posts here for quite some time: is there any updates on the before mentioned change in the behaviour? Is it planned to be implemented before experimental API and specification become stable?

@elizarov

This comment has been minimized.

Copy link
Member

commented Jun 18, 2019

No update so far. We've been thinking about fairness of the channels, though, and here are some thoughts:

  • Making channels "always fair" adds visible overhead. That is not something we'd want to do.
  • Leaving channels "unfair" as they are now creates various obviously problematic case (especially notable is the one contributed by @fvasco)

So, the current thinking that that we should make channels "partially fair" -- yield periodically (say on every 32 sends of something like that). It is not easy to add to the current design of channel, but as we work on a different channel implementation that feature can be quite easily incorporated at little additional cost.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.