Skip to content

Commit

Permalink
[ASTERIXDB-2268][CONF] Add Cores Multiplier
Browse files Browse the repository at this point in the history
- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Add configurable cores multiplier to CC
  config and default it to 3. The multiplier
  will be used to adjust nodes cores capacity.
- Add test case for adjusted node capacity.

Change-Id: I95dd6e0c1add92e70e667321e8ef5f9b9887cda5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2326
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
  • Loading branch information
mhubail committed Jan 27, 2018
1 parent de76607 commit 65b8070
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ public class NodeManager implements INodeManager {
private final IResourceManager resourceManager;
private final Map<String, NodeControllerState> nodeRegistry;
private final Map<InetAddress, Set<String>> ipAddressNodeNameMap;
private final int nodeCoresMultiplier;

public NodeManager(ClusterControllerService ccs, CCConfig ccConfig, IResourceManager resourceManager) {
this.ccs = ccs;
this.ccConfig = ccConfig;
this.resourceManager = resourceManager;
this.nodeRegistry = new LinkedHashMap<>();
this.ipAddressNodeNameMap = new HashMap<>();
this.nodeCoresMultiplier = ccConfig.getCoresMultiplier();
}

@Override
Expand Down Expand Up @@ -122,7 +124,7 @@ public void addNode(String nodeId, NodeControllerState ncState) throws HyracksEx
}
// Updates the cluster capacity.
LOGGER.warn("updating cluster capacity");
resourceManager.update(nodeId, ncState.getCapacity());
resourceManager.update(nodeId, getAdjustedNodeCapacity(ncState.getCapacity()));
}

@Override
Expand Down Expand Up @@ -218,4 +220,7 @@ private InetAddress getIpAddress(NodeControllerState ncState) throws HyracksExce
}
}

private NodeCapacity getAdjustedNodeCapacity(NodeCapacity nodeCapacity) {
return new NodeCapacity(nodeCapacity.getMemoryByteSize(), nodeCapacity.getCores() * nodeCoresMultiplier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ public class NodeManagerTest {
@Test
public void testNormal() throws HyracksException, IPCException {
IResourceManager resourceManager = new ResourceManager();
INodeManager nodeManager = new NodeManager(mockCcs(), makeCCConfig(), resourceManager);
final CCConfig ccConfig = makeCCConfig();
final int coresMultiplier = 1;
ccConfig.setCoresMultiplier(coresMultiplier);
INodeManager nodeManager = new NodeManager(mockCcs(), ccConfig, resourceManager);
NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);

Expand All @@ -73,6 +76,38 @@ public void testNormal() throws HyracksException, IPCException {
verifyEmptyCluster(resourceManager, nodeManager);
}

@Test
public void testAdjustedNodeCapacity() throws HyracksException, IPCException {
IResourceManager resourceManager = new ResourceManager();
final CCConfig ccConfig = makeCCConfig();
final int coresMultiplier = 3;
ccConfig.setCoresMultiplier(coresMultiplier);
INodeManager nodeManager = new NodeManager(mockCcs(), ccConfig, resourceManager);
NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);

// verify state after adding two nodes
nodeManager.addNode(NODE1, ncState1);
nodeManager.addNode(NODE2, ncState2);
int activeNodes = 2;
// verify adjusted cores
Assert.assertEquals(NODE_CORES * activeNodes * coresMultiplier,
resourceManager.getCurrentCapacity().getAggregatedCores());
// verify unadjusted memory size
Assert.assertEquals(NODE_MEMORY_SIZE * activeNodes,
resourceManager.getCurrentCapacity().getAggregatedMemoryByteSize());
// verify state after removing a node.
nodeManager.removeNode(NODE1);
activeNodes = 1;
Assert.assertEquals(NODE_CORES * activeNodes * coresMultiplier,
resourceManager.getCurrentCapacity().getAggregatedCores());
Assert.assertEquals(NODE_MEMORY_SIZE * activeNodes,
resourceManager.getCurrentCapacity().getAggregatedMemoryByteSize());
// verify state after removing last node
nodeManager.removeNode(NODE2);
verifyEmptyCluster(resourceManager, nodeManager);
}

@Test
public void testException() throws HyracksException, IPCException {
IResourceManager resourceManager = new ResourceManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public enum Option implements IOption {
JOB_QUEUE_CLASS(STRING, "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue"),
JOB_QUEUE_CAPACITY(INTEGER, 4096),
JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"),
ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false);
ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false),
CORES_MULTIPLIER(INTEGER, 3);

private final IOptionType parser;
private Object defaultValue;
Expand Down Expand Up @@ -161,6 +162,8 @@ public String description() {
case ENFORCE_FRAME_WRITER_PROTOCOL:
return "A flag indicating if runtime should enforce frame writer protocol and detect "
+ "bad behaving operators";
case CORES_MULTIPLIER:
return "Specifies the multiplier to use on the cluster available cores";
default:
throw new IllegalStateException("NYI: " + this);
}
Expand Down Expand Up @@ -363,4 +366,12 @@ public boolean getEnforceFrameWriterProtocol() {
public void setEnforceFrameWriterProtocol(boolean enforce) {
configManager.set(Option.ENFORCE_FRAME_WRITER_PROTOCOL, enforce);
}

public void setCoresMultiplier(int coresMultiplier) {
configManager.set(Option.CORES_MULTIPLIER, coresMultiplier);
}

public int getCoresMultiplier() {
return getAppConfig().getInt(Option.CORES_MULTIPLIER);
}
}

0 comments on commit 65b8070

Please sign in to comment.