From 473940eda06e2874d6b2cec5e50ee07d6054f438 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Wed, 26 Aug 2015 16:11:30 +0200 Subject: [PATCH 1/2] [FLINK-2326] Write yarn properties file to temp directory --- docs/setup/config.md | 6 +++ .../org/apache/flink/client/CliFrontend.java | 8 +++- .../flink/client/FlinkYarnSessionCli.java | 42 ++++++++++------- .../CliFrontendAddressConfigurationTest.java | 31 +++++++++++- .../testconfigwithyarn/flink-conf.yaml | 1 - .../flink/configuration/ConfigConstants.java | 9 ++++ .../yarn/AbstractFlinkYarnCluster.java | 47 ++++++++++++++++++- .../runtime/yarn/FlinkYarnClusterStatus.java | 10 ++-- .../apache/flink/yarn/FlinkYarnCluster.java | 8 +++- 9 files changed, 134 insertions(+), 28 deletions(-) diff --git a/docs/setup/config.md b/docs/setup/config.md index 53b9ae0fb82bd..e2ffda6ce4d34 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -351,6 +351,12 @@ to set the JM host:port manually. It is recommended to leave this option at 1. - `yarn.heartbeat-delay` (Default: 5 seconds). Time between heartbeats with the ResourceManager. +- `yarn.properties-file.location` (Default: temp directory). When a Flink job is submitted to YARN, +the JobManager's host and the number of available processing slots is written into a properties file, +so that the Flink client is able to pick those details up. This configuration parameter allows +changing the default location of that file (for example for environments sharing a Flink +installation between users) + ## Background ### Configuring the Network Buffers diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 9ef2d5f432ea9..ea1a6e933a91f 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -103,7 +103,7 @@ public class CliFrontend { private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf"; // YARN-session related constants - public static final String YARN_PROPERTIES_FILE = ".yarn-properties"; + public static final String YARN_PROPERTIES_FILE = ".yarn-properties-"; public static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager"; public static final String YARN_PROPERTIES_PARALLELISM = "parallelism"; public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString"; @@ -162,7 +162,11 @@ public CliFrontend(String configDir) throws Exception { this.config = GlobalConfiguration.getConfiguration(); // load the YARN properties - File propertiesFile = new File(configDirectory, YARN_PROPERTIES_FILE); + String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir"); + String currentUser = System.getProperty("user.name"); + String propertiesFileLocation = config.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); + + File propertiesFile = new File(propertiesFileLocation, CliFrontend.YARN_PROPERTIES_FILE + currentUser); if (propertiesFile.exists()) { logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath()); diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java index 83993f2b28a8b..ad7c6679c8b6d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java +++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java @@ -24,6 +24,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.commons.lang3.StringUtils; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; @@ -145,13 +146,13 @@ public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) { flinkYarnClient.setConfigurationFilePath(confPath); - List shipFiles = new ArrayList(); + List shipFiles = new ArrayList<>(); // path to directory to ship if (cmd.hasOption(SHIP_PATH.getOpt())) { String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt()); File shipDir = new File(shipPath); if (shipDir.isDirectory()) { - shipFiles = new ArrayList(Arrays.asList(shipDir.listFiles(new FilenameFilter() { + shipFiles = new ArrayList<>(Arrays.asList(shipDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { return !(name.equals(".") || name.equals("..")); @@ -215,8 +216,7 @@ public boolean accept(File dir, String name) { flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded); if (cmd.hasOption(DETACHED.getOpt())) { - detachedMode = true; - flinkYarnClient.setDetachedMode(detachedMode); + flinkYarnClient.setDetachedMode(true); } if (cmd.hasOption(STREAMING.getOpt())) { @@ -254,7 +254,7 @@ private void printUsage() { } public static AbstractFlinkYarnClient getFlinkYarnClient() { - AbstractFlinkYarnClient yarnClient = null; + AbstractFlinkYarnClient yarnClient; try { Class yarnClientClass = Class.forName("org.apache.flink.yarn.FlinkYarnClient").asSubclass(AbstractFlinkYarnClient.class); @@ -288,20 +288,21 @@ public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) { int numTaskmanagers = 0; try { BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + label: while (true) { // ------------------ check if there are updates by the cluster ----------- FlinkYarnClusterStatus status = yarnCluster.getClusterStatus(); if (status != null && numTaskmanagers != status.getNumberOfTaskManagers()) { System.err.println("Number of connected TaskManagers changed to " + - status.getNumberOfTaskManagers() + ". Slots available: " + status.getNumberOfSlots()); + status.getNumberOfTaskManagers() + ". Slots available: " + status.getNumberOfSlots()); numTaskmanagers = status.getNumberOfTaskManagers(); } List messages = yarnCluster.getNewMessages(); if (messages != null && messages.size() > 0) { System.err.println("New messages from the YARN cluster: "); - for(String msg : messages) { + for (String msg : messages) { System.err.println(msg); } } @@ -321,12 +322,17 @@ public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) { if (in.ready()) { String command = in.readLine(); - if (command.equals("quit") || command.equals("stop")) { - break; // leave loop, cli will stop cluster. - } else if (command.equals("help")) { - System.err.println(HELP); - } else { - System.err.println("Unknown command '"+command+"'. Showing help: \n"+HELP); + switch (command) { + case "quit": + case "stop": + break label; + + case "help": + System.err.println(HELP); + break; + default: + System.err.println("Unknown command '" + command + "'. Showing help: \n" + HELP); + break; } } if (yarnCluster.hasBeenStopped()) { @@ -380,7 +386,7 @@ public int run(String[] args) { // Query cluster for metrics if (cmd.hasOption(QUERY.getOpt())) { AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient(); - String description = null; + String description; try { description = flinkYarnClient.getClusterDescription(); } catch (Exception e) { @@ -415,8 +421,12 @@ public int run(String[] args) { System.out.println("Flink JobManager is now running on " + jobManagerAddress); System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL()); // file that we write into the conf/ dir containing the jobManager address and the dop. - String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv(); - File yarnPropertiesFile = new File(confDirPath + File.separator + CliFrontend.YARN_PROPERTIES_FILE); + + String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir"); + String currentUser = System.getProperty("user.name"); + String propertiesFileLocation = yarnCluster.getFlinkConfiguration().getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); + + File yarnPropertiesFile = new File(propertiesFileLocation + File.separator + CliFrontend.YARN_PROPERTIES_FILE + currentUser); Properties yarnProps = new Properties(); yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress); diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java index 7b0dd2b94c553..2d413744debe9 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java @@ -24,19 +24,27 @@ import static org.mockito.Mockito.*; +import java.io.File; import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; import org.apache.flink.client.cli.CommandLineOptions; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; /** * Tests that verify that the CLI client picks up the correct address for the JobManager * from configuration and configs. */ public class CliFrontendAddressConfigurationTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); @BeforeClass public static void init() { @@ -101,11 +109,30 @@ public void testValidConfig() { fail(e.getMessage()); } } - + + /** + * Test that the CliFrontent is able to pick up the .yarn-properties file from a specified location. + */ @Test public void testYarnConfig() { try { - CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDirWithYarnFile()); + File tmpFolder = folder.newFolder(); + String currentUser = System.getProperty("user.name"); + + // copy reference flink-conf.yaml to temporary test directory and append custom configuration path. + File confFile = new File(CliFrontendRunTest.class.getResource("/testconfigwithyarn/flink-conf.yaml").getFile()); + File testConfFile = new File(tmpFolder, "flink-conf.yaml"); + org.apache.commons.io.FileUtils.copyFile(confFile, testConfFile); + String toAppend = "\nyarn.properties-file.location: " + tmpFolder; + // append to flink-conf.yaml + Files.write(testConfFile.toPath(), toAppend.getBytes(), StandardOpenOption.APPEND); + // copy .yarn-properties- + File propertiesFile = new File(CliFrontendRunTest.class.getResource("/testconfigwithyarn/.yarn-properties").getFile()); + File testPropertiesFile = new File(tmpFolder, ".yarn-properties-"+currentUser); + org.apache.commons.io.FileUtils.copyFile(propertiesFile, testPropertiesFile); + + // start CLI Frontend + CliFrontend frontend = new CliFrontend(tmpFolder.getAbsolutePath()); CommandLineOptions options = mock(CommandLineOptions.class); InetSocketAddress address = frontend.getJobManagerAddress(options); diff --git a/flink-clients/src/test/resources/testconfigwithyarn/flink-conf.yaml b/flink-clients/src/test/resources/testconfigwithyarn/flink-conf.yaml index 084c71e8c116a..9e5de340cdb9f 100644 --- a/flink-clients/src/test/resources/testconfigwithyarn/flink-conf.yaml +++ b/flink-clients/src/test/resources/testconfigwithyarn/flink-conf.yaml @@ -23,4 +23,3 @@ jobmanager.rpc.address: 192.168.1.33 jobmanager.rpc.port: 55443 - diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index d145eb24e0abb..28fed301e8948 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -249,6 +249,15 @@ public final class ConfigConstants { */ public static final String YARN_HEARTBEAT_DELAY_SECONDS = "yarn.heartbeat-delay"; + /** + * When a Flink job is submitted to YARN, the JobManager's host and the number of available + * processing slots is written into a properties file, so that the Flink client is able + * to pick those details up. + * This configuration parameter allows changing the default location of that file (for example + * for environments sharing a Flink installation between users) + */ + public static final String YARN_PROPERTIES_FILE_LOCATION = "yarn.properties-file.location"; + // ------------------------ Hadoop Configuration ------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java index c2e897f9a0fa6..9a1e4b1d648a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java @@ -19,23 +19,48 @@ package org.apache.flink.runtime.yarn; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; import java.io.IOException; import java.net.InetSocketAddress; import java.util.List; +/** + * Abstract class for interacting with a running Flink cluster within YARN. + */ public abstract class AbstractFlinkYarnCluster { + /** + * Get hostname and port of the JobManager. + */ public abstract InetSocketAddress getJobManagerAddress(); + /** + * Returns an URL (as a string) to the JobManager web interface, running next to the + * ApplicationMaster and JobManager in a YARN container + */ public abstract String getWebInterfaceURL(); + /** + * Request the YARN cluster to shut down. + * + * @param failApplication If true, the application will be marked as failed in YARN + */ public abstract void shutdown(boolean failApplication); + /** + * Boolean indicating whether the cluster has been stopped already + */ public abstract boolean hasBeenStopped(); + /** + * Returns the latest cluster status, with number of Taskmanagers and slots + */ public abstract FlinkYarnClusterStatus getClusterStatus(); + /** + * Boolean indicating whether the Flink YARN cluster is in an erronous state. + */ public abstract boolean hasFailed(); /** @@ -43,10 +68,24 @@ public abstract class AbstractFlinkYarnCluster { */ public abstract String getDiagnostics(); + /** + * May return new messages from the cluster. + * Messages can be for example about failed containers or container launch requests. + */ public abstract List getNewMessages(); + /** + * Retruns a string representation of the ApplicationID assigned by YARN. + */ public abstract String getApplicationId(); + /** + * Flink's YARN cluster abstraction has two modes for connecting to the YARN AM. + * In the detached mode, the AM is launched and the Flink YARN client is disconnecting + * afterwards. + * In the non-detached mode, it maintains a connection with the AM to control the cluster. + * @return boolean indicating whether the cluster is a detached cluster + */ public abstract boolean isDetached(); /** @@ -69,7 +108,13 @@ public abstract class AbstractFlinkYarnCluster { * Tells the ApplicationMaster to monitor the status of JobId and stop itself once the specified * job has finished. * - * @param jobID + * @param jobID Id of the job */ public abstract void stopAfterJob(JobID jobID); + + /** + * Return the Flink configuration object + * @return The Flink configuration object + */ + public abstract Configuration getFlinkConfiguration(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java index 2aaaaa0a55bcc..7eb3e1f0b4449 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java @@ -20,7 +20,12 @@ import java.io.Serializable; +/** + * Simple status representation of a running YARN cluster. + * It contains the number of available Taskmanagers and processing slots. + */ public class FlinkYarnClusterStatus implements Serializable { + private static final long serialVersionUID = 4230348124179245370L; private int numberOfTaskManagers; private int numberOfSlots; @@ -62,11 +67,8 @@ public boolean equals(Object o) { if (numberOfSlots != that.numberOfSlots) { return false; } - if (numberOfTaskManagers != that.numberOfTaskManagers) { - return false; - } + return numberOfTaskManagers == that.numberOfTaskManagers; - return true; } @Override diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java index 5fa3ac7704ba9..3ac17c064dca1 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java @@ -220,9 +220,8 @@ public void disconnect() { // -------------------------- Interaction with the cluster ------------------------ - /** + /* * This call blocks until the message has been recevied. - * @param jobID */ @Override public void stopAfterJob(JobID jobID) { @@ -234,6 +233,11 @@ public void stopAfterJob(JobID jobID) { } } + @Override + public org.apache.flink.configuration.Configuration getFlinkConfiguration() { + return flinkConfig; + } + @Override public InetSocketAddress getJobManagerAddress() { return jobManagerAddress; From 08c5b11797a91714da14b98b73f1dd8c762460fd Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Thu, 27 Aug 2015 10:39:13 +0200 Subject: [PATCH 2/2] address PR comments --- .../java/org/apache/flink/client/FlinkYarnSessionCli.java | 3 ++- .../apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java | 2 +- .../src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java index ad7c6679c8b6d..66a48341f1811 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java +++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java @@ -216,7 +216,8 @@ public boolean accept(File dir, String name) { flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded); if (cmd.hasOption(DETACHED.getOpt())) { - flinkYarnClient.setDetachedMode(true); + this.detachedMode = true; + flinkYarnClient.setDetachedMode(detachedMode); } if (cmd.hasOption(STREAMING.getOpt())) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java index 9a1e4b1d648a4..3f7889824e196 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java @@ -75,7 +75,7 @@ public abstract class AbstractFlinkYarnCluster { public abstract List getNewMessages(); /** - * Retruns a string representation of the ApplicationID assigned by YARN. + * Returns a string representation of the ApplicationID assigned by YARN. */ public abstract String getApplicationId(); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java index 3ac17c064dca1..56be19800883a 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java @@ -269,10 +269,10 @@ public boolean isDetached() { @Override public FlinkYarnClusterStatus getClusterStatus() { if(!isConnected) { - throw new IllegalStateException("The cluster has been connected to the ApplicationMaster."); + throw new IllegalStateException("The cluster is not connected to the ApplicationMaster."); } if(hasBeenStopped()) { - throw new RuntimeException("The FlinkYarnCluster has alread been stopped"); + throw new RuntimeException("The FlinkYarnCluster has already been stopped"); } Future clusterStatusOption = ask(applicationClient, Messages.LocalGetYarnClusterStatus$.MODULE$, akkaTimeout); Object clusterStatus;