Skip to content

Commit

Permalink
AMBARI-171. Agents retry failed actions for a configurable number of …
Browse files Browse the repository at this point in the history
…times after a configurable delay.

git-svn-id: https://svn.apache.org/repos/asf/incubator/ambari/trunk@1221997 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Devaraj Das committed Dec 22, 2011
1 parent 2d5ca40 commit e1eeeab
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 13 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Expand Up @@ -2,6 +2,9 @@ Ambari Change log

Release 0.1.0 - unreleased

AMBARI-171. Agents retry failed actions for a configurable number of times
after a configurable delay (ddas)

AMBARI-170. Update the cluster state after state machine transitions it to final ACTIVE/INACTIVE state (vgogate)

AMBARI-168. trim the white spaces from host names returned through getHostnamesFromRageExpressions (vgogate)
Expand Down
42 changes: 36 additions & 6 deletions agent/src/main/python/ambari_agent/ActionQueue.py
Expand Up @@ -49,6 +49,8 @@ def __init__(self, config):
self.config = config
self.sh = shellRunner()
self._stop = threading.Event()
self.maxRetries = config.get('command', 'maxretries')
self.sleepInterval = config.get('command', 'sleepBetweenRetries')

def stop(self):
self._stop.set()
Expand Down Expand Up @@ -97,13 +99,41 @@ def run(self):
'INSTALL_AND_CONFIG_ACTION' : self.installAndConfigAction,
'NO_OP_ACTION' : self.noOpAction
}
try:
result = switches.get(action['kind'], self.unknownAction)(action)
except Exception, err:
traceback.print_exc()
logger.info(err)

exitCode = 1
retryCount = 1
while (exitCode != 0 and retryCount <= self.maxRetries):
try:
result = switches.get(action['kind'], self.unknownAction)(action)
if ('commandResult' in result):
commandResult = result['commandResult']
exitCode = commandResult['exitCode']
if (exitCode == 0):
break
else:
logger.warn(str(action) + " exited with code " + str(exitCode))
else:
#Really, no commandResult? Is this possible?
#TODO: check
exitCode = 0
break
except Exception, err:
traceback.print_exc()
logger.warn(err)
if ('commandResult' in result):
commandResult = result['commandResult']
if ('exitCode' in commandResult):
exitCode = commandResult['exitCode']
#retry in 5 seconds
time.sleep(self.sleepInterval)
retryCount += 1

if (exitCode != 0):
result = self.genResult(action)
result['exitCode']=1
result['exitCode']=exitCode
result['retryActionCount'] = retryCount - 1
else:
result['retryActionCount'] = retryCount
# Update the result
r.put(result)
if not self.stopped():
Expand Down
4 changes: 4 additions & 0 deletions agent/src/main/python/ambari_agent/AmbariConfig.py
Expand Up @@ -41,6 +41,10 @@
commandpath=/usr/local/bin/puppet apply --modulepath /home/puppet/puppet-ambari/modules
driver=/home/puppet/puppet-ambari/manifests/site.pp
[command]
maxretries=2
sleepBetweenRetries=1
"""
s = StringIO.StringIO(content)
config.readfp(s)
Expand Down
60 changes: 59 additions & 1 deletion agent/src/test/python/TestActionQueue.py
Expand Up @@ -21,7 +21,8 @@
from unittest import TestCase
from ambari_agent.ActionQueue import ActionQueue
from ambari_agent.AmbariConfig import AmbariConfig
import os, errno
from ambari_agent.FileUtil import getFilePath
import os, errno, time

class TestActionQueue(TestCase):
def test_ActionQueueStartStop(self):
Expand All @@ -30,3 +31,60 @@ def test_ActionQueueStartStop(self):
actionQueue.stop()
actionQueue.join()
self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')

def test_RetryAction(self):
action={'id' : 'tttt'}
config = AmbariConfig().getConfig()
actionQueue = ActionQueue(config)
path = actionQueue.getInstallFilename(action['id'])
configFile = {
"data" : "test",
"owner" : os.getuid(),
"group" : os.getgid() ,
"permission" : 0700,
"path" : path,
"umask" : 022
}

#note that the command in the action is just a listing of the path created
#we just want to ensure that 'ls' can run on the data file (in the actual world
#this 'ls' would be a puppet or a chef command that would work on a data
#file
badAction = {
'id' : 'tttt',
'kind' : 'INSTALL_AND_CONFIG_ACTION',
'workDirComponent' : 'abc-hdfs',
'file' : configFile,
'clusterDefinitionRevision' : 12,
'command' : ['/bin/ls',"/foo/bar/badPath1234"]
}
path=getFilePath(action,path)
goodAction = {
'id' : 'tttt',
'kind' : 'INSTALL_AND_CONFIG_ACTION',
'workDirComponent' : 'abc-hdfs',
'file' : configFile,
'clusterDefinitionRevision' : 12,
'command' : ['/bin/ls',path]
}
actionQueue.start()
response = {'actions' : [badAction,goodAction]}
actionQueue.maxRetries = 2
actionQueue.sleepInterval = 1
result = actionQueue.put(response)
time.sleep(5)
actionQueue.stop()
actionQueue.join()
results = actionQueue.result()
self.assertEqual(len(results), 2, 'Number of results is not 2.')
result = results[0]
maxretries = config.get('command', 'maxretries')
self.assertEqual(int(result['retryActionCount']),
int(maxretries),
"Number of retries is %d and not %d" %
(int(result['retryActionCount']), int(str(maxretries))))
result = results[1]
self.assertEqual(int(result['retryActionCount']),
1,
"Number of retries is %d and not %d" %
(int(result['retryActionCount']), 1))
Expand Up @@ -61,10 +61,10 @@ public class HeartbeatHandler {
private static Log LOG = LogFactory.getLog(HeartbeatHandler.class);
private final Clusters clusters;
private final Nodes nodes;
private StateMachineInvokerInterface stateMachineInvoker;
private FSMDriverInterface driver;
private final StateMachineInvokerInterface stateMachineInvoker;
private final FSMDriverInterface driver;

static final String DEFAULT_USER = "hdfs"; //TBD: this needs to come from the stack definition or something
static final String DEFAULT_USER = "hdfs"; //TODO: this needs to come from the stack definition or something (AMBARI-169)

@Inject
HeartbeatHandler(Clusters clusters, Nodes nodes,
Expand Down Expand Up @@ -192,6 +192,8 @@ public ControllerResponse processHeartBeat(HeartBeat heartbeat)
return createResponse(responseId,allActions,heartbeat);
}

//TODO: this should be moved to the ClusterImpl (a dependency graph
//should be created there)
private boolean dependentComponentsActive(ComponentPlugin plugin,
ClusterFSM cluster) throws IOException {
String[] dependents = plugin.getRequiredComponents();
Expand All @@ -216,7 +218,7 @@ private ControllerResponse createResponse(short responseId,
List<Action> allActions, HeartBeat heartbeat) {
ControllerResponse r = new ControllerResponse();
r.setResponseId(responseId);
if (allActions.size() > 0) {//TODO: REMOVE THIS
if (allActions.size() > 0) {//TODO: REMOVE THIS (AMBARI-158)
Action a = new Action();
a.setKind(Kind.NO_OP_ACTION);
allActions.add(a);
Expand Down Expand Up @@ -260,7 +262,7 @@ private void getInstallAndConfigureAction(String script,
List<Action> allActions) {
ConfigFile file = new ConfigFile();
file.setData(script);
file.setOwner(DEFAULT_USER);
file.setOwner(DEFAULT_USER); //TODO (AMBARI-169)

Action action = new Action();
action.setFile(file);
Expand Down Expand Up @@ -468,7 +470,7 @@ private void fillActionDetails(Action action, String clusterId,
action.setClusterDefinitionRevision(clusterDefRev);
action.setComponent(component);
action.setRole(role);
action.setUser(DEFAULT_USER);
action.setUser(DEFAULT_USER); //TODO (AMBARI-169)
action.setCleanUpCommand(new Command("foobar","",new String[]{"foobar"}));//TODO: this needs fixing at some point
String workDir = role.equals(component + "-client") ?
(clusterId + "-client") : (clusterId + "-" + role);
Expand All @@ -481,4 +483,9 @@ private void fillDetailsAndAddAction(Action action, List<Action> allActions,
fillActionDetails(action, clusterId, clusterDefRev, component, role);
addAction(action, allActions);
}

private class ActionTracker {
//tracks all actions based on agent hostnames. When the agent returns a response
//note all the failed actionIDs and resend them
}
}

0 comments on commit e1eeeab

Please sign in to comment.