Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

Commit

Permalink
[REEF-589] Support node addition while REEF applications run
Browse files Browse the repository at this point in the history
If an admin happens to dynamically add a new node to the cluster, and
future allocations are done in that node, REEF crashes as it is not able
to find that node in the catalog.

The change involves adding a node to our catalog if an allocation event
is received and the node is not already in the catalog.

JIRA:
  [REEF-589](https://issues.apache.org/jira/browse/REEF-589)

Pull Request:
  This closes #370
  • Loading branch information
nachocano authored and Markus Weimer committed Aug 14, 2015
1 parent 73ff564 commit 7bef8e7
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
Expand Up @@ -18,11 +18,15 @@
*/
package org.apache.reef.runtime.common.driver.evaluator;

import org.apache.commons.lang3.Validate;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.driver.catalog.NodeDescriptor;
import org.apache.reef.driver.catalog.ResourceCatalog;
import org.apache.reef.driver.evaluator.EvaluatorProcessFactory;
import org.apache.reef.runtime.common.driver.catalog.ResourceCatalogImpl;
import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent;
import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
import org.apache.reef.tang.Injector;
Expand Down Expand Up @@ -90,10 +94,20 @@ private EvaluatorManager getNewEvaluatorManagerInstance(final String id, final E
*/
public EvaluatorManager getNewEvaluatorManagerForNewlyAllocatedEvaluator(
final ResourceAllocationEvent resourceAllocationEvent) {
final NodeDescriptor nodeDescriptor = this.resourceCatalog.getNode(resourceAllocationEvent.getNodeId());
NodeDescriptor nodeDescriptor = this.resourceCatalog.getNode(resourceAllocationEvent.getNodeId());

if (nodeDescriptor == null) {
throw new RuntimeException("Unknown resource: " + resourceAllocationEvent.getNodeId());
final String nodeId = resourceAllocationEvent.getNodeId();
LOG.log(Level.WARNING, "Node {} is not in our catalog, adding it", nodeId);
final String[] hostNameAndPort = nodeId.split(":");
Validate.isTrue(hostNameAndPort.length == 2);
final NodeDescriptorEvent nodeDescriptorEvent = NodeDescriptorEventImpl.newBuilder().setIdentifier(nodeId)
.setHostName(hostNameAndPort[0]).setPort(Integer.parseInt(hostNameAndPort[1]))
.setMemorySize(resourceAllocationEvent.getResourceMemory())
.setRackName(resourceAllocationEvent.getRackName().get()).build();
// downcasting not to change the API
((ResourceCatalogImpl) resourceCatalog).handle(nodeDescriptorEvent);
nodeDescriptor = this.resourceCatalog.getNode(nodeId);
}
final EvaluatorDescriptorImpl evaluatorDescriptor = new EvaluatorDescriptorImpl(nodeDescriptor,
resourceAllocationEvent.getResourceMemory(), resourceAllocationEvent.getVirtualCores().get(),
Expand Down
Expand Up @@ -392,13 +392,28 @@ private void handleNewContainer(final Container container) {
this.requestsAfterSentToRM.remove();
doHomogeneousRequests();

// the rack name comes as part of the host name, e.g.
// <rackName>-<hostNumber>
// we perform some checks just in case it doesn't
final String hostName = container.getNodeId().getHost();
String rackName = null;
if (hostName != null) {
final String[] rackNameAndNumber = hostName.split("-");
if (rackNameAndNumber.length == 2) {
rackName = rackNameAndNumber[0];
} else {
LOG.log(Level.WARNING, "Could not get information from the rack name, should use the default");
}
}

LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number = {1}",
new Object[]{container.getResource().getMemory(), container.getResource().getVirtualCores()});
this.reefEventHandlers.onResourceAllocation(ResourceAllocationEventImpl.newBuilder()
.setIdentifier(container.getId().toString())
.setNodeId(container.getNodeId().toString())
.setResourceMemory(container.getResource().getMemory())
.setVirtualCores(container.getResource().getVirtualCores())
.setRackName(rackName)
.build());
this.updateRuntimeStatus();
} else {
Expand Down

0 comments on commit 7bef8e7

Please sign in to comment.