Skip to content

Commit

Permalink
Fix a few issues with Stream#repeat and repeatElems
Browse files Browse the repository at this point in the history
  • Loading branch information
iravid committed May 26, 2019
1 parent e2691dc commit 67fdc29
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 6 deletions.
47 changes: 47 additions & 0 deletions streams/jvm/src/test/scala/scalaz/zio/stream/StreamSpec.scala
Expand Up @@ -28,6 +28,13 @@ class StreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)
Stream.unfold $unfold
Stream.unfoldM $unfoldM
Stream.range $range
Stream.repeat
repeat $repeat
short circuits $repeatShortCircuits
Stream.repeatElems
repeatElems $repeatElems
short circuits $repeatElemsShortCircuits

Stream.take
take $take
Expand Down Expand Up @@ -588,4 +595,44 @@ class StreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)
list.reverse must_=== (1 to 4).toList
}
)

import scalaz.zio.duration._

private def repeat =
unsafeRun(
Stream(1)
.repeat(Schedule.recurs(4))
.run(Sink.collect[Int])
.map(_ must_=== List(1, 1, 1, 1, 1))
)

private def repeatShortCircuits =
unsafeRun(
for {
ref <- Ref.make[List[Int]](Nil)
_ <- Stream
.fromEffect(ref.update(1 :: _))
.repeat(Schedule.spaced(10.millis))
.take(2)
.run(Sink.drain)
result <- ref.get
} yield result must_=== List(1, 1)
)

private def repeatElems =
unsafeRun(
Stream(1, 2, 3)
.repeatElems(Schedule.recurs(1))
.run(Sink.collect[Int])
.map(_ must_=== List(1, 1, 2, 2, 3, 3))
)

private def repeatElemsShortCircuits =
unsafeRun(
Stream(1, 2, 3)
.repeatElems(Schedule.recurs(1))
.take(3)
.run(Sink.collect[Int])
.map(_ must_=== List(1, 1, 2))
)
}
15 changes: 9 additions & 6 deletions streams/shared/src/main/scala/scalaz/stream/ZStream.scala
Expand Up @@ -396,7 +396,7 @@ trait ZStream[-R, +E, +A] extends Serializable { self =>
self.fold[R2, E1, A1, S].flatMap { f0 =>
f0(s, cont, f).zip(schedule.update((), sched)).flatMap {
case (s, decision) =>
if (decision.cont) IO.unit.delay(decision.delay) *> loop(s, decision.state)
if (decision.cont && cont(s)) IO.unit.delay(decision.delay) *> loop(s, decision.state)
else IO.succeed(s)
}
}
Expand All @@ -413,11 +413,14 @@ trait ZStream[-R, +E, +A] extends Serializable { self =>
override def fold[R2 <: R1 with Clock, E1 >: E, A1 >: A, S]: Fold[R2, E1, A1, S] =
IO.succeedLazy { (s, cont, f) =>
def loop(s: S, sched: schedule.State, a: A): ZIO[R2, E1, S] =
schedule.update(a, sched).flatMap { decision =>
if (decision.cont)
IO.unit.delay(decision.delay) *> f(s, a).flatMap(loop(_, decision.state, a))
else IO.succeed(s)
}
if (!cont(s)) ZIO.succeed(s)
else
f(s, a).zip(schedule.update(a, sched)).flatMap {
case (s, decision) =>
if (decision.cont && cont(s))
IO.unit.delay(decision.delay) *> loop(s, decision.state, a)
else IO.succeed(s)
}

schedule.initial.flatMap { sched =>
self.fold[R2, E1, A, S].flatMap { f =>
Expand Down

0 comments on commit 67fdc29

Please sign in to comment.