Skip to content

Commit

Permalink
AMBARI-23206 - Use Agent Command Response To Update Mpack Install Sta…
Browse files Browse the repository at this point in the history
…te (#668)
  • Loading branch information
jonathan-hurley committed Mar 15, 2018
1 parent 757101c commit 239d782
Show file tree
Hide file tree
Showing 24 changed files with 623 additions and 335 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ def __init__(self, repo_object):
for repo_def in repos_def:
self.items.append(CommandRepositoryItem(self, repo_def))

def __str__(self):
inner = []
if self.mpack_id:
inner.append("mpack_id: %s" % str(self.mpack_id))
elif self.mpack_name:
inner.append("mpack_name: %s" % str(self.mpack_name))
elif self.version_string:
inner.append("mpack_version: %s" % str(self.version_string))
return "CommandRepository{%s}" % ", ".join(inner)


class CommandRepositoryItem(object):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,26 @@ def get_config_dir_during_stack_upgrade(self, env, base_dir, conf_select_name):
return os.path.realpath(config_path)
return None


def save_mpack_to_structured_out(self, command_name):
"""
Writes out information about the managment pack which was installed if this is an installation command.
:param command_name: command name
:return: None
"""
is_install_command = command_name is not None and command_name.lower() == "install"
if not is_install_command:
return

command_repository = CommandRepository(self.get_config()['repositoryFile'])

Logger.info("Reporting installation state for {0}".format(command_repository))

self.put_structured_out({"mpackId": command_repository.mpack_id})
self.put_structured_out({"mpackName":command_repository.mpack_name})
self.put_structured_out({"mpackVersion":command_repository.version_string})


def save_component_version_to_structured_out(self, command_name):
"""
Saves the version of the component for this command to the structured out file. If the
Expand All @@ -224,51 +244,16 @@ def save_component_version_to_structured_out(self, command_name):
:param command_name: command name
:return: None
"""
from resource_management.libraries.functions.default import default
from resource_management.libraries.functions import stack_select

repository_resolved = default("repositoryFile/resolved", False)
repository_version = default("repositoryFile/repoVersion", None)
is_install_command = command_name is not None and command_name.lower() == "install"

# start out with no version
component_version = None

# install command + trusted repo means use the repo version and don't consult stack-select
# this is needed in cases where an existing symlink is on the system and stack-select can't
# change it on installation (because it's scared to in order to support parallel installs)
if is_install_command and repository_resolved and repository_version is not None:
Logger.info("The repository with version {0} for this command has been marked as resolved."\
" It will be used to report the version of the component which was installed".format(repository_version))

component_version = repository_version

stack_name = Script.get_stack_name()
stack_select_package_name = stack_select.get_package_name()

if stack_select_package_name and stack_name:
# only query for the component version from stack-select if we can't trust the repository yet
if component_version is None:
component_version = version_select_util.get_component_version_from_symlink(stack_name, stack_select_package_name)

# last ditch effort - should cover the edge case where the package failed to setup its
# link and we have to try to see if <stack-select> can help
if component_version is None:
output, code, versions = stack_select.unsafe_get_stack_versions()
if len(versions) == 1:
component_version = versions[0]
Logger.error("The '{0}' component did not advertise a version. This may indicate a problem with the component packaging. " \
"However, the stack-select tool was able to report a single version installed ({1}). " \
"This is the version that will be reported.".format(stack_select_package_name, component_version))
component_version = version_select_util.get_component_version_from_symlink(stack_name, stack_select_package_name)

if component_version:
self.put_structured_out({"version": component_version})

# if repository_version_id is passed, pass it back with the version
from resource_management.libraries.functions.default import default
repo_version_id = default("/hostLevelParams/repository_version_id", None)
if repo_version_id:
self.put_structured_out({"repository_version_id": repo_version_id})
else:
if not self.is_hook():
Logger.error("The '{0}' component did not advertise a version. This may indicate a problem with the component packaging.".format(stack_select_package_name))
Expand Down Expand Up @@ -384,6 +369,8 @@ def execute(self):
ex.pre_raise()
raise
finally:
self.save_mpack_to_structured_out(self.command_name)

if self.should_expose_component_version(self.command_name):
self.save_component_version_to_structured_out(self.command_name)

Expand Down Expand Up @@ -1018,8 +1005,8 @@ def restart(self, env):
else:
self.post_rolling_restart(env)

if self.should_expose_component_version("restart"):
self.save_component_version_to_structured_out("restart")
if self.should_expose_component_version(self.command_name):
self.save_component_version_to_structured_out(self.command_name)


# TODO, remove after all services have switched to post_upgrade_restart
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
import org.apache.ambari.server.events.CommandReportReceivedEvent;
import org.apache.ambari.server.events.jpa.EntityManagerCacheInvalidationEvent;
import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.events.publishers.CommandReportEventPublisher;
import org.apache.ambari.server.events.publishers.JPAEventPublisher;
import org.apache.ambari.server.metadata.RoleCommandOrder;
import org.apache.ambari.server.metadata.RoleCommandOrderProvider;
Expand Down Expand Up @@ -119,7 +119,7 @@ class ActionScheduler implements Runnable {
private Clusters clusters;

@Inject
private AmbariEventPublisher ambariEventPublisher;
private CommandReportEventPublisher commandReportEventPublisher;

@Inject
private HostsMap hostsMap;
Expand Down Expand Up @@ -235,7 +235,7 @@ public ActionScheduler(@Named("schedulerSleeptime") long sleepTime,
*/
protected ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec, ActionDBAccessor db,
ActionQueue actionQueue, Clusters fsmObject, int maxAttempts, HostsMap hostsMap,
UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher,
UnitOfWork unitOfWork, CommandReportEventPublisher commandReportEventPublisher,
Configuration configuration, Provider<EntityManager> entityManagerProvider,
HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory,
RoleCommandOrderProvider roleCommandOrderProvider) {
Expand All @@ -248,7 +248,7 @@ protected ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec, Ac
this.maxAttempts = (short) maxAttempts;
this.hostsMap = hostsMap;
this.unitOfWork = unitOfWork;
this.ambariEventPublisher = ambariEventPublisher;
this.commandReportEventPublisher = commandReportEventPublisher;
this.configuration = configuration;
this.entityManagerProvider = entityManagerProvider;
this.hostRoleCommandDAO = hostRoleCommandDAO;
Expand Down Expand Up @@ -278,12 +278,13 @@ protected ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec, Ac
*/
protected ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec, ActionDBAccessor db,
ActionQueue actionQueue, Clusters fsmObject, int maxAttempts, HostsMap hostsMap,
UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher,
UnitOfWork unitOfWork, CommandReportEventPublisher commandReportEventPublisher,
Configuration configuration, Provider<EntityManager> entityManagerProvider,
HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory) {

this(sleepTimeMilliSec, actionTimeoutMilliSec, db, actionQueue, fsmObject, maxAttempts, hostsMap, unitOfWork,
ambariEventPublisher, configuration, entityManagerProvider, hostRoleCommandDAO, hostRoleCommandFactory,
commandReportEventPublisher, configuration, entityManagerProvider, hostRoleCommandDAO,
hostRoleCommandFactory,
null);
}

Expand Down Expand Up @@ -770,6 +771,7 @@ protected Map<String, RoleStats> processInProgressStage(Stage s, List<ExecutionC
for (ExecutionCommandWrapper wrapper : commandWrappers) {
ExecutionCommand c = wrapper.getExecutionCommand();
String roleStr = c.getRole();
RoleCommand roleCommand = c.getRoleCommand();
HostRoleStatus status = s.getHostRoleStatus(host, roleStr);
i_my++;
if (LOG.isTraceEnabled()) {
Expand Down Expand Up @@ -832,14 +834,12 @@ protected Map<String, RoleStats> processInProgressStage(Stage s, List<ExecutionC
"has been deleted recently. The command has been aborted and dequeued." +
"Execution command details: " +
"cmdId: %s; taskId: %s; roleCommand: %s",
c.getCommandId(), c.getTaskId(), c.getRoleCommand());
c.getCommandId(), c.getTaskId(), roleCommand);
LOG.warn("Host {} has been detected as non-available. {}", host, message);
// Abort the command itself
// We don't need to send CANCEL_COMMANDs in this case
db.abortHostRole(host, s.getRequestId(), s.getStageId(), c.getRole(), message);
if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
}
processCommandDeath(cluster.getClusterName(), c.getHostname(), roleStr, roleCommand.name());
status = HostRoleStatus.ABORTED;
} else if (timeOutActionNeeded(status, s, hostObj, roleStr, now, commandTimeout)
|| (isHostStateUnknown = isHostStateUnknown(s, hostObj, roleStr))) {
Expand All @@ -866,9 +866,7 @@ protected Map<String, RoleStats> processInProgressStage(Stage s, List<ExecutionC
//commands above don't affect host component state (e.g. no in_progress state in process), transition will fail
transitionToFailedState(cluster.getClusterName(), c.getServiceName(), roleStr, host, now, false);
}
if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
}
processCommandDeath(cluster.getClusterName(), c.getHostname(), roleStr, roleCommand.name());
}

// Dequeue command
Expand Down Expand Up @@ -956,10 +954,9 @@ private void abortOperationsForStage(Stage stage) {
Collection<HostRoleCommandEntity> abortedOperations = db.abortOperation(stage.getRequestId());

for (HostRoleCommandEntity command: abortedOperations) {
if (command.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
String clusterName = stage.getClusterName();
processActionDeath(clusterName, command.getHostName(), command.getRole().name());
}
String clusterName = stage.getClusterName();
processCommandDeath(clusterName, command.getHostName(), command.getRole().name(),
command.getRoleCommand().name());
}
}

Expand Down Expand Up @@ -1282,13 +1279,9 @@ void cancelHostRoleCommands(Collection<HostRoleCommand> hostRoleCommands, String
hostRoleCommand.getStageId(), hostRoleCommand.getRole().name());
}

// If host role is an Action, we have to send an event
if (hostRoleCommand.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
String clusterName = hostRoleCommand.getExecutionCommandWrapper().getExecutionCommand().getClusterName();
processActionDeath(clusterName,
hostRoleCommand.getHostName(),
hostRoleCommand.getRole().name());
}
String clusterName = hostRoleCommand.getExecutionCommandWrapper().getExecutionCommand().getClusterName();
processCommandDeath(clusterName, hostRoleCommand.getHostName(),
hostRoleCommand.getRole().name(), hostRoleCommand.getRoleCommand().name());
}
}

Expand All @@ -1309,28 +1302,30 @@ void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands, Multim


/**
* Attempts to process kill/timeout/abort of action and send
* appropriate event to all listeners
* Attempts to process kill/timeout/abort of a command and send appropriate
* event to all listeners
*/
private void processActionDeath(String clusterName,
String hostname,
String role) {
private void processCommandDeath(String clusterName, String hostname, String role,
String roleCommand) {
try {
// Usually clusterId is defined (except the awkward case when
// "Distribute repositories/install packages" action has been issued
// against a concrete host without binding to a cluster)
Long clusterId = clusterName != null ?
clusters.getCluster(clusterName).getClusterId() : null;

CommandReport report = new CommandReport();
report.setRole(role);
report.setRoleCommand(roleCommand);
report.setStdOut("Action is dead");
report.setStdErr("Action is dead");
report.setStructuredOut("{}");
report.setExitCode(1);
report.setStatus(HostRoleStatus.ABORTED.toString());
ActionFinalReportReceivedEvent event = new ActionFinalReportReceivedEvent(
clusterId, hostname, report, true);
ambariEventPublisher.publish(event);

CommandReportReceivedEvent event = new CommandReportReceivedEvent(
clusterId, hostname, report);
commandReportEventPublisher.publish(event);
} catch (AmbariException e) {
LOG.error(String.format("Can not get cluster %s", clusterName), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,17 +247,20 @@ public ExecutionCommand getExecutionCommand() {
executionCommand.setUpgradeSummary(upgradeSummary);
}

ServiceGroupEntity serviceGroupEntity = serviceGroupDAO.find(clusterId,
executionCommand.getServiceGroupName());

long mpackId = serviceGroupEntity.getStack().getMpackId();
Mpack mpack = ambariMetaInfo.getMpack(mpackId);
MpackEntity mpackEntity = mpackDAO.findById(mpackId);

executionCommand.setMpackId(mpackId);

// setting repositoryFile
final Host host = cluster.getHost(executionCommand.getHostname()); // can be null on internal commands

if (null == executionCommand.getRepositoryFile() && null != host) {
final CommandRepository commandRepository;

ServiceGroupEntity serviceGroupEntity = serviceGroupDAO.find(clusterId, executionCommand.getServiceGroupName());
long mpackId = serviceGroupEntity.getStack().getMpackId();
Mpack mpack = ambariMetaInfo.getMpack(mpackId);
MpackEntity mpackEntity = mpackDAO.findById(mpackId);

RepoOsEntity osEntity = repoVersionHelper.getOSEntityForHost(mpackEntity, host);
commandRepository = repoVersionHelper.getCommandRepository(mpack, osEntity);
executionCommand.setRepositoryFile(commandRepository);
Expand Down

0 comments on commit 239d782

Please sign in to comment.