Permalink
1089 lines (877 sloc) 42.3 KB

Guide to reactive streams with coroutines

This guide explains key differences between Kotlin coroutines and reactive streams and shows how they can be used together for greater good. Prior familiarity with basic coroutine concepts that are covered in Guide to kotlinx.coroutines is not required, but is a big plus. If you are familiar with reactive streams, you may find this guide a better introduction into the world of coroutines.

There are several modules in kotlinx.coroutines project that are related to reactive streams:

This guide is mostly based on Reactive Streams specification and uses its Publisher interface with some examples based on RxJava 2.x, which implements reactive streams specification.

You are welcome to clone kotlinx.coroutines project from GitHub to your workstation in order to run all the presented examples. They are contained in reactive/kotlinx-coroutines-rx2/test/guide directory of the project.

Table of contents

Differences between reactive streams and channels

This section outlines key differences between reactive streams and coroutine-based channels.

Basics of iteration

The Channel is somewhat similar concept to the following reactive stream classes:

They all describe an asynchronous stream of elements (aka items in Rx), either infinite or finite, and all of them support backpressure.

However, the Channel always represents a hot stream of items, using Rx terminology. Elements are being sent into the channel by producer coroutines and are received from it by consumer coroutines. Every receive invocation consumes an element from the channel. Let us illustrate it with the following example:

fun main(args: Array<String>) = runBlocking<Unit> {
    // create a channel that produces numbers from 1 to 3 with 200ms delays between them
    val source = produce<Int>(coroutineContext) {
        println("Begin") // mark the beginning of this coroutine in output
        for (x in 1..3) {
            delay(200) // wait for 200ms
            send(x) // send number x to the channel
        }
    }
    // print elements from the source
    println("Elements:")
    source.consumeEach { // consume elements from it
        println(it)
    }
    // print elements from the source AGAIN
    println("Again:")
    source.consumeEach { // consume elements from it
        println(it)
    }
}

You can get full code here

This code produces the following output:

Elements:
Begin
1
2
3
Again:

Notice, how "Begin" line was printed just once, because produce coroutine builder, when it is executed, launches one coroutine to produce a stream of elements. All the produced elements are consumed with ReceiveChannel.consumeEach extension function. There is no way to receive the elements from this channel again. The channel is closed when the producer coroutine is over and the attempt to receive from it again cannot receive anything.

Let us rewrite this code using publish coroutine builder from kotlinx-coroutines-reactive module instead of produce from kotlinx-coroutines-core module. The code stays the same, but where source used to have ReceiveChannel type, it now has reactive streams Publisher type.

fun main(args: Array<String>) = runBlocking<Unit> {
    // create a publisher that produces numbers from 1 to 3 with 200ms delays between them
    val source = publish<Int>(coroutineContext) {
    //           ^^^^^^^  <---  Difference from the previous examples is here
        println("Begin") // mark the beginning of this coroutine in output
        for (x in 1..3) {
            delay(200) // wait for 200ms
            send(x) // send number x to the channel
        }
    }
    // print elements from the source
    println("Elements:")
    source.consumeEach { // consume elements from it
        println(it)
    }
    // print elements from the source AGAIN
    println("Again:")
    source.consumeEach { // consume elements from it
        println(it)
    }
}

You can get full code here

Now the output of this code changes to:

Elements:
Begin
1
2
3
Again:
Begin
1
2
3

This example highlights the key difference between a reactive stream and a channel. A reactive stream is a higher-order functional concept. While the channel is a stream of elements, the reactive stream defines a recipe on how the stream of elements is produced. It becomes the actual stream of elements on subscription. Each subscriber may receive the same or a different stream of elements, depending on how the corresponding implementation of Publisher works.

The publish coroutine builder, that is used in the above example, launches a fresh coroutine on each subscription. Every Publisher.consumeEach invocation creates a fresh subscription. We have two of them in this code and that is why we see "Begin" printed twice.

In Rx lingo this is called a cold publisher. Many standard Rx operators produce cold streams, too. We can iterate over them from a coroutine, and every subscription produces the same stream of elements.

WARNING: It is planned that in the future a second invocation of consumeEach method on an channel that is already being consumed is going to fail fast, that is immediately throw an IllegalStateException. See this issue for details.

Note, that we can replicate the same behaviour that we saw with channels by using Rx publish operator and connect method with it.

Subscription and cancellation

An example in the previous section uses source.consumeEach { ... } snippet to open a subscription and receive all the elements from it. If we need more control on how what to do with the elements that are being received from the channel, we can use Publisher.openSubscription as shown in the following example:

fun main(args: Array<String>) = runBlocking<Unit> {
    val source = Flowable.range(1, 5) // a range of five numbers
        .doOnSubscribe { println("OnSubscribe") } // provide some insight
        .doOnComplete { println("OnComplete") }   // ...
        .doFinally { println("Finally") }         // ... into what's going on
    var cnt = 0 
    source.openSubscription().consume { // open channel to the source
        for (x in this) { // iterate over the channel to receive elements from it
            println(x)
            if (++cnt >= 3) break // break when 3 elements are printed
        }
        // Note: `consume` cancels the channel when this block of code is complete
    }
}

You can get full code here

It produces the following output:

OnSubscribe
1
2
3
Finally

With an explicit openSubscription we should cancel the corresponding subscription to unsubscribe from the source. There is no need to invoke cancel explicitly -- under the hood consume does that for us. The installed doFinally listener prints "Finally" to confirm that the subscription is actually being closed. Note that "OnComplete" is never printed because we did not consume all of the elements.

We do not need to use an explicit cancel either if iteration is performed over all the items that are emitted by the publisher, because it is being cancelled automatically by consumeEach:

fun main(args: Array<String>) = runBlocking<Unit> {
    val source = Flowable.range(1, 5) // a range of five numbers
        .doOnSubscribe { println("OnSubscribe") } // provide some insight
        .doOnComplete { println("OnComplete") }   // ...
        .doFinally { println("Finally") }         // ... into what's going on
    // iterate over the source fully
    source.consumeEach { println(it) }
}

You can get full code here

We get the following output:

OnSubscribe
1
2
3
4
OnComplete
Finally
5

Notice, how "OnComplete" and "Finally" are printed before the last element "5". It happens because our main function in this example is a coroutine that we start with runBlocking coroutine builder. Our main coroutine receives on the channel using source.consumeEach { ... } expression. The main coroutine is suspended while it waits for the source to emit an item. When the last item is emitted by Flowable.range(1, 5) it resumes the main coroutine, which gets dispatched onto the main thread to print this last element at a later point in time, while the source completes and prints "Finally".

Backpressure

Backpressure is one of the most interesting and complex aspects of reactive streams. Coroutines can suspend and they provide a natural answer to handling backpressure.

In Rx Java 2.x a backpressure-capable class is called Flowable. In the following example we use rxFlowable coroutine builder from kotlinx-coroutines-rx2 module to define a flowable that sends three integers from 1 to 3. It prints a message to the output before invocation of suspending send function, so that we can study how it operates.

The integers are generated in the context of the main thread, but subscription is shifted to another thread using Rx observeOn operator with a buffer of size 1. The subscriber is slow. It takes 500 ms to process each item, which is simulated using Thread.sleep.

fun main(args: Array<String>) = runBlocking<Unit> { 
    // coroutine -- fast producer of elements in the context of the main thread
    val source = rxFlowable(coroutineContext) {
        for (x in 1..3) {
            send(x) // this is a suspending function
            println("Sent $x") // print after successfully sent item
        }
    }
    // subscribe on another thread with a slow subscriber using Rx
    source
        .observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item
        .doOnComplete { println("Complete") }
        .subscribe { x ->
            Thread.sleep(500) // 500ms to process each item
            println("Processed $x")
        }
    delay(2000) // suspend the main thread for a few seconds
}

You can get full code here

The output of this code nicely illustrates how backpressure works with coroutines:

Sent 1
Processed 1
Sent 2
Processed 2
Sent 3
Processed 3
Complete

We see here how producer coroutine puts the first element in the buffer and is suspended while trying to send another one. Only after consumer processes the first item, producer sends the second one and resumes, etc.

Rx Subject vs BroadcastChannel

RxJava has a concept of Subject which is an object that effectively broadcasts elements to all its subscribers. The matching concept in coroutines world is called a BroadcastChannel. There is a variety of subjects in Rx with BehaviorSubject being the one used to manage state:

fun main(args: Array<String>) {
    val subject = BehaviorSubject.create<String>()
    subject.onNext("one")
    subject.onNext("two") // updates the state of BehaviorSubject, "one" value is lost
    // now subscribe to this subject and print everything
    subject.subscribe(System.out::println)
    subject.onNext("three")
    subject.onNext("four")
}

You can get full code here

This code prints the current state of the subject on subscription and all its further updates:

two
three
four

You can subscribe to subjects from a coroutine just as with any other reactive stream:

fun main(args: Array<String>) = runBlocking<Unit> {
    val subject = BehaviorSubject.create<String>()
    subject.onNext("one")
    subject.onNext("two")
    // now launch a coroutine to print everything
    launch(Unconfined) { // launch coroutine in unconfined context
        subject.consumeEach { println(it) }
    }
    subject.onNext("three")
    subject.onNext("four")
}

You can get full code here

The result is the same:

two
three
four

Here we use Unconfined coroutine context to launch consuming coroutine with the same behaviour as subscription in Rx. It basically means that the launched coroutine is going to be immediately executed in the same thread that is emitting elements. Contexts are covered in more details in a separate section.

The advantage of coroutines is that it is easy to get conflation behavior for single-threaded UI updates. A typical UI application does not need to react to every state change. Only the most recent state is relevant. A sequence of back-to-back updates to the application state needs to get reflected in UI only once, as soon as the UI thread is free. For the following example we are going to simulate this by launching consuming coroutine in the context of the main thread and use yield function to simulate a break in the sequence of updates and to release the main thread:

fun main(args: Array<String>) = runBlocking<Unit> {
    val subject = BehaviorSubject.create<String>()
    subject.onNext("one")
    subject.onNext("two")
    // now launch a coroutine to print the most recent update
    launch(coroutineContext) { // use the context of the main thread for a coroutine
        subject.consumeEach { println(it) }
    }
    subject.onNext("three")
    subject.onNext("four")
    yield() // yield the main thread to the launched coroutine <--- HERE
    subject.onComplete() // now complete subject's sequence to cancel consumer, too    
}

You can get full code here

Now coroutine process (prints) only the most recent update:

four

The corresponding behavior in a pure coroutines world is implemented by ConflatedBroadcastChannel that provides the same logic on top of coroutine channels directly, without going through the bridge to the reactive streams:

fun main(args: Array<String>) = runBlocking<Unit> {
    val broadcast = ConflatedBroadcastChannel<String>()
    broadcast.offer("one")
    broadcast.offer("two")
    // now launch a coroutine to print the most recent update
    launch(coroutineContext) { // use the context of the main thread for a coroutine
        broadcast.consumeEach { println(it) }
    }
    broadcast.offer("three")
    broadcast.offer("four")
    yield() // yield the main thread to the launched coroutine
    broadcast.close() // now close broadcast channel to cancel consumer, too    
}

You can get full code here

It produces the same output as the previous example based on BehaviorSubject:

four

Another implementation of BroadcastChannel is ArrayBroadcastChannel. It delivers every event to every subscriber since the moment the corresponding subscription is open. It corresponds to PublishSubject in Rx. The capacity of the buffer in the constructor of ArrayBroadcastChannel controls the numbers of elements that can be sent before the sender is suspended waiting for receiver to receive those elements.

Operators

Full-featured reactive stream libraries, like Rx, come with a very large set of operators to create, transform, combine and otherwise process the corresponding streams. Creating your own operators with support for back-pressure is notoriously difficult.

Coroutines and channels are designed to provide an opposite experience. There are no built-in operators, but processing streams of elements is extremely simple and back-pressure is supported automatically without you having to explicitly think about it.

This section shows coroutine-based implementation of several reactive stream operators.

Range

Let's roll out own implementation of range operator for reactive streams Publisher interface. The asynchronous clean-slate implementation of this operator for reactive streams is explained in this blog post. It takes a lot of code. Here is the corresponding code with coroutines:

fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
    for (x in start until start + count) send(x)
}

In this code CoroutineContext is used instead of an Executor and all the backpressure aspects are taken care of by the coroutines machinery. Note, that this implementation depends only on the small reactive streams library that defines Publisher interface and its friends.

It is straightforward to use from a coroutine:

fun main(args: Array<String>) = runBlocking<Unit> {
    range(CommonPool, 1, 5).consumeEach { println(it) }
}

You can get full code here

The result of this code is quite expected:

1
2
3
4
5

Fused filter-map hybrid

Reactive operators like filter and map are trivial to implement with coroutines. For a bit of challenge and showcase, let us combine them into the single fusedFilterMap operator:

fun <T, R> Publisher<T>.fusedFilterMap(
    context: CoroutineContext,   // the context to execute this coroutine in
    predicate: (T) -> Boolean,   // the filter predicate
    mapper: (T) -> R             // the mapper function
) = publish<R>(context) {
    consumeEach {                // consume the source stream 
        if (predicate(it))       // filter part
            send(mapper(it))     // map part
    }        
}

Using range from the previous example we can test our fusedFilterMap by filtering for even numbers and mapping them to strings:

fun main(args: Array<String>) = runBlocking<Unit> {
   range(coroutineContext, 1, 5)
       .fusedFilterMap(coroutineContext, { it % 2 == 0}, { "$it is even" })
       .consumeEach { println(it) } // print all the resulting strings
}

You can get full code here

It is not hard to see, that the result is going to be:

2 is even
4 is even

Take until

Let's implement our own version of takeUntil operator. It is quite a tricky one to implement, because of the need to track and manage subscription to two streams. We need to relay all the elements from the source stream until the other stream either completes or emits anything. However, we have select expression to rescue us in coroutines implementation:

fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
    this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher<T>
        val current = this
        other.openSubscription().consume { // explicitly open channel to Publisher<U>
            val other = this
            whileSelect {
                other.onReceive { false }          // bail out on any received element from `other`
                current.onReceive { send(it); true }  // resend element from this channel and continue
            }
        }
    }
}

This code is using whileSelect as a nicer shortcut to while(select{...}) {} loop and Kotlin's use expression to close the channels on exit, which unsubscribes from the corresponding publishers.

The following hand-written combination of range with interval is used for testing. It is coded using a publish coroutine builder (its pure-Rx implementation is shown in later sections):

fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
    for (x in start until start + count) { 
        delay(time) // wait before sending each number
        send(x)
    }
}

The following code shows how takeUntil works:

fun main(args: Array<String>) = runBlocking<Unit> {
    val slowNums = rangeWithInterval(coroutineContext, 200, 1, 10)         // numbers with 200ms interval
    val stop = rangeWithInterval(coroutineContext, 500, 1, 10)             // the first one after 500ms
    slowNums.takeUntil(coroutineContext, stop).consumeEach { println(it) } // let's test it
}

You can get full code here

Producing

1
2

Merge

There are always at least two ways for processing multiple streams of data with coroutines. One way involving select was shown in the previous example. The other way is just to launch multiple coroutines. Let us implement merge operator using the later approach:

fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
  consumeEach { pub ->                 // for each publisher received on the source channel
      launch(coroutineContext) {       // launch a child coroutine
          pub.consumeEach { send(it) } // resend all element from this publisher
      }
  }
}

Notice, the use of coroutineContext in the invocation of launch coroutine builder. It is used to refer to the context of the enclosing publish coroutine. This way, all the coroutines that are being launched here are children of the publish coroutine and will get cancelled when the publish coroutine is cancelled or is otherwise completed. Moreover, since parent coroutine waits until all children are complete, this implementation fully merges all the received streams.

For a test, let us start with rangeWithInterval function from the previous example and write a producer that sends its results twice with some delay:

fun testPub(context: CoroutineContext) = publish<Publisher<Int>>(context) {
    send(rangeWithInterval(context, 250, 1, 4)) // number 1 at 250ms, 2 at 500ms, 3 at 750ms, 4 at 1000ms 
    delay(100) // wait for 100 ms
    send(rangeWithInterval(context, 500, 11, 3)) // number 11 at 600ms, 12 at 1100ms, 13 at 1600ms
    delay(1100) // wait for 1.1s - done in 1.2 sec after start
}

The test code is to use merge on testPub and to display the results:

fun main(args: Array<String>) = runBlocking<Unit> {
    testPub(coroutineContext).merge(coroutineContext).consumeEach { println(it) } // print the whole stream
}

You can get full code here

And the results should be:

1
2
11
3
4
12
13

Coroutine context

All the example operators that are shown in the previous section have an explicit CoroutineContext parameter. In Rx world it roughly corresponds to a Scheduler.

Threads with Rx

The following example shows the basics of threading context management with Rx. Here rangeWithIntervalRx is an implementation of rangeWithInterval function using Rx zip, range, and interval operators.

fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> = 
    Flowable.zip(
        Flowable.range(start, count),
        Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
        BiFunction { x, _ -> x })

fun main(args: Array<String>) {
    rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
    Thread.sleep(1000)
}

You can get full code here

We are explicitly passing the Schedulers.computation() scheduler to our rangeWithIntervalRx operator and it is going to be executed in Rx computation thread pool. The output is going to be similar to the following one:

1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1

Threads with coroutines

In the world of coroutines Schedulers.computation() roughly corresponds to CommonPool, so the previous example is similar to the following one:

fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
    for (x in start until start + count) { 
        delay(time) // wait before sending each number
        send(x)
    }
}

fun main(args: Array<String>) {
    Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
    Thread.sleep(1000)
}

You can get full code here

The produced output is going to be similar to:

1 on thread ForkJoinPool.commonPool-worker-1
2 on thread ForkJoinPool.commonPool-worker-1
3 on thread ForkJoinPool.commonPool-worker-1

Here we've used Rx subscribe operator that does not have its own scheduler and operates on the same thread that the publisher -- on a CommonPool in this example.

Rx observeOn

In Rx you use special operators to modify the threading context for operations in the chain. You can find some good guides about them, if you are not familiar.

For example, there is observeOn operator. Let us modify the previous example to observe using Schedulers.computation():

fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
    for (x in start until start + count) { 
        delay(time) // wait before sending each number
        send(x)
    }
}

fun main(args: Array<String>) {
    Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
        .observeOn(Schedulers.computation())                           // <-- THIS LINE IS ADDED
        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
    Thread.sleep(1000)
}

You can get full code here

Here is the difference in output, notice "RxComputationThreadPool":

1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1

Coroutine context to rule them all

A coroutine is always working in some context. For example, let us start a coroutine in the main thread with runBlocking and iterate over the result of the Rx version of rangeWithIntervalRx operator, instead of using Rx subscribe operator:

fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
    Flowable.zip(
        Flowable.range(start, count),
        Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
        BiFunction { x, _ -> x })

fun main(args: Array<String>) = runBlocking<Unit> {
    rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
        .consumeEach { println("$it on thread ${Thread.currentThread().name}") }
}

You can get full code here

The resulting messages are going to be printed in the main thread:

1 on thread main
2 on thread main
3 on thread main

Unconfined context

Most Rx operators do not have any specific thread (scheduler) associated with them and are working in whatever thread that they happen to be invoked in. We've seen it on the example of subscribe operator in the threads with Rx section.

In the world of coroutines, Unconfined context serves a similar role. Let us modify our previous example, but instead of iterating over the source Flowable from the runBlocking coroutine that is confined to the main thread, we launch a new coroutine in Unconfined context, while the main coroutine simply waits its completion using Job.join:

fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
    Flowable.zip(
        Flowable.range(start, count),
        Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
        BiFunction { x, _ -> x })

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(Unconfined) { // launch new coroutine in Unconfined context (without its own thread pool)
        rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
            .consumeEach { println("$it on thread ${Thread.currentThread().name}") }
    }
    job.join() // wait for our coroutine to complete
}

You can get full code here

Now, the output shows that the code of the coroutine is executing in the Rx computation thread pool, just like our initial example using Rx subscribe operator.

1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1

Note, that Unconfined context shall be used with care. It may improve the overall performance on certain tests, due to the increased stack-locality of operations and less scheduling overhead, but it also produces deeper stacks and makes it harder to reason about asynchronicity of the code that is using it.

If a coroutine sends an element to a channel, then the thread that invoked the send may start executing the code of a coroutine with Unconfined dispatcher. The original producer coroutine that invoked send is paused until the unconfined consumer coroutine hits its next suspension point. This is very similar to a lock-step single-threaded onNext execution in Rx world in the absense of thread-shifting operators. It is a normal default for Rx, because operators are usually doing very small chunks of work and you have to combine many operators for a complex processing. However, this is unusual with coroutines, where you can have an arbitrary complex processing in a coroutine. Usually, you only need to chain stream-processing coroutines for complex pipelines with fan-in and fan-out between multiple worker coroutines.