Skip to content

Commit

Permalink
[FLINK-3545][yarn] integrate ResourceManager support
Browse files Browse the repository at this point in the history
  • Loading branch information
mxm committed Mar 29, 2016
1 parent 92ff2b1 commit 4405235
Show file tree
Hide file tree
Showing 27 changed files with 1,886 additions and 1,205 deletions.
Expand Up @@ -339,13 +339,15 @@ public final class ConfigConstants {
* For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
* yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
* in the flink-conf.yaml. * in the flink-conf.yaml.
* @deprecated Please use {@code CONTAINERED_MASTER_ENV_PREFIX}.
*/ */
@Deprecated @Deprecated
public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = "yarn.application-master.env."; public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = "yarn.application-master.env.";


/** /**
* Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this configuration prefix allows * Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this configuration prefix allows
* setting custom environment variables. * setting custom environment variables.
* @deprecated Please use {@code CONTAINERED_TASK_MANAGER_ENV_PREFIX}.
*/ */
@Deprecated @Deprecated
public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env."; public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env.";
Expand Down
Expand Up @@ -20,13 +20,18 @@


import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist; import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
import org.apache.flink.runtime.testutils.TestingResourceManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.SignalHandler;


/** /**
* Yarn application master which starts the {@link TestingYarnJobManager} and the * Yarn application master which starts the {@link TestingYarnJobManager},
* {@link TestingMemoryArchivist}. * {@link TestingResourceManager}, and the {@link TestingMemoryArchivist}.
*/ */
public class TestingApplicationMaster extends ApplicationMasterBase { public class TestingApplicationMaster extends YarnApplicationMasterRunner {

@Override @Override
public Class<? extends JobManager> getJobManagerClass() { public Class<? extends JobManager> getJobManagerClass() {
return TestingYarnJobManager.class; return TestingYarnJobManager.class;
Expand All @@ -37,9 +42,23 @@ public Class<? extends MemoryArchivist> getArchivistClass() {
return TestingMemoryArchivist.class; return TestingMemoryArchivist.class;
} }


@Override
protected Class<? extends TaskManager> getTaskManagerClass() {
return TestingYarnTaskManager.class;
}

@Override
public Class<? extends YarnFlinkResourceManager> getResourceManagerClass() {
return TestingYarnFlinkResourceManager.class;
}

public static void main(String[] args) { public static void main(String[] args) {
TestingApplicationMaster applicationMaster = new TestingApplicationMaster(); EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args);
SignalHandler.register(LOG);


applicationMaster.run(args); // run and exit with the proper return code
int returnCode = new TestingApplicationMaster().run(args);
System.exit(returnCode);
} }

} }
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

/**
* Flink's testing resource manager for Yarn.
*/
public class TestingYarnFlinkResourceManager extends YarnFlinkResourceManager {

public TestingYarnFlinkResourceManager(
Configuration flinkConfig,
YarnConfiguration yarnConfig,
LeaderRetrievalService leaderRetrievalService,
String applicationMasterHostName,
String webInterfaceURL,
ContaineredTaskManagerParameters taskManagerParameters,
ContainerLaunchContext taskManagerLaunchContext,
int yarnHeartbeatIntervalMillis,
int maxFailedContainers,
int numInitialTaskManagers) {

super(
flinkConfig,
yarnConfig,
leaderRetrievalService,
applicationMasterHostName,
webInterfaceURL,
taskManagerParameters,
taskManagerLaunchContext,
yarnHeartbeatIntervalMillis,
maxFailedContainers,
numInitialTaskManagers);
}
}
Expand Up @@ -57,44 +57,52 @@ public void testUberjarLocator() {
@Test @Test
public void testHeapCutoff() { public void testHeapCutoff() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15); conf.setDouble(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 0.15);
conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384); conf.setInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, 384);


Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) ); Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) ); Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );


// test different configuration // test different configuration
Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf)); Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));


conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_MIN, "1000"); conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, "1000");
conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.1"); conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.1");
Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf)); Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));


conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.5"); conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "0.5");
Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf)); Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));


conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "1"); conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));

// test also deprecated keys
conf = new Configuration();
conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15);
conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384);

Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );
} }


@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void illegalArgument() { public void illegalArgument() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "1.1"); conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "1.1");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
} }


@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void illegalArgumentNegative() { public void illegalArgumentNegative() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "-0.01"); conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "-0.01");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
} }


@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void tooMuchCutoff() { public void tooMuchCutoff() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_MIN, "6000"); conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, "6000");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
} }


Expand Down
@@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
Expand Down Expand Up @@ -204,7 +204,9 @@ public void testTaskManagerFailure() {


// test logfile access // test logfile access
String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log"); String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log");
Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster/JobManager (Version")); Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster"));
Assert.assertTrue(logs.contains("Starting JobManager"));
Assert.assertTrue(logs.contains("Starting JobManager Web Frontend"));
} catch(Throwable e) { } catch(Throwable e) {
LOG.warn("Error while running test",e); LOG.warn("Error while running test",e);
Assert.fail(e.getMessage()); Assert.fail(e.getMessage());
Expand All @@ -228,7 +230,7 @@ public void testTaskManagerFailure() {
ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers(); ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
for(Map.Entry<ContainerId, Container> entry : containers.entrySet()) { for(Map.Entry<ContainerId, Container> entry : containers.entrySet()) {
String command = Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands()); String command = Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands());
if(command.contains(YarnTaskManagerRunner.class.getSimpleName())) { if(command.contains(YarnTaskManager.class.getSimpleName())) {
taskManagerContainer = entry.getKey(); taskManagerContainer = entry.getKey();
nodeManager = nm; nodeManager = nm;
nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0); nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0);
Expand Down Expand Up @@ -279,7 +281,7 @@ public void testTaskManagerFailure() {
int killedOff = o.indexOf("Container killed by the ApplicationMaster"); int killedOff = o.indexOf("Container killed by the ApplicationMaster");
if (killedOff != -1) { if (killedOff != -1) {
o = o.substring(killedOff); o = o.substring(killedOff);
ok = o.indexOf("Launching container") > 0; ok = o.indexOf("Launching TaskManager") > 0;
} }
sleep(1000); sleep(1000);
} while(!ok); } while(!ok);
Expand All @@ -304,9 +306,14 @@ public void testTaskManagerFailure() {
LOG.info("Sending stderr content through logger: \n\n{}\n\n", eC); LOG.info("Sending stderr content through logger: \n\n{}\n\n", eC);


// ------ Check if everything happened correctly // ------ Check if everything happened correctly
Assert.assertTrue("Expect to see failed container", eC.contains("New messages from the YARN cluster")); Assert.assertTrue("Expect to see failed container",
Assert.assertTrue("Expect to see failed container", eC.contains("Container killed by the ApplicationMaster")); eC.contains("New messages from the YARN cluster"));
Assert.assertTrue("Expect to see new container started", eC.contains("Launching container") && eC.contains("on host"));
Assert.assertTrue("Expect to see failed container",
eC.contains("Container killed by the ApplicationMaster"));

Assert.assertTrue("Expect to see new container started",
eC.contains("Launching TaskManager") && eC.contains("on host"));


// cleanup auth for the subsequent tests. // cleanup auth for the subsequent tests.
remoteUgi.getTokenIdentifiers().remove(nmIdent); remoteUgi.getTokenIdentifiers().remove(nmIdent);
Expand Down Expand Up @@ -502,7 +509,7 @@ public boolean accept(File dir, String name) {
Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog); Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
content = FileUtils.readFileToString(jobmanagerLog); content = FileUtils.readFileToString(jobmanagerLog);
// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE) // TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE)
String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m"; String expected = "Starting TaskManagers with command: $JAVA_HOME/bin/java -Xms424m -Xmx424m";
Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'", Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'",
content.contains(expected)); content.contains(expected));
expected = " (2/2) (attempt #0) to "; expected = " (2/2) (attempt #0) to ";
Expand Down
Expand Up @@ -20,9 +20,9 @@


import org.apache.flink.client.FlinkYarnSessionCli; import org.apache.flink.client.FlinkYarnSessionCli;
import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;


import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
Expand Down Expand Up @@ -239,14 +239,14 @@ public void testJavaAPI() {
LOG.warn("Failing test", e); LOG.warn("Failing test", e);
Assert.fail("Error while deploying YARN cluster: "+e.getMessage()); Assert.fail("Error while deploying YARN cluster: "+e.getMessage());
} }
FlinkYarnClusterStatus expectedStatus = new FlinkYarnClusterStatus(1, 1); GetClusterStatusResponse expectedStatus = new GetClusterStatusResponse(1, 1);
for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever" for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever"
try { try {
Thread.sleep(1000); Thread.sleep(1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("Interrupted", e); LOG.warn("Interrupted", e);
} }
FlinkYarnClusterStatus status = yarnCluster.getClusterStatus(); GetClusterStatusResponse status = yarnCluster.getClusterStatus();
if(status != null && status.equals(expectedStatus)) { if(status != null && status.equals(expectedStatus)) {
LOG.info("Cluster reached status " + status); LOG.info("Cluster reached status " + status);
break; // all good, cluster started break; // all good, cluster started
Expand Down
Expand Up @@ -78,7 +78,4 @@ class TestingYarnJobManager(
checkpointRecoveryFactory, checkpointRecoveryFactory,
savepointStore, savepointStore,
jobRecoveryTimeout) jobRecoveryTimeout)
with TestingJobManagerLike { with TestingJobManagerLike {}

override val taskManagerRunnerClass = classOf[TestingYarnTaskManagerRunner]
}
Expand Up @@ -18,6 +18,7 @@


package org.apache.flink.yarn package org.apache.flink.yarn


import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.instance.InstanceConnectionInfo import org.apache.flink.runtime.instance.InstanceConnectionInfo
import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.disk.iomanager.IOManager
import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.io.network.NetworkEnvironment
Expand All @@ -32,6 +33,7 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
* instead of an anonymous class with the respective mixin to obtain a more readable logger name. * instead of an anonymous class with the respective mixin to obtain a more readable logger name.
* *
* @param config Configuration object for the actor * @param config Configuration object for the actor
* @param resourceID The Yarn container id
* @param connectionInfo Connection information of this actor * @param connectionInfo Connection information of this actor
* @param memoryManager MemoryManager which is responsibel for Flink's managed memory allocation * @param memoryManager MemoryManager which is responsibel for Flink's managed memory allocation
* @param ioManager IOManager responsible for I/O * @param ioManager IOManager responsible for I/O
Expand All @@ -42,6 +44,7 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
*/ */
class TestingYarnTaskManager( class TestingYarnTaskManager(
config: TaskManagerConfiguration, config: TaskManagerConfiguration,
resourceID: ResourceID,
connectionInfo: InstanceConnectionInfo, connectionInfo: InstanceConnectionInfo,
memoryManager: MemoryManager, memoryManager: MemoryManager,
ioManager: IOManager, ioManager: IOManager,
Expand All @@ -50,10 +53,25 @@ class TestingYarnTaskManager(
leaderRetrievalService: LeaderRetrievalService) leaderRetrievalService: LeaderRetrievalService)
extends YarnTaskManager( extends YarnTaskManager(
config, config,
resourceID,
connectionInfo, connectionInfo,
memoryManager, memoryManager,
ioManager, ioManager,
network, network,
numberOfSlots, numberOfSlots,
leaderRetrievalService) leaderRetrievalService)
with TestingTaskManagerLike {} with TestingTaskManagerLike {

object YarnTaskManager {

/** Entry point (main method) to run the TaskManager on YARN.
* @param args The command line arguments.
*/
def main(args: Array[String]): Unit = {
YarnTaskManagerRunner.runYarnTaskManager(args, classOf[TestingYarnTaskManager])
}

}
}


Expand Up @@ -18,11 +18,11 @@
package org.apache.flink.yarn; package org.apache.flink.yarn;


/** /**
* Default implementation of {@link FlinkYarnClientBase} which starts an {@link ApplicationMaster}. * Default implementation of {@link FlinkYarnClientBase} which starts an {@link YarnApplicationMasterRunner}.
*/ */
public class FlinkYarnClient extends FlinkYarnClientBase { public class FlinkYarnClient extends FlinkYarnClientBase {
@Override @Override
protected Class<?> getApplicationMasterClass() { protected Class<?> getApplicationMasterClass() {
return ApplicationMaster.class; return YarnApplicationMasterRunner.class;
} }
} }

0 comments on commit 4405235

Please sign in to comment.