/
SingleK.kt
71 lines (54 loc) · 2.13 KB
/
SingleK.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package arrow.effects
import arrow.core.Either
import arrow.core.Left
import arrow.core.Right
import arrow.effects.CoroutineContextRx2Scheduler.asScheduler
import arrow.effects.typeclasses.Proc
import arrow.higherkind
import io.reactivex.Single
import io.reactivex.SingleEmitter
import kotlin.coroutines.experimental.CoroutineContext
fun <A> Single<A>.k(): SingleK<A> = SingleK(this)
fun <A> SingleKOf<A>.value(): Single<A> = this.fix().single
@higherkind
data class SingleK<A>(val single: Single<A>) : SingleKOf<A>, SingleKKindedJ<A> {
fun <B> map(f: (A) -> B): SingleK<B> =
single.map(f).k()
fun <B> ap(fa: SingleKOf<(A) -> B>): SingleK<B> =
flatMap { a -> fa.fix().map { ff -> ff(a) } }
fun <B> flatMap(f: (A) -> SingleKOf<B>): SingleK<B> =
single.flatMap { f(it).fix().single }.k()
fun handleErrorWith(function: (Throwable) -> SingleK<A>): SingleK<A> =
single.onErrorResumeNext { t: Throwable -> function(t).single }.k()
fun continueOn(ctx: CoroutineContext): SingleK<A> =
single.observeOn(ctx.asScheduler()).k()
fun runAsync(cb: (Either<Throwable, A>) -> SingleKOf<Unit>): SingleK<Unit> =
single.flatMap { cb(Right(it)).value() }.onErrorResumeNext(io.reactivex.functions.Function { cb(Left(it)).value() }).k()
companion object {
fun <A> just(a: A): SingleK<A> =
Single.just(a).k()
fun <A> raiseError(t: Throwable): SingleK<A> =
Single.error<A>(t).k()
operator fun <A> invoke(fa: () -> A): SingleK<A> =
defer { just(fa()) }
fun <A> defer(fa: () -> SingleKOf<A>): SingleK<A> =
Single.defer { fa().value() }.k()
fun <A> async(fa: Proc<A>): SingleK<A> =
Single.create({ emitter: SingleEmitter<A> ->
fa { either: Either<Throwable, A> ->
either.fold({
emitter.onError(it)
}, {
emitter.onSuccess(it)
})
}
}).k()
tailrec fun <A, B> tailRecM(a: A, f: (A) -> SingleKOf<Either<A, B>>): SingleK<B> {
val either = f(a).fix().value().blockingGet()
return when (either) {
is Either.Left -> tailRecM(either.a, f)
is Either.Right -> Single.just(either.b).k()
}
}
}
}