Permalink
Browse files

fix implementation of PromiseStream.<<(elem1,elem2,elems)

- was doing the first two item specially and the sequencing the rest,
  which makes for some weird semantics

cherry-picked from 3a785b9
  • Loading branch information...
1 parent 8fdcd0f commit 700060cbf23e45674c92f280f2b79fede1d9671d @rkuhn rkuhn committed Mar 2, 2012
@@ -95,6 +95,17 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
assert(Await.result(c, timeout.duration) === 4)
}
+ "map futures" in {
+ val q = PromiseStream[String]()
+ flow {
+ q << (Future("a"), Future("b"), Future("c"))
+ }
+ val a, b, c = q.dequeue
+ Await.result(a, timeout.duration) must be("a")
+ Await.result(b, timeout.duration) must be("b")
+ Await.result(c, timeout.duration) must be("c")
+ }
+
"not fail under concurrent stress" in {
implicit val timeout = Timeout(60 seconds)
val q = PromiseStream[Long](timeout.duration.toMillis)
@@ -246,7 +246,10 @@ class PromiseStream[A](implicit val dispatcher: MessageDispatcher, val timeout:
shift { cont: (PromiseStream[A] Future[Any]) elem map (a cont(this += a)) }
final def <<(elem1: Future[A], elem2: Future[A], elems: Future[A]*): PromiseStream[A] @cps[Future[Any]] =
- shift { cont: (PromiseStream[A] Future[Any]) Future.flow(this << elem1 << elem2 <<< Future.sequence(elems.toSeq)) map cont }
+ shift { cont: (PromiseStream[A] Future[Any])
+ val seq = Future.sequence(elem1 +: elem2 +: elems)
+ seq map (a cont(this ++= a))
+ }
final def <<<(elems: Traversable[A]): PromiseStream[A] @cps[Future[Any]] =
shift { cont: (PromiseStream[A] Future[Any]) cont(this ++= elems) }

0 comments on commit 700060c

Please sign in to comment.