Skip to content

Commit

Permalink
[FLINK-7113] Make ClusterDescriptor independent of cluster size
Browse files Browse the repository at this point in the history
The deploySession method now is given a ClusterSpecification which specifies the
size of the cluster which it is supposed to deploy.

Remove 2 line breaks, unnecessary parameters for YarnTestBase#Runner, add builder for ClusterSpecification

This closes #4271.
  • Loading branch information
tillrohrmann committed Jul 26, 2017
1 parent 5e19a0d commit 7cf997d
Show file tree
Hide file tree
Showing 16 changed files with 540 additions and 297 deletions.
Expand Up @@ -90,7 +90,6 @@
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Date; import java.util.Date;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
Expand Down Expand Up @@ -126,16 +125,27 @@ public class CliFrontend {


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


private static final List<CustomCommandLine> customCommandLine = new LinkedList<>(); private static final List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(3);


static { static {
// Command line interface of the YARN session, with a special initialization here // Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn. // to prefix all options with y/yarn.
// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the // Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
// active CustomCommandLine in order and DefaultCLI isActive always return true. // active CustomCommandLine in order and DefaultCLI isActive always return true.
loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn"); final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnCLI", "y", "yarn"); final String flinkYarnCLI = "org.apache.flink.yarn.cli.FlinkYarnCLI";
customCommandLine.add(new DefaultCLI()); try {
customCommandLines.add(loadCustomCommandLine(flinkYarnSessionCLI, "y", "yarn"));
} catch (Exception e) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}

try {
customCommandLines.add(loadCustomCommandLine(flinkYarnCLI, "y", "yarn"));
} catch (Exception e) {
LOG.warn("Could not load CLI class {}.", flinkYarnCLI, e);
}
customCommandLines.add(new DefaultCLI());
} }


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -1172,7 +1182,7 @@ public static void setJobManagerAddressInConfig(Configuration config, InetSocket
* @return custom command-line which is active (may only be one at a time) * @return custom command-line which is active (may only be one at a time)
*/ */
public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) { public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
for (CustomCommandLine cli : customCommandLine) { for (CustomCommandLine cli : customCommandLines) {
if (cli.isActive(commandLine, config)) { if (cli.isActive(commandLine, config)) {
return cli; return cli;
} }
Expand All @@ -1184,38 +1194,30 @@ public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
* Retrieves the loaded custom command-lines. * Retrieves the loaded custom command-lines.
* @return An unmodifiyable list of loaded custom command-lines. * @return An unmodifiyable list of loaded custom command-lines.
*/ */
public static List<CustomCommandLine> getCustomCommandLineList() { public static List<CustomCommandLine<?>> getCustomCommandLineList() {
return Collections.unmodifiableList(customCommandLine); return Collections.unmodifiableList(customCommandLines);
} }


/** /**
* Loads a class from the classpath that implements the CustomCommandLine interface. * Loads a class from the classpath that implements the CustomCommandLine interface.
* @param className The fully-qualified class name to load. * @param className The fully-qualified class name to load.
* @param params The constructor parameters * @param params The constructor parameters
*/ */
private static void loadCustomCommandLine(String className, Object... params) { private static CustomCommandLine<?> loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException {


try { Class<? extends CustomCommandLine> customCliClass =
Class<? extends CustomCommandLine> customCliClass = Class.forName(className).asSubclass(CustomCommandLine.class);
Class.forName(className).asSubclass(CustomCommandLine.class);

// construct class types from the parameters
Class<?>[] types = new Class<?>[params.length];
for (int i = 0; i < params.length; i++) {
Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
types[i] = params[i].getClass();
}


Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types); // construct class types from the parameters
final CustomCommandLine cli = constructor.newInstance(params); Class<?>[] types = new Class<?>[params.length];
for (int i = 0; i < params.length; i++) {
Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
types[i] = params[i].getClass();
}


customCommandLine.add(cli); Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);


} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException return constructor.newInstance(params);
| InvocationTargetException e) {
LOG.warn("Unable to locate custom CLI class {}. " +
"Flink is not compiled with support for this class.", className, e);
}
} }


} }
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.client.cli; package org.apache.flink.client.cli;


import org.apache.flink.client.ClientUtils; import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.StandaloneClusterDescriptor; import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -83,6 +84,8 @@ public StandaloneClusterClient createCluster(
List<URL> userJarFiles) throws UnsupportedOperationException { List<URL> userJarFiles) throws UnsupportedOperationException {


StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
return descriptor.deploySessionCluster(); ClusterSpecification clusterSpecification = ClusterSpecification.fromConfiguration(config);

return descriptor.deploySessionCluster(clusterSpecification);
} }
} }
Expand Up @@ -42,10 +42,11 @@ public interface ClusterDescriptor<ClientType extends ClusterClient> {


/** /**
* Triggers deployment of a cluster. * Triggers deployment of a cluster.
* @param clusterSpecification Cluster specification defining the cluster to deploy
* @return Client for the cluster * @return Client for the cluster
* @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation * @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
*/ */
ClientType deploySessionCluster() throws UnsupportedOperationException; ClientType deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException;


/** /**
* Deploys a per-job cluster with the given job on the cluster. * Deploys a per-job cluster with the given job on the cluster.
Expand Down
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.deployment;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;

/**
* Description of the cluster to start by the {@link ClusterDescriptor}.
*/
public final class ClusterSpecification {
private final int masterMemoryMB;
private final int taskManagerMemoryMB;
private final int numberTaskManagers;
private final int slotsPerTaskManager;

private ClusterSpecification(int masterMemoryMB, int taskManagerMemoryMB, int numberTaskManagers, int slotsPerTaskManager) {
this.masterMemoryMB = masterMemoryMB;
this.taskManagerMemoryMB = taskManagerMemoryMB;
this.numberTaskManagers = numberTaskManagers;
this.slotsPerTaskManager = slotsPerTaskManager;
}

public int getMasterMemoryMB() {
return masterMemoryMB;
}

public int getTaskManagerMemoryMB() {
return taskManagerMemoryMB;
}

public int getNumberTaskManagers() {
return numberTaskManagers;
}

public int getSlotsPerTaskManager() {
return slotsPerTaskManager;
}

@Override
public String toString() {
return "ClusterSpecification{" +
"masterMemoryMB=" + masterMemoryMB +
", taskManagerMemoryMB=" + taskManagerMemoryMB +
", numberTaskManagers=" + numberTaskManagers +
", slotsPerTaskManager=" + slotsPerTaskManager +
'}';
}

public static ClusterSpecification fromConfiguration(Configuration configuration) {
int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);

int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);

return new ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMb)
.setTaskManagerMemoryMB(taskManagerMemoryMb)
.setNumberTaskManagers(1)
.setSlotsPerTaskManager(slots)
.createClusterSpecification();
}

/**
* Builder for the {@link ClusterSpecification} instance.
*/
public static class ClusterSpecificationBuilder {
private int masterMemoryMB = 768;
private int taskManagerMemoryMB = 768;
private int numberTaskManagers = 1;
private int slotsPerTaskManager = 1;

public ClusterSpecificationBuilder setMasterMemoryMB(int masterMemoryMB) {
this.masterMemoryMB = masterMemoryMB;
return this;
}

public ClusterSpecificationBuilder setTaskManagerMemoryMB(int taskManagerMemoryMB) {
this.taskManagerMemoryMB = taskManagerMemoryMB;
return this;
}

public ClusterSpecificationBuilder setNumberTaskManagers(int numberTaskManagers) {
this.numberTaskManagers = numberTaskManagers;
return this;
}

public ClusterSpecificationBuilder setSlotsPerTaskManager(int slotsPerTaskManager) {
this.slotsPerTaskManager = slotsPerTaskManager;
return this;
}

public ClusterSpecification createClusterSpecification() {
return new ClusterSpecification(
masterMemoryMB,
taskManagerMemoryMB,
numberTaskManagers,
slotsPerTaskManager);
}
}
}
Expand Up @@ -51,7 +51,7 @@ public StandaloneClusterClient retrieve(String applicationID) {
} }


@Override @Override
public StandaloneClusterClient deploySessionCluster() throws UnsupportedOperationException { public StandaloneClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException {
throw new UnsupportedOperationException("Can't deploy a standalone cluster."); throw new UnsupportedOperationException("Can't deploy a standalone cluster.");
} }


Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.yarn.cli.FlinkYarnSessionCli;


import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
Expand Down Expand Up @@ -66,7 +67,7 @@
* Tests that verify that the CLI client picks up the correct address for the JobManager * Tests that verify that the CLI client picks up the correct address for the JobManager
* from configuration and configs. * from configuration and configs.
*/ */
public class CliFrontendYarnAddressConfigurationTest { public class CliFrontendYarnAddressConfigurationTest extends TestLogger {


@Rule @Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder(); public TemporaryFolder temporaryFolder = new TemporaryFolder();
Expand Down Expand Up @@ -378,6 +379,8 @@ protected YarnClient getYarnClient() {
@Override @Override
protected YarnClusterClient createYarnClusterClient( protected YarnClusterClient createYarnClusterClient(
AbstractYarnClusterDescriptor descriptor, AbstractYarnClusterDescriptor descriptor,
int numberTaskManagers,
int slotsPerTaskManager,
YarnClient yarnClient, YarnClient yarnClient,
ApplicationReport report, ApplicationReport report,
Configuration flinkConfiguration, Configuration flinkConfiguration,
Expand Down
Expand Up @@ -21,10 +21,11 @@
import org.apache.flink.client.CliFrontend; import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.RunOptions; import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.yarn.cli.FlinkYarnSessionCli;


import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
Expand All @@ -48,7 +49,7 @@
/** /**
* Tests for the FlinkYarnSessionCli. * Tests for the FlinkYarnSessionCli.
*/ */
public class FlinkYarnSessionCliTest { public class FlinkYarnSessionCliTest extends TestLogger {


@Rule @Rule
public TemporaryFolder tmp = new TemporaryFolder(); public TemporaryFolder tmp = new TemporaryFolder();
Expand All @@ -57,12 +58,11 @@ public class FlinkYarnSessionCliTest {
public void testDynamicProperties() throws Exception { public void testDynamicProperties() throws Exception {


Map<String, String> map = new HashMap<String, String>(System.getenv()); Map<String, String> map = new HashMap<String, String>(System.getenv());
File tmpFolder = tmp.newFolder();
File fakeConf = new File(tmpFolder, "flink-conf.yaml");
fakeConf.createNewFile();
map.put(ConfigConstants.ENV_FLINK_CONF_DIR, tmpFolder.getAbsolutePath());
TestBaseUtils.setEnv(map); TestBaseUtils.setEnv(map);
FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", false); FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
"",
"",
false);
Options options = new Options(); Options options = new Options();
cli.addGeneralOptions(options); cli.addGeneralOptions(options);
cli.addRunOptions(options); cli.addRunOptions(options);
Expand Down Expand Up @@ -96,11 +96,11 @@ public void testNotEnoughTaskSlots() throws Exception {


FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn"); FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");


AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine()); ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(new Configuration(), runOptions.getCommandLine());


// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased. // each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
Assert.assertEquals(4, descriptor.getTaskManagerSlots()); Assert.assertEquals(4, clusterSpecification.getSlotsPerTaskManager());
Assert.assertEquals(2, descriptor.getTaskManagerCount()); Assert.assertEquals(2, clusterSpecification.getNumberTaskManagers());
} }


@Test @Test
Expand All @@ -118,14 +118,19 @@ public void testCorrectSettingOfMaxSlots() throws Exception {
FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn"); FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");


AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine()); AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine());
final ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(new Configuration(), runOptions.getCommandLine());


// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased. // each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
Assert.assertEquals(3, descriptor.getTaskManagerSlots()); Assert.assertEquals(3, clusterSpecification.getSlotsPerTaskManager());
Assert.assertEquals(2, descriptor.getTaskManagerCount()); Assert.assertEquals(2, clusterSpecification.getNumberTaskManagers());


Configuration config = new Configuration(); Configuration config = new Configuration();
CliFrontend.setJobManagerAddressInConfig(config, new InetSocketAddress("localhost", 9000)); CliFrontend.setJobManagerAddressInConfig(config, new InetSocketAddress("localhost", 9000));
ClusterClient client = new TestingYarnClusterClient(descriptor, config); ClusterClient client = new TestingYarnClusterClient(
descriptor,
clusterSpecification.getNumberTaskManagers(),
clusterSpecification.getSlotsPerTaskManager(),
config);
Assert.assertEquals(6, client.getMaxSlots()); Assert.assertEquals(6, client.getMaxSlots());
} }


Expand Down Expand Up @@ -170,8 +175,14 @@ protected AbstractYarnClusterDescriptor getClusterDescriptor() {


private static class TestingYarnClusterClient extends YarnClusterClient { private static class TestingYarnClusterClient extends YarnClusterClient {


public TestingYarnClusterClient(AbstractYarnClusterDescriptor descriptor, Configuration config) throws Exception { public TestingYarnClusterClient(
AbstractYarnClusterDescriptor descriptor,
int numberTaskManagers,
int slotsPerTaskManager,
Configuration config) throws Exception {
super(descriptor, super(descriptor,
numberTaskManagers,
slotsPerTaskManager,
Mockito.mock(YarnClient.class), Mockito.mock(YarnClient.class),
Mockito.mock(ApplicationReport.class), Mockito.mock(ApplicationReport.class),
config, config,
Expand Down

0 comments on commit 7cf997d

Please sign in to comment.