Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduces fold as a Flow transformation and generalizes Sink.fold … #17831

Merged
merged 1 commit into from Jun 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions akka-docs-dev/rst/stages-overview.rst
Expand Up @@ -35,6 +35,7 @@ filter the given predicate returns true for the element
collect the provided partial function is defined for the element the partial function is defined for the element and downstream backpressures upstream completes
grouped the specified number of elements has been accumulated or upstream completed a group has been assembled and downstream backpressures upstream completes
scan the function scanning the element returns a new element downstream backpressures upstream completes
fold upstream completes downstream backpressures upstream completes
drop the specified number of elements has been dropped already the specified number of elements has been dropped and downstream backpressures upstream completes
take the specified number of elements to take has not yet been reached downstream backpressures the defined number of elements has been taken or upstream completes
takeWhile the predicate is true and until the first false result downstream backpressures predicate returned false or upstream completes
Expand Down
Expand Up @@ -54,7 +54,7 @@ public void strictCollection() throws Exception {

final Future<Integer> future = Source.from(Arrays.asList(1, 2, 3, 4))
.runWith(sinkUnderTest, mat);
final Integer result = Await.result(future, Duration.create(100, TimeUnit.MILLISECONDS));
final Integer result = Await.result(future, Duration.create(1, TimeUnit.SECONDS));
assert(result == 20);
//#strict-collection
}
Expand All @@ -69,7 +69,7 @@ public void groupedPartOfInfiniteStream() throws Exception {
.grouped(10)
.runWith(Sink.head(), mat);
final List<Integer> result =
Await.result(future, Duration.create(100, TimeUnit.MILLISECONDS));
Await.result(future, Duration.create(1, TimeUnit.SECONDS));
assertEquals(result, Collections.nCopies(10, 2));
//#grouped-infinite
}
Expand All @@ -82,7 +82,7 @@ public void foldedStream() throws Exception {

final Future<Integer> future = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6))
.via(flowUnderTest).runWith(Sink.fold(0, (agg, next) -> agg + next), mat);
final Integer result = Await.result(future, Duration.create(100, TimeUnit.MILLISECONDS));
final Integer result = Await.result(future, Duration.create(1, TimeUnit.SECONDS));
assert(result == 10);
//#folded-stream
}
Expand All @@ -99,7 +99,7 @@ public void pipeToTestProbe() throws Exception {
.grouped(2)
.runWith(Sink.head(), mat);
akka.pattern.Patterns.pipe(future, system.dispatcher()).to(probe.ref());
probe.expectMsg(Duration.create(100, TimeUnit.MILLISECONDS),
probe.expectMsg(Duration.create(1, TimeUnit.SECONDS),
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4))
);
//#pipeto-testprobe
Expand All @@ -120,9 +120,9 @@ public void sinkActorRef() throws Exception {
.to(Sink.actorRef(probe.ref(), Tick.COMPLETED)).run(mat);
probe.expectMsg(Duration.create(1, TimeUnit.SECONDS), Tick.TOCK);
probe.expectNoMsg(Duration.create(100, TimeUnit.MILLISECONDS));
probe.expectMsg(Duration.create(200, TimeUnit.MILLISECONDS), Tick.TOCK);
probe.expectMsg(Duration.create(1, TimeUnit.SECONDS), Tick.TOCK);
cancellable.cancel();
probe.expectMsg(Duration.create(200, TimeUnit.MILLISECONDS), Tick.COMPLETED);
probe.expectMsg(Duration.create(1, TimeUnit.SECONDS), Tick.COMPLETED);
//#sink-actorref
}

Expand All @@ -145,7 +145,7 @@ public void sourceActorRef() throws Exception {
ref.tell(3, ActorRef.noSender());
ref.tell(new akka.actor.Status.Success("done"), ActorRef.noSender());

final String result = Await.result(future, Duration.create(100, TimeUnit.MILLISECONDS));
final String result = Await.result(future, Duration.create(1, TimeUnit.SECONDS));
assertEquals(result, "123");
//#source-actorref
}
Expand Down Expand Up @@ -190,7 +190,7 @@ public void injectingFailure() throws Exception {
final Future<Integer> future = probeAndFuture.second();
probe.sendError(new Exception("boom"));

Await.ready(future, Duration.create(100, TimeUnit.MILLISECONDS));
Await.ready(future, Duration.create(1, TimeUnit.SECONDS));
final Throwable exception = ((Failure)future.value().get()).exception();
assertEquals(exception.getMessage(), "boom");
//#injecting-failure
Expand Down
Expand Up @@ -12,8 +12,7 @@ import scala.concurrent.Promise
class HeadSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
import HeadSink._

override def createSubscriber(): Subscriber[Int] =
new HeadSinkSubscriber[Int](Promise[Int]())
override def createSubscriber(): Subscriber[Int] = new HeadSinkSubscriber[Int]

override def createElement(element: Int): Int = element
}
Expand Up @@ -22,13 +22,15 @@ class ActorMaterializerSpec extends AkkaSpec with ImplicitSender {
}

"properly shut down actors associated with it" in {
pending // FIXME disabled due to https://github.com/akka/akka/issues/17849

val m = ActorMaterializer.create(system)

val f = Source.lazyEmpty[Int].runFold(0)(_ + _)(m)

m.shutdown()

an[AbruptTerminationException] should be thrownBy
Await.result(f, 3.seconds)
an[AbruptTerminationException] should be thrownBy Await.result(f, 3.seconds)
}

"refuse materialization after shutdown" in {
Expand Down Expand Up @@ -65,7 +67,6 @@ class ActorMaterializerSpec extends AkkaSpec with ImplicitSender {
sys.awaitTermination()
m.isShutdown should ===(true)
}

}

}
Expand Up @@ -324,7 +324,7 @@ class InterpreterSupervisionSpec extends InterpreterSpecKit {
val pf: PartialFunction[Int, Int] =
{ case x: Int ⇒ if (x == 0) throw TE else x }
new TestSetup(Seq(
Collect(restartingDecider)(pf))) {
Collect(pf, restartingDecider))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
Expand Down
19 changes: 9 additions & 10 deletions akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala
Expand Up @@ -346,17 +346,17 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-
val writeButIgnoreRead: Flow[ByteString, ByteString, Unit] =
Flow.wrap(Sink.ignore, Source.single(ByteString("Early response")))(Keep.right)

val binding = Tcp().bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false).toMat(Sink.foreach { conn ⇒
conn.flow.join(writeButIgnoreRead).run()
})(Keep.left).run()
val binding = Tcp()
.bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false)
.toMat(Sink.foreach(_.flow.join(writeButIgnoreRead).run()))(Keep.left).run()

val result = Source(() ⇒ Iterator.continually(ByteString("client data")))
val result = Source.repeat(ByteString("client data"))
.via(Tcp().outgoingConnection(serverAddress.getHostName, serverAddress.getPort))
.runFold(ByteString.empty)(_ ++ _)

Await.result(result, 3.seconds) should ===(ByteString("Early response"))

binding.map(_.unbind())
val r: ByteString = Await.result(result, 3.seconds)
r should ===(ByteString("Early response"))
binding.foreach(_.unbind())
}

"Echo should work even if server is in full close mode" in {
Expand All @@ -374,7 +374,7 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-

Await.result(result, 3.seconds) should ===(10000)

binding.map(_.unbind())
binding.foreach(_.unbind())
}

"handle when connection actor terminates unexpectedly" in {
Expand Down Expand Up @@ -510,7 +510,7 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-

val folder = Source(immutable.Iterable.fill(1000)(ByteString(0)))
.via(Tcp().outgoingConnection(address))
.toMat(Sink.fold(0)(_ + _.size))(Keep.right)
.fold(0)(_ + _.size).toMat(Sink.head)(Keep.right)

val total = folder.run()
val rejected = folder.run()
Expand All @@ -521,7 +521,6 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-
Await.result(rejected, 5.seconds) should ===(1000)
}
}

}

def validateServerClientCommunication(testData: ByteString,
Expand Down
Expand Up @@ -15,23 +15,42 @@ class FlowFoldSpec extends AkkaSpec {
implicit val mat = ActorMaterializer()

"A Fold" must {
val input = 1 to 100
val expected = input.fold(0)(_ + _)
val inputSource = Source(input).filter(_ ⇒ true).map(identity)
val foldSource = inputSource.fold[Int](0)(_ + _).filter(_ ⇒ true).map(identity)
val foldFlow = Flow[Int].filter(_ ⇒ true).map(identity).fold(0)(_ + _).filter(_ ⇒ true).map(identity)
val foldSink = Sink.fold[Int, Int](0)(_ + _)

"work when using Source.runFold" in assertAllStagesStopped {
Await.result(inputSource.runFold(0)(_ + _), 3.seconds) should be(expected)
}

"work when using Source.fold" in assertAllStagesStopped {
Await.result(foldSource runWith Sink.head, 3.seconds) should be(expected)
}

"work when using Sink.fold" in assertAllStagesStopped {
Await.result(inputSource runWith foldSink, 3.seconds) should be(expected)
}

"work when using Flow.fold" in assertAllStagesStopped {
Await.result(inputSource via foldFlow runWith Sink.head, 3.seconds) should be(expected)
}

"fold" in assertAllStagesStopped {
val input = 1 to 100
val future = Source(input).runFold(0)(_ + _)
val expected = input.fold(0)(_ + _)
Await.result(future, 3.seconds) should be(expected)
"work when using Source.fold + Flow.fold + Sink.fold" in assertAllStagesStopped {
Await.result(foldSource via foldFlow runWith foldSink, 3.seconds) should be(expected)
}

"propagate an error" in assertAllStagesStopped {
val error = new Exception with NoStackTrace
val future = Source[Unit](() ⇒ throw error).runFold(())(Keep.none)
val future = inputSource.map(xif (x > 50) throw error else x).runFold(())(Keep.none)
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
}

"complete future with failure when function throws" in assertAllStagesStopped {
"complete future with failure when folding function throws" in assertAllStagesStopped {
val error = new Exception with NoStackTrace
val future = Source.single(1).runFold(0)((_, _) ⇒ throw error)
val future = inputSource.runFold(0)((x, y) ⇒ if (x > 50) throw error else x + y)
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
}

Expand Down