From 84792e6dbe44921a7c998c048c47b65b6f381115 Mon Sep 17 00:00:00 2001 From: Vipul Modi Date: Wed, 30 Nov 2016 21:42:09 +0530 Subject: [PATCH 1/2] Initial try --- .../MultiThreadedFlatMapFunction.java | 91 +++++++++++++++++++ .../MultiThreadedFlatMapFunctionTest.java | 16 ++++ 2 files changed, 107 insertions(+) create mode 100644 flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/MultiThreadedFlatMapFunction.java create mode 100644 flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/MultiThreadedFlatMapFunctionTest.java diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/MultiThreadedFlatMapFunction.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/MultiThreadedFlatMapFunction.java new file mode 100644 index 0000000000000..e8bb4576b0101 --- /dev/null +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/MultiThreadedFlatMapFunction.java @@ -0,0 +1,91 @@ +package org.apache.flink.contrib.streaming; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.util.ListCollector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.NamedThreadFactory; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +/** + * Created by vipulmodi on 11/30/16. + */ + +//To return Futures to calling fucntion we can do OUT extends Future. +public class MultiThreadedFlatMapFunction extends RichFlatMapFunction { + + private static final long serialVersionUID; + private ExecutorService flatMapExecutors; + private Integer numberOfthreads; + private List>> callables; + private List>> futures; + static { + serialVersionUID = 1L; + } + + public FlatMapFunction flatMapFunction; + + public MultiThreadedFlatMapFunction(FlatMapFunction flatMapFunction, Integer numberOfthreads) { + this.flatMapFunction = flatMapFunction; + this.numberOfthreads = numberOfthreads; + this.callables = new ArrayList<>(); + this.futures = new ArrayList<>(); + } + + // Understand side effects of making out final. + @Override + public void flatMap(final IN value, final Collector out) throws Exception { + // Figure out a better way to call + /*callables.add(new Callable>() { + @Override + public ArrayList call() throws Exception { + ArrayList result = new ArrayList(); + Collector collector = new ListCollector(result); + flatMapFunction.flatMap(value, collector); + return result; + } + });*/ + + futures.add(flatMapExecutors.submit(new Callable>() { + @Override + public ArrayList call() throws Exception { + ArrayList result = new ArrayList(); + Collector collector = new ListCollector(result); + flatMapFunction.flatMap(value, collector); + // Populate out synchronously. + synchronized (out) { + for(OUT o: result) { + out.collect(o); + } + } + return result; + } + })); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + // Create a thread pool. + flatMapExecutors = Executors.newFixedThreadPool( + numberOfthreads, + new NamedThreadFactory("flatmap-executors", "-thread-") + ); + } + + @Override + public void close() throws Exception { + super.close(); + // TODO: Wait for futures for intead of doing invoke all. + // Assumption all Callables or Futures can be held in memory. + flatMapExecutors.invokeAll(callables); + //shutdown thread pool. + flatMapExecutors.shutdown(); + //wait for all threads to finish + while(!flatMapExecutors.awaitTermination(1,TimeUnit.MINUTES)); + } +} diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/MultiThreadedFlatMapFunctionTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/MultiThreadedFlatMapFunctionTest.java new file mode 100644 index 0000000000000..cd3fd297b29aa --- /dev/null +++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/MultiThreadedFlatMapFunctionTest.java @@ -0,0 +1,16 @@ +package org.apache.flink.contrib.streaming; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.operators.DataSource; +import org.junit.Test; + +/** + * Created by vipulmodi on 11/30/16. + */ +public class MultiThreadedFlatMapFunctionTest { + + @Test + public void testMultiThreadedFlatMap() { + new DataSource() + } +} From a77b0682323667b1ebfa2a67a7c7053622cdbf35 Mon Sep 17 00:00:00 2001 From: Vipul Modi Date: Fri, 2 Dec 2016 13:08:01 +0530 Subject: [PATCH 2/2] Adding a sample test. --- .../MultiThreadedFlatMapFunctionTest.java | 85 +++++++++++++++++-- .../contrib/streaming/WordCountData.java | 52 ++++++++++++ 2 files changed, 132 insertions(+), 5 deletions(-) create mode 100644 flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/WordCountData.java diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/MultiThreadedFlatMapFunctionTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/MultiThreadedFlatMapFunctionTest.java index cd3fd297b29aa..76507cc4a7e02 100644 --- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/MultiThreadedFlatMapFunctionTest.java +++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/MultiThreadedFlatMapFunctionTest.java @@ -1,16 +1,91 @@ package org.apache.flink.contrib.streaming; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.operators.DataSource; -import org.junit.Test; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; /** * Created by vipulmodi on 11/30/16. */ public class MultiThreadedFlatMapFunctionTest { - @Test - public void testMultiThreadedFlatMap() { - new DataSource() + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + + // Checking input parameters + final ParameterTool params = ParameterTool.fromArgs(args); + + // set up the execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // make parameters available in the web interface + env.getConfig().setGlobalJobParameters(params); + + // get input data + DataStream text; + if (params.has("input")) { + // read the text file from given input path + text = env.readTextFile(params.get("input")); + } else { + System.out.println("Executing WordCount example with default input data set."); + System.out.println("Use --input to specify file input."); + // get default test text data + text = env.fromElements(WordCountData.WORDS); + } + + DataStream> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + //text.flatMap(new Tokenizer()).keyBy(0).sum(1); + text.flatMap(new MultiThreadedFlatMapFunction>(new Tokenizer(),10)). + returns(new TypeHint>(){}).keyBy(0).sum(1); + + // emit result + if (params.has("output")) { + counts.writeAsText(params.get("output")); + } else { + System.out.println("Printing result to stdout. Use --output to specify output path."); + counts.print(); + } + + // execute program + env.execute("Streaming WordCount"); } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** + * Implements the string tokenizer that splits sentences into words as a + * user-defined FlatMapFunction. The function takes a line (String) and + * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2}). + */ + public static final class Tokenizer implements FlatMapFunction> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(String value, Collector> out) + throws Exception { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2(token, 1)); + } + } + } + } + } diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/WordCountData.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/WordCountData.java new file mode 100644 index 0000000000000..a52c4d42c0c38 --- /dev/null +++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/WordCountData.java @@ -0,0 +1,52 @@ +package org.apache.flink.contrib.streaming; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; + +/** + * Created by vipulmodi on 12/2/16. + */ +public class WordCountData { + + public static final String[] WORDS = new String[] { + "To be, or not to be,--that is the question:--", + "Whether 'tis nobler in the mind to suffer", + "The slings and arrows of outrageous fortune", + "Or to take arms against a sea of troubles,", + "And by opposing end them?--To die,--to sleep,--", + "No more; and by a sleep to say we end", + "The heartache, and the thousand natural shocks", + "That flesh is heir to,--'tis a consummation", + "Devoutly to be wish'd. To die,--to sleep;--", + "To sleep! perchance to dream:--ay, there's the rub;", + "For in that sleep of death what dreams may come,", + "When we have shuffled off this mortal coil,", + "Must give us pause: there's the respect", + "That makes calamity of so long life;", + "For who would bear the whips and scorns of time,", + "The oppressor's wrong, the proud man's contumely,", + "The pangs of despis'd love, the law's delay,", + "The insolence of office, and the spurns", + "That patient merit of the unworthy takes,", + "When he himself might his quietus make", + "With a bare bodkin? who would these fardels bear,", + "To grunt and sweat under a weary life,", + "But that the dread of something after death,--", + "The undiscover'd country, from whose bourn", + "No traveller returns,--puzzles the will,", + "And makes us rather bear those ills we have", + "Than fly to others that we know not of?", + "Thus conscience does make cowards of us all;", + "And thus the native hue of resolution", + "Is sicklied o'er with the pale cast of thought;", + "And enterprises of great pith and moment,", + "With this regard, their currents turn awry,", + "And lose the name of action.--Soft you now!", + "The fair Ophelia!--Nymph, in thy orisons", + "Be all my sins remember'd." + }; + + public static DataSet getDefaultTextLineDataSet(ExecutionEnvironment env) { + return env.fromElements(WORDS); + } +}