To modify data RxJava provides a lot of methods, but Kotlin Coroutines doesn’t have them. This library’s goal is creating functions that will provide users possibility to use the same functional as in RxJava
Branch: master
Clone or download

Readme.md

Coroutines Operators

Preview

Kotlin Coroutines is growing in Android programming as a new way to create asynchronous work. Comparing them with RxJava, we can see that RxJava has a lot functions that helps to modify data, but Coroutines still has not got those functions. Coroutines Operators is library that created to fix those problems and provide users posibility to use same functions as in RxJava, but in Kotlin Coroutines.

Library contains extensions for Deferred and ReceiveChannel classes.

Note

Coroutines Cache works only on kotlin-coroutines:0.26.0 and above.

List of extended functions

Deferred

  • zip() - combine items from two Deferreds together via a specified function and return items based on the results of this function
  • zipWith() - instance version of zip()
  • filter() - filter items in Deferred
  • map() - transform the items from Deferred by applying some function
  • flatMap() - transform the Collection items in Deferred into Deferreds, then flatten this into a single Collection
  • concatMap() - transform the Collection items in Deferred into Deferreds, then flatten this into a single Collection, without interleaving
  • retryDeferredWithDelay() - try to await result of Deferred, if it fails, then retry with custom delay.

ReceiverChannels

  • asyncFlatMap() - transform the items, emitted in Channel into Deferreds, then flatten this into a single Channel
  • asyncConcatMap() - transform the items, emitted in Channel into Deferreds, then flatten this into a single Channel, without interleaving
  • asyncMap() - transform the items, emitted in Channel, by applying some function
  • distinctUntilChanged() - suppress duplicate consecutive items emitted by the ReceiverChannel
  • reduce() - apply a function to each emitted item, sequentially, and emit only the final accumulated value
  • concat() - concatenate two ReceiverChannels sequentially
  • concatWith() - instance version of concat()
  • debounce() - only emit an item from the source ReceiverChannel after a particular timespan has passed without the ReceiverChannel emitting any other items

Examples

zip

Combine items from two Deferreds together via a specified function and return items based on the results of this function

    val firstName = async { "James" }
    val secondName = async { "Kirk" }
    println(zip(first, second, BiFunction { first: String, second: String -> "$first $second" }))
    // prints "James Kirk"

zipWith

Instance version of zip()

    val firstName = async { "James" }
    val secondName = async { "Kirk" }
    println(first.zipWith(second, BiFunction { first: String, second: String -> "$first $second" }))
    // prints "James Kirk"

map

Transform the items from Deferred by applying some function

    val hello = async { "Hello" }
    println(hello.map { item: String -> "$item world"})
    // prints "Hello world"

flatMap

Transform the Collection items in Deferred into Deferreds, then flatten this into a single Collection

    val time = System.currentTimeMillis()
    val dataArray = async { listOf(1, 2, 3, 4, 5) }
    val result = dataArray.flatMap ({
        Thread.sleep(5000 - it * 1000L)
        it * 2
    }).toList()
    println(result)
    println("Working time is ${System.currentTimeMillis() - time}")
    // prints [10, 8, 6, 4, 2] Working time is 4097

concatMap

Transform the Collection items in Deferred into Deferreds, then flatten this into a single Collection, without interleaving

    val time = System.currentTimeMillis()
    val dataArray = async { listOf(1, 2, 3, 4, 5) }
    val result = dataArray.concatMap ({
        Thread.sleep(5000 - it * 1000L)
        it * 2
    }).toList()
    println(result)
    println("Working time is ${System.currentTimeMillis() - time}")
    // prints [2, 4, 6, 8, 10] Working time is 4303

asyncFlatMap

Transform the items, emitted in Channel into Deferreds, then flatten this into a single Channel

    val time = System.currentTimeMillis()
    val list = listOf(1, 2, 3, 4)
    val result = list.asReceiveChannel().asyncFlatMap(mapper = {
        Thread.sleep(5000 - it * 1000L)
        listOf(it * 2).asReceiveChannel()
    }).toList()
    println(result)
    print("Working time ${System.currentTimeMillis() - time}")
    // prints [6, 4, 8, 2] Working time 4167

asyncConcatMap

Transform the items, emitted in Channel into Deferreds, then flatten this into a single Channel, without interleaving

    val time = System.currentTimeMillis()
    val list = listOf(1, 2, 3, 4)
    val result = list.asReceiveChannel().asyncFlatMap(mapper = {
        Thread.sleep(5000 - it * 1000L)
        listOf(it * 2).asReceiveChannel()
    }).toList()
    println(result)
    print("Working time ${System.currentTimeMillis() - time}")
    // [2, 4, 6, 8] Working time 4222

asyncMap

Transform the items, emitted in Channel, by applying some function

    val list = listOf(1, 2, 3, 4)
    print(list.asReceiveChannel().asyncMap(mapper = { it * 2 }).toList())
    // prints [2, 4, 6, 8]

distinctUntilChanged

Suppress duplicate consecutive items emitted by the ReceiverChannel

    val list = listOf(1, 1, 1, 2, 2, 2, 3, 4, 4, 4)
    print(list.asReceiveChannel().distinctUntilChanged().toList())
    // prints [1, 2, 3, 4]

Version with comparator

    val list = listOf(1, 1, 2, 6, 8, 3, 4, 6, 8)
    print(list.asReceiveChannel().distinctUntilChanged(comparator = { first, second -> first % 2 == second % 3 }).toList())
    // prints [1, 2, 6, 3]

reduce

Apply a function to each emitted item, sequentially, and emit only the final accumulated value

    val list = listOf(1, 2, 3, 4)
    print(list.asReceiveChannel().reduce(accumulator = { first, second -> first + second }).toList())
    // prints [10]

Version with initial value

    val list = listOf(1, 2, 3, 4)
    print(list.asReceiveChannel().reduce(initialValue = 5, accumulator = { first, second -> first + second }).toList())
    // prints [15]

concat

Concatenate two ReceiverChannels sequentially

    val list1 = listOf(1, 2, 3, 4)
    val list2 = listOf(5, 6, 7)
    print(concat(first = list1.asReceiveChannel(), second = list2.asReceiveChannel()).toList())
    // prints [1, 2, 3, 4, 5, 6, 7]

concatWith

Instance version of concat()

    val list1 = listOf(1, 2, 3, 4)
    val list2 = listOf(5, 6, 7)
    print(list1.asReceiveChannel().concatWith(other = list2.asReceiveChannel()).toList())
    // prints [1, 2, 3, 4, 5, 6, 7]