Skip to content

Commit

Permalink
Reactor Integration (arrow-kt#706)
Browse files Browse the repository at this point in the history
* Starting Reactor integration

* Adding some tests

* Adding instances for MonoK

* Including arrow-effects-reactor in settings.gradle

* Updating FluxK

* Updating MonoK

* Fixing tests

* Adding another test for FluxK

* Adding tests for MonoK

* Removing duplicated laws for MonoK

* Adding docs for reactor integration

* Including missing dependencies

* Fixing typo in package name

* Adding reactor dependency line to the root README
  • Loading branch information
Cotel authored and RawToast committed Jul 18, 2018
1 parent 328e41e commit d1f9000
Show file tree
Hide file tree
Showing 15 changed files with 855 additions and 30 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dependencies {
compile 'io.arrow-kt:arrow-mtl:0.7.2' //optional
compile 'io.arrow-kt:arrow-effects:0.7.2' //optional
compile 'io.arrow-kt:arrow-effects-rx2:0.7.2' //optional
compile 'io.arrow-kt:arrow-effects-reactor:0.7.2' //optional
compile 'io.arrow-kt:arrow-effects-kotlinx-coroutines:0.7.2' //optional
compile 'io.arrow-kt:arrow-optics:0.7.2' //optional
compile 'io.arrow-kt:arrow-generic:0.7.2' //optional
Expand Down
60 changes: 30 additions & 30 deletions infographic/arrow-infographic.txt

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions modules/docs/arrow-docs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies {
compile project(':arrow-instances-data')
compile project(':arrow-effects')
compile project(':arrow-effects-rx2')
compile project(':arrow-effects-reactor')
compile project(':arrow-effects-kotlinx-coroutines')
compile project(':arrow-optics')
compile project(':arrow-recursion')
Expand All @@ -27,6 +28,7 @@ dependencies {

compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion"
compile "io.reactivex.rxjava2:rxjava:2.1.13"
compile "io.projectreactor:reactor-core:3.1.3.RELEASE"
}

task printcp {
Expand Down
3 changes: 3 additions & 0 deletions modules/docs/arrow-docs/docs/_data/menu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ options:
- title: Rx2
url: /docs/integrations/rx2/

- title: Reactor
url: /docs/integrations/reactor/

- title: kotlinx.coroutines
url: /docs/integrations/kotlinxcoroutines/

Expand Down
1 change: 1 addition & 0 deletions modules/docs/arrow-docs/docs/docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ dependencies {
compile 'io.arrow-kt:arrow-mtl:0.7.2' //optional
compile 'io.arrow-kt:arrow-effects:0.7.2' //optional
compile 'io.arrow-kt:arrow-effects-rx2:0.7.2' //optional
compile 'io.arrow-kt:arrow-effects-reactor:0.7.2' //optional
compile 'io.arrow-kt:arrow-effects-kotlinx-coroutines:0.7.2' //optional
compile 'io.arrow-kt:arrow-optics:0.7.2' //optional
compile 'io.arrow-kt:arrow-generic:0.7.2' //optional
Expand Down
157 changes: 157 additions & 0 deletions modules/docs/arrow-docs/docs/docs/integrations/reactor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
---
layout: docs
title: Reactor
permalink: /docs/integrations/reactor/
---

## Project Reactor

Arrow aims to enhance the user experience when using Project Reactor. While providing other datatypes that are capable of handling effects, like IO, the style of programming encouraged by the library allows users to generify behavior for any existing abstractions.

One of such abstractions is Project Reactor, a library that, like RxJava, offers reactive streams.

```kotlin
val flux = Flux.just(7, 4, 11 ,3)
.map { it + 1 }
.filter { it % 2 == 0 }
.scan { acc, value -> acc + value }
.collectList()
.subscribeOn(Schedulers.parallel())
.block()
//[8, 20, 24]
```

### Integration with your existing Flux chains

The largest quality of life improvement when using Flux streams in Arrow is the introduction of the [Monad Comprehension]({{ '/docs/patterns/monad_comprehensions' | relative_url }}). This library construct allows expressing asynchronous Flux sequences as synchronous code using binding/bind.

#### Arrow Wrapper

To wrap any existing Flux in its Arrow Wrapper counterpart you can use the extension function `k()`.

```kotlin:ank
import arrow.effects.*
import reactor.core.publisher.*
val flux = Flux.just(1, 2, 3, 4, 5).k()
flux
```

```kotlin:ank
val mono = Mono.just(1).k()
mono
```

You can return to their regular forms using the function `value()`.

```kotlin:ank
flux.value()
```

```kotlin:ank
mono.value()
```

### Observable comprehensions

The library provides instances of [`MonadError`]({{ '/docs/typeclasses/monaderror' | relative_url }}) and [`MonadDefer`]({{ '/docs/effects/monaddefer' | relative_url }}).

[`MonadDefer`]({{ '/docs/effects/async' | relative_url }}) allows you to generify over datatypes that can run asynchronous code. You can use it with `FluxK` or `MonoK`.

```kotlin
fun <F> getSongUrlAsync(MS: MonadDefer<F>) =
MS { getSongUrl() }

val songFlux: FluxKOf<Url> = getSongUrlAsync(FluxK.monadDefer())
val songMono: MonoKOf<Url> = getSongUrlAsync(MonoK.monadDefer())
```

[`MonadError`]({{ '/docs/typeclasses/monaderror' | relative_url }}) can be used to start a [Monad Comprehension]({{ '/docs/patterns/monad_comprehensions' | relative_url }}) using the method `bindingCatch`, with all its benefits.

Let's take an example and convert it to a comprehension. We'll create an observable that loads a song from a remote location, and then reports the current play % every 100 milliseconds until the percentage reaches 100%:

```kotlin
getSongUrlAsync()
.map { songUrl -> MediaPlayer.load(songUrl) }
.flatMap {
val totalTime = musicPlayer.getTotaltime()
Flux.interval(Duration.ofMillis(100))
.flatMap {
Flux.create { musicPlayer.getCurrentTime() }
.map { tick -> (tick / totalTime * 100).toInt() }
}
.takeUntil { percent -> percent >= 100 }
}
```

When rewritten using `bindingCatch` it becomes:

```kotlin
import arrow.effects.*
import arrow.typeclasses.*

ForFluxK extensions {
bindingCatch {
val songUrl = getSongUrlAsync().bind()
val musicPlayer = MediaPlayer.load(songUrl)
val totalTime = musicPlayer.getTotaltime()

val end = DirectProcessor.create<Unit>()
Flux.interval(Duration.ofMillis(100)).takeUntilOther(end).bind()

val tick = musicPlayer.getCurrentTime().bind()
val percent = (tick / totalTime * 100).toInt()
if (percent >= 100) {
end.onNext(Unit)
}

percent
}.fix()
}
```

Note that any unexpected exception, like `AritmeticException` when `totalTime` is 0, is automatically caught and wrapped inside the flux.

### Subscription and cancellation

Flux streams created with comprehensions like `bindingCatch` behave the same way regular flux streams do, including cancellation by disposing the subscription.

```kotlin
val disposable =
songFlux.value()
.subscribe({ println("Song $it") }, { System.err.println("Error $it") })

disposable.dispose()
```
Note that [`MonadDefer`]({{ '/docs/effects/monaddefer' | relative_url }}) provides an alternative to `bindingCatch` called `bindingCancellable` returning a `arrow.Disposable`.
Invoking this `Disposable` causes an `BindingCancellationException` in the chain which needs to be handled by the subscriber, similarly to what `Deferred` does.

```kotlin
val (flux, disposable) =
FluxK.monadDefer().bindingCancellable {
val userProfile = Flux.create { getUserProfile("123") }
val friendProfiles = userProfile.friends().map { friend ->
bindDefer { getProfile(friend.id) }
}
listOf(userProfile) + friendProfiles
}

flux.value()
.subscribe({ println("User $it") }, { System.err.println("Boom! caused by $it") })

disposable()
// Boom! caused by BindingCancellationException
```

## Available Instances

* [Applicative]({{ '/docs/typeclasses/applicative' | relative_url }})
* [ApplicativeError]({{ '/docs/typeclasses/applicativeerror' | relative_url }})
* [Functor]({{ '/docs/typeclasses/functor' | relative_url }})
* [Monad]({{ '/docs/typeclasses/monad' | relative_url }})
* [MonadError]({{ '/docs/typeclasses/monaderror' | relative_url }})
* [MonadDefer]({{ '/docs/effects/monaddefer' | relative_url }})
* [Async]({{ '/docs/effects/async' | relative_url }})
* [Effect]({{ '/docs/effects/effect' | relative_url }})
* [Foldable]({{ '/docs/typeclasses/foldable' | relative_url }})
* [Traverse]({{ '/docs/typeclasses/traverse' | relative_url }})
19 changes: 19 additions & 0 deletions modules/effects/arrow-effects-reactor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
dependencies {
compile project(':arrow-data')
compile project(':arrow-effects')
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlinVersion"
compile project(':arrow-annotations')
kapt project(':arrow-annotations-processor')
kaptTest project(':arrow-annotations-processor')
compileOnly project(':arrow-annotations-processor')
testCompileOnly project(':arrow-annotations-processor')
testCompile "io.kotlintest:kotlintest:$kotlinTestVersion"
testCompile project(':arrow-test')

compile "io.projectreactor:reactor-core:3.1.3.RELEASE"
testCompile "io.projectreactor:reactor-test:3.1.3.RELEASE"
}

apply from: rootProject.file('gradle/gradle-mvn-push.gradle')
apply from: rootProject.file('gradle/generated-kotlin-sources.gradle')
apply plugin: 'kotlin-kapt'
4 changes: 4 additions & 0 deletions modules/effects/arrow-effects-reactor/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Maven publishing configuration
POM_NAME=Arrow-Effects-Reactor
POM_ARTIFACT_ID=arrow-effects-reactor
POM_PACKAGING=jar
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package arrow.effects

import arrow.Kind
import arrow.core.Either
import arrow.core.Eval
import arrow.core.Left
import arrow.core.Right
import arrow.core.identity
import arrow.effects.typeclasses.Proc
import arrow.higherkind
import arrow.typeclasses.Applicative
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink

fun <A> Flux<A>.k(): FluxK<A> = FluxK(this)

fun <A> FluxKOf<A>.value(): Flux<A> =
this.fix().flux

@higherkind
data class FluxK<A>(val flux: Flux<A>) : FluxKOf<A>, FluxKKindedJ<A> {
fun <B> map(f: (A) -> B): FluxK<B> =
flux.map(f).k()

fun <B> ap(fa: FluxKOf<(A) -> B>): FluxK<B> =
flatMap { a -> fa.fix().map { ff -> ff(a) } }

fun <B> flatMap(f: (A) -> FluxKOf<B>): FluxK<B> =
flux.flatMap { f(it).fix().flux }.k()

fun <B> concatMap(f: (A) -> FluxKOf<B>): FluxK<B> =
flux.concatMap { f(it).fix().flux }.k()

fun <B> switchMap(f: (A) -> FluxKOf<B>): FluxK<B> =
flux.switchMap { f(it).fix().flux }.k()

fun <B> foldLeft(b: B, f: (B, A) -> B): B = flux.reduce(b, f).block()

fun <B> foldRight(lb: Eval<B>, f: (A, Eval<B>) -> Eval<B>): Eval<B> {
fun loop(fa_p: FluxK<A>): Eval<B> = when {
fa_p.flux.hasElements().map { !it }.block() -> lb
else -> f(fa_p.flux.blockFirst(), Eval.defer { loop(fa_p.flux.skip(1).k()) })
}

return Eval.defer { loop(this) }
}

fun <G, B> traverse(GA: Applicative<G>, f: (A) -> Kind<G, B>): Kind<G, FluxK<B>> = GA.run {
foldRight(Eval.always { GA.just(Flux.empty<B>().k()) }) { a, eval ->
f(a).map2Eval(eval) { Flux.concat(Flux.just<B>(it.a), it.b.flux).k() }
}.value()
}

fun handleErrorWith(function: (Throwable) -> FluxK<A>): FluxK<A> =
this.fix().flux.onErrorResume { t: Throwable -> function(t).flux }.k()

fun runAsync(cb: (Either<Throwable, A>) -> FluxKOf<Unit>): FluxK<Unit> =
flux.flatMap { cb(Right(it)).value() }.onErrorResume { cb(Left(it)).value() }.k()

companion object {
fun <A> just(a: A): FluxK<A> =
Flux.just(a).k()

fun <A> raiseError(t: Throwable): FluxK<A> =
Flux.error<A>(t).k()

operator fun <A> invoke(fa: () -> A): FluxK<A> =
defer { just(fa()) }

fun <A> defer(fa: () -> FluxKOf<A>): FluxK<A> =
Flux.defer { fa().value() }.k()

fun <A> runAsync(fa: Proc<A>): FluxK<A> =
Flux.create { emitter: FluxSink<A> ->
fa { either: Either<Throwable, A> ->
either.fold({
emitter.error(it)
}, {
emitter.next(it)
emitter.complete()
})
}
}.k()

tailrec fun <A, B> tailRecM(a: A, f: (A) -> FluxKOf<Either<A, B>>): FluxK<B> {
val either = f(a).fix().value().blockFirst()
return when (either) {
is Either.Left -> tailRecM(either.a, f)
is Either.Right -> Flux.just(either.b).k()
}
}

fun monadFlat(): FluxKMonadInstance = monad()

fun monadConcat(): FluxKMonadInstance = object : FluxKMonadInstance {
override fun <A, B> Kind<ForFluxK, A>.flatMap(f: (A) -> Kind<ForFluxK, B>): FluxK<B> =
fix().concatMap { f(it).fix() }
}

fun monadSwitch(): FluxKMonadInstance = object : FluxKMonadErrorInstance {
override fun <A, B> Kind<ForFluxK, A>.flatMap(f: (A) -> Kind<ForFluxK, B>): FluxK<B> =
fix().switchMap { f(it).fix() }
}

fun monadErrorFlat(): FluxKMonadErrorInstance = monadError()

fun monadErrorConcat(): FluxKMonadErrorInstance = object : FluxKMonadErrorInstance {
override fun <A, B> Kind<ForFluxK, A>.flatMap(f: (A) -> Kind<ForFluxK, B>): FluxK<B> =
fix().concatMap { f(it).fix() }
}

fun monadErrorSwitch(): FluxKMonadErrorInstance = object : FluxKMonadErrorInstance {
override fun <A, B> Kind<ForFluxK, A>.flatMap(f: (A) -> Kind<ForFluxK, B>): FluxK<B> =
fix().switchMap { f(it).fix() }
}
}
}

inline fun <A, G> FluxKOf<Kind<G, A>>.sequence(GA: Applicative<G>): Kind<G, FluxK<A>> =
fix().traverse(GA, ::identity)
Loading

0 comments on commit d1f9000

Please sign in to comment.