From baace5236340831789cddffaa75865f3c5dc3bb3 Mon Sep 17 00:00:00 2001 From: Ajay Bhat Date: Mon, 19 Jan 2015 16:15:24 +0530 Subject: [PATCH 1/2] [FLINK-1165] No createCollectionsEnvironment in Java API This commit adds a new method to ExecutionEnvironment to create a CollectionEnvironment, and applies the method to cases where a CollectionEnvironment() may be needed --- .../java/misc/CollectionExecutionExample.java | 3 +-- .../flink/api/java/ExecutionEnvironment.java | 16 ++++++++++++++-- .../CollectionExecutionAccumulatorsTest.java | 3 +-- .../CollectionExecutionIterationTest.java | 7 +++---- ...ectionExecutionWithBroadcastVariableTest.java | 5 ++--- 5 files changed, 21 insertions(+), 13 deletions(-) diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java index ff1a413402af3..4657af6b6c4dd 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java @@ -22,7 +22,6 @@ import java.util.Arrays; import java.util.Collection; -import org.apache.flink.api.java.CollectionEnvironment; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; @@ -73,7 +72,7 @@ public String toString() { } public static void main(String[] args) throws Exception { // initialize a new Collection-based execution environment - final ExecutionEnvironment env = new CollectionEnvironment(); + final ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); // create objects for users and emails User[] usersArray = { new User(1, "Peter"), new User(2, "John"), new User(3, "Bill") }; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 77fed97031e4b..76f3ed135dd3b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -843,7 +843,7 @@ private static String getDefaultName() { // -------------------------------------------------------------------------------------------- // Instantiation of Execution Contexts // -------------------------------------------------------------------------------------------- - + /** * Creates an execution environment that represents the context in which the program is currently executed. * If the program is invoked standalone, this method returns a local execution environment, as returned by @@ -856,7 +856,19 @@ public static ExecutionEnvironment getExecutionEnvironment() { return contextEnvironmentFactory == null ? createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment(); } - + + /** + * Createa a {@link CollectionEnvironment} that uses Java Collections underneath. This will execute in a + * single thread in the current JVM. It is very fast but will fail if the data does not fit into + * memory. Degree of parallelism will always be 1. This is useful during implementation and for debugging. + * @return A Collection Environment + */ + public static CollectionEnvironment createCollectionsEnvironment(){ + CollectionEnvironment ce = new CollectionEnvironment(); + ce.setDegreeOfParallelism(1); + return ce; + } + /** * Creates a {@link LocalEnvironment}. The local execution environment will run the program in a * multi-threaded fashion in the same JVM as the environment was created in. The default degree of diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java index f83669255a7d4..449d355648ff8 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.CollectionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOuputFormat; import org.apache.flink.configuration.Configuration; @@ -38,7 +37,7 @@ public void testAccumulator() { try { final int NUM_ELEMENTS = 100; - ExecutionEnvironment env = new CollectionEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); env.generateSequence(1, NUM_ELEMENTS) .map(new CountingMapper()) diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java index 34f9137aea92a..6ca0eb181005f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java @@ -28,7 +28,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.CollectionEnvironment; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; @@ -45,7 +44,7 @@ public class CollectionExecutionIterationTest implements java.io.Serializable { @Test public void testBulkIteration() { try { - ExecutionEnvironment env = new CollectionEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); IterativeDataSet iteration = env.fromElements(1).iterate(10); @@ -68,7 +67,7 @@ public void testBulkIteration() { @Test public void testBulkIterationWithTerminationCriterion() { try { - ExecutionEnvironment env = new CollectionEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); IterativeDataSet iteration = env.fromElements(1).iterate(100); @@ -99,7 +98,7 @@ public boolean filter(Integer value) { @Test public void testDeltaIteration() { try { - ExecutionEnvironment env = new CollectionEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); @SuppressWarnings("unchecked") DataSet> solInput = env.fromElements( diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java index c2db7c9217fd5..2cdd68faedf12 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.functions.RichCrossFunction; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.CollectionEnvironment; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; @@ -43,7 +42,7 @@ public class CollectionExecutionWithBroadcastVariableTest { @Test public void testUnaryOp() { try { - ExecutionEnvironment env = new CollectionEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); DataSet bcData = env.fromElements(SUFFIX); @@ -69,7 +68,7 @@ public void testUnaryOp() { @Test public void testBinaryOp() { try { - ExecutionEnvironment env = new CollectionEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); DataSet bcData = env.fromElements(SUFFIX); DataSet inData = env.fromElements(TEST_DATA); From 320709389ba83611a2a5c242c804341330c048c7 Mon Sep 17 00:00:00 2001 From: Ajay Bhat Date: Mon, 19 Jan 2015 17:21:32 +0530 Subject: [PATCH 2/2] [FLINK-1165] Fix typos in Javadoc for Java and Scala APIs and rename method to createCollectionEnvironment() --- .../examples/java/misc/CollectionExecutionExample.java | 2 +- .../org/apache/flink/api/java/ExecutionEnvironment.java | 4 ++-- .../operators/CollectionExecutionAccumulatorsTest.java | 2 +- .../common/operators/CollectionExecutionIterationTest.java | 6 +++--- .../CollectionExecutionWithBroadcastVariableTest.java | 4 ++-- .../org/apache/flink/api/scala/ExecutionEnvironment.scala | 4 ++-- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java index 4657af6b6c4dd..85aaff5403ef5 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java @@ -72,7 +72,7 @@ public String toString() { } public static void main(String[] args) throws Exception { // initialize a new Collection-based execution environment - final ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); + final ExecutionEnvironment env = ExecutionEnvironment.createCollectionEnvironment(); // create objects for users and emails User[] usersArray = { new User(1, "Peter"), new User(2, "John"), new User(3, "Bill") }; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 76f3ed135dd3b..3fc9bbf2c4480 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -858,12 +858,12 @@ public static ExecutionEnvironment getExecutionEnvironment() { } /** - * Createa a {@link CollectionEnvironment} that uses Java Collections underneath. This will execute in a + * Creates a {@link CollectionEnvironment} that uses Java Collections underneath. This will execute in a * single thread in the current JVM. It is very fast but will fail if the data does not fit into * memory. Degree of parallelism will always be 1. This is useful during implementation and for debugging. * @return A Collection Environment */ - public static CollectionEnvironment createCollectionsEnvironment(){ + public static CollectionEnvironment createCollectionEnvironment(){ CollectionEnvironment ce = new CollectionEnvironment(); ce.setDegreeOfParallelism(1); return ce; diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java index 449d355648ff8..8b89b2f9d4509 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java @@ -37,7 +37,7 @@ public void testAccumulator() { try { final int NUM_ELEMENTS = 100; - ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.createCollectionEnvironment(); env.generateSequence(1, NUM_ELEMENTS) .map(new CountingMapper()) diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java index 6ca0eb181005f..7c877553b813f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java @@ -44,7 +44,7 @@ public class CollectionExecutionIterationTest implements java.io.Serializable { @Test public void testBulkIteration() { try { - ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.createCollectionEnvironment(); IterativeDataSet iteration = env.fromElements(1).iterate(10); @@ -67,7 +67,7 @@ public void testBulkIteration() { @Test public void testBulkIterationWithTerminationCriterion() { try { - ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.createCollectionEnvironment(); IterativeDataSet iteration = env.fromElements(1).iterate(100); @@ -98,7 +98,7 @@ public boolean filter(Integer value) { @Test public void testDeltaIteration() { try { - ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.createCollectionEnvironment(); @SuppressWarnings("unchecked") DataSet> solInput = env.fromElements( diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java index 2cdd68faedf12..c0900652a5579 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java @@ -42,7 +42,7 @@ public class CollectionExecutionWithBroadcastVariableTest { @Test public void testUnaryOp() { try { - ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.createCollectionEnvironment(); DataSet bcData = env.fromElements(SUFFIX); @@ -68,7 +68,7 @@ public void testUnaryOp() { @Test public void testBinaryOp() { try { - ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.createCollectionEnvironment(); DataSet bcData = env.fromElements(SUFFIX); DataSet inData = env.fromElements(TEST_DATA); diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index 391986d624d06..a71ddb984c21d 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -488,12 +488,12 @@ object ExecutionEnvironment { } /** - * Createa an execution environment that uses Java Collections underneath. This will execute in a + * Creates an execution environment that uses Java Collections underneath. This will execute in a * single thread in the current JVM. It is very fast but will fail if the data does not fit into * memory. This is useful during implementation and for debugging. * @return */ - def createCollectionsEnvironment: ExecutionEnvironment = { + def createCollectionEnvironment: ExecutionEnvironment = { new ExecutionEnvironment(new CollectionEnvironment) }