Skip to content

Commit

Permalink
YARN-2980. Move health check script related functionality to hadoop-c…
Browse files Browse the repository at this point in the history
…ommon (Varun Saxena via aw)
  • Loading branch information
aw-was-here committed Feb 24, 2015
1 parent 73bcfa9 commit d4ac682
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 107 deletions.
Expand Up @@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.util;


import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
Expand All @@ -34,7 +34,6 @@
import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;


/** /**
* *
Expand All @@ -58,14 +57,11 @@ public class NodeHealthScriptRunner extends AbstractService {
/** ShellCommandExecutor used to execute monitoring script */ /** ShellCommandExecutor used to execute monitoring script */
ShellCommandExecutor shexec = null; ShellCommandExecutor shexec = null;


/** Configuration used by the checker */
private Configuration conf;

/** Pattern used for searching in the output of the node health script */ /** Pattern used for searching in the output of the node health script */
static private final String ERROR_PATTERN = "ERROR"; static private final String ERROR_PATTERN = "ERROR";


/** Time out error message */ /** Time out error message */
static final String NODE_HEALTH_SCRIPT_TIMED_OUT_MSG = "Node health script timed out"; public static final String NODE_HEALTH_SCRIPT_TIMED_OUT_MSG = "Node health script timed out";


private boolean isHealthy; private boolean isHealthy;


Expand Down Expand Up @@ -192,29 +188,23 @@ private boolean hasErrors(String output) {
} }
} }


public NodeHealthScriptRunner() { public NodeHealthScriptRunner(String scriptName, long chkInterval, long timeout,
String[] scriptArgs) {
super(NodeHealthScriptRunner.class.getName()); super(NodeHealthScriptRunner.class.getName());
this.lastReportedTime = System.currentTimeMillis(); this.lastReportedTime = System.currentTimeMillis();
this.isHealthy = true; this.isHealthy = true;
this.healthReport = ""; this.healthReport = "";
this.nodeHealthScript = scriptName;
this.intervalTime = chkInterval;
this.scriptTimeout = timeout;
this.timer = new NodeHealthMonitorExecutor(scriptArgs);
} }


/* /*
* Method which initializes the values for the script path and interval time. * Method which initializes the values for the script path and interval time.
*/ */
@Override @Override
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
this.conf = conf;
this.nodeHealthScript =
conf.get(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH);
this.intervalTime = conf.getLong(YarnConfiguration.NM_HEALTH_CHECK_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS);
this.scriptTimeout = conf.getLong(
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS,
YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS);
String[] args = conf.getStrings(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_OPTS,
new String[] {});
timer = new NodeHealthMonitorExecutor(args);
super.serviceInit(conf); super.serviceInit(conf);
} }


Expand All @@ -225,7 +215,7 @@ protected void serviceInit(Configuration conf) throws Exception {
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
// if health script path is not configured don't start the thread. // if health script path is not configured don't start the thread.
if (!shouldRun(conf)) { if (!shouldRun(nodeHealthScript)) {
LOG.info("Not starting node health monitor"); LOG.info("Not starting node health monitor");
return; return;
} }
Expand All @@ -242,7 +232,7 @@ protected void serviceStart() throws Exception {
*/ */
@Override @Override
protected void serviceStop() { protected void serviceStop() {
if (!shouldRun(conf)) { if (!shouldRun(nodeHealthScript)) {
return; return;
} }
if (nodeHealthScriptScheduler != null) { if (nodeHealthScriptScheduler != null) {
Expand Down Expand Up @@ -322,26 +312,25 @@ private synchronized void setLastReportedTime(long lastReportedTime) {
* <li>Node health check script file exists</li> * <li>Node health check script file exists</li>
* </ol> * </ol>
* *
* @param conf
* @return true if node health monitoring service can be started. * @return true if node health monitoring service can be started.
*/ */
public static boolean shouldRun(Configuration conf) { public static boolean shouldRun(String healthScript) {
String nodeHealthScript = if (healthScript == null || healthScript.trim().isEmpty()) {
conf.get(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH);
if (nodeHealthScript == null || nodeHealthScript.trim().isEmpty()) {
return false; return false;
} }
File f = new File(nodeHealthScript); File f = new File(healthScript);
return f.exists() && FileUtil.canExecute(f); return f.exists() && FileUtil.canExecute(f);
} }


private synchronized void setHealthStatus(boolean isHealthy, String output) { private synchronized void setHealthStatus(boolean isHealthy, String output) {
LOG.info("health status being set as " + output);
this.setHealthy(isHealthy); this.setHealthy(isHealthy);
this.setHealthReport(output); this.setHealthReport(output);
} }


private synchronized void setHealthStatus(boolean isHealthy, String output, private synchronized void setHealthStatus(boolean isHealthy, String output,
long time) { long time) {
LOG.info("health status being set as " + output);
this.setHealthStatus(isHealthy, output); this.setHealthStatus(isHealthy, output);
this.setLastReportedTime(time); this.setLastReportedTime(time);
} }
Expand All @@ -350,7 +339,7 @@ private synchronized void setHealthStatus(boolean isHealthy, String output,
* Used only by tests to access the timer task directly * Used only by tests to access the timer task directly
* @return the timer task * @return the timer task
*/ */
TimerTask getTimerTask() { public TimerTask getTimerTask() {
return timer; return timer;
} }
} }
@@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.TimerTask;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestNodeHealthScriptRunner {

protected static File testRootDir = new File("target",
TestNodeHealthScriptRunner.class.getName() +
"-localDir").getAbsoluteFile();

private File nodeHealthscriptFile = new File(testRootDir,
Shell.appendScriptExtension("failingscript"));

@Before
public void setup() {
testRootDir.mkdirs();
}

@After
public void tearDown() throws Exception {
if (testRootDir.exists()) {
FileContext.getLocalFSFileContext().delete(
new Path(testRootDir.getAbsolutePath()), true);
}
}

private void writeNodeHealthScriptFile(String scriptStr, boolean setExecutable)
throws IOException {
PrintWriter pw = null;
try {
FileUtil.setWritable(nodeHealthscriptFile, true);
FileUtil.setReadable(nodeHealthscriptFile, true);
pw = new PrintWriter(new FileOutputStream(nodeHealthscriptFile));
pw.println(scriptStr);
pw.flush();
} finally {
pw.close();
}
FileUtil.setExecutable(nodeHealthscriptFile, setExecutable);
}

@Test
public void testNodeHealthScriptShouldRun() throws IOException {
Assert.assertFalse("Node health script should start",
NodeHealthScriptRunner.shouldRun(
nodeHealthscriptFile.getAbsolutePath()));
writeNodeHealthScriptFile("", false);
// Node health script should not start if the node health script is not
// executable.
Assert.assertFalse("Node health script should start",
NodeHealthScriptRunner.shouldRun(
nodeHealthscriptFile.getAbsolutePath()));
writeNodeHealthScriptFile("", true);
Assert.assertTrue("Node health script should start",
NodeHealthScriptRunner.shouldRun(
nodeHealthscriptFile.getAbsolutePath()));
}

@Test
public void testNodeHealthScript() throws Exception {
String errorScript = "echo ERROR\n echo \"Tracker not healthy\"";
String normalScript = "echo \"I am all fine\"";
String timeOutScript =
Shell.WINDOWS ? "@echo off\nping -n 4 127.0.0.1 >nul\necho \"I am fine\""
: "sleep 4\necho \"I am fine\"";
Configuration conf = new Configuration();
writeNodeHealthScriptFile(normalScript, true);
NodeHealthScriptRunner nodeHealthScriptRunner = new NodeHealthScriptRunner(
nodeHealthscriptFile.getAbsolutePath(),
500, 1000, new String[] {});
nodeHealthScriptRunner.init(conf);
TimerTask timerTask = nodeHealthScriptRunner.getTimerTask();

timerTask.run();
// Normal Script runs successfully
Assert.assertTrue("Node health status reported unhealthy",
nodeHealthScriptRunner.isHealthy());
Assert.assertEquals("", nodeHealthScriptRunner.getHealthReport());

// Error script.
writeNodeHealthScriptFile(errorScript, true);
// Run timer
timerTask.run();
Assert.assertFalse("Node health status reported healthy",
nodeHealthScriptRunner.isHealthy());
Assert.assertTrue(
nodeHealthScriptRunner.getHealthReport().contains("ERROR"));

// Healthy script.
writeNodeHealthScriptFile(normalScript, true);
timerTask.run();
Assert.assertTrue("Node health status reported unhealthy",
nodeHealthScriptRunner.isHealthy());
Assert.assertEquals("", nodeHealthScriptRunner.getHealthReport());

// Timeout script.
writeNodeHealthScriptFile(timeOutScript, true);
timerTask.run();
Assert.assertFalse("Node health status reported healthy even after timeout",
nodeHealthScriptRunner.isHealthy());
Assert.assertEquals(
NodeHealthScriptRunner.NODE_HEALTH_SCRIPT_TIMED_OUT_MSG,
nodeHealthScriptRunner.getHealthReport());
}
}
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -17,6 +17,9 @@ Trunk - Unreleased


YARN-2796. deprecate sbin/yarn-daemon.sh (aw) YARN-2796. deprecate sbin/yarn-daemon.sh (aw)


YARN-2980. Move health check script related functionality to hadoop-common
(Varun Saxena via aw)

OPTIMIZATIONS OPTIMIZATIONS


BUG FIXES BUG FIXES
Expand Down
Expand Up @@ -20,6 +20,8 @@


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.NodeHealthScriptRunner;
import org.apache.hadoop.yarn.conf.YarnConfiguration;


/** /**
* The class which provides functionality of checking the health of the node and * The class which provides functionality of checking the health of the node and
Expand All @@ -33,15 +35,17 @@ public class NodeHealthCheckerService extends CompositeService {


static final String SEPARATOR = ";"; static final String SEPARATOR = ";";


public NodeHealthCheckerService() { public NodeHealthCheckerService(NodeHealthScriptRunner scriptRunner,
LocalDirsHandlerService dirHandlerService) {
super(NodeHealthCheckerService.class.getName()); super(NodeHealthCheckerService.class.getName());
dirsHandler = new LocalDirsHandlerService(); nodeHealthScriptRunner = scriptRunner;
dirsHandler = dirHandlerService;
} }


@Override @Override
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
if (NodeHealthScriptRunner.shouldRun(conf)) { if (NodeHealthScriptRunner.shouldRun(
nodeHealthScriptRunner = new NodeHealthScriptRunner(); conf.get(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH))) {
addService(nodeHealthScriptRunner); addService(nodeHealthScriptRunner);
} }
addService(dirsHandler); addService(dirsHandler);
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.NodeHealthScriptRunner;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
Expand Down Expand Up @@ -181,6 +182,25 @@ private void recoverTokens(NMTokenSecretManagerInNM nmTokenSecretManager,
} }
} }


public static NodeHealthScriptRunner getNodeHealthScriptRunner(Configuration conf) {
String nodeHealthScript =
conf.get(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH);
if(!NodeHealthScriptRunner.shouldRun(nodeHealthScript)) {
LOG.info("Abey khali");
return null;
}
long nmCheckintervalTime = conf.getLong(
YarnConfiguration.NM_HEALTH_CHECK_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS);
long scriptTimeout = conf.getLong(
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS,
YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS);
String[] scriptArgs = conf.getStrings(
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_OPTS, new String[] {});
return new NodeHealthScriptRunner(nodeHealthScript,
nmCheckintervalTime, scriptTimeout, scriptArgs);
}

@Override @Override
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {


Expand Down Expand Up @@ -216,9 +236,11 @@ protected void serviceInit(Configuration conf) throws Exception {
// NodeManager level dispatcher // NodeManager level dispatcher
this.dispatcher = new AsyncDispatcher(); this.dispatcher = new AsyncDispatcher();


nodeHealthChecker = new NodeHealthCheckerService(); dirsHandler = new LocalDirsHandlerService();
nodeHealthChecker =
new NodeHealthCheckerService(
getNodeHealthScriptRunner(conf), dirsHandler);
addService(nodeHealthChecker); addService(nodeHealthChecker);
dirsHandler = nodeHealthChecker.getDiskHandler();


this.context = createNMContext(containerTokenSecretManager, this.context = createNMContext(containerTokenSecretManager,
nmTokenSecretManager, nmStore); nmTokenSecretManager, nmStore);
Expand Down
Expand Up @@ -63,7 +63,7 @@ public class TestEventFlow {
private static File remoteLogDir = new File("target", private static File remoteLogDir = new File("target",
TestEventFlow.class.getName() + "-remoteLogDir").getAbsoluteFile(); TestEventFlow.class.getName() + "-remoteLogDir").getAbsoluteFile();
private static final long SIMULATED_RM_IDENTIFIER = 1234; private static final long SIMULATED_RM_IDENTIFIER = 1234;

@Test @Test
public void testSuccessfulContainerLaunch() throws InterruptedException, public void testSuccessfulContainerLaunch() throws InterruptedException,
IOException, YarnException { IOException, YarnException {
Expand Down Expand Up @@ -98,9 +98,10 @@ public int getHttpPort() {


DeletionService del = new DeletionService(exec); DeletionService del = new DeletionService(exec);
Dispatcher dispatcher = new AsyncDispatcher(); Dispatcher dispatcher = new AsyncDispatcher();
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService(); LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService(
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
healthChecker.init(conf); healthChecker.init(conf);
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
NodeManagerMetrics metrics = NodeManagerMetrics.create(); NodeManagerMetrics metrics = NodeManagerMetrics.create();
NodeStatusUpdater nodeStatusUpdater = NodeStatusUpdater nodeStatusUpdater =
new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics) { new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics) {
Expand Down

0 comments on commit d4ac682

Please sign in to comment.