Skip to content

Commit

Permalink
[FLINK-3667] refactor client communication classes
Browse files Browse the repository at this point in the history
- ClusterDescriptor: base interface for cluster deployment descriptors
- ClusterDescriptor: YarnClusterDescriptor

- ClusterClient: base class for ClusterClients, handles lifecycle of cluster
- ClusterClient: shares configuration with the implementations
- ClusterClient: StandaloneClusterClient, YarnClusterClient
- ClusterClient: remove run methods and enable detached mode via flag

- CliFrontend: remove all Yarn specific logic
- CliFrontend: remove all cluster setup logic

- CustomCommandLine: interface for other cluster implementations
- Customcommandline: enables creation of new cluster or resuming from existing

- Yarn: move Yarn classes and functionality to the yarn module (yarn
  properties, yarn interfaces)
- Yarn: improve reliability of cluster startup
- Yarn Tests: only disable parallel execution of ITCases

This closes #1978
  • Loading branch information
mxm committed Jun 17, 2016
1 parent efc344a commit f9b52a3
Show file tree
Hide file tree
Showing 57 changed files with 1,567 additions and 1,348 deletions.
Expand Up @@ -19,19 +19,12 @@
package org.apache.flink.api.avro; package org.apache.flink.api.avro;


import java.io.File; import java.io.File;
import java.net.InetAddress;


import org.apache.flink.api.common.Plan; import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.RemoteExecutor;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.ForkableFlinkMiniCluster;


import org.junit.Assert; import org.junit.Assert;
Expand Down Expand Up @@ -64,10 +57,10 @@ public void testExternalProgram() {
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort()); config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort());


Client client = new Client(config); ClusterClient client = new StandaloneClusterClient(config);


client.setPrintStatusDuringExecution(false); client.setPrintStatusDuringExecution(false);
client.runBlocking(program, 4); client.run(program, 4);


} }
catch (Throwable t) { catch (Throwable t) {
Expand Down

0 comments on commit f9b52a3

Please sign in to comment.