Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Arrays;
import java.util.Collection;

import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
Expand Down Expand Up @@ -73,7 +72,7 @@ public String toString() {
}
public static void main(String[] args) throws Exception {
// initialize a new Collection-based execution environment
final ExecutionEnvironment env = new CollectionEnvironment();
final ExecutionEnvironment env = ExecutionEnvironment.createCollectionEnvironment();

// create objects for users and emails
User[] usersArray = { new User(1, "Peter"), new User(2, "John"), new User(3, "Bill") };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ private static String getDefaultName() {
// --------------------------------------------------------------------------------------------
// Instantiation of Execution Contexts
// --------------------------------------------------------------------------------------------

/**
* Creates an execution environment that represents the context in which the program is currently executed.
* If the program is invoked standalone, this method returns a local execution environment, as returned by
Expand All @@ -856,7 +856,19 @@ public static ExecutionEnvironment getExecutionEnvironment() {
return contextEnvironmentFactory == null ?
createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment();
}


/**
* Creates a {@link CollectionEnvironment} that uses Java Collections underneath. This will execute in a
* single thread in the current JVM. It is very fast but will fail if the data does not fit into
* memory. Degree of parallelism will always be 1. This is useful during implementation and for debugging.
* @return A Collection Environment
*/
public static CollectionEnvironment createCollectionEnvironment(){
CollectionEnvironment ce = new CollectionEnvironment();
ce.setDegreeOfParallelism(1);
return ce;
}

/**
* Creates a {@link LocalEnvironment}. The local execution environment will run the program in a
* multi-threaded fashion in the same JVM as the environment was created in. The default degree of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOuputFormat;
import org.apache.flink.configuration.Configuration;
Expand All @@ -38,7 +37,7 @@ public void testAccumulator() {
try {
final int NUM_ELEMENTS = 100;

ExecutionEnvironment env = new CollectionEnvironment();
ExecutionEnvironment env = ExecutionEnvironment.createCollectionEnvironment();

env.generateSequence(1, NUM_ELEMENTS)
.map(new CountingMapper())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
Expand All @@ -45,7 +44,7 @@ public class CollectionExecutionIterationTest implements java.io.Serializable {
@Test
public void testBulkIteration() {
try {
ExecutionEnvironment env = new CollectionEnvironment();
ExecutionEnvironment env = ExecutionEnvironment.createCollectionEnvironment();

IterativeDataSet<Integer> iteration = env.fromElements(1).iterate(10);

Expand All @@ -68,7 +67,7 @@ public void testBulkIteration() {
@Test
public void testBulkIterationWithTerminationCriterion() {
try {
ExecutionEnvironment env = new CollectionEnvironment();
ExecutionEnvironment env = ExecutionEnvironment.createCollectionEnvironment();

IterativeDataSet<Integer> iteration = env.fromElements(1).iterate(100);

Expand Down Expand Up @@ -99,7 +98,7 @@ public boolean filter(Integer value) {
@Test
public void testDeltaIteration() {
try {
ExecutionEnvironment env = new CollectionEnvironment();
ExecutionEnvironment env = ExecutionEnvironment.createCollectionEnvironment();

@SuppressWarnings("unchecked")
DataSet<Tuple2<Integer, Integer>> solInput = env.fromElements(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import org.apache.flink.api.common.functions.RichCrossFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
Expand All @@ -43,7 +42,7 @@ public class CollectionExecutionWithBroadcastVariableTest {
@Test
public void testUnaryOp() {
try {
ExecutionEnvironment env = new CollectionEnvironment();
ExecutionEnvironment env = ExecutionEnvironment.createCollectionEnvironment();

DataSet<String> bcData = env.fromElements(SUFFIX);

Expand All @@ -69,7 +68,7 @@ public void testUnaryOp() {
@Test
public void testBinaryOp() {
try {
ExecutionEnvironment env = new CollectionEnvironment();
ExecutionEnvironment env = ExecutionEnvironment.createCollectionEnvironment();

DataSet<String> bcData = env.fromElements(SUFFIX);
DataSet<String> inData = env.fromElements(TEST_DATA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,12 +488,12 @@ object ExecutionEnvironment {
}

/**
* Createa an execution environment that uses Java Collections underneath. This will execute in a
* Creates an execution environment that uses Java Collections underneath. This will execute in a
* single thread in the current JVM. It is very fast but will fail if the data does not fit into
* memory. This is useful during implementation and for debugging.
* @return
*/
def createCollectionsEnvironment: ExecutionEnvironment = {
def createCollectionEnvironment: ExecutionEnvironment = {
new ExecutionEnvironment(new CollectionEnvironment)
}

Expand Down