From d7d2c62312d2e3ed73273a88359d1e2f35a0ecbc Mon Sep 17 00:00:00 2001 From: Aaron Gresch Date: Thu, 27 Sep 2018 13:30:59 -0500 Subject: [PATCH] STORM-3237 track Nimbus mkAssignment failures --- docs/ClusterMetrics.md | 1 + .../apache/storm/daemon/nimbus/Nimbus.java | 108 ++++++++++-------- 2 files changed, 63 insertions(+), 46 deletions(-) diff --git a/docs/ClusterMetrics.md b/docs/ClusterMetrics.md index 17c4ca1718a..4e4d0f14189 100644 --- a/docs/ClusterMetrics.md +++ b/docs/ClusterMetrics.md @@ -101,6 +101,7 @@ These are metrics that are specific to a nimbus instance. In many instances onl | nimbus:num-uploadChunk-calls | meter | calls to uploadChunk thrift method. | | nimbus:num-uploadNewCredentials-calls | meter | calls to uploadNewCredentials thrift method. | | nimbus:process-worker-metric-calls | meter | calls to processWorkerMetrics thrift method. | +| nimbus:mkAssignments-Errors | meter | tracks exceptions from mkAssignments | | nimbus:topology-scheduling-duration-ms | timer | time it takes to do a scheduling run. | | nimbus:total-available-memory-non-negative | gauge | available memory on the cluster MB | | nimbuses:uptime-secs | histogram | uptime of nimbuses | diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index d66c0ef859c..98ecfe398e7 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -265,6 +265,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private final Meter getOwnerResourceSummariesCalls; private final Meter shutdownCalls; private final Meter processWorkerMetricsCalls; + private final Meter mkAssignmentsErrors; + //Timer private final Timer fileUploadDuration; private final Timer schedulingDuration; @@ -511,6 +513,7 @@ public Nimbus(Map conf, INimbus inimbus, IStormClusterState stor "nimbus:num-getOwnerResourceSummaries-calls"); this.shutdownCalls = metricsRegistry.registerMeter("nimbus:num-shutdown-calls"); this.processWorkerMetricsCalls = metricsRegistry.registerMeter("nimbus:process-worker-metric-calls"); + this.mkAssignmentsErrors = metricsRegistry.registerMeter("nimbus:mkAssignments-Errors"); this.fileUploadDuration = metricsRegistry.registerTimer("nimbus:files-upload-duration-ms"); this.schedulingDuration = metricsRegistry.registerTimer("nimbus:topology-scheduling-duration-ms"); this.numAddedExecPerScheduling = metricsRegistry.registerHistogram("nimbus:num-added-executors-per-scheduling"); @@ -2166,64 +2169,77 @@ private void mkAssignments() throws Exception { } private void mkAssignments(String scratchTopoId) throws Exception { - if (!isReadyForMKAssignments()) { - return; - } - // get existing assignment (just the topologyToExecutorToNodePort map) -> default to {} - // filter out ones which have a executor timeout - // figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many executors - // should be in each slot (e.g., 4, 4, 4, 5) - // only keep existing slots that satisfy one of those slots. for rest, reassign them across remaining slots - // edge case for slots with no executor timeout but with supervisor timeout... just treat these as valid slots that can be - // reassigned to. worst comes to worse the executor will timeout and won't assign here next time around + try { + if (!isReadyForMKAssignments()) { + return; + } + // get existing assignment (just the topologyToExecutorToNodePort map) -> default to {} + // filter out ones which have a executor timeout + // figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many executors + // should be in each slot (e.g., 4, 4, 4, 5) + // only keep existing slots that satisfy one of those slots. for rest, reassign them across remaining slots + // edge case for slots with no executor timeout but with supervisor timeout... just treat these as valid slots that can be + // reassigned to. worst comes to worse the executor will timeout and won't assign here next time around - IStormClusterState state = stormClusterState; - //read all the topologies - Map bases; - Map tds = new HashMap<>(); - synchronized (submitLock) { - // should promote: only fetch storm bases of topologies that need scheduling. - bases = state.topologyBases(); + IStormClusterState state = stormClusterState; + //read all the topologies + Map bases; + Map tds = new HashMap<>(); + synchronized (submitLock) { + // should promote: only fetch storm bases of topologies that need scheduling. + bases = state.topologyBases(); - for (Iterator> it = bases.entrySet().iterator(); it.hasNext(); ) { - Entry entry = it.next(); - String id = entry.getKey(); - try { - tds.put(id, readTopologyDetails(id, entry.getValue())); - } catch (KeyNotFoundException e) { - //A race happened and it is probably not running - it.remove(); + for (Iterator> it = bases.entrySet().iterator(); it.hasNext(); ) { + Entry entry = it.next(); + String id = entry.getKey(); + try { + tds.put(id, readTopologyDetails(id, entry.getValue())); + } catch (KeyNotFoundException e) { + //A race happened and it is probably not running + it.remove(); + } } } - } - Topologies topologies = new Topologies(tds); - List assignedTopologyIds = state.assignments(null); - Map existingAssignments = new HashMap<>(); - for (String id : assignedTopologyIds) { - //for the topology which wants rebalance (specified by the scratchTopoId) - // we exclude its assignment, meaning that all the slots occupied by its assignment - // will be treated as free slot in the scheduler code. - if (!id.equals(scratchTopoId)) { - Assignment currentAssignment = state.assignmentInfo(id, null); - if (!currentAssignment.is_set_owner()) { - TopologyDetails td = tds.get(id); - if (td != null) { - currentAssignment.set_owner(td.getTopologySubmitter()); - state.setAssignment(id, currentAssignment, td.getConf()); + List assignedTopologyIds = state.assignments(null); + Map existingAssignments = new HashMap<>(); + for (String id : assignedTopologyIds) { + //for the topology which wants rebalance (specified by the scratchTopoId) + // we exclude its assignment, meaning that all the slots occupied by its assignment + // will be treated as free slot in the scheduler code. + if (!id.equals(scratchTopoId)) { + Assignment currentAssignment = state.assignmentInfo(id, null); + if (!currentAssignment.is_set_owner()) { + TopologyDetails td = tds.get(id); + if (td != null) { + currentAssignment.set_owner(td.getTopologySubmitter()); + state.setAssignment(id, currentAssignment, td.getConf()); + } } + existingAssignments.put(id, currentAssignment); } - existingAssignments.put(id, currentAssignment); } + + // make the new assignments for topologies + lockingMkAssignments(existingAssignments, bases, scratchTopoId, assignedTopologyIds, state, tds); + } catch (Exception e) { + this.mkAssignmentsErrors.mark(); + throw e; } - // make the new assignments for topologies + } + + private void lockingMkAssignments(Map existingAssignments, Map bases, + String scratchTopoId, List assignedTopologyIds, IStormClusterState state, + Map tds) throws Exception { + Topologies topologies = new Topologies(tds); + synchronized (schedLock) { Map newSchedulerAssignments = - computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); + computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); Map, List>> topologyToExecutorToNodePort = - computeTopoToExecToNodePort(newSchedulerAssignments, assignedTopologyIds); + computeTopoToExecToNodePort(newSchedulerAssignments, assignedTopologyIds); Map> newAssignedWorkerToResources = - computeTopoToNodePortToResources(newSchedulerAssignments); + computeTopoToNodePortToResources(newSchedulerAssignments); int nowSecs = Time.currentTimeSecs(); Map basicSupervisorDetailsMap = basicSupervisorDetailsMap(state); //construct the final Assignments by adding start-times etc into it @@ -2330,7 +2346,7 @@ private void mkAssignments(String scratchTopoId) throws Exception { totalAssignmentsChangedNodes.putAll(assignmentChangedNodes(existingAssignment, assignment)); } notifySupervisorsAssignments(newAssignments, assignmentsDistributer, totalAssignmentsChangedNodes, - basicSupervisorDetailsMap); + basicSupervisorDetailsMap); Map> addedSlots = new HashMap<>(); for (Entry entry : newAssignments.entrySet()) {