Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added new metric to report real time missing top state for partition #2344

Closed
wants to merge 7 commits into from
Expand Up @@ -85,6 +85,9 @@ private void updateTopStateStatus(ResourceControllerDataProvider cache,
if (cache.getClusterConfig() != null) {
durationThreshold = cache.getClusterConfig().getMissTopStateDurationThreshold();
}
if (clusterStatusMonitor != null) {
clusterStatusMonitor.updateMissingTopStateDurationThreshold(durationThreshold);
rahulrane50 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if durationThreshold is Long.MAX-VALUE, our monitoring will never finish.. thread will remain spin with no-action..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct if threshold is not set then it's no-op thread. But in this case most of the metrics related to top state are not reported so i'm assuming that this config is used commonly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we can assume that. Normally we have default value and user can set -1 if they want to disable this.

}

// Remove any resource records that no longer exists
missingTopStateMap.keySet().retainAll(resourceMap.keySet());
Expand Down Expand Up @@ -206,7 +209,7 @@ private void reportTopStateExistence(ResourceControllerDataProvider cache, Curre

if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
.containsKey(partition.getPartitionName())) {
// We previously recorded a top state missing, and it's not coming back
// We previously recorded a top state missing, and it has not come back
reportTopStateComesBack(cache, currentStateOutput.getCurrentStateMap(resourceName, partition),
resourceName, partition, clusterStatusMonitor, durationThreshold,
stateModelDef.getTopState());
Expand Down Expand Up @@ -319,12 +322,17 @@ private void reportTopStateHandoffFailIfNecessary(ResourceControllerDataProvider
String partitionName = partition.getPartitionName();
MissingTopStateRecord record = missingTopStateMap.get(resourceName).get(partitionName);
long startTime = record.getStartTimeStamp();
if (startTime > 0 && System.currentTimeMillis() - startTime > durationThreshold && !record
if (startTime > 0 && !record
.isFailed()) {
rahulrane50 marked this conversation as resolved.
Show resolved Hide resolved
record.setFailed();
missingTopStateMap.get(resourceName).put(partitionName, record);
if (clusterStatusMonitor != null) {
clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, 0L, false, false);
clusterStatusMonitor.updateMissingTopStateResourceMap(resourceName, partitionName, true, startTime);
}
if (System.currentTimeMillis() - startTime > durationThreshold) {
record.setFailed();
missingTopStateMap.get(resourceName).put(partitionName, record);
if (clusterStatusMonitor != null) {
clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, 0L, false, false);
rahulrane50 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
Expand Down Expand Up @@ -488,6 +496,10 @@ private void reportTopStateComesBack(ResourceControllerDataProvider cache, Map<S
}
}
removeFromStatsMap(missingTopStateMap, resourceName, partition);
if (clusterStatusMonitor != null) {
// Update recovered top state for partition in resource map.
clusterStatusMonitor.updateMissingTopStateResourceMap(resourceName, partition.getPartitionName(), false, 0L);
}
}

private void removeFromStatsMap(
Expand Down
Expand Up @@ -55,6 +55,54 @@
import org.slf4j.LoggerFactory;

public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
private class AsyncMissingTopStateMonitor extends Thread {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be meaningful if our pipeline speed is very fast. Otherwise, it will not help as the cache updated state is refreshed per pipeline. I dont believe we need build this thread but just rely on pipeline call as we did before.

But we need to remove the constraint of doing only final reporting.

We can discuss about it tomorrow f2f.

private final Map<String, Map<String, Long>> _missingTopStateResourceMap;
rahulrane50 marked this conversation as resolved.
Show resolved Hide resolved
private long _missingTopStateDurationThreshold = Long.MAX_VALUE;;

public AsyncMissingTopStateMonitor(Map<String, Map<String, Long>> missingTopStateResourceMap) {
_missingTopStateResourceMap = missingTopStateResourceMap;
}

public void setMissingTopStateDurationThreshold(long missingTopStateDurationThreshold) {
_missingTopStateDurationThreshold = missingTopStateDurationThreshold;
}

@Override
public void run() {
try {
synchronized (this) {
rahulrane50 marked this conversation as resolved.
Show resolved Hide resolved
while (true) {
while (_missingTopStateResourceMap.size() == 0) {
this.wait();
rahulrane50 marked this conversation as resolved.
Show resolved Hide resolved
}
for (Iterator<Map.Entry<String, Map<String, Long>>> resourcePartitionIt =
_missingTopStateResourceMap.entrySet().iterator(); resourcePartitionIt.hasNext(); ) {
Map.Entry<String, Map<String, Long>> resourcePartitionEntry = resourcePartitionIt.next();
// Iterate over all partitions and if any partition has missing top state greater than threshold then report
// it.
ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourcePartitionEntry.getKey());
// If all partitions of resource has top state recovered then reset the counter
if (resourcePartitionEntry.getValue().isEmpty()) {
resourceMonitor.resetOneOrManyPartitionsMissingTopStateRealTimeGuage();
resourcePartitionIt.remove();
rahulrane50 marked this conversation as resolved.
Show resolved Hide resolved
} else {
rahulrane50 marked this conversation as resolved.
Show resolved Hide resolved
for (Long missingTopStateStartTime : resourcePartitionEntry.getValue().values()) {
if (_missingTopStateDurationThreshold < Long.MAX_VALUE && System.currentTimeMillis() - missingTopStateStartTime > _missingTopStateDurationThreshold) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use Java parallel compute lamda feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good suggestion, but even if we parallelize this loop but still all those threads would be updating same guage value so it would be sequential only, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But it will save our sequential computational time as this update is synchronized in Helix regular pipelines.

resourceMonitor.updateOneOrManyPartitionsMissingTopStateRealTimeGuage();
}
}
}
}
// TODO: Check if this SLEEP_TIME is correct? Thread should keep on increasing the counter continuously until top
// state is recovered but it can sleep for reasonable amount of time in between.
sleep(100);
rahulrane50 marked this conversation as resolved.
Show resolved Hide resolved
}
}
} catch (InterruptedException e) {
LOG.error("AsyncMissingTopStateMonitor has been interrupted.", e);
rahulrane50 marked this conversation as resolved.
Show resolved Hide resolved
}
}
};
private static final Logger LOG = LoggerFactory.getLogger(ClusterStatusMonitor.class);

static final String MESSAGE_QUEUE_STATUS_KEY = "MessageQueueStatus";
Expand Down Expand Up @@ -109,9 +157,21 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {

private final Map<String, JobMonitor> _perTypeJobMonitorMap = new ConcurrentHashMap<>();

/**
* Missing top state resource map: resourceName-><PartitionName->startTimeOfMissingTopState>
*/
private final Map<String, Map<String, Long>> _missingTopStateResourceMap = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this line and next line can be part of ClusterStatusMonitor constructor? Just define the type of variable here, but do the instantiation in the constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point! Reason of defining (allocating memory) in main process is that main process (ClusterStatusMonitor) will be the one who is updating this map and async thread will be just reading from it.

private final AsyncMissingTopStateMonitor _asyncMissingTopStateMonitor = new AsyncMissingTopStateMonitor(_missingTopStateResourceMap);
rahulrane50 marked this conversation as resolved.
Show resolved Hide resolved

public ClusterStatusMonitor(String clusterName) {
_clusterName = clusterName;
_beanServer = ManagementFactory.getPlatformMBeanServer();
/**
* Start a async thread for each cluster which keeps monitoring missing top states of any resource for a cluster.
* This thread will keep on iterating over resources and report missingTopStateDuration for them until there are at
* least one resource with missing top state for a cluster.
*/
_asyncMissingTopStateMonitor.start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to join this thread eventually?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! I think it has to be stopped/interrupted for sure by ClusterStatusMonitor when it's cleaning up all state. But main process (ClusterStatusMonitor) do not have to wait until it's finished because it's async long running thread and metrics reporting is never ending job so it's lifecycle can be tied with main process. Let me know if that makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I did not follow. When ClusterStatusMonitor terminates, we need to join this thread dont we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the flow and async thread should be tied with ClusterStatusMonitor thread. That means currently, all beans are registered in active() method and unregistered or reset in reset() method. So whenever Helix controller will activate cluster status monitor it should start this async thread and whenever helix controller stops/resets cluster status monitor it should stop this async thread. So just to re-iterate whenever cluster status monitor is reset() it has to be activated first by caller which will make sure that this async thread will be started.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean after reset() we should kill this metrics reporting thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. that's what is done here. In reset() we unregister all things, clear map and then stop/interrupt thread. Async thread will be started again in active() call.

}

public ObjectName getObjectName(String name) throws MalformedObjectNameException {
Expand Down Expand Up @@ -583,6 +643,33 @@ public void updateMissingTopStateDurationStats(String resourceName, long totalDu
}
}

public void updateMissingTopStateDurationThreshold(long missingTopStateDurationThreshold) {
_asyncMissingTopStateMonitor.setMissingTopStateDurationThreshold(missingTopStateDurationThreshold);
rahulrane50 marked this conversation as resolved.
Show resolved Hide resolved
}

public void updateMissingTopStateResourceMap(String resourceName,String partitionName, boolean isTopStateMissing, long startTime) {
// Top state started missing
if (isTopStateMissing) {
// Wake up asyncMissingTopStateMonitor thread on first resource being added to map
if (_missingTopStateResourceMap.isEmpty()) {
synchronized (_asyncMissingTopStateMonitor) {
_asyncMissingTopStateMonitor.notify();
}
}
if (!_missingTopStateResourceMap.containsKey(resourceName)) {
_missingTopStateResourceMap.put(resourceName, new HashMap<String, Long>());
}
rahulrane50 marked this conversation as resolved.
Show resolved Hide resolved
_missingTopStateResourceMap.get(resourceName).put(partitionName, startTime);
} else { // top state recovered
// remove partitions from resourceMap whose top state has been recovered, this will put
// asyncMissingTopStateMonitor thread to sleep when no resources left to monitor.
Map<String, Long> entry = _missingTopStateResourceMap.get(resourceName);
if (entry != null) {
entry.remove(partitionName);
}
rahulrane50 marked this conversation as resolved.
Show resolved Hide resolved
}
}

public void updateRebalancerStats(String resourceName, long numPendingRecoveryRebalancePartitions,
long numPendingLoadRebalancePartitions, long numRecoveryRebalanceThrottledPartitions,
long numLoadRebalanceThrottledPartitions) {
Expand Down Expand Up @@ -652,6 +739,8 @@ public void reset() {
_rebalanceFailureCount.set(0L);
_continuousResourceRebalanceFailureCount.set(0L);
_continuousTaskRebalanceFailureCount.set(0L);
_asyncMissingTopStateMonitor.interrupt();
_missingTopStateResourceMap.clear();
rahulrane50 marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
LOG.error("Fail to reset ClusterStatusMonitor, cluster: " + _clusterName, e);
}
Expand Down
Expand Up @@ -65,6 +65,9 @@ public enum RebalanceStatus {
private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledReplicas;
private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledReplicas;
private SimpleDynamicMetric<Long> _numPendingStateTransitions;
// Real-time guage which keeps on increasing until a resource has "at least" 1 partition missing top state greater than
// threshold value. Guage will be reset when resource has no partition with missing top state.
private SimpleDynamicMetric<Long> _oneOrManyPartitionsMissingTopStateRealTimeGuage;

// Counters
private SimpleDynamicMetric<Long> _successfulTopStateHandoffDurationCounter;
Expand Down Expand Up @@ -108,7 +111,6 @@ public ResourceMonitor(String clusterName, String resourceName, ObjectName objec
_resourceName = resourceName;
_initObjectName = objectName;
_dynamicCapacityMetricsMap = new ConcurrentHashMap<>();

_externalViewIdealStateDiff = new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0L);
_numPendingRecoveryRebalanceReplicas =
new SimpleDynamicMetric("PendingRecoveryRebalanceReplicaGauge", 0L);
Expand All @@ -128,6 +130,7 @@ public ResourceMonitor(String clusterName, String resourceName, ObjectName objec
_numOfPartitionsInExternalView = new SimpleDynamicMetric("ExternalViewPartitionGauge", 0L);
_numOfPartitions = new SimpleDynamicMetric("PartitionGauge", 0L);
_numPendingStateTransitions = new SimpleDynamicMetric("PendingStateTransitionGauge", 0L);
_oneOrManyPartitionsMissingTopStateRealTimeGuage = new SimpleDynamicMetric<>("OneOrManyPartitionsMissingTopStateRealTimeGuage", 0L);

_partitionTopStateHandoffDurationGauge =
new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", new Histogram(
Expand Down Expand Up @@ -206,6 +209,8 @@ public long getTotalMessageReceived() {
return _totalMessageReceived.getValue();
}

public long getOneOrManyPartitionsMissingTopStateRealTimeGuage() { return _oneOrManyPartitionsMissingTopStateRealTimeGuage.getValue(); }

@Deprecated
public synchronized void increaseMessageCount(long messageReceived) {
_totalMessageReceived.updateValue(_totalMessageReceived.getValue() + messageReceived);
Expand Down Expand Up @@ -335,12 +340,21 @@ private void resetResourceStateGauges() {
_numOfPartitionsInExternalView.updateValue(0L);
_numLessMinActiveReplicaPartitions.updateValue(0L);
_numLessReplicaPartitions.updateValue(0L);
_oneOrManyPartitionsMissingTopStateRealTimeGuage.updateValue(0L);
rahulrane50 marked this conversation as resolved.
Show resolved Hide resolved
}

public void updatePendingStateTransitionMessages(int messageCount) {
_numPendingStateTransitions.updateValue((long) messageCount);
}

public void resetOneOrManyPartitionsMissingTopStateRealTimeGuage() {
_oneOrManyPartitionsMissingTopStateRealTimeGuage.updateValue(0L);
}

public void updateOneOrManyPartitionsMissingTopStateRealTimeGuage() {
_oneOrManyPartitionsMissingTopStateRealTimeGuage.updateValue(_oneOrManyPartitionsMissingTopStateRealTimeGuage.getValue() +1);
}

public void updateStateHandoffStats(MonitorState monitorState, long totalDuration,
long helixLatency, boolean isGraceful, boolean succeeded) {
switch (monitorState) {
Expand Down
Expand Up @@ -171,9 +171,12 @@ public void testFastTopStateHandoffWithNoMissingTopStateAndOldInstanceCrash(Test
// - S->M from 8000 to 9000
// Therefore the recorded latency should be 9000 - 7500 = 1500, though original master crashed,
// since this is a single top state handoff observed within 1 pipeline, we treat it as graceful,
// and only record user latency for transiting to master
// and only record user latency for transiting to master.
// Realtime metric for tracking missing top state should be started at 7000 and will be reset at 9000. Now since
// top state got recovered under threshold (7500), metric won't be incremented.
Range<Long> expectedDuration = Range.closed(1500L, 1500L);
Range<Long> expectedHelixLatency = Range.closed(500L, 500L);
// Now even if handoff took more than threshold real time metric should be correctly reset to 0 after successful handoff.
runStageAndVerify(
cfg.initialCurrentStates, cfg.currentStateWithMissingTopState, cfg.finalCurrentState,
new MissingStatesDataCacheInject() {
Expand All @@ -183,7 +186,7 @@ public void doInject(ResourceControllerDataProvider cache) {
liMap.remove("localhost_1");
cache.setLiveInstances(new ArrayList<>(liMap.values()));
}
}, 1, 0,
}, 1, 0, 0,
expectedDuration,
DURATION_ZERO,
expectedDuration, expectedHelixLatency
Expand Down Expand Up @@ -212,7 +215,44 @@ public void doInject(ResourceControllerDataProvider cache) {
cache.getInstanceOfflineTimeMap().put("localhost_0", lastOfflineTime);
cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE);
}
}, 1, 0,
},
1,
0,
0, // real time metric should have been incremented but reset to 0 after successful handoff.
DURATION_ZERO, // graceful handoff duration should be 0
expectedDuration, // we should have an record for non-graceful handoff
expectedDuration, // max handoff should be same as non-graceful handoff
DURATION_ZERO // we don't record user latency for non-graceful transition
);
}

@Test(dataProvider = "failedYetRecoveredTopStateNonGraceful")
public void testTopStateFailedYetNonGracefulHandoffCompleted(TestCaseConfig cfg) {
// localhost_0 crashed at 15000
// localhost_1 slave -> master started 20000, ended 22000, top state handoff = 7000
preSetup();
final String downInstance = "localhost_0";
final Long lastOfflineTime = 15000L;
Range<Long> expectedDuration = Range.closed(7000L, 7000L);
ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
clusterConfig.setMissTopStateDurationThreshold(500L);
setClusterConfig(clusterConfig);
runStageAndVerify(
cfg.initialCurrentStates, cfg.currentStateWithMissingTopState, cfg.finalCurrentState,
new MissingStatesDataCacheInject() {
@Override
public void doInject(ResourceControllerDataProvider cache) {
accessor.removeProperty(accessor.keyBuilder().liveInstance(downInstance));
Map<String, LiveInstance> liMap = new HashMap<>(cache.getLiveInstances());
liMap.remove("localhost_0");
cache.setLiveInstances(new ArrayList<>(liMap.values()));
cache.getInstanceOfflineTimeMap().put("localhost_0", lastOfflineTime);
cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE);
}
},
0, // since handoff could not be completed under threshold it's counted as failed.
1,
0, // real time metric should have been incremented but reset to 0 after handoff is COMPLETED.
DURATION_ZERO, // graceful handoff duration should be 0
expectedDuration, // we should have an record for non-graceful handoff
expectedDuration, // max handoff should be same as non-graceful handoff
Expand All @@ -222,6 +262,8 @@ public void doInject(ResourceControllerDataProvider cache) {

@Test(dataProvider = "failedCurrentStateInput")
public void testTopStateFailedHandoff(TestCaseConfig cfg) {
// Since current resource has at least one partition with missing top state then real time gurage should have non-zero
// value.
ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
clusterConfig.setMissTopStateDurationThreshold(5000L);
setClusterConfig(clusterConfig);
Expand Down Expand Up @@ -254,7 +296,7 @@ public void testHandoffDurationWithDefaultStartTime(final TestCaseConfig cfg) {
// actual timestamp when running the stage will be later than current time, so the expected
// helix latency will be less than the mocked helix latency
runStageAndVerify(Collections.EMPTY_MAP, cfg.currentStateWithMissingTopState,
cfg.finalCurrentState, null, 1, 0,
cfg.finalCurrentState, null, 1, 0, 0,
Range.closed(0L, helixLatency + userLatency),
DURATION_ZERO,
Range.closed(0L, helixLatency + userLatency),
Expand Down Expand Up @@ -312,7 +354,7 @@ public void testHandoffDurationWithPendingMessage(final TestCaseConfig cfg) {
cache.cacheMessages(Collections.singletonList(message));
}
}
}, 1, 0,
}, 1, 0, 0,
Range.closed(durationToVerify, durationToVerify),
DURATION_ZERO,
Range.closed(durationToVerify, durationToVerify),
Expand Down Expand Up @@ -355,7 +397,7 @@ private void runTestWithNoInjection(TestCaseConfig cfg, boolean expectFail) {
Range<Long> expectedHelixLatency =
cfg.isGraceful ? Range.closed(cfg.helixLatency, cfg.helixLatency) : DURATION_ZERO;
runStageAndVerify(cfg.initialCurrentStates, cfg.currentStateWithMissingTopState,
cfg.finalCurrentState, null, expectFail ? 0 : 1, expectFail ? 1 : 0, expectedDuration, expectedNonGracefulDuration,
cfg.finalCurrentState, null, expectFail ? 0 : 1, expectFail ? 1 : 0, expectFail ? 1 : 0, expectedDuration, expectedNonGracefulDuration,
expectedDuration, expectedHelixLatency);
}

Expand Down Expand Up @@ -412,6 +454,7 @@ private void runStageAndVerify(
MissingStatesDataCacheInject inject,
int successCnt,
int failCnt,
int oneOrManyPartitionsMissingTopStateCnt,
Range<Long> expectedDuration,
Range<Long> expectedNonGracefulDuration,
Range<Long> expectedMaxDuration,
Expand All @@ -427,6 +470,8 @@ private void runStageAndVerify(
Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), successCnt);
Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), failCnt);

Assert.assertTrue(monitor.getOneOrManyPartitionsMissingTopStateRealTimeGuage() >= oneOrManyPartitionsMissingTopStateCnt);

long graceful = monitor.getPartitionTopStateHandoffDurationGauge()
.getAttributeValue(GRACEFUL_HANDOFF_DURATION).longValue();
long nonGraceful = monitor.getPartitionTopStateNonGracefulHandoffDurationGauge()
Expand Down