From f006ac6f557340fe620839debec2f076e1e291af Mon Sep 17 00:00:00 2001 From: Shunxin Date: Wed, 31 Aug 2016 11:18:56 -0700 Subject: [PATCH] APEXMALHAR-2201 Suppressed console output in tests of Stream API. --- .../stream/sample/MinimalWordCount.java | 2 +- .../stream/sample/WindowedWordCount.java | 53 +++++++++++-------- .../stream/sample/complete/AutoComplete.java | 53 ++++++++++++------- .../sample/complete/TopWikipediaSessions.java | 1 + .../stream/sample/complete/TrafficRoutes.java | 2 +- .../cookbook/CombinePerKeyExamples.java | 30 +++++++---- .../stream/sample/cookbook/DeDupExample.java | 5 +- .../stream/sample/MinimalWordCountTest.java | 2 +- .../stream/sample/WindowedWordCountTest.java | 3 +- .../sample/complete/AutoCompleteTest.java | 3 +- .../complete/TopWikipediaSessionsTest.java | 1 + .../sample/complete/TrafficRoutesTest.java | 1 + .../cookbook/CombinePerKeyExamplesTest.java | 6 +-- .../sample/cookbook/DeDupExampleTest.java | 2 +- .../apex/malhar/stream/api/ApexStream.java | 6 +++ .../stream/api/impl/ApexStreamImpl.java | 9 ++++ .../sample/ApplicationWithStreamAPI.java | 6 ++- .../sample/ApplicationWithStreamAPITest.java | 2 + .../malhar/stream/sample/MyStreamTest.java | 2 +- 19 files changed, 124 insertions(+), 65 deletions(-) diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java index 21afc5bbf4..03579abe33 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java @@ -117,7 +117,7 @@ public KeyValPair f(Tuple.WindowedTuple> } }, name("FormatResults")) // Print the result. - .print() + .print(name("console")) // Attach a collector to the stream to collect results. .endWith(collector, collector.input, name("Collector")) // populate the dag using the stream. diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java index c8a0e51dcf..f020ddfeb1 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java @@ -64,17 +64,11 @@ public class WindowedWordCount implements StreamingApplication */ public static class TextInput extends BaseOperator implements InputOperator { - private static boolean done = false; - public final transient DefaultOutputPort output = new DefaultOutputPort<>(); + private boolean done = false; private transient BufferedReader reader; - public static boolean isDone() - { - return done; - } - @Override public void setup(Context.OperatorContext context) { @@ -101,20 +95,21 @@ public void teardown() @Override public void emitTuples() { - try { - String line = reader.readLine(); - if (line == null) { - done = true; - reader.close(); - Thread.sleep(1000); - } else { - this.output.emit(line); + if (!done) { + try { + String line = reader.readLine(); + if (line == null) { + done = true; + reader.close(); + } else { + this.output.emit(line); + } + Thread.sleep(50); + } catch (IOException ex) { + throw new RuntimeException(ex); + } catch (InterruptedException e) { + throw Throwables.propagate(e); } - Thread.sleep(50); - } catch (IOException ex) { - throw new RuntimeException(ex); - } catch (InterruptedException e) { - throw Throwables.propagate(e); } } } @@ -122,6 +117,19 @@ public void emitTuples() public static class Collector extends BaseOperator { private static Map, Long> result = new HashMap<>(); + private static boolean done = false; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + done = false; + } + + public static boolean isDone() + { + return done; + } public static Map, Long> getResult() { @@ -134,6 +142,9 @@ public static Map, Long> getResult() public void process(PojoEvent tuple) { result.put(new KeyValPair(tuple.getTimestamp(), tuple.getWord()), tuple.getCount()); + if (tuple.getWord().equals("bye")) { + done = true; + } } }; } @@ -270,7 +281,7 @@ public Tuple> f(Tuple.TimestampedTuple input) }, name("count words")) // Format the output and print out the result. - .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print(); + .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print(name("console")); wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag); } diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java index 00c40e7c88..7ac6621386 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java @@ -75,16 +75,11 @@ public class AutoComplete implements StreamingApplication */ public static class TweetsInput extends BaseOperator implements InputOperator { - private static boolean done = false; public final transient DefaultOutputPort output = new DefaultOutputPort<>(); + private boolean done; private transient BufferedReader reader; - public static boolean isDone() - { - return done; - } - @Override public void setup(OperatorContext context) { @@ -111,20 +106,21 @@ public void teardown() @Override public void emitTuples() { - try { - String line = reader.readLine(); - if (line == null) { - done = true; - reader.close(); - Thread.sleep(1000); - } else { - this.output.emit(line); + if (!done) { + try { + String line = reader.readLine(); + if (line == null) { + done = true; + reader.close(); + } else { + this.output.emit(line); + } + Thread.sleep(50); + } catch (IOException ex) { + throw new RuntimeException(ex); + } catch (InterruptedException e) { + // Ignore it. } - Thread.sleep(50); - } catch (IOException ex) { - throw new RuntimeException(ex); - } catch (InterruptedException e) { - // Ignore it. } } } @@ -132,6 +128,19 @@ public void emitTuples() public static class Collector extends BaseOperator { private static Map> result = new HashMap<>(); + private static boolean done = false; + + public static boolean isDone() + { + return done; + } + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + done = false; + } public static Map> getResult() { @@ -143,6 +152,9 @@ public static Map> getResult() @Override public void process(Tuple.WindowedTuple>> tuple) { + if (tuple.getValue().getKey().equals("yarn")) { + done = true; + } result.put(tuple.getValue().getKey(), tuple.getValue().getValue()); } }; @@ -303,7 +315,8 @@ public void populateDAG(DAG dag, Configuration conf) .flatMap(new ExtractHashtags()); tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) - .addCompositeStreams(ComputeTopCompletions.top(10, true)).endWith(collector, collector.input, name("collector")) + .addCompositeStreams(ComputeTopCompletions.top(10, true)).print(name("console")) + .endWith(collector, collector.input, name("collector")) .populateDag(dag); } } diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java index d7d62fea70..a697d5203b 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java @@ -335,6 +335,7 @@ public void populateDAG(DAG dag, Configuration conf) Collector collector = new Collector(); StreamFactory.fromInput(sg, sg.output, name("sessionGen")) .addCompositeStreams(new ComputeTopSessions()) + .print(name("console")) .endWith(collector, collector.input, name("collector")).populateDag(dag); } } diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java index 3045238a2a..08aa8c8eef 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java @@ -514,7 +514,7 @@ public void populateDAG(DAG dag, Configuration conf) .addCompositeStreams(new TrackSpeed()) // print the result to console. - .print() + .print(name("console")) .endWith(collector, collector.input, name("Collector")) .populateDag(dag); } diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java index 7c1652174b..653207ad1b 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java @@ -201,11 +201,6 @@ public static class SampleInput extends BaseOperator implements InputOperator private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"}; private static int i; - public static int getI() - { - return i; - } - @Override public void setup(Context.OperatorContext context) { @@ -219,10 +214,10 @@ public void emitTuples() while (i < 1) { for (String word : words) { for (String corpus : corpuses) { - beanOutput.emit(new SampleBean(word, corpus)); try { - Thread.sleep(100); - } catch (InterruptedException e) { + Thread.sleep(50); + beanOutput.emit(new SampleBean(word, corpus)); + } catch (Exception e) { // Ignore it } } @@ -235,12 +230,24 @@ public void emitTuples() public static class Collector extends BaseOperator { - static List result; + private static List result; + private static boolean done = false; + + public static List getResult() + { + return result; + } + + public static boolean isDone() + { + return done; + } @Override public void setup(Context.OperatorContext context) { result = new ArrayList<>(); + done = false; } public final transient DefaultInputPort input = new DefaultInputPort() @@ -248,6 +255,9 @@ public void setup(Context.OperatorContext context) @Override public void process(SampleBean tuple) { + if (tuple.getWord().equals("F")) { + done = true; + } result.add(tuple); } }; @@ -265,7 +275,7 @@ public void populateDAG(DAG dag, Configuration conf) Collector collector = new Collector(); StreamFactory.fromInput(input, input.beanOutput, name("input")) .addCompositeStreams(new PlaysForWord()) - .print() + .print(name("console")) .endWith(collector, collector.input, name("Collector")) .populateDag(dag); diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java index 0cd7c58946..d13e2c315a 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java @@ -117,8 +117,9 @@ public String f(String input) new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(1))) // Remove the duplicate words and print out the result. - .accumulate(new RemoveDuplicates(), name("RemoveDuplicates")).print().endWith(collector, collector.input) - + .accumulate(new RemoveDuplicates(), name("RemoveDuplicates")) + .print(name("console")) + .endWith(collector, collector.input) .populateDag(dag); } } diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java index d32da72cb4..c078683d55 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java @@ -37,7 +37,7 @@ public void MinimalWordCountTest() throws Exception { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); - + conf.set("dt.application.MinimalWordCount.operator.console.silent", "true"); MinimalWordCount app = new MinimalWordCount(); lma.prepareDAG(app, conf); diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java index f6270d46fa..f0c51f66e9 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java @@ -46,6 +46,7 @@ public void WindowedWordCountTest() throws Exception { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); + conf.set("dt.application.WindowedWordCount.operator.console.silent", "true"); lma.prepareDAG(new WindowedWordCount(), conf); LocalMode.Controller lc = lma.getController(); ((StramLocalCluster)lc).setExitCondition(new Callable() @@ -53,7 +54,7 @@ public void WindowedWordCountTest() throws Exception @Override public Boolean call() throws Exception { - return WindowedWordCount.TextInput.isDone(); + return WindowedWordCount.Collector.isDone(); } }); diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java index 97c5ad44a8..4ed2d5d598 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java @@ -39,6 +39,7 @@ public void AutoCompleteTest() throws Exception { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); + conf.set("dt.application.AutoComplete.operator.console.silent", "true"); lma.prepareDAG(new AutoComplete(), conf); LocalMode.Controller lc = lma.getController(); @@ -47,7 +48,7 @@ public void AutoCompleteTest() throws Exception @Override public Boolean call() throws Exception { - return AutoComplete.TweetsInput.isDone(); + return AutoComplete.Collector.isDone(); } }); diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java index c0dbaf4204..fddf511c8b 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java @@ -39,6 +39,7 @@ public void TopWikipediaSessionsTest() throws Exception { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); + conf.set("dt.application.TopWikipediaSessions.operator.console.silent", "true"); lma.prepareDAG(new TopWikipediaSessions(), conf); LocalMode.Controller lc = lma.getController(); diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java index c532898726..766fa60008 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java @@ -41,6 +41,7 @@ public void TrafficRoutesTest() throws Exception { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); + conf.set("dt.application.TrafficRoutes.operator.console.silent", "true"); lma.prepareDAG(new TrafficRoutes(), conf); LocalMode.Controller lc = lma.getController(); diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java index b1308080ba..1e14ffff3b 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java @@ -35,7 +35,7 @@ public void CombinePerKeyExamplesTest() throws Exception { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); - + conf.set("dt.application.CombinePerKeyExamples.operator.console.silent", "true"); CombinePerKeyExamples app = new CombinePerKeyExamples(); lma.prepareDAG(app, conf); @@ -46,11 +46,11 @@ public void CombinePerKeyExamplesTest() throws Exception @Override public Boolean call() throws Exception { - return CombinePerKeyExamples.SampleInput.getI() >= 1; + return CombinePerKeyExamples.Collector.isDone(); } }); lc.run(100000); - Assert.assertTrue(CombinePerKeyExamples.Collector.result.get(CombinePerKeyExamples.Collector.result.size() - 1).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8")); + Assert.assertTrue(CombinePerKeyExamples.Collector.getResult().get(CombinePerKeyExamples.Collector.getResult().size() - 2).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8")); } } diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java index a175cd7cd5..7f93f508e2 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java @@ -38,7 +38,7 @@ public void DeDupExampleTest() throws Exception { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); - + conf.set("dt.application.DeDupExample.operator.console.silent", "true"); DeDupExample app = new DeDupExample(); lma.prepareDAG(app, conf); LocalMode.Controller lc = lma.getController(); diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java index 6d4453458d..47f358ff63 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java @@ -102,6 +102,12 @@ public interface ApexStream */ > STREAM union(ApexStream... others); + /** + * Add a stdout console output operator + * @return stream itself + */ + > STREAM print(Option... opts); + /** * Add a stdout console output operator * @return stream itself diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java index 032cb03cb7..ba399de95f 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java @@ -305,6 +305,15 @@ public > STREAM union(ApexStream... others) throw new UnsupportedOperationException(); } + @Override + @SuppressWarnings("unchecked") + public ApexStreamImpl print(Option... opts) + { + ConsoleOutputOperator consoleOutputOperator = new ConsoleOutputOperator(); + addOperator(consoleOutputOperator, (Operator.InputPort)consoleOutputOperator.input, null, opts); + return this; + } + @Override @SuppressWarnings("unchecked") public ApexStreamImpl print() diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java index f65806ebda..a39ff35fe7 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java @@ -35,6 +35,8 @@ import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.util.KeyValPair; +import static org.apache.apex.malhar.stream.api.Option.Options.name; + /** * An application example with stream api */ @@ -56,7 +58,7 @@ public Iterable f(String input) return Arrays.asList(input.split("[\\p{Punct}\\s]+")); } }); - stream.print(); + stream.print(name("WordOutput")); stream.window(new WindowOption.GlobalWindow(), new TriggerOption().withEarlyFiringsAtEvery(Duration .millis(1000)).accumulatingFiredPanes()).countByKey(new Function.ToKeyValue() { @@ -65,7 +67,7 @@ public Tuple> f(String input) { return new Tuple.PlainTuple(new KeyValPair<>(input, 1L)); } - }).print(); + }).print(name("WCOutput")); stream.populateDag(dag); } diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java index 70f26f215a..29a2070f40 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java @@ -42,6 +42,8 @@ public void testWordcountApplication() throws Exception { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); + conf.set("dt.application.WordCountStreamingApiDemo.operator.WCOutput.silent", "true"); + conf.set("dt.application.WordCountStreamingApiDemo.operator.WordOutput.silent", "true"); lma.prepareDAG(new ApplicationWithStreamAPI(), conf); LocalMode.Controller lc = lma.getController(); long start = System.currentTimeMillis(); diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java index 5e48974376..d91211730f 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java @@ -162,7 +162,7 @@ public Tuple> f(String input) { return new Tuple.PlainTuple(new KeyValPair<>(input, 1L)); } - }).addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition); + }).addOperator(collector, collector.inputPort, collector.outputPort).runEmbedded(false, 30000, exitCondition); Map dataMap = new HashMap<>();