Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedule.repeatAsFlow #2676

Merged
merged 27 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f6f5af8
add recursAndCollect
i-walker Feb 27, 2022
b52345e
cleanUp
i-walker Feb 27, 2022
172df3a
simplify impl
i-walker Feb 27, 2022
83f80c4
Schedule -> Flow
i-walker Feb 28, 2022
779c2ce
rm recurseAndCollect
i-walker Feb 28, 2022
ad6053d
Update API files
i-walker Feb 28, 2022
a2a7558
rm recurseAndCollect and add Validated version
i-walker Mar 1, 2022
f940f44
Merge remote-tracking branch 'origin/schedule-add' into schedule-add
i-walker Mar 1, 2022
a77bfe4
Merge remote-tracking branch 'origin/main' into schedule-add
i-walker Mar 1, 2022
89b9860
rm repeatAsFlowValidated
i-walker Mar 1, 2022
43b4a58
change api files
i-walker Mar 1, 2022
7e2aa5f
Merge remote-tracking branch 'origin/main' into schedule-add
i-walker Mar 1, 2022
dd3b1bb
rm docs
i-walker Mar 1, 2022
d209eed
add repeatAsFlow tests and Docs
i-walker Mar 1, 2022
4ce8055
clean up
i-walker Mar 1, 2022
70d6156
fix tests
i-walker Mar 2, 2022
e2e90a6
Use Boolean instead of Atomic
i-walker Mar 2, 2022
7cee181
Merge branch 'main' into schedule-add
i-walker Mar 2, 2022
bf31b5d
add last output in `orElse`
i-walker Mar 2, 2022
b998bc9
Update API files
i-walker Mar 2, 2022
653762a
fix
i-walker Mar 2, 2022
f2419d2
fix imports/ depreactions
i-walker Mar 2, 2022
66facbf
Merge branch 'main' into schedule-add
i-walker Mar 2, 2022
a5b4249
Merge branch 'main' into schedule-add
i-walker Mar 8, 2022
0fc7a1d
adjust names
i-walker Mar 8, 2022
79a30de
Merge branch 'main' into schedule-add
i-walker Mar 9, 2022
847e3d5
Merge branch 'main' into schedule-add
nomisRev Mar 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package arrow.fx.coroutines

import arrow.core.Either
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
import io.kotest.property.arbitrary.int
Expand Down
3 changes: 3 additions & 0 deletions arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,11 @@ public abstract class arrow/fx/coroutines/Schedule {
public final fun or (Larrow/fx/coroutines/Schedule;)Larrow/fx/coroutines/Schedule;
public abstract fun pipe (Larrow/fx/coroutines/Schedule;)Larrow/fx/coroutines/Schedule;
public final fun repeat (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun repeatAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun repeatOrElse (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun repeatOrElseAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun repeatOrElseEither (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun repeatOrElseEitherAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun untilInput (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule;
public final fun untilOutput (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule;
public final fun void ()Larrow/fx/coroutines/Schedule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import kotlin.random.Random
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
import kotlin.time.nanoseconds
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.retry

/**
* # Retrying and repeating effects
Expand Down Expand Up @@ -217,6 +221,28 @@ public sealed class Schedule<Input, Output> {
public suspend fun repeatOrElse(fa: suspend () -> Input, orElse: suspend (Throwable, Output?) -> Output): Output =
repeatOrElseEither(fa, orElse).fold(::identity, ::identity)

public abstract suspend fun <C> repeatOrElseEitherAsFlow(
fa: suspend () -> Input,
orElse: suspend (Throwable, Output?) -> C
): Flow<Either<C, Output>>

/**
* Runs this effect and emits the output, if it succeeded, decide using the provided policy if the effect should be repeated and emitted, if so, with how much delay.
* This will raise an error if a repeat failed.
*/
public suspend fun repeatAsFlow(fa: suspend () -> Input): Flow<Output> =
repeatOrElseAsFlow(fa) { e, _ -> throw e }

/**
* Runs this effect and emits the output, if it succeeded, decide using the provided policy if the effect should be repeated and emitted, if so, with how much delay.
* Also offers a function to handle errors if they are encountered during repetition.
*/
public suspend fun repeatOrElseAsFlow(
fa: suspend () -> Input,
orElse: suspend (Throwable, Output?) -> Output
): Flow<Output> =
repeatOrElseEitherAsFlow(fa, orElse).map { it.fold(::identity, ::identity) }

/**
* Changes the output of a schedule. Does not alter the decision of the schedule.
*/
Expand Down Expand Up @@ -292,7 +318,10 @@ public sealed class Schedule<Input, Output> {
/**
* Accumulates the results of a schedule by folding over them effectfully.
*/
public abstract fun <C> foldLazy(initial: suspend () -> C, f: suspend (acc: C, output: Output) -> C): Schedule<Input, C>
public abstract fun <C> foldLazy(
initial: suspend () -> C,
f: suspend (acc: C, output: Output) -> C
): Schedule<Input, C>

/**
* Composes this schedule with the other schedule by piping the output of this schedule
Expand Down Expand Up @@ -458,6 +487,38 @@ public sealed class Schedule<Input, Output> {
}
}

override suspend fun <C> repeatOrElseEitherAsFlow(
fa: suspend () -> Input,
orElse: suspend (Throwable, Output?) -> C
): Flow<Either<C, Output>> =
flow {
var loop = true
var last: (() -> Output)? = null // We haven't seen any input yet
var state: State = initialState.invoke()

while (loop) {
coroutineContext.ensureActive()
try {
val a = fa.invoke()
val step = update(a, state)
if (!step.cont) {
emit(Either.Right(step.finish.value()))
loop = false
} else {
delay((step.delayInNanos / 1_000_000).toLong())
val output = step.finish.value()
// Set state before looping again and emit Output
emit(Either.Right(output))
last = { output }
state = step.state
}
} catch (e: Throwable) {
emit(Either.Left(orElse(e.nonFatalOrThrow(), last?.invoke())))
loop = false
}
}
}

override fun <B> map(f: (output: Output) -> B): Schedule<Input, B> =
ScheduleImpl(initialState) { i, s -> update(i, s).map(f) }

Expand Down Expand Up @@ -621,7 +682,12 @@ public sealed class Schedule<Input, Output> {
/**
* A single decision. Contains the decision to continue, the delay, the new state and the (lazy) result of a Schedule.
*/
public data class Decision<out A, out B>(val cont: Boolean, val delayInNanos: Double, val state: A, val finish: Eval<B>) {
public data class Decision<out A, out B>(
val cont: Boolean,
val delayInNanos: Double,
val state: A,
val finish: Eval<B>
) {

@ExperimentalTime
val duration: Duration
Expand All @@ -638,6 +704,7 @@ public sealed class Schedule<Input, Output> {

public fun <D> map(g: (B) -> D): Decision<A, D> =
bimap(::identity, g)

public fun <C, D, E> combineNanos(
other: Decision<C, D>,
f: (Boolean, Boolean) -> Boolean,
Expand Down Expand Up @@ -736,7 +803,7 @@ public sealed class Schedule<Input, Output> {
unfold(0) { it + 1 }

/**
* Creates a Schedule that continues n times and returns the number of iterations.
* Creates a Schedule that continues [n] times and returns the number of iterations.
*/
public fun <A> recurs(n: Int): Schedule<A, Int> =
Schedule(suspend { 0 }) { _: A, acc ->
Expand Down Expand Up @@ -940,7 +1007,10 @@ public suspend fun <A, B> Schedule<Throwable, B>.retry(fa: suspend () -> A): A =
* Runs an effect and, if it fails, decide using the provided policy if the effect should be retried and if so, with how much delay.
* Also offers a function to handle errors if they are encountered during retrial.
*/
public suspend fun <A, B> Schedule<Throwable, B>.retryOrElse(fa: suspend () -> A, orElse: suspend (Throwable, B) -> A): A =
public suspend fun <A, B> Schedule<Throwable, B>.retryOrElse(
fa: suspend () -> A,
orElse: suspend (Throwable, B) -> A
): A =
retryOrElseEither(fa, orElse).fold(::identity, ::identity)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ import io.kotest.matchers.should
import io.kotest.matchers.shouldBe
import kotlinx.coroutines.withTimeoutOrNull
import kotlin.math.pow
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.nanoseconds
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime
import kotlin.time.milliseconds
import kotlin.time.nanoseconds
import kotlin.time.seconds
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.zip

@ExperimentalTime
class ScheduleTest : ArrowFxSpec(
Expand Down Expand Up @@ -173,6 +178,10 @@ class ScheduleTest : ArrowFxSpec(
checkRepeat(Schedule.recurs(20_000), expected = 20_000)
}

"repeatAsFlow is stack-safe" {
checkRepeatAsFlow(Schedule.recurs(500_000), expected = (1..500_000).asFlow())
}

"repeat" {
val stop = RuntimeException("WOOO")
val dec = Schedule.Decision(true, 10.0, 0, Eval.now("state"))
Expand All @@ -192,15 +201,48 @@ class ScheduleTest : ArrowFxSpec(
l shouldBe Either.Left(stop)
}

"repeatAsFlow" {
val stop = RuntimeException("WOOO")
val dec = Schedule.Decision(true, 10.0, 0, Eval.now("state"))
val n = 100
val schedule = Schedule({ 0 }) { _: Unit, _ -> dec }

val eff = SideEffect()

val l = Either.catch {
schedule.repeatAsFlow {
if (eff.counter >= n) throw stop
else eff.increment()
}.collect()
}

eff.counter shouldBe 100
l shouldBe Either.Left(stop)
}

"repeat fails fast on errors" {
val ex = Throwable("Hello")
Schedule.recurs<Int>(0).repeatOrElseEither({ throw ex }) { exc, _ -> exc }
.fold({ it shouldBe ex }, { fail("The impossible happened") })
}

"repeatAsFlow fails fast on errors" {
val ex = Throwable("Hello")
Schedule.recurs<Int>(0).repeatOrElseEitherAsFlow({ throw ex }, { t, _ -> t })
.collect { either -> either.fold({ it shouldBe ex }, { fail("The impossible happened") }) }
}

"repeat should run the schedule with the correct input" {
var i = 0
(Schedule.recurs<Int>(10).zipRight(Schedule.collect())).repeat { i++ } shouldBe (0..10).toList()
val n = 10
(Schedule.recurs<Int>(n).zipRight(Schedule.collect())).repeat { i++ } shouldBe (0..n).toList()
}

"repeatAsFlow should run the schedule with the correct input" {
var i = 0
val n = 10
(Schedule.recurs<Int>(n).zipRight(Schedule.collect())).repeatAsFlow { i++ }.toList() shouldBe
(0..n).map { (0..it).toList() }
}

"retry is stack-safe" {
Expand Down Expand Up @@ -291,6 +333,14 @@ private suspend fun <B> checkRepeat(schedule: Schedule<Int, B>, expected: B): Un
} shouldBe expected
}

private suspend fun <B> checkRepeatAsFlow(schedule: Schedule<Int, B>, expected: Flow<B>): Unit {
val count = Atomic(0)
schedule.repeatAsFlow {
count.updateAndGet { it + 1 }
}.zip(expected, ::Pair)
.collect { (a, b) -> a shouldBe b }
}

@ExperimentalTime
private infix fun <A> Schedule.Decision<Any?, A>.eqv(other: Schedule.Decision<Any?, A>): Unit {
require(cont == other.cont) { "Decision#cont: ${this.cont} shouldBe ${other.cont}" }
Expand Down