Skip to content

Commit

Permalink
(WIP) monix#166 Generalize Task.sequence for arbitrary collections
Browse files Browse the repository at this point in the history
  • Loading branch information
guersam committed Jun 29, 2016
1 parent fd67eed commit c63ad81
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 15 deletions.
17 changes: 15 additions & 2 deletions monix-eval/shared/src/main/scala/monix/eval/Coeval.scala
Expand Up @@ -21,6 +21,7 @@ import monix.eval.Coeval._
import monix.types.{Bimonad, Evaluable}
import scala.annotation.tailrec
import scala.collection.mutable
import scala.collection.generic.CanBuildFrom
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -335,8 +336,20 @@ object Coeval {
*
* For [[Coeval]] this has the same behavior as [[zipList]].
*/
def sequence[A](sources: Seq[Coeval[A]]): Coeval[List[A]] =
zipList(sources)
def sequence[A, M[X] <: TraversableOnce[X]](sources: M[Coeval[A]])
(implicit cbf: CanBuildFrom[M[Coeval[A]], A, M[A]]): Coeval[M[A]] = {
val init = evalAlways(cbf(sources))
val r = sources.foldLeft(init)((acc,elem) => acc.zipWith(elem)(_ += _))
r.map(_.result())
}

def traverse[A, B, M[X] <: TraversableOnce[X]](sources: M[A])
(f: A => Coeval[B])
(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Coeval[M[B]] = {
val init = evalAlways(cbf(sources))
val r = sources.foldLeft(init)((acc,elem) => acc.zipWith(f(elem))(_ += _))
r.map(_.result())
}

/** Zips together multiple [[Coeval]] instances. */
def zipList[A](sources: Seq[Coeval[A]]): Coeval[List[A]] = {
Expand Down
35 changes: 24 additions & 11 deletions monix-eval/shared/src/main/scala/monix/eval/Task.scala
Expand Up @@ -28,6 +28,7 @@ import org.reactivestreams.Subscriber
import monix.execution.atomic.{Atomic, AtomicAny}
import scala.annotation.tailrec
import scala.collection.mutable
import scala.collection.generic.CanBuildFrom
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Future, Promise, TimeoutException}
import scala.util.control.NonFatal
Expand Down Expand Up @@ -775,10 +776,19 @@ object Task extends TaskInstances {
* both effects and results will be ordered. See [[gather]] and [[gatherUnordered]]
* for unordered results or effects, and thus potential of running in paralel.
*/
def sequence[A](in: Seq[Task[A]]): Task[List[A]] = {
val init = evalAlways(mutable.ListBuffer.empty[A])
def sequence[A, M[X] <: TraversableOnce[X]](in: M[Task[A]])
(implicit cbf: CanBuildFrom[M[Task[A]], A, M[A]]): Task[M[A]] = {
val init = evalAlways(cbf(in))
val r = in.foldLeft(init)((acc,elem) => acc.flatMap(lb => elem.map(e => lb += e)))
r.map(_.toList)
r.map(_.result())
}

def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])
(f: A => Task[B])
(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Task[M[B]] = {
val init = evalAlways(cbf(in))
val r = in.foldLeft(init)((acc,elem) => acc.flatMap(lb => f(elem).map(e => lb += e)))
r.map(_.result())
}

/** Nondeterministically gather results from the given sequence of tasks,
Expand All @@ -793,11 +803,13 @@ object Task extends TaskInstances {
*
* Although the effects are unordered, we ensure the order of results
* matches the order of the input sequence. Also see [[gatherUnordered]].
*
* Alias for [[zipList]].
*/
def gather[A](in: Seq[Task[A]]): Task[List[A]] =
zipList(in)
def gather[A, M[X] <: TraversableOnce[X]](in: M[Task[A]])
(implicit cbf: CanBuildFrom[M[Task[A]], A, M[A]]): Task[M[A]] = {
val init = evalAlways(cbf(in))
val r = in.foldLeft(init)((acc,elem) => Task.mapBoth(acc,elem)(_ += _))
r.map(_.result())
}

/** Nondeterministically gather results from the given sequence of tasks
* to a sequence, without keeping the original ordering of results.
Expand All @@ -806,9 +818,10 @@ object Task extends TaskInstances {
* results will be ordered. Useful when you don't need ordering because it
* can be more efficient than `gather`.
*/
def gatherUnordered[A](in: Seq[Task[A]]): Task[Seq[A]] =
def gatherUnordered[A](in: Seq[Task[A]]): Task[List[A]] =
Async { (scheduler, conn, finalCallback) =>
val atom = Atomic(Vector.empty[A])
val atom = Atomic(List.empty[A])
// TODO better completion detection which does not rely on length
val expected = in.length

val composite = CompositeCancelable()
Expand All @@ -824,7 +837,7 @@ object Task extends TaskInstances {
atom.get match {
case null => ()
case current =>
val update = current :+ value
val update = value :: current
if (!atom.compareAndSet(current, update))
onSuccess(value)
else if (update.length == expected) {
Expand Down Expand Up @@ -1591,4 +1604,4 @@ private[eval] trait TaskInstances {
override def recoverWith[A](fa: Task[A])(pf: PartialFunction[Throwable, Task[A]]): Task[A] =
fa.onErrorRecoverWith(pf)
}
}
}
Expand Up @@ -30,7 +30,7 @@ object TaskGatherUnorderedSuite extends BaseTestSuite {
s.tick(2.seconds)
assertEquals(f.value, None)
s.tick(1.second)
assertEquals(f.value, Some(Success(Seq(2, 1, 3))))
assertEquals(f.value, Some(Success(List(3, 1, 2))))
}

test("Task.gatherUnordered should onError if one of the tasks terminates in error") { implicit s =>
Expand Down Expand Up @@ -62,4 +62,4 @@ object TaskGatherUnorderedSuite extends BaseTestSuite {
s.tick(1.second)
assertEquals(f.value, None)
}
}
}

0 comments on commit c63ad81

Please sign in to comment.