Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactor Integration #706

Merged
merged 21 commits into from
Jun 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dependencies {
compile 'io.arrow-kt:arrow-mtl:0.7.2' //optional
compile 'io.arrow-kt:arrow-effects:0.7.2' //optional
compile 'io.arrow-kt:arrow-effects-rx2:0.7.2' //optional
compile 'io.arrow-kt:arrow-effects-reactor:0.7.2' //optional
compile 'io.arrow-kt:arrow-effects-kotlinx-coroutines:0.7.2' //optional
compile 'io.arrow-kt:arrow-optics:0.7.2' //optional
compile 'io.arrow-kt:arrow-generic:0.7.2' //optional
Expand Down
60 changes: 30 additions & 30 deletions infographic/arrow-infographic.txt

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions modules/docs/arrow-docs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies {
compile project(':arrow-instances-data')
compile project(':arrow-effects')
compile project(':arrow-effects-rx2')
compile project(':arrow-effects-reactor')
compile project(':arrow-effects-kotlinx-coroutines')
compile project(':arrow-optics')
compile project(':arrow-recursion')
Expand All @@ -27,6 +28,7 @@ dependencies {

compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion"
compile "io.reactivex.rxjava2:rxjava:2.1.13"
compile "io.projectreactor:reactor-core:3.1.3.RELEASE"
}

task printcp {
Expand Down
3 changes: 3 additions & 0 deletions modules/docs/arrow-docs/docs/_data/menu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ options:
- title: Rx2
url: /docs/integrations/rx2/

- title: Reactor
url: /docs/integrations/reactor/

- title: kotlinx.coroutines
url: /docs/integrations/kotlinxcoroutines/

Expand Down
1 change: 1 addition & 0 deletions modules/docs/arrow-docs/docs/docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ dependencies {
compile 'io.arrow-kt:arrow-mtl:0.7.2' //optional
compile 'io.arrow-kt:arrow-effects:0.7.2' //optional
compile 'io.arrow-kt:arrow-effects-rx2:0.7.2' //optional
compile 'io.arrow-kt:arrow-effects-reactor:0.7.2' //optional
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure to add this line to the README in the root of the repo. It isn't the same as this one, sadly.

compile 'io.arrow-kt:arrow-effects-kotlinx-coroutines:0.7.2' //optional
compile 'io.arrow-kt:arrow-optics:0.7.2' //optional
compile 'io.arrow-kt:arrow-generic:0.7.2' //optional
Expand Down
157 changes: 157 additions & 0 deletions modules/docs/arrow-docs/docs/docs/integrations/reactor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
---
layout: docs
title: Reactor
permalink: /docs/integrations/reactor/
---

## Project Reactor

Arrow aims to enhance the user experience when using Project Reactor. While providing other datatypes that are capable of handling effects, like IO, the style of programming encouraged by the library allows users to generify behavior for any existing abstractions.

One of such abstractions is Project Reactor, a library that, like RxJava, offers reactive streams.

```kotlin
val flux = Flux.just(7, 4, 11 ,3)
.map { it + 1 }
.filter { it % 2 == 0 }
.scan { acc, value -> acc + value }
.collectList()
.subscribeOn(Schedulers.parallel())
.block()
//[8, 20, 24]
```

### Integration with your existing Flux chains

The largest quality of life improvement when using Flux streams in Arrow is the introduction of the [Monad Comprehension]({{ '/docs/patterns/monad_comprehensions' | relative_url }}). This library construct allows expressing asynchronous Flux sequences as synchronous code using binding/bind.

#### Arrow Wrapper

To wrap any existing Flux in its Arrow Wrapper counterpart you can use the extension function `k()`.

```kotlin:ank
import arrow.effects.*
import reactor.core.publisher.*

val flux = Flux.just(1, 2, 3, 4, 5).k()
flux
```

```kotlin:ank
val mono = Mono.just(1).k()
mono
```

You can return to their regular forms using the function `value()`.

```kotlin:ank
flux.value()
```

```kotlin:ank
mono.value()
```

### Observable comprehensions

The library provides instances of [`MonadError`]({{ '/docs/typeclasses/monaderror' | relative_url }}) and [`MonadDefer`]({{ '/docs/effects/monaddefer' | relative_url }}).

[`MonadDefer`]({{ '/docs/effects/async' | relative_url }}) allows you to generify over datatypes that can run asynchronous code. You can use it with `FluxK` or `MonoK`.

```kotlin
fun <F> getSongUrlAsync(MS: MonadDefer<F>) =
MS { getSongUrl() }

val songFlux: FluxKOf<Url> = getSongUrlAsync(FluxK.monadDefer())
val songMono: MonoKOf<Url> = getSongUrlAsync(MonoK.monadDefer())
```

[`MonadError`]({{ '/docs/typeclasses/monaderror' | relative_url }}) can be used to start a [Monad Comprehension]({{ '/docs/patterns/monad_comprehensions' | relative_url }}) using the method `bindingCatch`, with all its benefits.

Let's take an example and convert it to a comprehension. We'll create an observable that loads a song from a remote location, and then reports the current play % every 100 milliseconds until the percentage reaches 100%:

```kotlin
getSongUrlAsync()
.map { songUrl -> MediaPlayer.load(songUrl) }
.flatMap {
val totalTime = musicPlayer.getTotaltime()
Flux.interval(Duration.ofMillis(100))
.flatMap {
Flux.create { musicPlayer.getCurrentTime() }
.map { tick -> (tick / totalTime * 100).toInt() }
}
.takeUntil { percent -> percent >= 100 }
}
```

When rewritten using `bindingCatch` it becomes:

```kotlin
import arrow.effects.*
import arrow.typeclasses.*

ForFluxK extensions {
bindingCatch {
val songUrl = getSongUrlAsync().bind()
val musicPlayer = MediaPlayer.load(songUrl)
val totalTime = musicPlayer.getTotaltime()

val end = DirectProcessor.create<Unit>()
Flux.interval(Duration.ofMillis(100)).takeUntilOther(end).bind()

val tick = musicPlayer.getCurrentTime().bind()
val percent = (tick / totalTime * 100).toInt()
if (percent >= 100) {
end.onNext(Unit)
}

percent
}.fix()
}
```

Note that any unexpected exception, like `AritmeticException` when `totalTime` is 0, is automatically caught and wrapped inside the flux.

### Subscription and cancellation

Flux streams created with comprehensions like `bindingCatch` behave the same way regular flux streams do, including cancellation by disposing the subscription.

```kotlin
val disposable =
songFlux.value()
.subscribe({ println("Song $it") }, { System.err.println("Error $it") })

disposable.dispose()
```
Note that [`MonadDefer`]({{ '/docs/effects/monaddefer' | relative_url }}) provides an alternative to `bindingCatch` called `bindingCancellable` returning a `arrow.Disposable`.
Invoking this `Disposable` causes an `BindingCancellationException` in the chain which needs to be handled by the subscriber, similarly to what `Deferred` does.

```kotlin
val (flux, disposable) =
FluxK.monadDefer().bindingCancellable {
val userProfile = Flux.create { getUserProfile("123") }
val friendProfiles = userProfile.friends().map { friend ->
bindDefer { getProfile(friend.id) }
}
listOf(userProfile) + friendProfiles
}

flux.value()
.subscribe({ println("User $it") }, { System.err.println("Boom! caused by $it") })

disposable()
// Boom! caused by BindingCancellationException
```

## Available Instances

* [Applicative]({{ '/docs/typeclasses/applicative' | relative_url }})
* [ApplicativeError]({{ '/docs/typeclasses/applicativeerror' | relative_url }})
* [Functor]({{ '/docs/typeclasses/functor' | relative_url }})
* [Monad]({{ '/docs/typeclasses/monad' | relative_url }})
* [MonadError]({{ '/docs/typeclasses/monaderror' | relative_url }})
* [MonadDefer]({{ '/docs/effects/monaddefer' | relative_url }})
* [Async]({{ '/docs/effects/async' | relative_url }})
* [Effect]({{ '/docs/effects/effect' | relative_url }})
* [Foldable]({{ '/docs/typeclasses/foldable' | relative_url }})
* [Traverse]({{ '/docs/typeclasses/traverse' | relative_url }})
19 changes: 19 additions & 0 deletions modules/effects/arrow-effects-reactor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
dependencies {
compile project(':arrow-data')
compile project(':arrow-effects')
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlinVersion"
compile project(':arrow-annotations')
kapt project(':arrow-annotations-processor')
kaptTest project(':arrow-annotations-processor')
compileOnly project(':arrow-annotations-processor')
testCompileOnly project(':arrow-annotations-processor')
testCompile "io.kotlintest:kotlintest:$kotlinTestVersion"
testCompile project(':arrow-test')

compile "io.projectreactor:reactor-core:3.1.3.RELEASE"
testCompile "io.projectreactor:reactor-test:3.1.3.RELEASE"
}

apply from: rootProject.file('gradle/gradle-mvn-push.gradle')
apply from: rootProject.file('gradle/generated-kotlin-sources.gradle')
apply plugin: 'kotlin-kapt'
4 changes: 4 additions & 0 deletions modules/effects/arrow-effects-reactor/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Maven publishing configuration
POM_NAME=Arrow-Effects-Reactor
POM_ARTIFACT_ID=arrow-effects-reactor
POM_PACKAGING=jar
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package arrow.effects

import arrow.Kind
import arrow.core.Either
import arrow.core.Eval
import arrow.core.Left
import arrow.core.Right
import arrow.core.identity
import arrow.effects.typeclasses.Proc
import arrow.higherkind
import arrow.typeclasses.Applicative
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink

fun <A> Flux<A>.k(): FluxK<A> = FluxK(this)

fun <A> FluxKOf<A>.value(): Flux<A> =
this.fix().flux

@higherkind
data class FluxK<A>(val flux: Flux<A>) : FluxKOf<A>, FluxKKindedJ<A> {
fun <B> map(f: (A) -> B): FluxK<B> =
flux.map(f).k()

fun <B> ap(fa: FluxKOf<(A) -> B>): FluxK<B> =
flatMap { a -> fa.fix().map { ff -> ff(a) } }

fun <B> flatMap(f: (A) -> FluxKOf<B>): FluxK<B> =
flux.flatMap { f(it).fix().flux }.k()

fun <B> concatMap(f: (A) -> FluxKOf<B>): FluxK<B> =
flux.concatMap { f(it).fix().flux }.k()

fun <B> switchMap(f: (A) -> FluxKOf<B>): FluxK<B> =
flux.switchMap { f(it).fix().flux }.k()

fun <B> foldLeft(b: B, f: (B, A) -> B): B = flux.reduce(b, f).block()

fun <B> foldRight(lb: Eval<B>, f: (A, Eval<B>) -> Eval<B>): Eval<B> {
fun loop(fa_p: FluxK<A>): Eval<B> = when {
fa_p.flux.hasElements().map { !it }.block() -> lb
else -> f(fa_p.flux.blockFirst(), Eval.defer { loop(fa_p.flux.skip(1).k()) })
}

return Eval.defer { loop(this) }
}

fun <G, B> traverse(GA: Applicative<G>, f: (A) -> Kind<G, B>): Kind<G, FluxK<B>> = GA.run {
foldRight(Eval.always { GA.just(Flux.empty<B>().k()) }) { a, eval ->
f(a).map2Eval(eval) { Flux.concat(Flux.just<B>(it.a), it.b.flux).k() }
}.value()
}

fun handleErrorWith(function: (Throwable) -> FluxK<A>): FluxK<A> =
this.fix().flux.onErrorResume { t: Throwable -> function(t).flux }.k()

fun runAsync(cb: (Either<Throwable, A>) -> FluxKOf<Unit>): FluxK<Unit> =
flux.flatMap { cb(Right(it)).value() }.onErrorResume { cb(Left(it)).value() }.k()

companion object {
fun <A> just(a: A): FluxK<A> =
Flux.just(a).k()

fun <A> raiseError(t: Throwable): FluxK<A> =
Flux.error<A>(t).k()

operator fun <A> invoke(fa: () -> A): FluxK<A> =
defer { just(fa()) }

fun <A> defer(fa: () -> FluxKOf<A>): FluxK<A> =
Flux.defer { fa().value() }.k()

fun <A> runAsync(fa: Proc<A>): FluxK<A> =
Flux.create { emitter: FluxSink<A> ->
fa { either: Either<Throwable, A> ->
either.fold({
emitter.error(it)
}, {
emitter.next(it)
emitter.complete()
})
}
}.k()

tailrec fun <A, B> tailRecM(a: A, f: (A) -> FluxKOf<Either<A, B>>): FluxK<B> {
val either = f(a).fix().value().blockFirst()
return when (either) {
is Either.Left -> tailRecM(either.a, f)
is Either.Right -> Flux.just(either.b).k()
}
}

fun monadFlat(): FluxKMonadInstance = monad()

fun monadConcat(): FluxKMonadInstance = object : FluxKMonadInstance {
override fun <A, B> Kind<ForFluxK, A>.flatMap(f: (A) -> Kind<ForFluxK, B>): FluxK<B> =
fix().concatMap { f(it).fix() }
}

fun monadSwitch(): FluxKMonadInstance = object : FluxKMonadErrorInstance {
override fun <A, B> Kind<ForFluxK, A>.flatMap(f: (A) -> Kind<ForFluxK, B>): FluxK<B> =
fix().switchMap { f(it).fix() }
}

fun monadErrorFlat(): FluxKMonadErrorInstance = monadError()

fun monadErrorConcat(): FluxKMonadErrorInstance = object : FluxKMonadErrorInstance {
override fun <A, B> Kind<ForFluxK, A>.flatMap(f: (A) -> Kind<ForFluxK, B>): FluxK<B> =
fix().concatMap { f(it).fix() }
}

fun monadErrorSwitch(): FluxKMonadErrorInstance = object : FluxKMonadErrorInstance {
override fun <A, B> Kind<ForFluxK, A>.flatMap(f: (A) -> Kind<ForFluxK, B>): FluxK<B> =
fix().switchMap { f(it).fix() }
}
}
}

inline fun <A, G> FluxKOf<Kind<G, A>>.sequence(GA: Applicative<G>): Kind<G, FluxK<A>> =
fix().traverse(GA, ::identity)
Loading