Skip to content

Commit

Permalink
YARN-6102. RMActiveService context to be updated with new RMContext o…
Browse files Browse the repository at this point in the history
…n failover. Contributed by Rohith Sharma K S.
  • Loading branch information
sunilgovind committed Jul 24, 2017
1 parent 2054324 commit e315328
Show file tree
Hide file tree
Showing 13 changed files with 451 additions and 242 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
new HAServiceProtocol.StateChangeRequestInfo( new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC); HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);


private RMContext rmContext; private ResourceManager rm;


private byte[] localActiveNodeInfo; private byte[] localActiveNodeInfo;
private ActiveStandbyElector elector; private ActiveStandbyElector elector;
Expand All @@ -66,9 +66,9 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
@VisibleForTesting @VisibleForTesting
final Object zkDisconnectLock = new Object(); final Object zkDisconnectLock = new Object();


ActiveStandbyElectorBasedElectorService(RMContext rmContext) { ActiveStandbyElectorBasedElectorService(ResourceManager rm) {
super(ActiveStandbyElectorBasedElectorService.class.getName()); super(ActiveStandbyElectorBasedElectorService.class.getName());
this.rmContext = rmContext; this.rm = rm;
} }


@Override @Override
Expand Down Expand Up @@ -140,7 +140,7 @@ public void becomeActive() throws ServiceFailedException {
cancelDisconnectTimer(); cancelDisconnectTimer();


try { try {
rmContext.getRMAdminService().transitionToActive(req); rm.getRMContext().getRMAdminService().transitionToActive(req);
} catch (Exception e) { } catch (Exception e) {
throw new ServiceFailedException("RM could not transition to Active", e); throw new ServiceFailedException("RM could not transition to Active", e);
} }
Expand All @@ -151,7 +151,7 @@ public void becomeStandby() {
cancelDisconnectTimer(); cancelDisconnectTimer();


try { try {
rmContext.getRMAdminService().transitionToStandby(req); rm.getRMContext().getRMAdminService().transitionToStandby(req);
} catch (Exception e) { } catch (Exception e) {
LOG.error("RM could not transition to Standby", e); LOG.error("RM could not transition to Standby", e);
} }
Expand Down Expand Up @@ -205,7 +205,7 @@ public void run() {
@SuppressWarnings(value = "unchecked") @SuppressWarnings(value = "unchecked")
@Override @Override
public void notifyFatalError(String errorMessage) { public void notifyFatalError(String errorMessage) {
rmContext.getDispatcher().getEventHandler().handle( rm.getRMContext().getDispatcher().getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,
errorMessage)); errorMessage));
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ public class AdminService extends CompositeService implements


private static final Log LOG = LogFactory.getLog(AdminService.class); private static final Log LOG = LogFactory.getLog(AdminService.class);


private final RMContext rmContext;
private final ResourceManager rm; private final ResourceManager rm;
private String rmId; private String rmId;


Expand All @@ -123,16 +122,16 @@ public class AdminService extends CompositeService implements
@VisibleForTesting @VisibleForTesting
boolean isCentralizedNodeLabelConfiguration = true; boolean isCentralizedNodeLabelConfiguration = true;


public AdminService(ResourceManager rm, RMContext rmContext) { public AdminService(ResourceManager rm) {
super(AdminService.class.getName()); super(AdminService.class.getName());
this.rm = rm; this.rm = rm;
this.rmContext = rmContext;
} }


@Override @Override
public void serviceInit(Configuration conf) throws Exception { public void serviceInit(Configuration conf) throws Exception {
autoFailoverEnabled = autoFailoverEnabled =
rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf); rm.getRMContext().isHAEnabled()
&& HAUtil.isAutomaticFailoverEnabled(conf);


masterServiceBindAddress = conf.getSocketAddr( masterServiceBindAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST, YarnConfiguration.RM_BIND_HOST,
Expand Down Expand Up @@ -189,7 +188,7 @@ protected void startServer() throws Exception {
RMPolicyProvider.getInstance()); RMPolicyProvider.getInstance());
} }


if (rmContext.isHAEnabled()) { if (rm.getRMContext().isHAEnabled()) {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class, RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class); ProtobufRpcEngine.class);


Expand Down Expand Up @@ -265,7 +264,7 @@ private void checkHaStateChange(StateChangeRequestInfo req)
} }


private synchronized boolean isRMActive() { private synchronized boolean isRMActive() {
return HAServiceState.ACTIVE == rmContext.getHAServiceState(); return HAServiceState.ACTIVE == rm.getRMContext().getHAServiceState();
} }


private void throwStandbyException() throws StandbyException { private void throwStandbyException() throws StandbyException {
Expand Down Expand Up @@ -304,7 +303,7 @@ public synchronized void transitionToActive(
// call all refresh*s for active RM to get the updated configurations. // call all refresh*s for active RM to get the updated configurations.
refreshAll(); refreshAll();
} catch (Exception e) { } catch (Exception e) {
rmContext rm.getRMContext()
.getDispatcher() .getDispatcher()
.getEventHandler() .getEventHandler()
.handle( .handle(
Expand Down Expand Up @@ -363,7 +362,7 @@ public synchronized void transitionToStandby(
@Override @Override
public synchronized HAServiceStatus getServiceStatus() throws IOException { public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState"); checkAccess("getServiceState");
HAServiceState haState = rmContext.getHAServiceState(); HAServiceState haState = rm.getRMContext().getHAServiceState();
HAServiceStatus ret = new HAServiceStatus(haState); HAServiceStatus ret = new HAServiceStatus(haState);
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) { if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
ret.setReadyToBecomeActive(); ret.setReadyToBecomeActive();
Expand Down Expand Up @@ -395,11 +394,12 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
} }


private void refreshQueues() throws IOException, YarnException { private void refreshQueues() throws IOException, YarnException {
rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); rm.getRMContext().getScheduler().reinitialize(getConfig(),
this.rm.getRMContext());
// refresh the reservation system // refresh the reservation system
ReservationSystem rSystem = rmContext.getReservationSystem(); ReservationSystem rSystem = rm.getRMContext().getReservationSystem();
if (rSystem != null) { if (rSystem != null) {
rSystem.reinitialize(getConfig(), rmContext); rSystem.reinitialize(getConfig(), rm.getRMContext());
} }
} }


Expand All @@ -418,14 +418,14 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
switch (request.getDecommissionType()) { switch (request.getDecommissionType()) {
case NORMAL: case NORMAL:
rmContext.getNodesListManager().refreshNodes(conf); rm.getRMContext().getNodesListManager().refreshNodes(conf);
break; break;
case GRACEFUL: case GRACEFUL:
rmContext.getNodesListManager().refreshNodesGracefully( rm.getRMContext().getNodesListManager().refreshNodesGracefully(
conf, request.getDecommissionTimeout()); conf, request.getDecommissionTimeout());
break; break;
case FORCEFUL: case FORCEFUL:
rmContext.getNodesListManager().refreshNodesForcefully(); rm.getRMContext().getNodesListManager().refreshNodesForcefully();
break; break;
} }
RMAuditLogger.logSuccess(user.getShortUserName(), operation, RMAuditLogger.logSuccess(user.getShortUserName(), operation,
Expand All @@ -440,7 +440,7 @@ private void refreshNodes() throws IOException, YarnException {
Configuration conf = Configuration conf =
getConfiguration(new Configuration(false), getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
rmContext.getNodesListManager().refreshNodes(conf); rm.getRMContext().getNodesListManager().refreshNodes(conf);
} }


@Override @Override
Expand Down Expand Up @@ -559,10 +559,11 @@ private void refreshActiveServicesAcls() throws IOException, YarnException {
Configuration conf = Configuration conf =
getConfiguration(new Configuration(false), getConfiguration(new Configuration(false),
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE); YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider); rm.getRMContext().getClientRMService().refreshServiceAcls(conf,
rmContext.getApplicationMasterService().refreshServiceAcls( policyProvider);
rm.getRMContext().getApplicationMasterService().refreshServiceAcls(
conf, policyProvider); conf, policyProvider);
rmContext.getResourceTrackerService().refreshServiceAcls( rm.getRMContext().getResourceTrackerService().refreshServiceAcls(
conf, policyProvider); conf, policyProvider);
} }


Expand Down Expand Up @@ -601,7 +602,7 @@ public UpdateNodeResourceResponse updateNodeResource(
// if any invalid nodes, throw exception instead of partially updating // if any invalid nodes, throw exception instead of partially updating
// valid nodes. // valid nodes.
for (NodeId nodeId : nodeIds) { for (NodeId nodeId : nodeIds) {
RMNode node = this.rmContext.getRMNodes().get(nodeId); RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId);
if (node == null) { if (node == null) {
LOG.error("Resource update get failed on all nodes due to change " LOG.error("Resource update get failed on all nodes due to change "
+ "resource on an unrecognized node: " + nodeId); + "resource on an unrecognized node: " + nodeId);
Expand All @@ -619,14 +620,14 @@ public UpdateNodeResourceResponse updateNodeResource(
for (Map.Entry<NodeId, ResourceOption> entry : nodeResourceMap.entrySet()) { for (Map.Entry<NodeId, ResourceOption> entry : nodeResourceMap.entrySet()) {
ResourceOption newResourceOption = entry.getValue(); ResourceOption newResourceOption = entry.getValue();
NodeId nodeId = entry.getKey(); NodeId nodeId = entry.getKey();
RMNode node = this.rmContext.getRMNodes().get(nodeId); RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId);


if (node == null) { if (node == null) {
LOG.warn("Resource update get failed on an unrecognized node: " + nodeId); LOG.warn("Resource update get failed on an unrecognized node: " + nodeId);
allSuccess = false; allSuccess = false;
} else { } else {
// update resource to RMNode // update resource to RMNode
this.rmContext.getDispatcher().getEventHandler() this.rm.getRMContext().getDispatcher().getEventHandler()
.handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption)); .handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption));
LOG.info("Update resource on node(" + node.getNodeID() LOG.info("Update resource on node(" + node.getNodeID()
+ ") with resource(" + newResourceOption.toString() + ")"); + ") with resource(" + newResourceOption.toString() + ")");
Expand Down Expand Up @@ -661,7 +662,8 @@ public RefreshNodesResourcesResponse refreshNodesResources(
DynamicResourceConfiguration newConf; DynamicResourceConfiguration newConf;


InputStream drInputStream = InputStream drInputStream =
this.rmContext.getConfigurationProvider().getConfigurationInputStream( this.rm.getRMContext().getConfigurationProvider()
.getConfigurationInputStream(
configuration, YarnConfiguration.DR_CONFIGURATION_FILE); configuration, YarnConfiguration.DR_CONFIGURATION_FILE);


if (drInputStream != null) { if (drInputStream != null) {
Expand All @@ -679,7 +681,7 @@ public RefreshNodesResourcesResponse refreshNodesResources(
updateNodeResource(updateRequest); updateNodeResource(updateRequest);
} }
// refresh dynamic resource in ResourceTrackerService // refresh dynamic resource in ResourceTrackerService
this.rmContext.getResourceTrackerService(). this.rm.getRMContext().getResourceTrackerService().
updateDynamicResourceConfiguration(newConf); updateDynamicResourceConfiguration(newConf);
RMAuditLogger.logSuccess(user.getShortUserName(), operation, RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService"); "AdminService");
Expand All @@ -692,7 +694,8 @@ public RefreshNodesResourcesResponse refreshNodesResources(
private synchronized Configuration getConfiguration(Configuration conf, private synchronized Configuration getConfiguration(Configuration conf,
String... confFileNames) throws YarnException, IOException { String... confFileNames) throws YarnException, IOException {
for (String confFileName : confFileNames) { for (String confFileName : confFileNames) {
InputStream confFileInputStream = this.rmContext.getConfigurationProvider() InputStream confFileInputStream =
this.rm.getRMContext().getConfigurationProvider()
.getConfigurationInputStream(conf, confFileName); .getConfigurationInputStream(conf, confFileName);
if (confFileInputStream != null) { if (confFileInputStream != null) {
conf.addResource(confFileInputStream); conf.addResource(confFileInputStream);
Expand Down Expand Up @@ -746,7 +749,7 @@ public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLab
AddToClusterNodeLabelsResponse response = AddToClusterNodeLabelsResponse response =
recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class); recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class);
try { try {
rmContext.getNodeLabelManager() rm.getRMContext().getNodeLabelManager()
.addToCluserNodeLabels(request.getNodeLabels()); .addToCluserNodeLabels(request.getNodeLabels());
RMAuditLogger.logSuccess(user.getShortUserName(), operation, RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService"); "AdminService");
Expand All @@ -769,7 +772,8 @@ public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
RemoveFromClusterNodeLabelsResponse response = RemoveFromClusterNodeLabelsResponse response =
recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class); recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class);
try { try {
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels()); rm.getRMContext().getNodeLabelManager()
.removeFromClusterNodeLabels(request.getNodeLabels());
RMAuditLogger RMAuditLogger
.logSuccess(user.getShortUserName(), operation, "AdminService"); .logSuccess(user.getShortUserName(), operation, "AdminService");
return response; return response;
Expand Down Expand Up @@ -805,19 +809,20 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
boolean isKnown = false; boolean isKnown = false;
// both active and inactive nodes are recognized as known nodes // both active and inactive nodes are recognized as known nodes
if (requestedNode.getPort() != 0) { if (requestedNode.getPort() != 0) {
if (rmContext.getRMNodes().containsKey(requestedNode) if (rm.getRMContext().getRMNodes().containsKey(requestedNode) || rm
|| rmContext.getInactiveRMNodes().containsKey(requestedNode)) { .getRMContext().getInactiveRMNodes().containsKey(requestedNode)) {
isKnown = true; isKnown = true;
} }
} else { } else {
for (NodeId knownNode : rmContext.getRMNodes().keySet()) { for (NodeId knownNode : rm.getRMContext().getRMNodes().keySet()) {
if (knownNode.getHost().equals(requestedNode.getHost())) { if (knownNode.getHost().equals(requestedNode.getHost())) {
isKnown = true; isKnown = true;
break; break;
} }
} }
if (!isKnown) { if (!isKnown) {
for (NodeId knownNode : rmContext.getInactiveRMNodes().keySet()) { for (NodeId knownNode : rm.getRMContext().getInactiveRMNodes()
.keySet()) {
if (knownNode.getHost().equals(requestedNode.getHost())) { if (knownNode.getHost().equals(requestedNode.getHost())) {
isKnown = true; isKnown = true;
break; break;
Expand All @@ -841,7 +846,7 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
} }
} }
try { try {
rmContext.getNodeLabelManager().replaceLabelsOnNode( rm.getRMContext().getNodeLabelManager().replaceLabelsOnNode(
request.getNodeToLabels()); request.getNodeToLabels());
RMAuditLogger RMAuditLogger
.logSuccess(user.getShortUserName(), operation, "AdminService"); .logSuccess(user.getShortUserName(), operation, "AdminService");
Expand Down Expand Up @@ -878,7 +883,7 @@ public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(


checkRMStatus(user.getShortUserName(), operation, msg); checkRMStatus(user.getShortUserName(), operation, msg);


Set<NodeId> decommissioningNodes = rmContext.getNodesListManager() Set<NodeId> decommissioningNodes = rm.getRMContext().getNodesListManager()
.checkForDecommissioningNodes(); .checkForDecommissioningNodes();
RMAuditLogger.logSuccess(user.getShortUserName(), operation, RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService"); "AdminService");
Expand Down Expand Up @@ -914,6 +919,6 @@ private void refreshClusterMaxPriority() throws IOException, YarnException {
getConfiguration(new Configuration(false), getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);


rmContext.getScheduler().setClusterMaxPriority(conf); rm.getRMContext().getScheduler().setClusterMaxPriority(conf);
} }
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ public class CuratorBasedElectorService extends AbstractService
LogFactory.getLog(CuratorBasedElectorService.class); LogFactory.getLog(CuratorBasedElectorService.class);
private LeaderLatch leaderLatch; private LeaderLatch leaderLatch;
private CuratorFramework curator; private CuratorFramework curator;
private RMContext rmContext;
private String latchPath; private String latchPath;
private String rmId; private String rmId;
private ResourceManager rm; private ResourceManager rm;


public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) { public CuratorBasedElectorService(ResourceManager rm) {
super(CuratorBasedElectorService.class.getName()); super(CuratorBasedElectorService.class.getName());
this.rmContext = rmContext;
this.rm = rm; this.rm = rm;
} }


Expand Down Expand Up @@ -102,7 +100,8 @@ public String getZookeeperConnectionState() {
public void isLeader() { public void isLeader() {
LOG.info(rmId + "is elected leader, transitioning to active"); LOG.info(rmId + "is elected leader, transitioning to active");
try { try {
rmContext.getRMAdminService().transitionToActive( rm.getRMContext().getRMAdminService()
.transitionToActive(
new HAServiceProtocol.StateChangeRequestInfo( new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC)); HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
} catch (Exception e) { } catch (Exception e) {
Expand All @@ -123,7 +122,8 @@ private void closeLeaderLatch() throws IOException {
public void notLeader() { public void notLeader() {
LOG.info(rmId + " relinquish leadership"); LOG.info(rmId + " relinquish leadership");
try { try {
rmContext.getRMAdminService().transitionToStandby( rm.getRMContext().getRMAdminService()
.transitionToStandby(
new HAServiceProtocol.StateChangeRequestInfo( new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC)); HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
} catch (Exception e) { } catch (Exception e) {
Expand Down
Loading

0 comments on commit e315328

Please sign in to comment.