From e39988646d9ed3e96d129c85b65a0ddeb4233910 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 7 Sep 2018 12:31:47 -0500 Subject: [PATCH] STORM-3216: Add in RasBlacklistStrategy --- .../java/org/apache/storm/DaemonConfig.java | 3 + .../org/apache/storm/scheduler/Cluster.java | 37 +++++++ .../storm/scheduler/ISchedulingState.java | 21 +++- .../storm/scheduler/TopologyDetails.java | 12 ++ .../strategies/DefaultBlacklistStrategy.java | 86 +++++++++------ .../strategies/RasBlacklistStrategy.java | 103 ++++++++++++++++++ .../NormalizedResourceOffer.java | 22 ++++ .../NormalizedResourceRequest.java | 5 + .../normalization/NormalizedResources.java | 37 +++++++ .../NormalizedResourcesWithMemory.java | 4 + .../scheduling/BaseResourceAwareStrategy.java | 2 +- 11 files changed, 295 insertions(+), 37 deletions(-) create mode 100644 storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index d9869d4fd89..9eaf6f6cf11 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -122,6 +122,9 @@ public class DaemonConfig implements Validated { /** * The class that specifies the eviction strategy to use in blacklist scheduler. + * If you are using the RAS scheduler please set this to + * "org.apache.storm.scheduler.blacklist.strategies.RasBlacklistStrategy" or you may + * get odd behavior when the cluster is full and there are blacklisted nodes. */ @NotNull @isImplementationOfClass(implementsClass = IBlacklistStrategy.class) diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java index d0142368543..e8d771d5a38 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java @@ -374,6 +374,18 @@ public Set getAssignablePorts(SupervisorDetails supervisor) { return supervisor.getAllPorts(); } + @Override + public List getNonBlacklistedAvailableSlots(List blacklistedSupervisorIds) { + List slots = new ArrayList<>(); + for (SupervisorDetails supervisor : this.supervisors.values()) { + if (!isBlackListed(supervisor.getId()) && !blacklistedSupervisorIds.contains(supervisor.getId())) { + slots.addAll(getAvailableSlots(supervisor)); + } + } + + return slots; + } + @Override public List getAvailableSlots() { List slots = new ArrayList<>(); @@ -448,6 +460,19 @@ public int getAssignedNumWorkers(TopologyDetails topology) { return slots.size(); } + @Override + public NormalizedResourceOffer getAvailableResources(SupervisorDetails sd) { + NormalizedResourceOffer ret = new NormalizedResourceOffer(sd.getTotalResources()); + for (SchedulerAssignment assignment: assignments.values()) { + for (Entry entry: assignment.getScheduledResources().entrySet()) { + if (sd.getId().equals(entry.getKey().getNodeId())) { + ret.remove(entry.getValue()); + } + } + } + return ret; + } + private void addResource(Map resourceMap, String resourceName, Double valueToBeAdded) { if (!resourceMap.containsKey(resourceName)) { resourceMap.put(resourceName, 0.0); @@ -781,6 +806,18 @@ public Map getSupervisors() { return this.supervisors; } + @Override + public NormalizedResourceOffer getNonBlacklistedClusterAvailableResources(Collection blacklistedSupervisorIds) { + NormalizedResourceOffer available = new NormalizedResourceOffer(); + for (SupervisorDetails sup : supervisors.values()) { + if (!isBlackListed(sup.getId()) && !blacklistedSupervisorIds.contains(sup.getId())) { + available.add(sup.getTotalResources()); + available.remove(getAllScheduledResourcesForNode(sup.getId())); + } + } + return available; + } + @Override public double getClusterTotalCpuResource() { double sum = 0.0; diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java index d96109cff4e..41a00b0dc2b 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java @@ -153,6 +153,12 @@ Map> getNeedsSchedulingComponentToExecutors( */ List getAvailableSlots(); + /** + * Get all the available worker slots in the cluster, that are not blacklisted. + * @param blacklistedSupervisorIds list of supervisor ids that should also be considered blacklisted. + */ + List getNonBlacklistedAvailableSlots(List blacklistedSupervisorIds); + /** * Return all non-blacklisted slots on this supervisor. * @@ -187,6 +193,13 @@ Map> getNeedsSchedulingComponentToExecutors( */ int getAssignedNumWorkers(TopologyDetails topology); + /** + * Get the resources on the supervisor that are available to be scheduled. + * @param sd the supervisor. + * @return the resources available to be scheduled. + */ + NormalizedResourceOffer getAvailableResources(SupervisorDetails sd); + /** * Would scheduling exec on ws fit? With a heap <= maxHeap total memory added <= memoryAvailable and cpu added <= cpuAvailable. * @@ -239,9 +252,15 @@ boolean wouldFit( /** * Get all scheduled resources for node. - **/ + */ NormalizedResourceRequest getAllScheduledResourcesForNode(String nodeId); + /** + * Get the resources in the cluster that are available for scheduling. + * @param blacklistedSupervisorIds other ids that are tentatively blacklisted. + */ + NormalizedResourceOffer getNonBlacklistedClusterAvailableResources(Collection blacklistedSupervisorIds); + /** * Get the total amount of CPU resources in cluster. */ diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java index 6fd7d389df7..5d781aad8eb 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java @@ -339,6 +339,18 @@ public NormalizedResourceRequest getTotalResources(ExecutorDetails exec) { return null; } + /** + * Get an approximate total resources needed for this topology. + * @return the approximate total resources needed for this topology. + */ + public NormalizedResourceRequest getApproximateTotalResources() { + NormalizedResourceRequest ret = new NormalizedResourceRequest(); + for (NormalizedResourceRequest resources : resourceList.values()) { + ret.add(resources); + } + return ret; + } + /** * Get the total CPU requirement for executor. * diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java index 3332b2d61f0..a0c5b6ca67a 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java @@ -12,6 +12,7 @@ package org.apache.storm.scheduler.blacklist.strategies; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -30,11 +31,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * The default strategy used for blacklisting hosts. + */ public class DefaultBlacklistStrategy implements IBlacklistStrategy { public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800; public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3; - private static Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class); + private static final Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class); private IReporter reporter; private int toleranceCount; @@ -59,7 +63,7 @@ public void prepare(Map conf) { @Override public Set getBlacklist(List>> supervisorsWithFailures, Cluster cluster, Topologies topologies) { - Map countMap = new HashMap(); + Map countMap = new HashMap<>(); for (Map> item : supervisorsWithFailures) { Set supervisors = item.keySet(); @@ -68,25 +72,32 @@ public Set getBlacklist(List>> supervisorsWithF countMap.put(supervisor, supervisorCount + 1); } } + for (Map.Entry entry : countMap.entrySet()) { String supervisor = entry.getKey(); int count = entry.getValue(); if (count >= toleranceCount) { if (!blacklist.containsKey(supervisor)) { // if not in blacklist then add it and set the resume time according to config - LOG.debug("add supervisor {} to blacklist", supervisor); + LOG.debug("Added supervisor {} to blacklist", supervisor); LOG.debug("supervisorsWithFailures : {}", supervisorsWithFailures); reporter.reportBlacklist(supervisor, supervisorsWithFailures); blacklist.put(supervisor, resumeTime / nimbusMonitorFreqSecs); } } } - releaseBlacklistWhenNeeded(cluster, topologies); + Set toRelease = releaseBlacklistWhenNeeded(cluster, new ArrayList<>(blacklist.keySet())); + if (toRelease != null) { + LOG.debug("Releasing {} nodes because of low resources", toRelease.size()); + for (String key: toRelease) { + blacklist.remove(key); + } + } return blacklist.keySet(); } @Override public void resumeFromBlacklist() { - Set readyToRemove = new HashSet(); + Set readyToRemove = new HashSet<>(); for (Map.Entry entry : blacklist.entrySet()) { String key = entry.getKey(); int value = entry.getValue() - 1; @@ -102,48 +113,53 @@ public void resumeFromBlacklist() { } } - private void releaseBlacklistWhenNeeded(Cluster cluster, Topologies topologies) { - if (blacklist.size() > 0) { - int totalNeedNumWorkers = 0; - List needSchedulingTopologies = cluster.needsSchedulingTopologies(); - for (TopologyDetails topologyDetails : needSchedulingTopologies) { - int numWorkers = topologyDetails.getNumWorkers(); - int assignedNumWorkers = cluster.getAssignedNumWorkers(topologyDetails); - int unAssignedNumWorkers = numWorkers - assignedNumWorkers; - totalNeedNumWorkers += unAssignedNumWorkers; + /** + * Decide when/if to release blacklisted hosts. + * @param cluster the current state of the cluster. + * @param blacklistedNodeIds the current set of blacklisted node ids sorted by earliest + * @return the set of nodes to be released. + */ + protected Set releaseBlacklistWhenNeeded(Cluster cluster, final List blacklistedNodeIds) { + Set readyToRemove = new HashSet<>(); + if (blacklistedNodeIds.size() > 0) { + int availableSlots = cluster.getNonBlacklistedAvailableSlots(blacklistedNodeIds).size(); + int neededSlots = 0; + + for (TopologyDetails td : cluster.needsSchedulingTopologies()) { + int slots = td.getNumWorkers(); + int assignedSlots = cluster.getAssignedNumWorkers(td); + int tdSlotsNeeded = slots - assignedSlots; + neededSlots += tdSlotsNeeded; } + + //Now we need to free up some resources... Map availableSupervisors = cluster.getSupervisors(); - List availableSlots = cluster.getAvailableSlots(); - int availableSlotsNotInBlacklistCount = 0; - for (WorkerSlot slot : availableSlots) { - if (!blacklist.containsKey(slot.getNodeId())) { - availableSlotsNotInBlacklistCount += 1; - } - } - int shortage = totalNeedNumWorkers - availableSlotsNotInBlacklistCount; + int shortageSlots = neededSlots - availableSlots; + LOG.debug("Need {} slots.", neededSlots); + LOG.debug("Available {} slots.", availableSlots); + LOG.debug("Shortage {} slots.", shortageSlots); - if (shortage > 0) { - LOG.info("total needed num of workers :{}, available num of slots not in blacklist :{}, num blacklist :{}, " + - "will release some blacklist.", totalNeedNumWorkers, availableSlotsNotInBlacklistCount, blacklist.size()); + if (shortageSlots > 0) { + LOG.info("Need {} slots more. Releasing some blacklisted nodes to cover it.", shortageSlots); //release earliest blacklist - Set readyToRemove = new HashSet<>(); - for (String supervisor : blacklist.keySet()) { //blacklist is treeMap sorted by value, minimum value means earliest - if (availableSupervisors.containsKey(supervisor)) { - Set ports = cluster.getAvailablePorts(availableSupervisors.get(supervisor)); + for (String supervisor : blacklistedNodeIds) { + SupervisorDetails sd = availableSupervisors.get(supervisor); + if (sd != null) { + int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); readyToRemove.add(supervisor); - shortage -= ports.size(); - if (shortage <= 0) { //released enough supervisor + shortageSlots -= sdAvailableSlots; + LOG.debug("Releasing {} with {} slots leaving {} slots to go", supervisor, + sdAvailableSlots, shortageSlots); + if (shortageSlots <= 0) { + // we have enough resources now... break; } } } - for (String key : readyToRemove) { - blacklist.remove(key); - LOG.info("release supervisor {} for shortage of worker slots.", key); - } } } + return readyToRemove; } private Object initializeInstance(String className, String representation) { diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java new file mode 100644 index 00000000000..f7abc04dbcb --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.blacklist.strategies; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.SupervisorDetails; +import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer; +import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest; +import org.apache.storm.scheduler.resource.normalization.NormalizedResourcesWithMemory; +import org.apache.storm.utils.ServerUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Blacklisting strategy just like the default one, but specifically setup for use with the resource aware scheduler. + */ +public class RasBlacklistStrategy extends DefaultBlacklistStrategy { + private static final Logger LOG = LoggerFactory.getLogger(RasBlacklistStrategy.class); + + @Override + protected Set releaseBlacklistWhenNeeded(Cluster cluster, final List blacklistedNodeIds) { + LOG.info("RAS We have {} nodes blacklisted...", blacklistedNodeIds.size()); + Set readyToRemove = new HashSet<>(); + if (blacklistedNodeIds.size() > 0) { + int availableSlots = cluster.getNonBlacklistedAvailableSlots(blacklistedNodeIds).size(); + int neededSlots = 0; + NormalizedResourceOffer available = cluster.getNonBlacklistedClusterAvailableResources(blacklistedNodeIds); + NormalizedResourceOffer needed = new NormalizedResourceOffer(); + + for (TopologyDetails td : cluster.getTopologies()) { + if (cluster.needsSchedulingRas(td)) { + int slots = 0; + try { + slots = ServerUtils.getEstimatedWorkerCountForRASTopo(td.getConf(), td.getTopology()); + } catch (InvalidTopologyException e) { + LOG.warn("Could not guess the number of slots needed for {}", td.getName(), e); + } + int assignedSlots = cluster.getAssignedNumWorkers(td); + int tdSlotsNeeded = slots - assignedSlots; + neededSlots += tdSlotsNeeded; + + NormalizedResourceRequest resources = td.getApproximateTotalResources(); + needed.add(resources); + + LOG.warn("{} needs to be scheduled with {} and {} slots", td.getName(), resources, tdSlotsNeeded); + } + } + + //Now we need to free up some resources... + Map availableSupervisors = cluster.getSupervisors(); + NormalizedResourceOffer shortage = new NormalizedResourceOffer(needed); + shortage.remove(available); + int shortageSlots = neededSlots - availableSlots; + LOG.debug("Need {} and {} slots.", needed, neededSlots); + LOG.debug("Available {} and {} slots.", available, availableSlots); + LOG.debug("Shortage {} and {} slots.", shortage, shortageSlots); + + if (shortage.areAnyOverZero() || shortageSlots > 0) { + LOG.info("Need {} and {} slots more. Releasing some blacklisted nodes to cover it.", shortage, shortageSlots); + //release earliest blacklist + for (String supervisor : blacklistedNodeIds) { + SupervisorDetails sd = availableSupervisors.get(supervisor); + if (sd != null) { + NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); + int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); + readyToRemove.add(supervisor); + shortage.remove(sdAvailable); + shortageSlots -= sdAvailableSlots; + LOG.debug("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisor, + sdAvailable, sdAvailableSlots, shortage, shortageSlots); + if (!shortage.areAnyOverZero() && shortageSlots <= 0) { + // we have enough resources now... + break; + } + } + } + } + } + return readyToRemove; + } +} diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java index 2c1f5be6173..c680be718d6 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java @@ -20,6 +20,7 @@ import java.util.Map; import org.apache.storm.Constants; +import org.apache.storm.generated.WorkerResources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,6 +97,22 @@ public boolean remove(NormalizedResourcesWithMemory other) { return negativeResources; } + /** + * Remove the resources in other from this. + * @param other the resources to be removed. + * @return true if one or more resources in other were larger than available resources in this, else false. + */ + public boolean remove(WorkerResources other) { + boolean negativeResources = normalizedResources.remove(other); + totalMemoryMb -= (other.get_mem_off_heap() + other.get_mem_on_heap()); + if (totalMemoryMb < 0.0) { + negativeResources = true; + NormalizedResources.numNegativeResourceEvents.mark(); + totalMemoryMb = 0.0; + } + return negativeResources; + } + /** * Calculate the average percentage used. * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, @@ -153,4 +170,9 @@ public void clear() { this.totalMemoryMb = 0.0; this.normalizedResources.clear(); } + + @Override + public boolean areAnyOverZero() { + return totalMemoryMb > 0 || normalizedResources.areAnyOverZero(); + } } diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java index 63a8a562066..14c6846fc7a 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java @@ -242,4 +242,9 @@ public void clear() { offHeap = 0.0; onHeap = 0.0; } + + @Override + public boolean areAnyOverZero() { + return onHeap > 0 || offHeap > 0 || normalizedResources.areAnyOverZero(); + } } diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java index 4a069994c9a..1e6af6f5ba3 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java @@ -153,6 +153,30 @@ public boolean remove(NormalizedResources other) { return ret; } + /** + * Remove the resources of a worker from this. + * + * @param value the worker resources that should be removed from this. + */ + public boolean remove(WorkerResources value) { + Map workerNormalizedResources = value.get_resources(); + cpu -= workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0); + return remove(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources)) || cpu < 0; + } + + private boolean remove(double[] resourceArray) { + boolean ret = false; + int otherLength = resourceArray.length; + zeroPadOtherResourcesIfNecessary(otherLength); + for (int i = 0; i < otherLength; i++) { + otherResources[i] -= resourceArray[i]; + if (otherResources[i] < 0) { + ret = true; + } + } + return ret; + } + @Override public String toString() { return "Normalized resources: " + toNormalizedMap(); @@ -366,4 +390,17 @@ public void clear() { otherResources[i] = 0.0; } } + + /** + * Are any of the resources positive. + * @return true of any of the resources are positive. False if they are all <= 0. + */ + public boolean areAnyOverZero() { + for (int i = 0; i < otherResources.length; i++) { + if (otherResources[i] > 0) { + return true; + } + } + return cpu > 0; + } } diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java index aeb0737643c..2c910bac86d 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java @@ -30,4 +30,8 @@ public interface NormalizedResourcesWithMemory { */ void clear(); + /** + * Return true if any of the resources are > 0. + */ + boolean areAnyOverZero(); } diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java index 7d22336b204..113243215b3 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java @@ -603,7 +603,7 @@ private void logClusterInfo() { * @return the ids n that node. */ public List hostnameToNodes(String hostname) { - return hostnameToNodes.get(hostname); + return hostnameToNodes.getOrDefault(hostname, Collections.emptyList()); } /**