Skip to content

Commit

Permalink
=str akka#16787 java cookbook
Browse files Browse the repository at this point in the history
  • Loading branch information
2m committed Jul 3, 2015
1 parent 87c0ece commit 7d198ef
Show file tree
Hide file tree
Showing 14 changed files with 1,219 additions and 149 deletions.
382 changes: 377 additions & 5 deletions akka-docs-dev/rst/java/stream-cookbook.rst

Large diffs are not rendered by default.

Expand Up @@ -15,31 +15,27 @@ class RecipeMultiGroupBy extends RecipeSpec {
case class Topic(name: String)

val elems = Source(List("1: a", "1: b", "all: c", "all: d", "1: e"))
val topicMapper = { msg: Message =>
val extractTopics = { msg: Message =>
if (msg.startsWith("1")) List(Topic("1"))
else List(Topic("1"), Topic("2"))
}

class X {
//#multi-groupby
val topicMapper: (Message) => immutable.Seq[Topic] = ???

//#multi-groupby
}

//#multi-groupby
val topicMapper: (Message) => immutable.Seq[Topic] = extractTopics

val messageAndTopic: Source[(Message, Topic), Unit] = elems.mapConcat { msg: Message =>
val topicsForMessage = topicMapper(msg)
// Create a (Msg, Topic) pair for each of the topics
// the message belongs to
topicsForMessage.map(msg -> _)
}

val multiGroups: Source[(Topic, Source[String, Unit]), Unit] = messageAndTopic.groupBy(_._2).map {
case (topic, topicStream) =>
// chopping of the topic from the (Message, Topic) pairs
(topic, topicStream.map(_._1))
}
val multiGroups: Source[(Topic, Source[String, Unit]), Unit] = messageAndTopic
.groupBy(_._2).map {
case (topic, topicStream) =>
// chopping of the topic from the (Message, Topic) pairs
(topic, topicStream.map(_._1))
}
//#multi-groupby

val result = multiGroups.map {
Expand Down
Expand Up @@ -22,8 +22,9 @@ class RecipeParseLines extends RecipeSpec {

//#parse-lines
import akka.stream.io.Framing
val linesStream = rawData.via(
Framing.delimiter(ByteString("\r\n"), maximumFrameLength = 100, allowTruncation = true)).map(_.utf8String)
val linesStream = rawData.via(Framing.delimiter(
ByteString("\r\n"), maximumFrameLength = 100, allowTruncation = true))
.map(_.utf8String)
//#parse-lines

Await.result(linesStream.grouped(10).runWith(Sink.head), 3.seconds) should be(List(
Expand Down
28 changes: 6 additions & 22 deletions akka-docs-dev/rst/scala/stream-cookbook.rst
Expand Up @@ -22,7 +22,7 @@ Working with Flows
==================

In this collection we show simple recipes that involve linear flows. The recipes in this section are rather
general, more targeted recipes are available as separate sections ("Working with rate", "Working with IO").
general, more targeted recipes are available as separate sections (:ref:`stream-rate-scala`, :ref:`stream-io-scala`).

Logging elements of a stream
----------------------------
Expand Down Expand Up @@ -60,8 +60,8 @@ Draining a stream to a strict collection

In this recipe we will use the ``grouped`` stream operation that groups incoming elements into a stream of limited
size collections (it can be seen as the almost opposite version of the "Flattening a stream of sequences" recipe
we showed before). By using a ``grouped(MaxAllowedSeqSize).runWith(Sink.head)`` we first create a stream of groups
with maximum size of ``MaxAllowedSeqSize`` and then we take the first element of this stream. What we get is a
we showed before). By using a ``grouped(MaxAllowedSeqSize)`` we create a stream of groups
with maximum size of ``MaxAllowedSeqSize`` and then we take the first element of this stream by attaching a ``Sink.head``. What we get is a
:class:`Future` containing a sequence with all the elements of the original up to ``MaxAllowedSeqSize`` size (further
elements are dropped).

Expand All @@ -80,8 +80,8 @@ chunk will arrive (``onPush``) which we use to update the digest, then it will p

Eventually the stream of ``ByteStrings`` depletes and we get a notification about this event via ``onUpstreamFinish``.
At this point we want to emit the digest value, but we cannot do it in this handler directly. Instead we call
``ctx.absorbTermination`` signalling to our context that we do not yet want to finish. When the environment decides that
we can emit further elements ``onPull`` is called again, and we see ``ctx.isFinishing`` returning true (since the upstream
``ctx.absorbTermination()`` signalling to our context that we do not yet want to finish. When the environment decides that
we can emit further elements ``onPull`` is called again, and we see ``ctx.isFinishing`` returning ``true`` (since the upstream
source has been depleted already). Since we only want to emit a final element it is enough to call ``ctx.pushAndFinish``
passing the digest ByteString to be emitted.

Expand Down Expand Up @@ -113,7 +113,7 @@ we have a stream of streams, where every substream will serve identical words.
To count the words, we need to process the stream of streams (the actual groups containing identical words). By mapping
over the groups and using ``fold`` (remember that ``fold`` automatically materializes and runs the stream it is used
on) we get a stream with elements of ``Future[String,Int]``. Now all we need is to flatten this stream, which
can be achieved by calling ``mapAsynch(identity)``.
can be achieved by calling ``mapAsync`` with ``identity`` function.

There is one tricky issue to be noted here. The careful reader probably noticed that we put a ``buffer`` between the
``mapAsync()`` operation that flattens the stream of futures and the actual stream of futures. The reason for this is
Expand Down Expand Up @@ -379,19 +379,3 @@ whenever the merge can choose because multiple upstream producers have elements
preferred upstream effectively giving it an absolute priority.

.. includecode:: code/docs/stream/cookbook/RecipeKeepAlive.scala#inject-keepalive
















Expand Up @@ -27,7 +27,6 @@
public class FlowGraphDocTest {

static ActorSystem system;


@BeforeClass
public static void setup() {
Expand All @@ -39,43 +38,38 @@ public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}

final Materializer mat = ActorMaterializer.create(system);

@Test
public void demonstrateBuildSimpleGraph() throws Exception {
//#simple-flow-graph
final Source<Integer, BoxedUnit> in = Source.from(Arrays.asList(1, 2, 3, 4, 5));
final Sink<List<String>, Future<List<String>>> sink = Sink.head();
final Flow<Integer, Integer, BoxedUnit> f1 =
Flow.of(Integer.class).map(elem -> elem + 10);
final Flow<Integer, Integer, BoxedUnit> f2 =
Flow.of(Integer.class).map(elem -> elem + 20);
final Flow<Integer, String, BoxedUnit> f3 =
Flow.of(Integer.class).map(elem -> elem.toString());
final Flow<Integer, Integer, BoxedUnit> f4 =
Flow.of(Integer.class).map(elem -> elem + 30);
final Sink<List<Integer>, Future<List<Integer>>> sink2 = Sink.head();
final Flow<Integer, Integer, BoxedUnit> f1 = Flow.of(Integer.class).map(elem -> elem + 10);
final Flow<Integer, Integer, BoxedUnit> f2 = Flow.of(Integer.class).map(elem -> elem + 20);
final Flow<Integer, String, BoxedUnit> f3 = Flow.of(Integer.class).map(elem -> elem.toString());
final Flow<Integer, Integer, BoxedUnit> f4 = Flow.of(Integer.class).map(elem -> elem + 30);

final RunnableGraph<Future<List<String>>> result = FlowGraph.factory()
.closed(
sink,
(builder, out) -> {
final UniformFanOutShape<Integer, Integer> bcast =
builder.graph(Broadcast.create(2));
final UniformFanInShape<Integer, Integer> merge =
builder.graph(Merge.create(2));

builder.from(in).via(f1).via(bcast).via(f2).via(merge)
.via(f3.grouped(1000)).to(out);
builder.from(bcast).via(f4).to(merge);
});
.closed(
sink,
(builder, out) -> {
final UniformFanOutShape<Integer, Integer> bcast = builder.graph(Broadcast.create(2));
final UniformFanInShape<Integer, Integer> merge = builder.graph(Merge.create(2));

builder.from(in).via(f1).via(bcast).via(f2).via(merge)
.via(f3.grouped(1000)).to(out);
builder.from(bcast).via(f4).to(merge);
});
//#simple-flow-graph
final List<String> list = Await.result(result.run(mat), Duration.create(3, TimeUnit.SECONDS));
final String[] res = list.toArray(new String[] {});
Arrays.sort(res, null);
assertArrayEquals(new String[] {"31", "32", "33", "34", "35", "41", "42", "43", "44", "45"}, res);
assertArrayEquals(new String[] { "31", "32", "33", "34", "35", "41", "42", "43", "44", "45" }, res);
}

@Test
@SuppressWarnings("unused")
public void demonstrateConnectErrors() {
Expand All @@ -92,30 +86,29 @@ public void demonstrateConnectErrors() {
//#simple-graph
fail("expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("unconnected"));
assertTrue(e.getMessage().contains("unconnected"));
}
}

@Test
public void demonstrateReusingFlowInGraph() throws Exception {
//#flow-graph-reusing-a-flow
final Sink<Integer, Future<Integer>> topHeadSink = Sink.head();
final Sink<Integer, Future<Integer>> bottomHeadSink = Sink.head();
final Flow<Integer, Integer, BoxedUnit> sharedDoubler =
Flow.of(Integer.class).map(elem -> elem * 2);
final Flow<Integer, Integer, BoxedUnit> sharedDoubler = Flow.of(Integer.class).map(elem -> elem * 2);

final RunnableGraph<Pair<Future<Integer>, Future<Integer>>> g = FlowGraph
.factory().closed(
topHeadSink, // import this sink into the graph
bottomHeadSink, // and this as well
Keep.both(),
(b, top, bottom) -> {
final UniformFanOutShape<Integer, Integer> bcast = b
.graph(Broadcast.create(2));
b.from(Source.single(1)).via(bcast).via(sharedDoubler).to(top);
b.from(bcast).via(sharedDoubler).to(bottom);
});
.factory().closed(
topHeadSink, // import this sink into the graph
bottomHeadSink, // and this as well
Keep.both(),
(b, top, bottom) -> {
final UniformFanOutShape<Integer, Integer> bcast = b
.graph(Broadcast.create(2));

b.from(Source.single(1)).via(bcast).via(sharedDoubler).to(top);
b.from(bcast).via(sharedDoubler).to(bottom);
});
//#flow-graph-reusing-a-flow
final Pair<Future<Integer>, Future<Integer>> pair = g.run(mat);
assertEquals(Integer.valueOf(2), Await.result(pair.first(), Duration.create(3, TimeUnit.SECONDS)));
Expand All @@ -125,23 +118,22 @@ public void demonstrateReusingFlowInGraph() throws Exception {
@Test
public void demonstrateMatValue() throws Exception {
//#flow-graph-matvalue
final Sink<Integer, Future<Integer>> foldSink = Sink.<Integer, Integer>fold(0, (a, b) -> {
final Sink<Integer, Future<Integer>> foldSink = Sink.<Integer, Integer> fold(0, (a, b) -> {
return a + b;
});

final Flow<Future<Integer>, Integer, BoxedUnit> flatten = Flow.<Future<Integer>>empty()
final Flow<Future<Integer>, Integer, BoxedUnit> flatten = Flow.<Future<Integer>> empty()
.mapAsync(4, x -> {
return x;
});

final Flow<Integer, Integer, Future<Integer>> foldingFlow = Flow.factory().create(foldSink,
(b, fold) -> {
return new Pair<>(
fold.inlet(),
b.from(b.materializedValue()).via(flatten).out()
);
});
//#flow-graph-matvalue
return new Pair<>(
fold.inlet(),
b.from(b.materializedValue()).via(flatten).out());
});
//#flow-graph-matvalue

//#flow-graph-matvalue-cycle
// This cannot produce any value:
Expand Down
@@ -0,0 +1,60 @@
package docs.stream;

import akka.actor.ActorRef;

import java.util.function.Predicate;

/**
* Acts as if `System.out.println()` yet swallows all messages. Useful for putting printlines in examples yet without poluting the build with them.
*/
public class SilenceSystemOut {

private SilenceSystemOut() {
}

public static System get() {
return new System(new System.Println() {
@Override
public void println(String s) {
// ignore
}
});
}

public static System get(ActorRef probe) {
return new System(new System.Println() {
@Override
public void println(String s) {
probe.tell(s, ActorRef.noSender());
}
});
}

public static System get(Predicate<String> filter, ActorRef probe) {
return new System(new System.Println() {
@Override
public void println(String s) {
if (filter.test(s))
probe.tell(s, ActorRef.noSender());
}
});
}

public static class System {
public final Println out;

public System(Println out) {
this.out = out;
}

public static abstract class Println {
public abstract void println(String s);

public void println(Object s) {
println(s.toString());
}
}

}

}

0 comments on commit 7d198ef

Please sign in to comment.