From 547b333203db8e01737a690e06ad5f5663d7faca Mon Sep 17 00:00:00 2001 From: Bill Liu Date: Mon, 20 Feb 2017 19:27:46 -0800 Subject: [PATCH] [FLINK-5668] reduce hdfs dependency at startup time --- .../java/org/apache/flink/yarn/Utils.java | 20 ------------------- .../yarn/YarnApplicationMasterRunner.java | 4 +++- .../flink/yarn/YarnResourceManager.java | 4 ++-- .../flink/yarn/YarnTaskManagerRunner.java | 8 ++++++-- .../yarn/YarnApplicationMasterRunnerTest.java | 3 +-- 5 files changed, 12 insertions(+), 27 deletions(-) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 60f7204152a42..f8056cbe14196 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -27,7 +27,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; @@ -308,7 +307,6 @@ static ContainerLaunchContext createTaskExecutorContext( YarnConfiguration yarnConfig, Map env, ContaineredTaskManagerParameters tmParams, - org.apache.flink.configuration.Configuration taskManagerConfig, String workingDirectory, Class taskManagerMainClass, Logger log) throws Exception { @@ -383,26 +381,8 @@ static ContainerLaunchContext createTaskExecutorContext( registerLocalResource(fs, remoteJarPath, flinkJar); } - // register conf with local fs - LocalResource flinkConf = Records.newRecord(LocalResource.class); - { - // write the TaskManager configuration to a local file - final File taskManagerConfigFile = - new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml"); - log.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath()); - BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile); - - Path homeDirPath = new Path(clientHomeDir); - FileSystem fs = homeDirPath.getFileSystem(yarnConfig); - setupLocalResource(fs, appId, - new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir)); - - log.info("Prepared local resource for modified yaml: {}", flinkConf); - } - Map taskManagerLocalResources = new HashMap<>(); taskManagerLocalResources.put("flink.jar", flinkJar); - taskManagerLocalResources.put("flink-conf.yaml", flinkConf); //To support Yarn Secure Integration Test Scenario if(yarnConfResource != null && krb5ConfResource != null) { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 9d5673cbf23bb..82d922e63c174 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -318,9 +318,11 @@ protected int runApplicationMaster(Configuration config) { config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT); LOG.debug("TaskManager configuration: {}", taskManagerConfig); + taskManagerParameters.taskManagerEnv().putAll(taskManagerConfig.toMap()); + final ContainerLaunchContext taskManagerContext = Utils.createTaskExecutorContext( config, yarnConfig, ENV, - taskManagerParameters, taskManagerConfig, + taskManagerParameters, currDir, getTaskManagerClass(), LOG); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index ab96441bcb01b..6f35d7bb41eac 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -343,10 +343,10 @@ private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration( flinkConfig, "", 0, 1, teRegistrationTimeout); LOG.debug("TaskManager configuration: {}", taskManagerConfig); - + taskManagerParameters.taskManagerEnv().putAll(taskManagerConfig.toMap()); ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext( flinkConfig, yarnConfig, ENV, - taskManagerParameters, taskManagerConfig, + taskManagerParameters, currDir, YarnTaskExecutorRunner.class, LOG); // set a special environment variable to uniquely identify this container diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java index 849a8a64d9450..9e18e040fcb24 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java @@ -54,9 +54,13 @@ public static void runYarnTaskManager(String[] args, final Class entry : System.getenv().entrySet()) { + configuration.setString(entry.getKey(), entry.getValue()); + } + } } catch (Throwable t) { LOG.error(t.getMessage(), t); diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java index f8748967affb5..7ae4e3258c91d 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java @@ -82,12 +82,11 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { .put(FLINK_JAR_PATH, root.toURI().toString()) .build(); ContaineredTaskManagerParameters tmParams = mock(ContaineredTaskManagerParameters.class); - Configuration taskManagerConf = new Configuration(); String workingDirectory = root.getAbsolutePath(); Class taskManagerMainClass = YarnApplicationMasterRunnerTest.class; ContainerLaunchContext ctx = Utils.createTaskExecutorContext(flinkConf, yarnConf, env, tmParams, - taskManagerConf, workingDirectory, taskManagerMainClass, LOG); + workingDirectory, taskManagerMainClass, LOG); assertEquals("file", ctx.getLocalResources().get("flink.jar").getResource().getScheme()); } }