Skip to content
Kotlin multi-platform implementation of Reactive Extensions
Kotlin Other
  1. Kotlin 99.3%
  2. Other 0.7%
Branch: master
Clone or download
arkivanov Merge pull request #390 from amihusb/patch-1
Remove useless generic parameter
Latest commit c8b66a6 Jan 20, 2020
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
.github/workflows Create separate Gradle projects for darwin samples Jan 11, 2020
assets Added Reaktive logo for GitHub Apr 11, 2019
benchmarks/jmh Updated benchmarks Nov 15, 2019
buildSrc Merge branch 'master' into samples-as-gradle-projects Jan 13, 2020
coroutines-interop Add ExperimentalCoroutinesApi annotation for tests Nov 8, 2019
gradle/wrapper Gradle 5.6.4 Nov 2, 2019
reaktive-annotations Move ExperimentalReaktiveApi to the reaktive module Jan 8, 2020
reaktive-testing Add delaySubscription operators Jan 10, 2020
reaktive Throw NoSuchElementException in iterator Jan 18, 2020
rxjava2-interop Removed useless generic parameter Jan 20, 2020
rxjava3-interop Apply detekt for jvm library projects Nov 8, 2019
sample-android-app Revert changes in sample-android-app build.gradle Jan 7, 2020
sample-ios-app Create separate Gradle projects for darwin samples Jan 11, 2020
sample-js-browser-app Add missing new lines at the end Oct 30, 2019
sample-linuxx64-app Refactored CompositeDisposable Nov 7, 2019
sample-macos-app Create separate Gradle projects for darwin samples Jan 11, 2020
sample-mpp-module Mark classes that use DisposableScope with UseExperimental Jan 14, 2020
tools/binary-compatibility Update BC files Jan 10, 2020
utils Merge branch 'master' into move-ThreadLocalStorage-to-utils Dec 27, 2019
.editorconfig Add new line to the end of .editorconfig Nov 14, 2019
.gitignore Add all build folders to .gitignore Oct 24, 2019
LICENSE Initial commit Mar 6, 2019
README.md Merge branch 'master' into bump-version-to-1.1.7 Jan 8, 2020
build.gradle Merge branch 'master' into samples-as-gradle-projects Jan 13, 2020
detekt.yml Enable UseIfInsteadOfWhen Dec 14, 2019
gradle.properties add androidx and jetifier flags Jan 2, 2020
gradlew Upgrade Gradle to 5.6.3 Oct 20, 2019
gradlew.bat Upgrade Gradle to 5.6.3 Oct 20, 2019
settings.gradle.kts Create separate Gradle projects for darwin samples Jan 11, 2020

README.md

Download Build Status License

Kotlin multiplatform implementation of Reactive Extensions.

Setup

Recommended minimum Gradle version is 5.3. Please read first the documentation about metadata publishing mode.

Add Bintray repository into your root build.gradle file:

repositories {
    maven {
        url  "https://dl.bintray.com/badoo/maven"
    }
}

There are a number of modules published:

  • reaktive - the main Reaktive library (multiplatform)
  • reaktive-annotations - collection of annotations (mutiplatform)
  • reaktive-testing - testing utilities (multiplatform)
  • utils - some utilities like Clock, AtomicReference, Lock, etc. (multiplatform)
  • coroutines-interop - Kotlin coroutines interoperability helpers (multiplatform)
  • rxjava2-interop - RxJava2 interoperability helpers (JVM and Android)
  • rxjava3-interop - RxJava3 interoperability helpers (JVM and Android)

Multiplatform module publications

Kotlin common (root publication):

implementation 'com.badoo.reaktive:<module-name>:<latest-version>'

JVM:

implementation 'com.badoo.reaktive:<module-name>-jvm:<latest-version>'

Android (debug and release):

implementation 'com.badoo.reaktive:<module-name>-android:<latest-version>'

iOS 32:

implementation 'com.badoo.reaktive:<module-name>-ios32:<latest-version>'

iOS 64:

implementation 'com.badoo.reaktive:<module-name>-ios64:<latest-version>'

iOS sim:

implementation 'com.badoo.reaktive:<module-name>-iossim:<latest-version>'

macOS x64:

implementation 'com.badoo.reaktive:<module-name>-macosx64:<latest-version>'

watchOS ARM32

implementation 'com.badoo.reaktive:<module-name>-watchosarm32:<latest-version>'

watchOS ARM64

implementation 'com.badoo.reaktive:<module-name>-watchosarm64:<latest-version>'

watchOS sim

implementation 'com.badoo.reaktive:<module-name>-watchossim:<latest-version>'

tvOS ARM64

implementation 'com.badoo.reaktive:<module-name>-tvosarm64:<latest-version>'

tvOS sim

implementation 'com.badoo.reaktive:<module-name>-tvossim:<latest-version>'

JavaScript:

implementation 'com.badoo.reaktive:<module-name>-js:<latest-version>'

Linux x64:

implementation 'com.badoo.reaktive:<module-name>-linuxx64:<latest-version>'

Linux ARM 32 hfp:

implementation 'com.badoo.reaktive:<module-name>-linuxarm32hfp:<latest-version>'

Regular modules:

implementation 'com.badoo.reaktive:<module-name>:<latest-version>'

Typical dependencies configuration for MPP module (metadata mode)

kotlin {
    sourceSets {
        commonMain {
            dependencies {
                implementation 'com.badoo.reaktive:reaktive:<latest-version>'
                implementation 'com.badoo.reaktive:reaktive-annotations:<latest-version>'
                implementation 'com.badoo.reaktive:coroutines-interop:<latest-version>'
            }
        }

        commonTest {
            dependencies {
                implementation 'com.badoo.reaktive:reaktive-testing:<latest-version>'
            }
        }
    }
}

Features:

  • Multiplatform: JVM, Android, iOS, macOS, watchOS, tvOS, JavaScript, Linux X64, Linux ARM 32 hfp
  • Schedulers support: computation, IO, trampoline, main
  • True multithreading for Kotlin/Native (there are some limitations)
  • Thread local subscriptions without freezing for Kotlin/Native
  • Supported sources: Observable, Maybe, Single, Completable
  • Subjects: PublishSubject, BehaviorSubject, ReplaySubject, UnicastSubject
  • Interoperability with Kotlin Coroutines: conversions between coroutines (including Flow) and Reaktive
  • Interoperability with RxJava2 and RxJava3: conversion of sources between Reaktive and RxJava, ability to reuse RxJava's schedulers

Kotlin Native pitfalls

Kotlin Native memory model and concurrency are very special. In general shared mutable state between threads is not allowed. Since Reaktive supports multithreading in Kotlin Native, please read the following documents before using it:

Object detachment is relatively difficult to achieve and is very error-prone when the objects are created from outside and are not fully managed by the library. This is why Reaktive prefers frozen state. Here are some hints:

  • Any callback (and any captured objects) submitted to a Scheduler will be frozen
  • subscribeOn freezes both its upstream source and downstream observer, all the Disposables (upstream's and downstream's) are frozen as well, all the values (including errors) are not frozen by the operator
  • observeOn freezes only its downstream observer and all the values (including errors) passed through it, plus all the Disposables, upstream source is not frozen by the operator
  • Other operators that use scheduler (like debounce, timer, delay, etc.) behave same as observeOn in most of the cases

Thread local tricks to avoid freezing

Sometimes freezing is not acceptable, e.g. we might want to load some data in background and then update the UI. Obviously UI can not be frozen. With Reaktive it is possible to achieve such a behaviour in two ways:

Use threadLocal operator:

val values = mutableListOf<Any>()
var isFinished = false

observable<Any> { emitter ->
    // Background job
}
    .subscribeOn(ioScheduler)
    .observeOn(mainScheduler)
    .threadLocal()
    .doOnBeforeNext { values += it } // Callback is not frozen, we can updated the mutable list
    .doOnBeforeFinally { isFinished = true } // Callback is not frozen, we can change the flag
    .subscribe()

Set isThreadLocal flag to true in subscribe operator:

val values = mutableListOf<Any>()
var isComplete = false

observable<Any> { emitter ->
    // Background job
}
    .subscribeOn(ioScheduler)
    .observeOn(mainScheduler)
    .subscribe(
        isThreadLocal = true,
        onNext = { values += it }, // Callback is not frozen, we can updated the mutable list
        onComplete = { isComplete = true } // Callback is not frozen, we can change the flag
    )

In both cases subscription (subscribe call) must be performed on the Main thread.

Subscription management with DisposableScope

Reaktive provides an easy way to manage subscriptions: DisposableScope.

Take a look at the following examples:

val scope =
    disposableScope {
        observable.subscribeScoped(...) // Subscription will be disposed when the scope is disposed

        doOnDispose {
            // Will be called when the scope is disposed
        }

        someDisposable.scope() // `someDisposable` will be disposed when the scope is disposed
    }

// At some point later
scope.dispose()
class MyPresenter(
    private val view: MyView,
    private val longRunningAction: Completable
) : DisposableScope by DisposableScope() {

    init {
        doOnDispose {
            // Will be called when the presenter is disposed
        }
    }

    fun load() {
        view.showProgressBar()

        // Subscription will be disposed when the presenter is disposed
        longRunningAction.subscribeScoped(onComplete = view::hideProgressBar)
    }
}

class MyActivity : AppCompatActivity(), DisposableScope by DisposableScope() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        MyPresenter(...).scope()
    }

    override fun onDestroy() {
        dispose()

        super.onDestroy()
    }
}

Samples:

You can’t perform that action at this time.