From 58f90d4a9f4d00378fe56a27dbf8ace5638588f8 Mon Sep 17 00:00:00 2001
From: "pingyong.xpy"
Date: Tue, 4 Jul 2017 20:29:17 +0800
Subject: [PATCH 1/3] [FLINK-7065] Separate the flink-streaming-java module
from flink-clients
---
flink-clients/pom.xml | 6 +
.../apache/flink/client/LocalExecutor.java | 69 +++++--
.../apache/flink/client/RemoteExecutor.java | 63 ++++++-
.../client/program/ContextEnvironment.java | 2 +
.../program/ContextEnvironmentFactory.java | 2 +-
.../program/OptimizerPlanEnvironment.java | 5 +
.../program/PreviewPlanEnvironment.java | 8 +
.../program}/StreamContextEnvironment.java | 13 +-
.../StreamContextEnvironmentFactory.java | 41 +++++
.../program}/StreamPlanEnvironment.java | 20 ++-
flink-contrib/flink-storm/pom.xml | 2 +-
.../org/apache/flink/api/common/Executor.java | 89 +++++++++
.../flink/api/common/ExecutorFactory.java | 121 +++++++++++++
.../apache/flink/api/common/PlanExecutor.java | 170 +-----------------
.../flink/api/java/LocalEnvironment.java | 5 +-
.../flink/api/java/RemoteEnvironment.java | 5 +-
.../api/java/ScalaShellRemoteEnvironment.java | 3 +-
.../ScalaShellRemoteStreamEnvironment.java | 19 +-
.../apache/flink/api/java/FlinkILoopTest.java | 24 ++-
flink-streaming-java/pom.xml | 2 +-
.../environment/LocalStreamEnvironment.java | 38 ++--
.../environment/RemoteStreamEnvironment.java | 78 ++------
.../StreamExecutionEnvironment.java | 18 +-
.../api/environment/StreamGraphExecutor.java | 52 ++++++
24 files changed, 533 insertions(+), 322 deletions(-)
rename {flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment => flink-clients/src/main/java/org/apache/flink/client/program}/StreamContextEnvironment.java (89%)
create mode 100644 flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironmentFactory.java
rename {flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment => flink-clients/src/main/java/org/apache/flink/client/program}/StreamPlanEnvironment.java (81%)
create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/Executor.java
create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/ExecutorFactory.java
create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamGraphExecutor.java
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 5995ea8978851..4c0f47c071a17 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -57,6 +57,12 @@ under the License.
${project.version}
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${project.version}
+
+
org.apache.flink
flink-java
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index bb74bdb56a9e8..2829f5fcfa8dc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -37,19 +37,22 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.streaming.api.environment.StreamGraphExecutor;
+import org.apache.flink.streaming.api.graph.StreamGraph;
import java.util.List;
/**
- * A PlanExecutor that runs Flink programs on a local embedded Flink runtime instance.
+ * A LocalExecutor that runs Flink programs or streamGraphs on a local embedded Flink runtime instance.
*
- * By simply calling the {@link #executePlan(org.apache.flink.api.common.Plan)} method,
+ *
By simply calling the {@link #executePlan(org.apache.flink.api.common.Plan)} method or
+ * the {@link #executeStreamGraph(StreamGraph)} method,
* this executor still start up and shut down again immediately after the program finished.
*
- * To use this executor to execute many dataflow programs that constitute one job together,
+ *
To use this executor to execute streamGraphs or many dataflow programs that constitute one job together,
* then this executor needs to be explicitly started, to keep running across several executions.
*/
-public class LocalExecutor extends PlanExecutor {
+public class LocalExecutor implements PlanExecutor, StreamGraphExecutor {
private static final boolean DEFAULT_OVERWRITE = false;
@@ -70,6 +73,9 @@ public class LocalExecutor extends PlanExecutor {
/** Config flag whether to overwrite existing files by default. */
private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;
+ /** If true, all execution progress updates are not only logged, but also printed to System.out */
+ private boolean printUpdatesToSysout = true;
+
// ------------------------------------------------------------------------
public LocalExecutor() {
@@ -102,6 +108,16 @@ public int getTaskManagerNumSlots() {
// --------------------------------------------------------------------------------------------
+ @Override
+ public void setPrintStatusDuringExecution(boolean printStatus) {
+ printUpdatesToSysout= printStatus;
+ }
+
+ @Override
+ public boolean isPrintingStatusDuringExecution() {
+ return printUpdatesToSysout;
+ }
+
@Override
public void start() throws Exception {
synchronized (lock) {
@@ -137,6 +153,31 @@ public boolean isRunning() {
}
}
+ /**
+ * Executes the given streamGraph on a local runtime and waits for the job to finish.
+ *
+ * If the executor has not been started before, this starts the executor and shuts it down
+ * after the job finished. If the job runs in session mode, the executor is kept alive until
+ * no more references to the executor exist.
+ *
+ * @param streamGraph The streamGraph to execute.
+ * @return The net runtime of the streamGraph, in milliseconds.
+ *
+ * @throws Exception Thrown, if either the startup of the local execution context, or the execution
+ * caused an exception.
+ */
+ @Override
+ public JobExecutionResult executeStreamGraph(StreamGraph streamGraph) throws Exception {
+ if (streamGraph == null) {
+ throw new IllegalArgumentException("The streamGraph may not be null.");
+ }
+
+ JobGraph jobGraph = streamGraph.getJobGraph();
+ this.configuration.addAll(jobGraph.getJobConfiguration());
+
+ return executeJobGraph(jobGraph, jobGraph.getMaximumParallelism());
+ }
+
/**
* Executes the given program on a local runtime and waits for the job to finish.
*
@@ -156,6 +197,15 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
throw new IllegalArgumentException("The plan may not be null.");
}
+ Optimizer pc = new Optimizer(new DataStatistics(), configuration);
+ OptimizedPlan op = pc.compile(plan);
+ JobGraphGenerator jgg = new JobGraphGenerator(configuration);
+ JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());
+
+ return executeJobGraph(jobGraph, plan.getMaximumParallelism());
+ }
+
+ public JobExecutionResult executeJobGraph(JobGraph jobGraph, int maxParallelism) throws Exception {
synchronized (this.lock) {
// check if we start a session dedicated for this execution
@@ -166,9 +216,8 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
// configure the number of local slots equal to the parallelism of the local plan
if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
- int maxParallelism = plan.getMaximumParallelism();
if (maxParallelism > 0) {
- this.taskManagerNumSlots = maxParallelism;
+ setTaskManagerNumSlots(maxParallelism);
}
}
@@ -181,14 +230,6 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
}
try {
- Configuration configuration = this.flink.configuration();
-
- Optimizer pc = new Optimizer(new DataStatistics(), configuration);
- OptimizedPlan op = pc.compile(plan);
-
- JobGraphGenerator jgg = new JobGraphGenerator(configuration);
- JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());
-
boolean sysoutPrint = isPrintingStatusDuringExecution();
return flink.submitJobAndWait(jobGraph, sysoutPrint);
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 1ae9b07dbb2f0..a2c5e264efc78 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -37,9 +37,13 @@
import java.net.URL;
import java.util.Collections;
import java.util.List;
+import org.apache.flink.streaming.api.environment.StreamGraphExecutor;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+
+import java.io.IOException;
/**
- * The RemoteExecutor is a {@link org.apache.flink.api.common.PlanExecutor} that takes the program
+ * The RemoteExecutor is a {@link org.apache.flink.api.common.Executor} that takes the program or streamGraph
* and ships it to a remote Flink cluster for execution.
*
* The RemoteExecutor is pointed at the JobManager and gets the program and (if necessary) the
@@ -48,7 +52,7 @@
*
The RemoteExecutor is used in the {@link org.apache.flink.api.java.RemoteEnvironment} to
* remotely execute program parts.
*/
-public class RemoteExecutor extends PlanExecutor {
+public class RemoteExecutor implements PlanExecutor, StreamGraphExecutor {
private final Object lock = new Object();
@@ -62,6 +66,9 @@ public class RemoteExecutor extends PlanExecutor {
private int defaultParallelism = 1;
+ /** If true, all execution progress updates are not only logged, but also printed to System.out */
+ private boolean printUpdatesToSysout = true;
+
public RemoteExecutor(String hostname, int port) {
this(hostname, port, new Configuration(), Collections.emptyList(),
Collections.emptyList());
@@ -106,8 +113,14 @@ public RemoteExecutor(InetSocketAddress inet, Configuration clientConfiguration,
List jarFiles, List globalClasspaths) {
this.clientConfiguration = clientConfiguration;
this.jarFiles = jarFiles;
+ for (URL jarFileUrl : jarFiles) {
+ try {
+ JobWithJars.checkJarFile(jarFileUrl);
+ } catch (IOException e) {
+ throw new RuntimeException("Problem with jar file " + jarFileUrl, e);
+ }
+ }
this.globalClasspaths = globalClasspaths;
-
clientConfiguration.setString(JobManagerOptions.ADDRESS, inet.getHostName());
clientConfiguration.setInteger(JobManagerOptions.PORT, inet.getPort());
}
@@ -143,6 +156,16 @@ public int getDefaultParallelism() {
// Startup & Shutdown
// ------------------------------------------------------------------------
+ @Override
+ public void setPrintStatusDuringExecution(boolean printStatus) {
+ printUpdatesToSysout = printStatus;
+ }
+
+ @Override
+ public boolean isPrintingStatusDuringExecution() {
+ return printUpdatesToSysout;
+ }
+
@Override
public void start() throws Exception {
synchronized (lock) {
@@ -174,7 +197,6 @@ public boolean isRunning() {
// ------------------------------------------------------------------------
// Executing programs
// ------------------------------------------------------------------------
-
@Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
if (plan == null) {
@@ -185,6 +207,39 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
return executePlanWithJars(p);
}
+ // ------------------------------------------------------------------------
+ // Executing streamGraph
+ // ------------------------------------------------------------------------
+ @Override
+ public JobExecutionResult executeStreamGraph(StreamGraph streamGraph) throws Exception {
+ ClassLoader userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,
+ getClass().getClassLoader());
+
+ synchronized (this.lock) {
+ // check if we start a session dedicated for this execution
+ final boolean shutDownAtEnd;
+
+ if (client == null) {
+ shutDownAtEnd = true;
+ // start the executor for us
+ start();
+ }
+ else {
+ // we use the existing session
+ shutDownAtEnd = false;
+ }
+
+ try {
+ return client.run(streamGraph, jarFiles, globalClasspaths, userCodeClassLoader).getJobExecutionResult();
+ }
+ finally {
+ if (shutDownAtEnd) {
+ stop();
+ }
+ }
+ }
+ }
+
public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception {
if (program == null) {
throw new IllegalArgumentException("The job may not be null.");
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 7e478256fd58a..019d737379208 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -108,9 +108,11 @@ public SavepointRestoreSettings getSavepointRestoreSettings() {
static void setAsContext(ContextEnvironmentFactory factory) {
initializeContextEnvironment(factory);
+ StreamContextEnvironment.setAsContext(new StreamContextEnvironmentFactory(factory));
}
static void unsetContext() {
resetContextEnvironment();
+ StreamContextEnvironment.unsetContext();
}
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index 620925429bcb1..3c0635bced427 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -64,7 +64,7 @@ public ContextEnvironmentFactory(ClusterClient client, List jarFilesToAttac
@Override
public ExecutionEnvironment createExecutionEnvironment() {
if (isDetached && lastEnvCreated != null) {
- throw new InvalidProgramException("Multiple enviornments cannot be created in detached mode");
+ throw new InvalidProgramException("Multiple environments cannot be created in detached mode");
}
lastEnvCreated = isDetached
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
index faacd9fe87b51..a48de3db8382b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
@@ -37,8 +37,11 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
private FlinkPlan optimizerPlan;
+ private StreamPlanEnvironment streamPlanEnvironment;
+
public OptimizerPlanEnvironment(Optimizer compiler) {
this.compiler = compiler;
+ streamPlanEnvironment = new StreamPlanEnvironment(this);
}
// ------------------------------------------------------------------------
@@ -118,10 +121,12 @@ public ExecutionEnvironment createExecutionEnvironment() {
}
};
initializeContextEnvironment(factory);
+ streamPlanEnvironment.setAsContext();
}
private void unsetAsContext() {
resetContextEnvironment();
+ streamPlanEnvironment.unsetAsContext();
}
// ------------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
index 271864feb60b5..9050ad20af4fd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
@@ -38,6 +38,12 @@ public final class PreviewPlanEnvironment extends ExecutionEnvironment {
Plan plan;
+ StreamPlanEnvironment streamPlanEnvironment;
+
+ public PreviewPlanEnvironment() {
+ this.streamPlanEnvironment = new StreamPlanEnvironment(this);
+ }
+
@Override
public JobExecutionResult execute(String jobName) throws Exception {
this.plan = createProgramPlan(jobName);
@@ -68,10 +74,12 @@ public ExecutionEnvironment createExecutionEnvironment() {
}
};
initializeContextEnvironment(factory);
+ streamPlanEnvironment.setAsContext();
}
public void unsetAsContext() {
resetContextEnvironment();
+ streamPlanEnvironment.unsetAsContext();
}
public void setPreview(String preview) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
similarity index 89%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
rename to flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index 010628f81611e..f2d473bb9041f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.api.environment;
+package org.apache.flink.client.program;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.client.program.DetachedEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.Preconditions;
@@ -67,4 +66,12 @@ public JobExecutionResult execute(String jobName) throws Exception {
.getJobExecutionResult();
}
}
+
+ static void setAsContext(StreamContextEnvironmentFactory factory) {
+ initializeContextEnvironment(factory);
+ }
+
+ static void unsetContext() {
+ resetContextEnvironment();
+ }
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironmentFactory.java
new file mode 100644
index 0000000000000..327e182c95abe
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironmentFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
+
+/**
+ * The factory that create the stream environment to be used when running jobs that are
+ * submitted through a pre-configured client connection.
+ * This happens for example when a job is submitted from the command line.
+ */
+public class StreamContextEnvironmentFactory implements StreamExecutionEnvironmentFactory {
+
+ private ContextEnvironmentFactory contextEnvironmentFactory;
+
+ public StreamContextEnvironmentFactory(ContextEnvironmentFactory contextEnvironmentFactory) {
+ this.contextEnvironmentFactory = contextEnvironmentFactory;
+ }
+
+ @Override
+ public StreamExecutionEnvironment createExecutionEnvironment() {
+ return new StreamContextEnvironment((ContextEnvironment)contextEnvironmentFactory.createExecutionEnvironment());
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java
similarity index 81%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
rename to flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java
index 50f2ed3fe7fb0..e9b56cec99bb7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.api.environment;
+package org.apache.flink.client.program;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.OptimizerPlanEnvironment;
-import org.apache.flink.client.program.PreviewPlanEnvironment;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.streaming.api.graph.StreamGraph;
/**
@@ -71,4 +71,18 @@ public JobExecutionResult execute(String jobName) throws Exception {
throw new OptimizerPlanEnvironment.ProgramAbortException();
}
+
+ public void setAsContext() {
+ StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() {
+ @Override
+ public StreamExecutionEnvironment createExecutionEnvironment() {
+ return StreamPlanEnvironment.this;
+ }
+ };
+ initializeContextEnvironment(factory);
+ }
+
+ public void unsetAsContext() {
+ resetContextEnvironment();
+ }
}
diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml
index 1baf26b342ff9..71403a7b31941 100644
--- a/flink-contrib/flink-storm/pom.xml
+++ b/flink-contrib/flink-storm/pom.xml
@@ -65,7 +65,7 @@ under the License.
org.apache.flink
- flink-streaming-java_${scala.binary.version}
+ flink-clients_${scala.binary.version}
${project.version}
provided
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Executor.java b/flink-core/src/main/java/org/apache/flink/api/common/Executor.java
new file mode 100644
index 0000000000000..69836d391eb42
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Executor.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A Executor execute a Flink program's job.
+ *
+ * The specific implementation (such as the org.apache.flink.client.LocalExecutor
+ * and org.apache.flink.client.RemoteExecutor) determines where and how to run the dataflow.
+ * The concrete implementations of the executors are loaded dynamically, because they depend on
+ * the full set of all runtime classes.
+ */
+@Internal
+public interface Executor {
+
+ // ------------------------------------------------------------------------
+ // Config Options
+ // ------------------------------------------------------------------------
+
+ /**
+ * Sets whether the executor should print progress results to "standard out" ({@link System#out}).
+ * All progress messages are logged using the configured logging framework independent of the value
+ * set here.
+ *
+ * @param printStatus True, to print progress updates to standard out, false to not do that.
+ */
+ void setPrintStatusDuringExecution(boolean printStatus);
+
+ /**
+ * Gets whether the executor prints progress results to "standard out" ({@link System#out}).
+ *
+ * @return True, if the executor prints progress messages to standard out, false if not.
+ */
+ boolean isPrintingStatusDuringExecution();
+
+ /**
+ * Starts the program executor. After the executor has been started, it will keep
+ * running until {@link #stop()} is called.
+ *
+ * @throws Exception Thrown, if the executor startup failed.
+ */
+ void start() throws Exception;
+
+ /**
+ * Shuts down the executor and releases all local resources.
+ *
+ * This method also ends all sessions created by this executor. Remote job executions
+ * may complete, but the session is not kept alive after that.
+ *
+ * @throws Exception Thrown, if the proper shutdown failed.
+ */
+ void stop() throws Exception;
+
+ /**
+ * Checks if this executor is currently running.
+ *
+ * @return True is the executor is running, false otherwise.
+ */
+ boolean isRunning();
+
+ /**
+ * Ends the job session, identified by the given JobID. Jobs can be kept around as sessions,
+ * if a session timeout is specified. Keeping Jobs as sessions allows users to incrementally
+ * add new operations to their dataflow, that refer to previous intermediate results of the
+ * dataflow.
+ *
+ * @param jobID The JobID identifying the job session.
+ * @throws Exception Thrown, if the message to finish the session cannot be delivered.
+ */
+ void endSession(JobID jobID) throws Exception;
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutorFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutorFactory.java
new file mode 100644
index 0000000000000..f0220f77ebaa8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutorFactory.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A ExecutorFactory create the specific implementation of {@link Executor}
+ *
+ */
+@Internal
+public class ExecutorFactory {
+
+ private static final String LOCAL_EXECUTOR_CLASS = "org.apache.flink.client.LocalExecutor";
+ private static final String REMOTE_EXECUTOR_CLASS = "org.apache.flink.client.RemoteExecutor";
+
+ private Class clazz;
+
+ public ExecutorFactory(Class clazz) {
+ this.clazz = clazz;
+ }
+
+ // ------------------------------------------------------------------------
+ // Executor Factories
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates an executor that runs locally in a multi-threaded environment.
+ *
+ * @return A local executor.
+ */
+ public T createLocalExecutor(Configuration configuration) {
+ Class extends T> leClass = loadExecutorClass(LOCAL_EXECUTOR_CLASS);
+
+ try {
+ return leClass.getConstructor(Configuration.class).newInstance(configuration);
+ }
+ catch (Throwable t) {
+ throw new RuntimeException("An error occurred while loading the local executor ("
+ + LOCAL_EXECUTOR_CLASS + ").", t);
+ }
+ }
+
+ /**
+ * Creates an executor that runs on a remote environment. The remote executor is typically used
+ * to send the program to a cluster for execution.
+ *
+ * @param hostname The address of the JobManager to send the program to.
+ * @param port The port of the JobManager to send the program to.
+ * @param clientConfiguration The configuration for the client (Akka, default.parallelism).
+ * @param jarFiles A list of jar files that contain the user-defined function (UDF) classes and all classes used
+ * from within the UDFs.
+ * @param globalClasspaths A list of URLs that are added to the classpath of each user code classloader of the
+ * program. Paths must specify a protocol (e.g. file://) and be accessible on all nodes.
+ * @return A remote executor.
+ */
+ public T createRemoteExecutor(String hostname, int port, Configuration clientConfiguration,
+ List jarFiles, List globalClasspaths) {
+ if (hostname == null) {
+ throw new IllegalArgumentException("The hostname must not be null.");
+ }
+ if (port <= 0 || port > 0xffff) {
+ throw new IllegalArgumentException("The port value is out of range.");
+ }
+
+ Class extends T> reClass = loadExecutorClass(REMOTE_EXECUTOR_CLASS);
+
+ List files = (jarFiles == null) ?
+ Collections.emptyList() : jarFiles;
+ List paths = (globalClasspaths == null) ?
+ Collections.emptyList() : globalClasspaths;
+
+ try {
+ T executor = (clientConfiguration == null) ?
+ reClass.getConstructor(String.class, int.class, List.class)
+ .newInstance(hostname, port, files) :
+ reClass.getConstructor(String.class, int.class, Configuration.class, List.class, List.class)
+ .newInstance(hostname, port, clientConfiguration, files, paths);
+ return executor;
+ }
+ catch (Throwable t) {
+ throw new RuntimeException("An error occurred while loading the remote executor ("
+ + REMOTE_EXECUTOR_CLASS + ").", t);
+ }
+ }
+
+ private Class extends T> loadExecutorClass(String className) {
+ try {
+ Class> leClass = Class.forName(className);
+ return leClass.asSubclass(clazz);
+ }
+ catch (ClassNotFoundException cnfe) {
+ throw new RuntimeException("Could not load the executor class (" + className
+ + "). Do you have the 'flink-clients' project in your dependencies?");
+ }
+ catch (Throwable t) {
+ throw new RuntimeException("An error occurred while loading the executor (" + className + ").", t);
+ }
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
index 87f0e09997d46..0277605565604 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
@@ -19,91 +19,18 @@
package org.apache.flink.api.common;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.Configuration;
-
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
/**
* A PlanExecutor executes a Flink program's dataflow plan. All Flink programs are translated to
* dataflow plans prior to execution.
*
* The specific implementation (such as the org.apache.flink.client.LocalExecutor
- * and org.apache.flink.client.RemoteExecutor) determines where and how to run the dataflow.
- * The concrete implementations of the executors are loaded dynamically, because they depend on
- * the full set of all runtime classes.
+ * and org.apache.flink.client.RemoteExecutor) is created by {@link ExecutorFactory}
*
- * PlanExecutors can be started explicitly, in which case they keep running until stopped. If
- * a program is submitted to a plan executor that is not running, it will start up for that
- * program, and shut down afterwards.
*/
@Internal
-public abstract class PlanExecutor {
-
- private static final String LOCAL_EXECUTOR_CLASS = "org.apache.flink.client.LocalExecutor";
- private static final String REMOTE_EXECUTOR_CLASS = "org.apache.flink.client.RemoteExecutor";
-
- // ------------------------------------------------------------------------
- // Config Options
- // ------------------------------------------------------------------------
-
- /** If true, all execution progress updates are not only logged, but also printed to System.out */
- private boolean printUpdatesToSysout = true;
-
- /**
- * Sets whether the executor should print progress results to "standard out" ({@link System#out}).
- * All progress messages are logged using the configured logging framework independent of the value
- * set here.
- *
- * @param printStatus True, to print progress updates to standard out, false to not do that.
- */
- public void setPrintStatusDuringExecution(boolean printStatus) {
- this.printUpdatesToSysout = printStatus;
- }
-
- /**
- * Gets whether the executor prints progress results to "standard out" ({@link System#out}).
- *
- * @return True, if the executor prints progress messages to standard out, false if not.
- */
- public boolean isPrintingStatusDuringExecution() {
- return this.printUpdatesToSysout;
- }
-
- // ------------------------------------------------------------------------
- // Startup & Shutdown
- // ------------------------------------------------------------------------
-
- /**
- * Starts the program executor. After the executor has been started, it will keep
- * running until {@link #stop()} is called.
- *
- * @throws Exception Thrown, if the executor startup failed.
- */
- public abstract void start() throws Exception;
-
- /**
- * Shuts down the plan executor and releases all local resources.
- *
- * This method also ends all sessions created by this executor. Remote job executions
- * may complete, but the session is not kept alive after that.
- *
- * @throws Exception Thrown, if the proper shutdown failed.
- */
- public abstract void stop() throws Exception;
+public interface PlanExecutor extends Executor {
- /**
- * Checks if this executor is currently running.
- *
- * @return True is the executor is running, false otherwise.
- */
- public abstract boolean isRunning();
-
- // ------------------------------------------------------------------------
- // Program Execution
- // ------------------------------------------------------------------------
-
/**
* Execute the given program.
*
@@ -118,7 +45,7 @@ public boolean isPrintingStatusDuringExecution() {
*
* @throws Exception Thrown, if job submission caused an exception.
*/
- public abstract JobExecutionResult executePlan(Plan plan) throws Exception;
+ JobExecutionResult executePlan(Plan plan) throws Exception;
/**
* Gets the programs execution plan in a JSON format.
@@ -128,94 +55,5 @@ public boolean isPrintingStatusDuringExecution() {
*
* @throws Exception Thrown, if the executor could not connect to the compiler.
*/
- public abstract String getOptimizerPlanAsJSON(Plan plan) throws Exception;
-
- /**
- * Ends the job session, identified by the given JobID. Jobs can be kept around as sessions,
- * if a session timeout is specified. Keeping Jobs as sessions allows users to incrementally
- * add new operations to their dataflow, that refer to previous intermediate results of the
- * dataflow.
- *
- * @param jobID The JobID identifying the job session.
- * @throws Exception Thrown, if the message to finish the session cannot be delivered.
- */
- public abstract void endSession(JobID jobID) throws Exception;
-
- // ------------------------------------------------------------------------
- // Executor Factories
- // ------------------------------------------------------------------------
-
- /**
- * Creates an executor that runs the plan locally in a multi-threaded environment.
- *
- * @return A local executor.
- */
- public static PlanExecutor createLocalExecutor(Configuration configuration) {
- Class extends PlanExecutor> leClass = loadExecutorClass(LOCAL_EXECUTOR_CLASS);
-
- try {
- return leClass.getConstructor(Configuration.class).newInstance(configuration);
- }
- catch (Throwable t) {
- throw new RuntimeException("An error occurred while loading the local executor ("
- + LOCAL_EXECUTOR_CLASS + ").", t);
- }
- }
-
- /**
- * Creates an executor that runs the plan on a remote environment. The remote executor is typically used
- * to send the program to a cluster for execution.
- *
- * @param hostname The address of the JobManager to send the program to.
- * @param port The port of the JobManager to send the program to.
- * @param clientConfiguration The configuration for the client (Akka, default.parallelism).
- * @param jarFiles A list of jar files that contain the user-defined function (UDF) classes and all classes used
- * from within the UDFs.
- * @param globalClasspaths A list of URLs that are added to the classpath of each user code classloader of the
- * program. Paths must specify a protocol (e.g. file://) and be accessible on all nodes.
- * @return A remote executor.
- */
- public static PlanExecutor createRemoteExecutor(String hostname, int port, Configuration clientConfiguration,
- List jarFiles, List globalClasspaths) {
- if (hostname == null) {
- throw new IllegalArgumentException("The hostname must not be null.");
- }
- if (port <= 0 || port > 0xffff) {
- throw new IllegalArgumentException("The port value is out of range.");
- }
-
- Class extends PlanExecutor> reClass = loadExecutorClass(REMOTE_EXECUTOR_CLASS);
-
- List files = (jarFiles == null) ?
- Collections.emptyList() : jarFiles;
- List paths = (globalClasspaths == null) ?
- Collections.emptyList() : globalClasspaths;
-
- try {
- PlanExecutor executor = (clientConfiguration == null) ?
- reClass.getConstructor(String.class, int.class, List.class)
- .newInstance(hostname, port, files) :
- reClass.getConstructor(String.class, int.class, Configuration.class, List.class, List.class)
- .newInstance(hostname, port, clientConfiguration, files, paths);
- return executor;
- }
- catch (Throwable t) {
- throw new RuntimeException("An error occurred while loading the remote executor ("
- + REMOTE_EXECUTOR_CLASS + ").", t);
- }
- }
-
- private static Class extends PlanExecutor> loadExecutorClass(String className) {
- try {
- Class> leClass = Class.forName(className);
- return leClass.asSubclass(PlanExecutor.class);
- }
- catch (ClassNotFoundException cnfe) {
- throw new RuntimeException("Could not load the executor class (" + className
- + "). Do you have the 'flink-clients' project in your dependencies?");
- }
- catch (Throwable t) {
- throw new RuntimeException("An error occurred while loading the executor (" + className + ").", t);
- }
- }
+ String getOptimizerPlanAsJSON(Plan plan) throws Exception;
}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index 0b2567a9f7171..90315d866eed6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -21,6 +21,7 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutorFactory;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
@@ -104,7 +105,7 @@ public String getExecutionPlan() throws Exception {
return executor.getOptimizerPlanAsJSON(p);
}
else {
- PlanExecutor tempExecutor = PlanExecutor.createLocalExecutor(configuration);
+ PlanExecutor tempExecutor = new ExecutorFactory<>(PlanExecutor.class).createLocalExecutor(configuration);
return tempExecutor.getOptimizerPlanAsJSON(p);
}
}
@@ -120,7 +121,7 @@ public void startNewSession() throws Exception {
}
// create a new local executor
- executor = PlanExecutor.createLocalExecutor(configuration);
+ executor = new ExecutorFactory<>(PlanExecutor.class).createLocalExecutor(configuration);
executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
// if we have a session, start the mini cluster eagerly to have it available across sessions
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 34a54babbb545..88c62c37e252c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -20,6 +20,7 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutorFactory;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
@@ -186,7 +187,7 @@ public String getExecutionPlan() throws Exception {
return executor.getOptimizerPlanAsJSON(p);
}
else {
- PlanExecutor le = PlanExecutor.createLocalExecutor(null);
+ PlanExecutor le = new ExecutorFactory<>(PlanExecutor.class).createLocalExecutor(null);
String plan = le.getOptimizerPlanAsJSON(p);
le.stop();
@@ -205,7 +206,7 @@ public void startNewSession() throws Exception {
protected PlanExecutor getExecutor() throws Exception {
if (executor == null) {
- executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration,
+ executor = new ExecutorFactory<>(PlanExecutor.class).createRemoteExecutor(host, port, clientConfiguration,
jarFiles, globalClasspaths);
executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
}
diff --git a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
index b8e1c01a9ae99..5e9944eac1d2b 100644
--- a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
+++ b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -19,6 +19,7 @@
* limitations under the License.
*/
+import org.apache.flink.api.common.ExecutorFactory;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.scala.FlinkILoop;
import org.apache.flink.configuration.Configuration;
@@ -68,7 +69,7 @@ protected PlanExecutor getExecutor() throws Exception {
List allJarFiles = new ArrayList<>(jarFiles);
allJarFiles.add(jarUrl);
- this.executor = PlanExecutor.createRemoteExecutor(
+ this.executor = new ExecutorFactory<>(PlanExecutor.class).createRemoteExecutor(
host,
port,
clientConfiguration,
diff --git a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java
index 22b280f890e7d..06158a6caeb0c 100644
--- a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java
+++ b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java
@@ -25,15 +25,12 @@
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
-import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.MalformedURLException;
import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
/**
* A {@link RemoteStreamEnvironment} for the Scala shell.
@@ -71,26 +68,20 @@ public ScalaShellRemoteStreamEnvironment(
/**
* Executes the remote job.
*
- * @param streamGraph
- * Stream Graph to execute
- * @param jarFiles
- * List of jar file URLs to ship to the cluster
+ * @param jobName
+ * job name
* @return The result of the job execution, containing elapsed time and accumulators.
*/
@Override
- protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException {
+ public JobExecutionResult execute(String jobName) throws Exception {
URL jarUrl;
try {
jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
} catch (MalformedURLException e) {
throw new ProgramInvocationException("Could not write the user code classes to disk.", e);
}
-
- List allJarFiles = new ArrayList<>(jarFiles.size() + 1);
- allJarFiles.addAll(jarFiles);
- allJarFiles.add(jarUrl);
-
- return super.executeRemotely(streamGraph, allJarFiles);
+ addJarFile(jarUrl);
+ return super.execute(jobName);
}
public void setAsContext() {
diff --git a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
index 689381444ba03..38bbe69ac4ec5 100644
--- a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
+++ b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java;
+import org.apache.flink.api.common.ExecutorFactory;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
@@ -35,7 +36,6 @@
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -52,7 +52,7 @@
* Integration tests for {@link FlinkILoop}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest(PlanExecutor.class)
+@PrepareForTest(ExecutorFactory.class)
public class FlinkILoopTest extends TestLogger {
@Test
@@ -63,16 +63,16 @@ public void testConfigurationForwarding() throws Exception {
final TestPlanExecutor testPlanExecutor = new TestPlanExecutor();
- PowerMockito.mockStatic(PlanExecutor.class);
- BDDMockito.given(PlanExecutor.createRemoteExecutor(
+ ExecutorFactory executorFactory = BDDMockito.mock(ExecutorFactory.class);
+ BDDMockito.given(executorFactory.createRemoteExecutor(
Matchers.anyString(),
Matchers.anyInt(),
Matchers.any(Configuration.class),
Matchers.any(java.util.List.class),
Matchers.any(java.util.List.class)
- )).willAnswer(new Answer() {
+ )).willAnswer(new Answer() {
@Override
- public PlanExecutor answer(InvocationOnMock invocation) throws Throwable {
+ public TestPlanExecutor answer(InvocationOnMock invocation) throws Throwable {
testPlanExecutor.setHost((String) invocation.getArguments()[0]);
testPlanExecutor.setPort((Integer) invocation.getArguments()[1]);
testPlanExecutor.setConfiguration((Configuration) invocation.getArguments()[2]);
@@ -118,7 +118,7 @@ public void testConfigurationForwardingStreamEnvironment() {
assertEquals(configuration, forwardedConfiguration);
}
- static class TestPlanExecutor extends PlanExecutor {
+ static class TestPlanExecutor implements PlanExecutor {
private String host;
private int port;
@@ -126,6 +126,16 @@ static class TestPlanExecutor extends PlanExecutor {
private List jars;
private List globalClasspaths;
+ @Override
+ public void setPrintStatusDuringExecution(boolean printStatus) {
+
+ }
+
+ @Override
+ public boolean isPrintingStatusDuringExecution() {
+ return false;
+ }
+
@Override
public void start() throws Exception {
diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
index 69765198fa46d..ee54f0726445c 100644
--- a/flink-streaming-java/pom.xml
+++ b/flink-streaming-java/pom.xml
@@ -52,7 +52,7 @@ under the License.
org.apache.flink
- flink-clients_${scala.binary.version}
+ flink-optimizer_${scala.binary.version}
${project.version}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 6b31ff82d1449..d21219c9167bf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -18,14 +18,11 @@
package org.apache.flink.streaming.api.environment;
import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutorFactory;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
@@ -51,6 +48,8 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
/** The configuration to use for the local cluster. */
private final Configuration conf;
+ private StreamGraphExecutor streamGraphExecutor;
+
/**
* Creates a new local stream environment that uses the default configuration.
*/
@@ -86,30 +85,15 @@ public JobExecutionResult execute(String jobName) throws Exception {
// transform the streaming program into a JobGraph
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
+ transformations.clear();
+ StreamGraphExecutor executor = getStreamExecutor();
+ return executor.executeStreamGraph(streamGraph);
+ }
- JobGraph jobGraph = streamGraph.getJobGraph();
-
- Configuration configuration = new Configuration();
- configuration.addAll(jobGraph.getJobConfiguration());
-
- configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
-
- // add (and override) the settings with what the user defined
- configuration.addAll(this.conf);
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Running job on local embedded Flink mini cluster");
- }
-
- LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true);
- try {
- exec.start();
- return exec.submitJobAndWait(jobGraph, getConfig().isSysoutLoggingEnabled());
- }
- finally {
- transformations.clear();
- exec.stop();
+ private StreamGraphExecutor getStreamExecutor() {
+ if (streamGraphExecutor == null) {
+ streamGraphExecutor = new ExecutorFactory<>(StreamGraphExecutor.class).createLocalExecutor(conf);
}
+ return streamGraphExecutor;
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 74b1c68aa0998..eb6b135893423 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -18,22 +18,17 @@
package org.apache.flink.streaming.api.environment;
import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutorFactory;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
@@ -64,6 +59,9 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
/** The classpaths that need to be attached to each job. */
private final List globalClasspaths;
+ /** The remote executor that runs the streamGraph. */
+ private StreamGraphExecutor remoteExecutor;
+
/**
* Creates a new RemoteStreamEnvironment that points to the master
* (JobManager) described by the given host name and port.
@@ -153,11 +151,8 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig
try {
URL jarFileUrl = new File(jarFile).getAbsoluteFile().toURI().toURL();
this.jarFiles.add(jarFileUrl);
- JobWithJars.checkJarFile(jarFileUrl);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("JAR file path is invalid '" + jarFile + "'", e);
- } catch (IOException e) {
- throw new RuntimeException("Problem with jar file " + jarFile, e);
}
}
if (globalClasspaths == null) {
@@ -169,62 +164,19 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig
}
@Override
- public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
+ public JobExecutionResult execute(String jobName) throws Exception {
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
transformations.clear();
- return executeRemotely(streamGraph, jarFiles);
+ StreamGraphExecutor executor = getStreamExecutor();
+ return executor.executeStreamGraph(streamGraph);
}
- /**
- * Executes the remote job.
- *
- * @param streamGraph
- * Stream Graph to execute
- * @param jarFiles
- * List of jar file URLs to ship to the cluster
- * @return The result of the job execution, containing elapsed time and accumulators.
- */
- protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException {
- if (LOG.isInfoEnabled()) {
- LOG.info("Running remotely at {}:{}", host, port);
- }
-
- ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,
- getClass().getClassLoader());
-
- Configuration configuration = new Configuration();
- configuration.addAll(this.clientConfiguration);
-
- configuration.setString(JobManagerOptions.ADDRESS, host);
- configuration.setInteger(JobManagerOptions.PORT, port);
-
- ClusterClient client;
- try {
- client = new StandaloneClusterClient(configuration);
- client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
- }
- catch (Exception e) {
- throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e);
- }
-
- try {
- return client.run(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader).getJobExecutionResult();
- }
- catch (ProgramInvocationException e) {
- throw e;
- }
- catch (Exception e) {
- String term = e.getMessage() == null ? "." : (": " + e.getMessage());
- throw new ProgramInvocationException("The program execution failed" + term, e);
- }
- finally {
- try {
- client.shutdown();
- } catch (Exception e) {
- LOG.warn("Could not properly shut down the cluster client.", e);
- }
+ private StreamGraphExecutor getStreamExecutor() {
+ if (remoteExecutor == null) {
+ remoteExecutor = new ExecutorFactory<>(StreamGraphExecutor.class).createRemoteExecutor(host, port, clientConfiguration, jarFiles, globalClasspaths);
}
+ return remoteExecutor;
}
@Override
@@ -233,6 +185,14 @@ public String toString() {
+ (getParallelism() == -1 ? "default" : getParallelism()) + ")";
}
+ /**
+ * add a jarFile to jarFiles.
+ * @param jarFile
+ */
+ public void addJarFile(URL jarFile) {
+ this.jarFiles.add(jarFile);
+ }
+
/**
* Gets the hostname of the master (JobManager), where the
* program will be executed.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 46c821edfa2e8..c15a9f22ba6d9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -35,7 +35,6 @@
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -43,9 +42,6 @@
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.client.program.OptimizerPlanEnvironment;
-import org.apache.flink.client.program.PreviewPlanEnvironment;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
@@ -1586,19 +1582,7 @@ public static StreamExecutionEnvironment getExecutionEnvironment() {
if (contextEnvironmentFactory != null) {
return contextEnvironmentFactory.createExecutionEnvironment();
}
-
- // because the streaming project depends on "flink-clients" (and not the other way around)
- // we currently need to intercept the data set environment and create a dependent stream env.
- // this should be fixed once we rework the project dependencies
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- if (env instanceof ContextEnvironment) {
- return new StreamContextEnvironment((ContextEnvironment) env);
- } else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) {
- return new StreamPlanEnvironment(env);
- } else {
- return createLocalEnvironment();
- }
+ return createLocalEnvironment();
}
/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamGraphExecutor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamGraphExecutor.java
new file mode 100644
index 0000000000000..8241c419945cc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamGraphExecutor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.Executor;
+import org.apache.flink.api.common.ExecutorFactory;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+
+/**
+ * A StreamGraphExecutor executes a StreamGraph.
+ *
+ * The specific implementation (such as the org.apache.flink.client.LocalExecutor
+ * and org.apache.flink.client.RemoteExecutor) is created by {@link ExecutorFactory}
+ *
+ */
+@Internal
+public interface StreamGraphExecutor extends Executor {
+
+ /**
+ * Execute the given program.
+ *
+ * If the executor has not been started before, then this method will start the
+ * executor and stop it after the execution has completed. This implies that one needs
+ * to explicitly start the executor for all programs where multiple dataflow parts
+ * depend on each other. Otherwise, the previous parts will no longer
+ * be available, because the executor immediately shut down after the execution.
+ *
+ * @param streamGraph The streamGraph to execute.
+ * @return The execution result, containing for example the net runtime of the program, and the accumulators.
+ *
+ * @throws Exception Thrown, if job submission caused an exception.
+ */
+ JobExecutionResult executeStreamGraph(StreamGraph streamGraph) throws Exception;
+}
From 505032128ff31f07e8553cb556f0aa572f0f7732 Mon Sep 17 00:00:00 2001
From: "pingyong.xpy"
Date: Tue, 11 Jul 2017 14:37:31 +0800
Subject: [PATCH 2/3] fix compile and UT error
---
flink-contrib/flink-connector-wikiedits/pom.xml | 2 +-
.../flink/api/java/ScalaShellRemoteEnvironment.java | 1 +
.../org/apache/flink/api/java/FlinkILoopTest.java | 11 +++++++----
.../api/environment/LocalStreamEnvironment.java | 9 +++++----
.../api/environment/RemoteStreamEnvironment.java | 1 +
5 files changed, 15 insertions(+), 9 deletions(-)
diff --git a/flink-contrib/flink-connector-wikiedits/pom.xml b/flink-contrib/flink-connector-wikiedits/pom.xml
index 1e5fbfdaf6fa8..a7c07e66a9f53 100644
--- a/flink-contrib/flink-connector-wikiedits/pom.xml
+++ b/flink-contrib/flink-connector-wikiedits/pom.xml
@@ -37,7 +37,7 @@ under the License.
org.apache.flink
- flink-streaming-java_${scala.binary.version}
+ flink-clients_${scala.binary.version}
${project.version}
diff --git a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
index 5e9944eac1d2b..b07f761408a1c 100644
--- a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
+++ b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -95,6 +95,7 @@ public ExecutionEnvironment createExecutionEnvironment() {
}
};
initializeContextEnvironment(factory);
+ ScalaShellRemoteStreamEnvironment.disableAllContextAndOtherEnvironments();
}
public static void resetContextEnvironments() {
diff --git a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
index 38bbe69ac4ec5..314d397faf77d 100644
--- a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
+++ b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
@@ -36,6 +36,7 @@
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -52,7 +53,7 @@
* Integration tests for {@link FlinkILoop}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest(ExecutorFactory.class)
+@PrepareForTest({ExecutorFactory.class, ScalaShellRemoteEnvironment.class})
public class FlinkILoopTest extends TestLogger {
@Test
@@ -63,16 +64,18 @@ public void testConfigurationForwarding() throws Exception {
final TestPlanExecutor testPlanExecutor = new TestPlanExecutor();
- ExecutorFactory executorFactory = BDDMockito.mock(ExecutorFactory.class);
+ ExecutorFactory executorFactory = BDDMockito.mock(ExecutorFactory.class);
+ PowerMockito.mockStatic(ExecutorFactory.class);
+ PowerMockito.whenNew(ExecutorFactory.class).withAnyArguments().thenReturn(executorFactory);
BDDMockito.given(executorFactory.createRemoteExecutor(
Matchers.anyString(),
Matchers.anyInt(),
Matchers.any(Configuration.class),
Matchers.any(java.util.List.class),
Matchers.any(java.util.List.class)
- )).willAnswer(new Answer() {
+ )).willAnswer(new Answer() {
@Override
- public TestPlanExecutor answer(InvocationOnMock invocation) throws Throwable {
+ public PlanExecutor answer(InvocationOnMock invocation) throws Throwable {
testPlanExecutor.setHost((String) invocation.getArguments()[0]);
testPlanExecutor.setPort((Integer) invocation.getArguments()[1]);
testPlanExecutor.setConfiguration((Configuration) invocation.getArguments()[2]);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index d21219c9167bf..7da7d5551fe4c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -48,7 +48,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
/** The configuration to use for the local cluster. */
private final Configuration conf;
- private StreamGraphExecutor streamGraphExecutor;
+ private StreamGraphExecutor localExecutor;
/**
* Creates a new local stream environment that uses the default configuration.
@@ -91,9 +91,10 @@ public JobExecutionResult execute(String jobName) throws Exception {
}
private StreamGraphExecutor getStreamExecutor() {
- if (streamGraphExecutor == null) {
- streamGraphExecutor = new ExecutorFactory<>(StreamGraphExecutor.class).createLocalExecutor(conf);
+ if (localExecutor == null) {
+ localExecutor = new ExecutorFactory<>(StreamGraphExecutor.class).createLocalExecutor(conf);
+ localExecutor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
}
- return streamGraphExecutor;
+ return localExecutor;
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index eb6b135893423..74f9629f8caea 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -175,6 +175,7 @@ public JobExecutionResult execute(String jobName) throws Exception {
private StreamGraphExecutor getStreamExecutor() {
if (remoteExecutor == null) {
remoteExecutor = new ExecutorFactory<>(StreamGraphExecutor.class).createRemoteExecutor(host, port, clientConfiguration, jarFiles, globalClasspaths);
+ remoteExecutor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
}
return remoteExecutor;
}
From de40d4f0fec26c00a3654252181868fd883f6a59 Mon Sep 17 00:00:00 2001
From: "pingyong.xpy"
Date: Tue, 8 Aug 2017 21:30:05 +0800
Subject: [PATCH 3/3] remove flink-optimizer dependency from
flink-streaming-java
---
.../org/apache/flink/client/CliFrontend.java | 4 ++--
.../apache/flink/client/LocalExecutor.java | 4 ++--
.../apache/flink/client/RemoteExecutor.java | 15 ++++--------
.../flink/client/program/ClusterClient.java | 4 ++--
.../client/program/DetachedEnvironment.java | 2 +-
.../program/OptimizerPlanEnvironment.java | 4 ++--
.../program/PreviewPlanEnvironment.java | 2 +-
.../flink-connector-wikiedits/pom.xml | 9 ++++++-
.../apache/flink/api/common}/FlinkPlan.java | 2 +-
.../apache/flink/api/common/PlanExecutor.java | 4 ++--
.../{Executor.java => ProgramExecutor.java} | 4 ++--
...ctory.java => ProgramExecutorFactory.java} | 24 +++++++------------
.../flink/api/java/LocalEnvironment.java | 6 ++---
.../flink/api/java/RemoteEnvironment.java | 6 ++---
.../flink/optimizer/plan/OptimizedPlan.java | 1 +
.../webmonitor/handlers/JarActionHandler.java | 4 ++--
.../api/java/ScalaShellRemoteEnvironment.java | 5 ++--
.../apache/flink/api/java/FlinkILoopTest.java | 21 ++++++++--------
flink-streaming-java/pom.xml | 6 -----
.../environment/LocalStreamEnvironment.java | 4 ++--
.../environment/RemoteStreamEnvironment.java | 4 ++--
.../api/environment/StreamGraphExecutor.java | 8 +++----
.../streaming/api/graph/StreamGraph.java | 1 -
.../streaming/api/graph}/StreamingPlan.java | 9 +++----
24 files changed, 72 insertions(+), 81 deletions(-)
rename {flink-optimizer/src/main/java/org/apache/flink/optimizer/plan => flink-core/src/main/java/org/apache/flink/api/common}/FlinkPlan.java (95%)
rename flink-core/src/main/java/org/apache/flink/api/common/{Executor.java => ProgramExecutor.java} (97%)
rename flink-core/src/main/java/org/apache/flink/api/common/{ExecutorFactory.java => ProgramExecutorFactory.java} (85%)
rename {flink-optimizer/src/main/java/org/apache/flink/optimizer/plan => flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph}/StreamingPlan.java (88%)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index f89b016052d48..9c547bba1f2bd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -50,9 +50,9 @@
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.api.common.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.StreamingPlan;
+import org.apache.flink.streaming.api.graph.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 2829f5fcfa8dc..130e36a263057 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -49,7 +49,7 @@
* the {@link #executeStreamGraph(StreamGraph)} method,
* this executor still start up and shut down again immediately after the program finished.
*
- * To use this executor to execute streamGraphs or many dataflow programs that constitute one job together,
+ *
To use this executor to execute StreamGraphs or many dataflow programs that constitute one job together,
* then this executor needs to be explicitly started, to keep running across several executions.
*/
public class LocalExecutor implements PlanExecutor, StreamGraphExecutor {
@@ -110,7 +110,7 @@ public int getTaskManagerNumSlots() {
@Override
public void setPrintStatusDuringExecution(boolean printStatus) {
- printUpdatesToSysout= printStatus;
+ printUpdatesToSysout = printStatus;
}
@Override
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index a2c5e264efc78..928b8d9fa2181 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -22,6 +22,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.api.common.ProgramExecutor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.StandaloneClusterClient;
@@ -40,10 +41,8 @@
import org.apache.flink.streaming.api.environment.StreamGraphExecutor;
import org.apache.flink.streaming.api.graph.StreamGraph;
-import java.io.IOException;
-
/**
- * The RemoteExecutor is a {@link org.apache.flink.api.common.Executor} that takes the program or streamGraph
+ * The RemoteExecutor is a {@link ProgramExecutor} that takes the program or streamGraph
* and ships it to a remote Flink cluster for execution.
*
* The RemoteExecutor is pointed at the JobManager and gets the program and (if necessary) the
@@ -113,13 +112,6 @@ public RemoteExecutor(InetSocketAddress inet, Configuration clientConfiguration,
List jarFiles, List globalClasspaths) {
this.clientConfiguration = clientConfiguration;
this.jarFiles = jarFiles;
- for (URL jarFileUrl : jarFiles) {
- try {
- JobWithJars.checkJarFile(jarFileUrl);
- } catch (IOException e) {
- throw new RuntimeException("Problem with jar file " + jarFileUrl, e);
- }
- }
this.globalClasspaths = globalClasspaths;
clientConfiguration.setString(JobManagerOptions.ADDRESS, inet.getHostName());
clientConfiguration.setInteger(JobManagerOptions.PORT, inet.getPort());
@@ -212,6 +204,9 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
// ------------------------------------------------------------------------
@Override
public JobExecutionResult executeStreamGraph(StreamGraph streamGraph) throws Exception {
+ for (URL jarFileUrl : jarFiles) {
+ JobWithJars.checkJarFile(jarFileUrl);
+ }
ClassLoader userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,
getClass().getClassLoader());
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 3018a8caf11e6..0cc21f0bb4456 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -30,9 +30,9 @@
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.api.common.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.StreamingPlan;
+import org.apache.flink.streaming.api.graph.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
index 63aa8118a4a62..3f6625b1aaaf7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
@@ -23,7 +23,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
-import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.api.common.FlinkPlan;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.slf4j.Logger;
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
index a48de3db8382b..9c9f1d17cf7d4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
@@ -23,7 +23,7 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.api.common.FlinkPlan;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
@@ -37,7 +37,7 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
private FlinkPlan optimizerPlan;
- private StreamPlanEnvironment streamPlanEnvironment;
+ private final StreamPlanEnvironment streamPlanEnvironment;
public OptimizerPlanEnvironment(Optimizer compiler) {
this.compiler = compiler;
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
index 9050ad20af4fd..bdc28bbc771ad 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
@@ -38,7 +38,7 @@ public final class PreviewPlanEnvironment extends ExecutionEnvironment {
Plan plan;
- StreamPlanEnvironment streamPlanEnvironment;
+ final StreamPlanEnvironment streamPlanEnvironment;
public PreviewPlanEnvironment() {
this.streamPlanEnvironment = new StreamPlanEnvironment(this);
diff --git a/flink-contrib/flink-connector-wikiedits/pom.xml b/flink-contrib/flink-connector-wikiedits/pom.xml
index a7c07e66a9f53..9ca0d1ad89a8f 100644
--- a/flink-contrib/flink-connector-wikiedits/pom.xml
+++ b/flink-contrib/flink-connector-wikiedits/pom.xml
@@ -37,7 +37,7 @@ under the License.
org.apache.flink
- flink-clients_${scala.binary.version}
+ flink-streaming-java_${scala.binary.version}
${project.version}
@@ -46,6 +46,13 @@ under the License.
irclib
1.10
+
+
+ org.apache.flink
+ flink-clients_${scala.binary.version}
+ ${project.version}
+ test
+
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java b/flink-core/src/main/java/org/apache/flink/api/common/FlinkPlan.java
similarity index 95%
rename from flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
rename to flink-core/src/main/java/org/apache/flink/api/common/FlinkPlan.java
index d146c83e35dab..1eb1968dca558 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/FlinkPlan.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.optimizer.plan;
+package org.apache.flink.api.common;
/**
* A common interface for compiled Flink plans for both batch and streaming
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
index 0277605565604..f8498c00514bb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
@@ -25,11 +25,11 @@
* dataflow plans prior to execution.
*
* The specific implementation (such as the org.apache.flink.client.LocalExecutor
- * and org.apache.flink.client.RemoteExecutor) is created by {@link ExecutorFactory}
+ * and org.apache.flink.client.RemoteExecutor) is created by {@link ProgramExecutorFactory}
*
*/
@Internal
-public interface PlanExecutor extends Executor {
+public interface PlanExecutor extends ProgramExecutor {
/**
* Execute the given program.
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Executor.java b/flink-core/src/main/java/org/apache/flink/api/common/ProgramExecutor.java
similarity index 97%
rename from flink-core/src/main/java/org/apache/flink/api/common/Executor.java
rename to flink-core/src/main/java/org/apache/flink/api/common/ProgramExecutor.java
index 69836d391eb42..4c1f04757064d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ProgramExecutor.java
@@ -21,7 +21,7 @@
import org.apache.flink.annotation.Internal;
/**
- * A Executor execute a Flink program's job.
+ * A ProgramExecutor execute a Flink program's job.
*
* The specific implementation (such as the org.apache.flink.client.LocalExecutor
* and org.apache.flink.client.RemoteExecutor) determines where and how to run the dataflow.
@@ -29,7 +29,7 @@
* the full set of all runtime classes.
*/
@Internal
-public interface Executor {
+public interface ProgramExecutor {
// ------------------------------------------------------------------------
// Config Options
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutorFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/ProgramExecutorFactory.java
similarity index 85%
rename from flink-core/src/main/java/org/apache/flink/api/common/ExecutorFactory.java
rename to flink-core/src/main/java/org/apache/flink/api/common/ProgramExecutorFactory.java
index f0220f77ebaa8..f7d85533fb374 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutorFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ProgramExecutorFactory.java
@@ -26,23 +26,17 @@
import java.util.List;
/**
- * A ExecutorFactory create the specific implementation of {@link Executor}
+ * A ProgramExecutorFactory create the specific implementation of {@link ProgramExecutor}
*
*/
@Internal
-public class ExecutorFactory {
+public class ProgramExecutorFactory {
private static final String LOCAL_EXECUTOR_CLASS = "org.apache.flink.client.LocalExecutor";
private static final String REMOTE_EXECUTOR_CLASS = "org.apache.flink.client.RemoteExecutor";
- private Class clazz;
-
- public ExecutorFactory(Class clazz) {
- this.clazz = clazz;
- }
-
// ------------------------------------------------------------------------
- // Executor Factories
+ // ProgramExecutor Factories
// ------------------------------------------------------------------------
/**
@@ -50,8 +44,8 @@ public ExecutorFactory(Class clazz) {
*
* @return A local executor.
*/
- public T createLocalExecutor(Configuration configuration) {
- Class extends T> leClass = loadExecutorClass(LOCAL_EXECUTOR_CLASS);
+ public static T createLocalExecutor(Class clazz, Configuration configuration) {
+ Class extends T> leClass = loadExecutorClass(clazz, LOCAL_EXECUTOR_CLASS);
try {
return leClass.getConstructor(Configuration.class).newInstance(configuration);
@@ -75,7 +69,7 @@ public T createLocalExecutor(Configuration configuration) {
* program. Paths must specify a protocol (e.g. file://) and be accessible on all nodes.
* @return A remote executor.
*/
- public T createRemoteExecutor(String hostname, int port, Configuration clientConfiguration,
+ public static T createRemoteExecutor(Class clazz, String hostname, int port, Configuration clientConfiguration,
List jarFiles, List globalClasspaths) {
if (hostname == null) {
throw new IllegalArgumentException("The hostname must not be null.");
@@ -83,8 +77,8 @@ public T createRemoteExecutor(String hostname, int port, Configuration clientCon
if (port <= 0 || port > 0xffff) {
throw new IllegalArgumentException("The port value is out of range.");
}
-
- Class extends T> reClass = loadExecutorClass(REMOTE_EXECUTOR_CLASS);
+
+ Class extends T> reClass = loadExecutorClass(clazz, REMOTE_EXECUTOR_CLASS);
List files = (jarFiles == null) ?
Collections.emptyList() : jarFiles;
@@ -105,7 +99,7 @@ public T createRemoteExecutor(String hostname, int port, Configuration clientCon
}
}
- private Class extends T> loadExecutorClass(String className) {
+ private static Class extends T> loadExecutorClass(Class clazz, String className) {
try {
Class> leClass = Class.forName(className);
return leClass.asSubclass(clazz);
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index 90315d866eed6..cd5287f4f6c41 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -21,7 +21,7 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.ExecutorFactory;
+import org.apache.flink.api.common.ProgramExecutorFactory;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
@@ -105,7 +105,7 @@ public String getExecutionPlan() throws Exception {
return executor.getOptimizerPlanAsJSON(p);
}
else {
- PlanExecutor tempExecutor = new ExecutorFactory<>(PlanExecutor.class).createLocalExecutor(configuration);
+ PlanExecutor tempExecutor = ProgramExecutorFactory.createLocalExecutor(PlanExecutor.class, configuration);
return tempExecutor.getOptimizerPlanAsJSON(p);
}
}
@@ -121,7 +121,7 @@ public void startNewSession() throws Exception {
}
// create a new local executor
- executor = new ExecutorFactory<>(PlanExecutor.class).createLocalExecutor(configuration);
+ executor = ProgramExecutorFactory.createLocalExecutor(PlanExecutor.class, configuration);
executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
// if we have a session, start the mini cluster eagerly to have it available across sessions
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 88c62c37e252c..53a4431e9beec 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -20,7 +20,7 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.ExecutorFactory;
+import org.apache.flink.api.common.ProgramExecutorFactory;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
@@ -187,7 +187,7 @@ public String getExecutionPlan() throws Exception {
return executor.getOptimizerPlanAsJSON(p);
}
else {
- PlanExecutor le = new ExecutorFactory<>(PlanExecutor.class).createLocalExecutor(null);
+ PlanExecutor le = ProgramExecutorFactory.createLocalExecutor(PlanExecutor.class, null);
String plan = le.getOptimizerPlanAsJSON(p);
le.stop();
@@ -206,7 +206,7 @@ public void startNewSession() throws Exception {
protected PlanExecutor getExecutor() throws Exception {
if (executor == null) {
- executor = new ExecutorFactory<>(PlanExecutor.class).createRemoteExecutor(host, port, clientConfiguration,
+ executor = ProgramExecutorFactory.createRemoteExecutor(PlanExecutor.class, host, port, clientConfiguration,
jarFiles, globalClasspaths);
executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
}
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
index 311c2861e747c..0e009d2d0ceba 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
@@ -20,6 +20,7 @@
import java.util.Collection;
+import org.apache.flink.api.common.FlinkPlan;
import org.apache.flink.api.common.Plan;
import org.apache.flink.util.Visitable;
import org.apache.flink.util.Visitor;
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
index d86a21bec4a49..32217ffa35dcc 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.FlinkPlan;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
@@ -29,12 +30,11 @@
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.streaming.api.graph.StreamingPlan;
import org.apache.flink.util.ExceptionUtils;
import com.fasterxml.jackson.core.JsonGenerator;
diff --git a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
index b07f761408a1c..024f179152a98 100644
--- a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
+++ b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -19,8 +19,8 @@
* limitations under the License.
*/
-import org.apache.flink.api.common.ExecutorFactory;
import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.api.common.ProgramExecutorFactory;
import org.apache.flink.api.scala.FlinkILoop;
import org.apache.flink.configuration.Configuration;
@@ -69,7 +69,8 @@ protected PlanExecutor getExecutor() throws Exception {
List allJarFiles = new ArrayList<>(jarFiles);
allJarFiles.add(jarUrl);
- this.executor = new ExecutorFactory<>(PlanExecutor.class).createRemoteExecutor(
+ this.executor = ProgramExecutorFactory.createRemoteExecutor(
+ PlanExecutor.class,
host,
port,
clientConfiguration,
diff --git a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
index 314d397faf77d..b7f33db19ada1 100644
--- a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
+++ b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
@@ -18,11 +18,11 @@
package org.apache.flink.api.java;
-import org.apache.flink.api.common.ExecutorFactory;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.api.common.ProgramExecutorFactory;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.scala.FlinkILoop;
import org.apache.flink.configuration.Configuration;
@@ -53,7 +53,7 @@
* Integration tests for {@link FlinkILoop}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorFactory.class, ScalaShellRemoteEnvironment.class})
+@PrepareForTest({ProgramExecutorFactory.class, ScalaShellRemoteEnvironment.class})
public class FlinkILoopTest extends TestLogger {
@Test
@@ -64,10 +64,9 @@ public void testConfigurationForwarding() throws Exception {
final TestPlanExecutor testPlanExecutor = new TestPlanExecutor();
- ExecutorFactory executorFactory = BDDMockito.mock(ExecutorFactory.class);
- PowerMockito.mockStatic(ExecutorFactory.class);
- PowerMockito.whenNew(ExecutorFactory.class).withAnyArguments().thenReturn(executorFactory);
- BDDMockito.given(executorFactory.createRemoteExecutor(
+ PowerMockito.mockStatic(ProgramExecutorFactory.class);
+ BDDMockito.given(ProgramExecutorFactory.createRemoteExecutor(
+ Matchers.any(Class.class),
Matchers.anyString(),
Matchers.anyInt(),
Matchers.any(Configuration.class),
@@ -76,11 +75,11 @@ public void testConfigurationForwarding() throws Exception {
)).willAnswer(new Answer() {
@Override
public PlanExecutor answer(InvocationOnMock invocation) throws Throwable {
- testPlanExecutor.setHost((String) invocation.getArguments()[0]);
- testPlanExecutor.setPort((Integer) invocation.getArguments()[1]);
- testPlanExecutor.setConfiguration((Configuration) invocation.getArguments()[2]);
- testPlanExecutor.setJars((List) invocation.getArguments()[3]);
- testPlanExecutor.setGlobalClasspaths((List) invocation.getArguments()[4]);
+ testPlanExecutor.setHost((String) invocation.getArguments()[1]);
+ testPlanExecutor.setPort((Integer) invocation.getArguments()[2]);
+ testPlanExecutor.setConfiguration((Configuration) invocation.getArguments()[3]);
+ testPlanExecutor.setJars((List) invocation.getArguments()[4]);
+ testPlanExecutor.setGlobalClasspaths((List) invocation.getArguments()[5]);
return testPlanExecutor;
}
diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
index ee54f0726445c..a6b742393c767 100644
--- a/flink-streaming-java/pom.xml
+++ b/flink-streaming-java/pom.xml
@@ -50,12 +50,6 @@ under the License.
${project.version}
-
- org.apache.flink
- flink-optimizer_${scala.binary.version}
- ${project.version}
-
-
org.apache.commons
commons-math3
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 7da7d5551fe4c..a4644f3ab78ea 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.api.environment;
import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.ExecutorFactory;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.ProgramExecutorFactory;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -92,7 +92,7 @@ public JobExecutionResult execute(String jobName) throws Exception {
private StreamGraphExecutor getStreamExecutor() {
if (localExecutor == null) {
- localExecutor = new ExecutorFactory<>(StreamGraphExecutor.class).createLocalExecutor(conf);
+ localExecutor = ProgramExecutorFactory.createLocalExecutor(StreamGraphExecutor.class, conf);
localExecutor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
}
return localExecutor;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 74f9629f8caea..03a350dcce36c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.api.environment;
import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.ExecutorFactory;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.ProgramExecutorFactory;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -174,7 +174,7 @@ public JobExecutionResult execute(String jobName) throws Exception {
private StreamGraphExecutor getStreamExecutor() {
if (remoteExecutor == null) {
- remoteExecutor = new ExecutorFactory<>(StreamGraphExecutor.class).createRemoteExecutor(host, port, clientConfiguration, jarFiles, globalClasspaths);
+ remoteExecutor = ProgramExecutorFactory.createRemoteExecutor(StreamGraphExecutor.class, host, port, clientConfiguration, jarFiles, globalClasspaths);
remoteExecutor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
}
return remoteExecutor;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamGraphExecutor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamGraphExecutor.java
index 8241c419945cc..929a02ace40a6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamGraphExecutor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamGraphExecutor.java
@@ -19,20 +19,20 @@
package org.apache.flink.streaming.api.environment;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.Executor;
-import org.apache.flink.api.common.ExecutorFactory;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.ProgramExecutor;
+import org.apache.flink.api.common.ProgramExecutorFactory;
import org.apache.flink.streaming.api.graph.StreamGraph;
/**
* A StreamGraphExecutor executes a StreamGraph.
*
* The specific implementation (such as the org.apache.flink.client.LocalExecutor
- * and org.apache.flink.client.RemoteExecutor) is created by {@link ExecutorFactory}
+ * and org.apache.flink.client.RemoteExecutor) is created by {@link ProgramExecutorFactory}
*
*/
@Internal
-public interface StreamGraphExecutor extends Executor {
+public interface StreamGraphExecutor extends ProgramExecutor {
/**
* Execute the given program.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 20a361e312ef2..f886f9f51835f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -27,7 +27,6 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
-import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.AbstractStateBackend;
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingPlan.java
similarity index 88%
rename from flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingPlan.java
index 880f2e3d5db41..120a48849b173 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingPlan.java
@@ -16,16 +16,17 @@
* limitations under the License.
*/
-package org.apache.flink.optimizer.plan;
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.api.common.FlinkPlan;
+import org.apache.flink.runtime.jobgraph.JobGraph;
import java.io.File;
import java.io.IOException;
-import org.apache.flink.runtime.jobgraph.JobGraph;
/**
- * Abstract class representing Flink Streaming plans
- *
+ * Abstract class representing Flink Streaming plans.
*/
public abstract class StreamingPlan implements FlinkPlan {