Permalink
Browse files

Implementation of MYRIAD-229, MYRIAD-237, MYRIAD-238, MYRIAD-225

JIRA:
	[MYRIAD-225] https://issues.apache.org/jira/browse/MYRIAD-225
        [MYRIAD-229] https://issues.apache.org/jira/browse/MYRIAD-239
        [MYRIAD-237] https://issues.apache.org/jira/browse/MYRIAD-237
	[MYRIAD-237] https://issues.apache.org/jira/browse/MYRIAD-237
Pull Request:
  Closes #91

Author:    hokiegeek2 <hokiegeek2@gmail.com>
Date:      Wed Aug 17 15:21:56 2016 -0400
  • Loading branch information...
hokiegeek2 authored and darinj committed Aug 17, 2016
1 parent 7aea259 commit 577c30b1abdaa30313251c887c014aebac3fd93c
Showing with 1,590 additions and 1,036 deletions.
  1. +0 −1 build.gradle
  2. +0 −46 gradle/spock.gradle
  3. +3 −15 myriad-scheduler/src/main/java/org/apache/myriad/Main.java
  4. +1 −1 myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
  5. +1 −1 myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java
  6. +15 −3 myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
  7. +7 −2 myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
  8. +2 −2 myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java
  9. +1 −1 myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java
  10. +1 −1 myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
  11. +2 −2 myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
  12. +6 −2 myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
  13. +1 −1 ...-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
  14. +50 −17 myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java
  15. +46 −21 myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java
  16. +6 −4 myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
  17. +5 −0 myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
  18. +7 −7 myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java
  19. +7 −15 ...java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStoreTest.java
  20. +74 −3 myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java
  21. +22 −10 myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java
  22. +206 −84 myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java
  23. +17 −0 myriad-scheduler/src/test/java/org/apache/myriad/api/ArtifactsResourceTest.java
  24. +17 −0 myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java
  25. +3 −2 myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java
  26. +17 −0 myriad-scheduler/src/test/java/org/apache/myriad/health/HealthCheckUtilsTest.java
  27. +17 −0 myriad-scheduler/src/test/java/org/apache/myriad/health/MesosDriverHealthCheckTest.java
  28. +54 −43 myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java
  29. +0 −90 myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy
  30. +89 −0 myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsTest.java
  31. +1 −2 myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestTaskUtils.java
  32. +0 −93 myriad-scheduler/src/test/java/org/apache/myriad/scheduler/constraints/LikeConstraintSpec.groovy
  33. +86 −0 myriad-scheduler/src/test/java/org/apache/myriad/scheduler/constraints/LikeConstraintTest.java
  34. +0 −175 myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy
  35. +0 −114 myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy
  36. +240 −0 myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerTest.java
  37. +20 −3 myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NodeStoreTest.java
  38. +89 −0 myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NodeTest.java
  39. +20 −2 myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/OfferLifeCycleManagerTest.java
  40. +0 −143 myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
  41. +86 −63 myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerTest.java
  42. +17 −0 myriad-scheduler/src/test/java/org/apache/myriad/state/ClusterTest.java
  43. +23 −5 myriad-scheduler/src/test/java/org/apache/myriad/state/MockDispatcher.java
  44. +17 −0 myriad-scheduler/src/test/java/org/apache/myriad/state/MockFuture.java
  45. +17 −0 myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMApp.java
  46. +24 −1 myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMContext.java
  47. +17 −0 myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMNode.java
  48. +17 −0 myriad-scheduler/src/test/java/org/apache/myriad/state/MockState.java
  49. +17 −0 myriad-scheduler/src/test/java/org/apache/myriad/state/MockVariable.java
  50. +17 −0 myriad-scheduler/src/test/java/org/apache/myriad/state/MyriadStateTest.java
  51. +17 −0 myriad-scheduler/src/test/java/org/apache/myriad/state/NodeTaskTest.java
  52. +117 −42 myriad-scheduler/src/test/java/org/apache/myriad/state/SchedulerStateTest.java
  53. +17 −0 myriad-scheduler/src/test/java/org/apache/myriad/state/utils/ByteBufferSupportTest.java
  54. +17 −0 myriad-scheduler/src/test/java/org/apache/myriad/webapp/HttpConnectorProviderTest.java
  55. +17 −0 myriad-scheduler/src/test/java/org/apache/myriad/webapp/MyriadWebServerTest.java
  56. +20 −19 myriad-scheduler/src/test/resources/myriad-config-test-default.yml
View
@@ -57,7 +57,6 @@ subprojects {
apply plugin: 'java'
apply plugin: 'application'
apply from: "$rootDir/gradle/spock.gradle"
apply from: "$rootDir/gradle/quality.gradle"
sourceCompatibility = '1.7'
View

This file was deleted.

Oops, something went wrong.
@@ -79,18 +79,6 @@
private static Injector injector;
/**
* Main is the bootstrap class for the Myriad scheduler, managing the lifecycles of
* the following components:
*
* 1. MyriadDriverManager
* 2. MyriadWebServer
* 3. TaskTerminator
* 4. HealthCheckRegistry
*
* Main uses the Guice Injector framework to manage the Myriad object graph and is
* configured by myriad-config-default.yml
*/
public static void initialize(Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext,
InterceptorRegistry registry) throws Exception {
MyriadModule myriadModule = new MyriadModule("myriad-config-default.yml", hadoopConf, yarnScheduler, rmContext, registry);
@@ -217,19 +205,19 @@ private void startNMInstances(Injector injector) {
SchedulerState schedulerState = injector.getInstance(SchedulerState.class);
Set<org.apache.myriad.state.NodeTask> launchedNMTasks = new HashSet<>();
launchedNMTasks.addAll(schedulerState.getPendingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX));
launchedNMTasks.addAll(schedulerState.getPendingTasksByType(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX));
if (!launchedNMTasks.isEmpty()) {
LOGGER.info("{} NM(s) in pending state. Not launching additional NMs", launchedNMTasks.size());
return;
}
launchedNMTasks.addAll(schedulerState.getStagingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX));
launchedNMTasks.addAll(schedulerState.getStagingTasksByType(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX));
if (!launchedNMTasks.isEmpty()) {
LOGGER.info("{} NM(s) in staging state. Not launching additional NMs", launchedNMTasks.size());
return;
}
launchedNMTasks.addAll(schedulerState.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX));
launchedNMTasks.addAll(schedulerState.getActiveTasksByType(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX));
if (!launchedNMTasks.isEmpty()) {
LOGGER.info("{} NM(s) in active state. Not launching additional NMs", launchedNMTasks.size());
return;
@@ -102,7 +102,7 @@ protected void configure() {
bind(NMHeartBeatHandler.class).asEagerSingleton();
MapBinder<String, TaskFactory> mapBinder = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class);
mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON);
mapBinder.addBinding(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON);
Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations();
for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) {
@@ -51,7 +51,7 @@ public SchedulerStateResource(final MyriadConfiguration cfg, final org.apache.my
@GET
public GetSchedulerStateResponse getState() {
return new GetSchedulerStateResponse(toStringCollection(state.getPendingTaskIds()), toStringCollection(
state.getStagingTaskIds()), toStringCollection(state.getActiveTaskIds()), toStringCollection(state.getKillableTasks()));
state.getStagingTaskIds()), toStringCollection(state.getActiveTaskIds()), toStringCollection(state.getKillableTaskIds()));
}
private Collection<String> toStringCollection(Collection<Protos.TaskID> collection) {
@@ -56,8 +56,13 @@
/**
* Default NodeManager Mesos task prefix
*/
public static final String NM_TASK_PREFIX = "nm";
public static final String DEFAULT_NM_TASK_PREFIX = "nm";
/**
* Default max CPU cores for NodeManager JVM
*/
public static final double DEFAULT_NM_MAX_CPUS = 24;
/**
* Translates to -Xmx for the NodeManager JVM.
*/
@@ -85,7 +90,10 @@
*/
@JsonProperty
private Boolean cgroups;
@JsonProperty
private Double maxCpus;
private Double generateNodeManagerMemory() {
return (NodeManagerConfiguration.DEFAULT_JVM_MAX_MEMORY_MB) * (1 + NodeManagerConfiguration.JVM_OVERHEAD);
}
@@ -117,4 +125,8 @@ public Double getCpus() {
public boolean getCgroups() {
return Optional.fromNullable(cgroups).or(DEFAULT_NM_CGROUPS);
}
public Double getMaxCpus() {
return Optional.fromNullable(maxCpus).or(DEFAULT_NM_MAX_CPUS);
}
}
@@ -40,6 +40,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
@@ -49,7 +50,6 @@
public class MyriadOperations {
private static final Logger LOGGER = LoggerFactory.getLogger(MyriadOperations.class);
private final SchedulerState schedulerState;
private MyriadConfiguration cfg;
private NodeScaleDownPolicy nodeScaleDownPolicy;
private MyriadDriverManager driverManager;
@@ -69,12 +69,17 @@ public MyriadOperations(MyriadConfiguration cfg, SchedulerState schedulerState,
myriadStateStore = (MyriadStateStore) rmContext.getStateStore();
}
}
@VisibleForTesting
protected SchedulerState getSchedulerState() {
return schedulerState;
}
public void flexUpCluster(ServiceResourceProfile serviceResourceProfile, int instances, Constraint constraint) {
Collection<NodeTask> nodes = new HashSet<>();
for (int i = 0; i < instances; i++) {
NodeTask nodeTask = new NodeTask(serviceResourceProfile, constraint);
nodeTask.setTaskPrefix(NodeManagerConfiguration.NM_TASK_PREFIX);
nodeTask.setTaskPrefix(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX);
nodes.add(nodeTask);
}
@@ -46,8 +46,8 @@ public Rebalancer(SchedulerState schedulerState, MyriadOperations myriadOperatio
@Override
public void run() {
final Set<Protos.TaskID> activeIds = schedulerState.getActiveTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX);
final Set<Protos.TaskID> pendingIds = schedulerState.getPendingTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX);
final Set<Protos.TaskID> activeIds = schedulerState.getActiveTaskIds(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX);
final Set<Protos.TaskID> pendingIds = schedulerState.getPendingTaskIds(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX);
LOGGER.info("Active {}, Pending {}", activeIds.size(), pendingIds.size());
if (activeIds.size() < 1 && pendingIds.size() < 1) {
myriadOperations.flexUpCluster(profileManager.get("small"), 1, null);
@@ -59,7 +59,7 @@ public static boolean isUniqueHostname(Protos.OfferOrBuilder offer, NodeTask tas
* @return
*/
public static boolean isEligibleForFineGrainedScaling(String hostName, SchedulerState state) {
for (NodeTask activeNMTask : state.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)) {
for (NodeTask activeNMTask : state.getActiveTasksByType(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX)) {
if (activeNMTask.getProfile().getCpus() == 0 &&
activeNMTask.getProfile().getMemory() == 0 &&
activeNMTask.getHostname().equals(hostName)) {
@@ -150,7 +150,7 @@ public TaskFactory(MyriadConfiguration cfg, TaskUtils taskUtils, ExecutorCommand
* Simple helper to convert Mesos Range Resource to a list of longs.
*/
protected List<Long> rangesConverter(List<Protos.Resource> rangeResources) {
List<Long> ret = new ArrayList();
List<Long> ret = new ArrayList<Long>();
for (Protos.Resource range : rangeResources) {
ret.add(range.getRanges().getRange(0).getBegin());
}
@@ -62,12 +62,12 @@ public TaskTerminator(SchedulerState schedulerState, MyriadDriverManager driverM
@Override
public void run() {
//If there are 1..n killable tasks, proceed; otherwise, simply return
if (CollectionUtils.isNotEmpty(schedulerState.getKillableTasks())) {
if (CollectionUtils.isNotEmpty(schedulerState.getKillableTaskIds())) {
/*
* Clone the killable task collection, iterate through all tasks, and
* process any pending and/or non-pending tasks
*/
Set<TaskID> killableTasks = Sets.newHashSet(schedulerState.getKillableTasks());
Set<TaskID> killableTasks = Sets.newHashSet(schedulerState.getKillableTaskIds());
Status driverStatus = driverManager.getDriverStatus();
//TODO (hokiegeek2) Can the DriverManager be restarted? If not, should the ResourceManager stop?
@@ -46,7 +46,11 @@ public TaskUtils(MyriadConfiguration cfg) {
public double getNodeManagerMemory() {
return cfg.getNodeManagerConfiguration().getJvmMaxMemoryMB();
}
public double getNodeManagerMaxCpus() {
return cfg.getNodeManagerConfiguration().getMaxCpus();
}
public double getNodeManagerCpus() {
return cfg.getNodeManagerConfiguration().getCpus();
}
@@ -80,7 +84,7 @@ public TaskUtils() {
*/
public Iterable<Protos.Resource> getScalarResource(Protos.Offer offer, String name, Double value, Double used) {
String role = cfg.getFrameworkRole();
List<Protos.Resource> resources = new ArrayList<>();
List<Protos.Resource> resources = new ArrayList<Protos.Resource>();
double resourceDifference = 0; //used to determine the resource difference of value and the resources requested from role *
//Find role by name, must loop through resources
@@ -124,6 +124,6 @@ private void cleanupTask(TaskID taskId, NodeTask task, String stopReason) {
LOGGER.info("Removed {} task with id {}", stopReason, taskId);
}
private boolean taskIsKillable(TaskID taskId) {
return schedulerState.getKillableTasks().contains(taskId);
return schedulerState.getKillableTaskIds().contains(taskId);
}
}
@@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.Offer;
import org.apache.myriad.configuration.NodeManagerConfiguration;
import org.apache.myriad.scheduler.MyriadDriver;
import org.apache.myriad.scheduler.SchedulerUtils;
import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor;
@@ -56,11 +57,12 @@
private final OfferLifecycleManager offerLifecycleMgr;
private final NodeStore nodeStore;
private final SchedulerState state;
private final NodeManagerConfiguration conf;
@Inject
public NMHeartBeatHandler(InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, MyriadDriver myriadDriver,
YarnNodeCapacityManager yarnNodeCapacityMgr, OfferLifecycleManager offerLifecycleMgr,
NodeStore nodeStore, SchedulerState state) {
NodeStore nodeStore, SchedulerState state, NodeManagerConfiguration conf) {
if (registry != null) {
registry.register(this);
@@ -72,6 +74,7 @@ public NMHeartBeatHandler(InterceptorRegistry registry, AbstractYarnScheduler ya
this.offerLifecycleMgr = offerLifecycleMgr;
this.nodeStore = nodeStore;
this.state = state;
this.conf = conf;
}
@Override
@@ -88,9 +91,11 @@ public boolean allowCallBacksForNode(NodeId nodeManager) {
public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
switch (event.getType()) {
case STARTED:
// Since the RMNode was just started, it should not have a non-zero capacity
RMNode rmNode = context.getRMNodes().get(event.getNodeId());
Resource totalCapability = rmNode.getTotalCapability();
if (totalCapability.getMemory() != 0 || totalCapability.getVirtualCores() != 0) {
if (isNonZeroCapacityNode(rmNode)) {
Resource totalCapability = rmNode.getTotalCapability();
logger.warn(
"FineGrainedScaling feature got invoked for a NM with non-zero capacity. Host: {}, Mem: {}, CPU: {}. Setting the " +
"NM's capacity to (0G,0CPU)", rmNode.getHostName(), totalCapability.getMemory(), totalCapability.getVirtualCores());
@@ -108,6 +113,12 @@ public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
}
}
@VisibleForTesting
protected boolean isNonZeroCapacityNode(RMNode node) {
Resource resource = node.getTotalCapability();
return (resource.getMemory() != 0 || resource.getVirtualCores() != 0);
}
@VisibleForTesting
protected void handleStatusUpdate(RMNodeEvent event, RMContext context) {
if (!(event instanceof RMNodeStatusEvent)) {
@@ -124,25 +135,42 @@ protected void handleStatusUpdate(RMNodeEvent event, RMContext context) {
host.snapshotRunningContainers();
}
// New capacity of the node =
// resources under use on the node (due to previous offers) +
// new resources offered by mesos for the node
yarnNodeCapacityMgr.setNodeCapacity(rmNode, Resources.add(getResourcesUnderUse(statusEvent), getNewResourcesOfferedByMesos(
hostName)));
/*
* Set the new node capacity which is the sum of the current node resources plus those offered by Mesos.
* If the sum is greater than the max capacity of the node, reject the offer.
*/
Resource offeredResources = getNewResourcesOfferedByMesos(hostName);
Resource currentResources = getResourcesUnderUse(statusEvent);
if (offerWithinResourceLimits(currentResources, offeredResources)) {
yarnNodeCapacityMgr.setNodeCapacity(rmNode, Resources.add(currentResources, offeredResources));
logger.info("Updated resources for {} with {} cores and {} memory", rmNode.getNode().getName(),
offeredResources.getVirtualCores(), offeredResources.getMemory());
} else {
logger.info("Did not update {} with {} cores and {} memory, over max cpu cores and/or max memory",
rmNode.getNode().getName(), offeredResources.getVirtualCores(), offeredResources.getMemory());
}
}
private Resource getNewResourcesOfferedByMesos(String hostname) {
@VisibleForTesting
protected boolean offerWithinResourceLimits(Resource currentResources, Resource offeredResources) {
int newMemory = currentResources.getMemory() + offeredResources.getMemory();
int newCores = currentResources.getVirtualCores() + offeredResources.getVirtualCores();
return (newMemory <= conf.getJvmMaxMemoryMB() && newCores <= conf.getMaxCpus());
}
@VisibleForTesting
protected Resource getNewResourcesOfferedByMesos(String hostname) {
OfferFeed feed = offerLifecycleMgr.getOfferFeed(hostname);
if (feed == null) {
logger.debug("No offer feed for: {}", hostname);
return Resource.newInstance(0, 0);
}
List<Offer> offers = new ArrayList<>();
Protos.Offer offer;
while ((offer = feed.poll()) != null) {
offers.add(offer);
offers.add(offer);
offerLifecycleMgr.markAsConsumed(offer);
}
Resource fromMesosOffers = OfferUtils.getYarnResourcesFromMesosOffers(offers);
if (logger.isDebugEnabled()) {
@@ -153,10 +181,11 @@ private Resource getNewResourcesOfferedByMesos(String hostname) {
return fromMesosOffers;
}
private Resource getResourcesUnderUse(RMNodeStatusEvent statusEvent) {
@VisibleForTesting
protected Resource getResourcesUnderUse(RMNodeStatusEvent statusEvent) {
Resource usedResources = Resource.newInstance(0, 0);
for (ContainerStatus status : statusEvent.getContainers()) {
if (status.getState() == ContainerState.NEW || status.getState() == ContainerState.RUNNING) {
if (containerInUse(status)) {
RMContainer rmContainer = yarnScheduler.getRMContainer(status.getContainerId());
// (sdaingade) This check is needed as RMContainer information may not be populated
// immediately after a RM restart.
@@ -167,4 +196,8 @@ private Resource getResourcesUnderUse(RMNodeStatusEvent statusEvent) {
}
return usedResources;
}
private boolean containerInUse(ContainerStatus status) {
return (status.getState() == ContainerState.NEW || status.getState() == ContainerState.RUNNING);
}
}
Oops, something went wrong.

0 comments on commit 577c30b

Please sign in to comment.