diff --git a/cruise-control-core/src/main/java/com/linkedin/cruisecontrol/detector/metricanomaly/MetricAnomalyFinder.java b/cruise-control-core/src/main/java/com/linkedin/cruisecontrol/detector/metricanomaly/MetricAnomalyFinder.java index 177c5e994..1fd624608 100644 --- a/cruise-control-core/src/main/java/com/linkedin/cruisecontrol/detector/metricanomaly/MetricAnomalyFinder.java +++ b/cruise-control-core/src/main/java/com/linkedin/cruisecontrol/detector/metricanomaly/MetricAnomalyFinder.java @@ -24,4 +24,12 @@ public interface MetricAnomalyFinder extends CruiseControlConf */ Collection> metricAnomalies(Map metricsHistoryByEntity, Map currentMetricsByEntity); + + /** + * Get the latest number of metric anomalies with the given type detected by this metric anomaly finder. + * + * @param type Metric anomaly type for which the latest number of metric anomalies is queried. + * @return The latest number of metric anomalies with the given type detected by this metric anomaly finder. + */ + int numAnomaliesOfType(MetricAnomalyType type); } diff --git a/cruise-control-core/src/main/java/com/linkedin/cruisecontrol/detector/metricanomaly/MetricAnomalyType.java b/cruise-control-core/src/main/java/com/linkedin/cruisecontrol/detector/metricanomaly/MetricAnomalyType.java new file mode 100644 index 000000000..434a1eb6e --- /dev/null +++ b/cruise-control-core/src/main/java/com/linkedin/cruisecontrol/detector/metricanomaly/MetricAnomalyType.java @@ -0,0 +1,36 @@ +/* + * Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.cruisecontrol.detector.metricanomaly; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + + +/** + * Flags to indicate if a {@link MetricAnomalyFinder} identified brokers as anomaly suspects, recent anomalies, or persistent anomalies. + * A {@link MetricAnomalyFinder#metricAnomalies(Map, Map)} may report all or a selected subset of metric anomaly types + * (e.g. {@link #RECENT} and {@link #PERSISTENT}, but not {@link #SUSPECT}). + * + *
    + *
  • {@link #SUSPECT}: The broker is a metric anomaly suspect, but there is not yet enough evidence to conclude either way.
  • + *
  • {@link #RECENT}: The broker has recently been identified with a metric anomaly.
  • + *
  • {@link #PERSISTENT}: The broker continues to be identified with a metric anomaly for a prolonged period.
  • + *
+ */ +public enum MetricAnomalyType { + SUSPECT, RECENT, PERSISTENT; + + private static final List CACHED_VALUES = Collections.unmodifiableList(Arrays.asList(values())); + + /** + * Use this instead of values() because values() creates a new array each time. + * @return enumerated values in the same order as values() + */ + public static List cachedValues() { + return CACHED_VALUES; + } +} diff --git a/cruise-control-core/src/main/java/com/linkedin/cruisecontrol/detector/metricanomaly/PercentileMetricAnomalyFinder.java b/cruise-control-core/src/main/java/com/linkedin/cruisecontrol/detector/metricanomaly/PercentileMetricAnomalyFinder.java index 06758c00f..1d70931b6 100644 --- a/cruise-control-core/src/main/java/com/linkedin/cruisecontrol/detector/metricanomaly/PercentileMetricAnomalyFinder.java +++ b/cruise-control-core/src/main/java/com/linkedin/cruisecontrol/detector/metricanomaly/PercentileMetricAnomalyFinder.java @@ -43,9 +43,11 @@ public abstract class PercentileMetricAnomalyFinder implements protected Double _anomalyUpperPercentile; protected Double _anomalyLowerPercentile; protected Set _interestedMetrics; + protected int _numRecentAnomalies; public PercentileMetricAnomalyFinder() { _percentile = new Percentile(); + _numRecentAnomalies = 0; } /** @@ -170,9 +172,16 @@ public Collection> metricAnomalies(Map configs) { PercentileMetricAnomalyFinderConfig internalConfig = new PercentileMetricAnomalyFinderConfig(configs); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManager.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManager.java index a947c1f83..cd3ea478c 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManager.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManager.java @@ -8,6 +8,7 @@ import com.codahale.metrics.MetricRegistry; import com.linkedin.cruisecontrol.detector.Anomaly; import com.linkedin.cruisecontrol.detector.AnomalyType; +import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyType; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl; import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionStatus; import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig; @@ -189,6 +190,13 @@ private void registerGaugeSensors(MetricRegistry dropwizardMetricRegistry) { dropwizardMetricRegistry.register(MetricRegistry.name(METRIC_REGISTRY_NAME, "right-sized"), (Gauge) () -> (_goalViolationDetector.provisionStatus() == ProvisionStatus.RIGHT_SIZED) ? 1 : 0); + + // The number of metric anomalies corresponding to each metric anomaly type. + for (MetricAnomalyType type : MetricAnomalyType.cachedValues()) { + dropwizardMetricRegistry.register(MetricRegistry.name(METRIC_REGISTRY_NAME, + String.format("num-%s-metric-anomalies", type.toString().toLowerCase())), + (Gauge) () -> _metricAnomalyDetector.numAnomaliesOfType(type)); + } } private void scheduleDetectorAtFixedRate(KafkaAnomalyType anomalyType, Runnable anomalyDetector) { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/MetricAnomalyDetector.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/MetricAnomalyDetector.java index 43f0470a1..d787c0ab2 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/MetricAnomalyDetector.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/MetricAnomalyDetector.java @@ -6,6 +6,7 @@ import com.linkedin.cruisecontrol.detector.Anomaly; import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyFinder; +import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyType; import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl; import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig; @@ -30,6 +31,7 @@ public class MetricAnomalyDetector extends AbstractAnomalyDetector implements Ru public static final String METRIC_ANOMALY_BROKER_ENTITIES_OBJECT_CONFIG = "metric.anomaly.broker.entities.object"; public static final String METRIC_ANOMALY_FIXABLE_OBJECT_CONFIG = "metric.anomaly.fixable.object"; private final List _kafkaMetricAnomalyFinders; + private boolean _skippedLatestDetection; public MetricAnomalyDetector(Queue anomalies, KafkaCruiseControl kafkaCruiseControl) { super(anomalies, kafkaCruiseControl); @@ -39,6 +41,19 @@ public MetricAnomalyDetector(Queue anomalies, KafkaCruiseControl kafkaC AnomalyDetectorConfig.METRIC_ANOMALY_FINDER_CLASSES_CONFIG, MetricAnomalyFinder.class, configWithCruiseControlObject); + _skippedLatestDetection = true; + } + + /** + * Get the latest total number of metric anomalies with the given type detected by metric anomaly finders, or {@code 0} if the latest + * anomaly detection was skipped. + * + * @param type Metric anomaly type for which the latest total number of metric anomalies is queried. + * @return The latest total number of metric anomalies with the given type detected by metric anomaly finders, or {@code 0} if the latest + * anomaly detection was skipped. + */ + int numAnomaliesOfType(MetricAnomalyType type) { + return _skippedLatestDetection ? 0 : _kafkaMetricAnomalyFinders.stream().mapToInt(finder -> finder.numAnomaliesOfType(type)).sum(); } @Override @@ -46,6 +61,8 @@ public MetricAnomalyDetector(Queue anomalies, KafkaCruiseControl kafkaC public void run() { try { if (getAnomalyDetectionStatus(_kafkaCruiseControl, true, true) != AnomalyDetectionStatus.READY) { + // Skip the latest detection because metric anomaly detector is not ready + _skippedLatestDetection = true; return; } @@ -56,8 +73,9 @@ public void run() { for (MetricAnomalyFinder kafkaMetricAnomalyFinder : _kafkaMetricAnomalyFinders) { _anomalies.addAll(kafkaMetricAnomalyFinder.metricAnomalies(metricsHistoryByBroker, currentMetricsByBroker)); } - + _skippedLatestDetection = false; } catch (Exception e) { + _skippedLatestDetection = true; LOG.warn("Metric Anomaly Detector encountered exception: ", e); } finally { LOG.debug("Metric anomaly detection finished."); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/NoopMetricAnomalyFinder.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/NoopMetricAnomalyFinder.java index 380322c4b..9e3ca8c11 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/NoopMetricAnomalyFinder.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/NoopMetricAnomalyFinder.java @@ -6,6 +6,7 @@ import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomaly; import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyFinder; +import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyType; import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations; import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerEntity; import java.util.Collection; @@ -25,6 +26,11 @@ public Collection> metricAnomalies( return Collections.emptySet(); } + @Override + public int numAnomaliesOfType(MetricAnomalyType type) { + return 0; + } + @Override public void configure(Map configs) { } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/SlowBrokerFinder.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/SlowBrokerFinder.java index 964683e6e..c89e89e97 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/SlowBrokerFinder.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/SlowBrokerFinder.java @@ -6,6 +6,7 @@ import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomaly; import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyFinder; +import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyType; import com.linkedin.cruisecontrol.monitor.sampling.aggregator.AggregatedMetricValues; import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl; @@ -140,6 +141,7 @@ public class SlowBrokerFinder implements MetricAnomalyFinder { private final Map _brokerSlownessScore; private final Map _detectedSlowBrokers; private final Percentile _percentile; + private final Map _numSlowBrokersByType; private double _bytesInRateDetectionThreshold; private double _logFlushTimeThresholdMs; private double _metricHistoryPercentile; @@ -154,6 +156,8 @@ public SlowBrokerFinder() { _brokerSlownessScore = new HashMap<>(); _detectedSlowBrokers = new HashMap<>(); _percentile = new Percentile(); + _numSlowBrokersByType = new HashMap<>(MetricAnomalyType.cachedValues().size()); + MetricAnomalyType.cachedValues().forEach(type -> _numSlowBrokersByType.put(type, 0)); } private Set detectMetricAnomalies(Map metricsHistoryByBroker, @@ -347,6 +351,11 @@ public Collection> metricAnomalies(Map detectedMetricAnomalies) { for (BrokerEntity broker : detectedMetricAnomalies) { // Update slow broker detection time and slowness score. @@ -389,10 +398,15 @@ private Set> createSlowBrokerAnomalies(Set clusterSize * _selfHealingUnfixableRatio) { + if (numBrokersToDemoteOrRemove > clusterSize * _selfHealingUnfixableRatio) { brokersToRemove.forEach(brokersToDemote::put); detectedSlowBrokers.add(createSlowBrokersAnomaly(brokersToDemote, false, false)); } else { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskTracker.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskTracker.java index 810a982cb..67054cba9 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskTracker.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskTracker.java @@ -33,17 +33,17 @@ public class ExecutionTaskTracker { private final Time _time; private volatile boolean _stopRequested; - private static final String INTER_BROKER_REPLICA_ACTION = "replica-action"; - private static final String INTRA_BROKER_REPLICA_ACTION = "intra-broker-replica-action"; - private static final String LEADERSHIP_ACTION = "leadership-action"; - private static final String IN_PROGRESS = "in-progress"; - private static final String PENDING = "pending"; - private static final String ABORTING = "aborting"; - private static final String ABORTED = "aborted"; - private static final String DEAD = "dead"; - private static final String COMPLETED = "completed"; - private static final String GAUGE_ONGOING_EXECUTION_IN_KAFKA_ASSIGNER_MODE = "ongoing-execution-kafka_assigner"; - private static final String GAUGE_ONGOING_EXECUTION_IN_NON_KAFKA_ASSIGNER_MODE = "ongoing-execution-non_kafka_assigner"; + public static final String INTER_BROKER_REPLICA_ACTION = "replica-action"; + public static final String INTRA_BROKER_REPLICA_ACTION = "intra-broker-replica-action"; + public static final String LEADERSHIP_ACTION = "leadership-action"; + public static final String IN_PROGRESS = "in-progress"; + public static final String PENDING = "pending"; + public static final String ABORTING = "aborting"; + public static final String ABORTED = "aborted"; + public static final String DEAD = "dead"; + public static final String COMPLETED = "completed"; + public static final String GAUGE_ONGOING_EXECUTION_IN_KAFKA_ASSIGNER_MODE = "ongoing-execution-kafka_assigner"; + public static final String GAUGE_ONGOING_EXECUTION_IN_NON_KAFKA_ASSIGNER_MODE = "ongoing-execution-non_kafka_assigner"; ExecutionTaskTracker(MetricRegistry dropwizardMetricRegistry, Time time) { List states = ExecutionTaskState.cachedValues(); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java index 442a4da52..971e1a662 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java @@ -82,7 +82,7 @@ public class LoadMonitor { // Metadata TTL is set based on experience -- i.e. a short TTL with large metadata may cause excessive load on brokers. private static final long METADATA_TTL = TimeUnit.SECONDS.toMillis(10); private static final long METADATA_REFRESH_BACKOFF = TimeUnit.SECONDS.toMillis(5); - private static final String LOAD_MONITOR_METRICS_NAME_PREFIX = "LoadMonitor"; + public static final String LOAD_MONITOR_METRICS_NAME_PREFIX = "LoadMonitor"; // The maximum time allowed to make a state update. If the state value cannot be updated in time it will be invalidated. // TODO: Make this configurable. private final long _monitorStateUpdateTimeoutMs; @@ -171,7 +171,7 @@ public LoadMonitor(KafkaCruiseControlConfig config, Time time, MetricRegistry dr _loadMonitorTaskRunner = new LoadMonitorTaskRunner(config, _partitionMetricSampleAggregator, _brokerMetricSampleAggregator, _metadataClient, metricDef, time, dropwizardMetricRegistry, _brokerCapacityConfigResolver); - _clusterModelCreationTimer = dropwizardMetricRegistry.timer(MetricRegistry.name("LoadMonitor", + _clusterModelCreationTimer = dropwizardMetricRegistry.timer(MetricRegistry.name(LOAD_MONITOR_METRICS_NAME_PREFIX, "cluster-model-creation-timer")); _loadMonitorExecutor = Executors.newScheduledThreadPool(2, new KafkaCruiseControlThreadFactory("LoadMonitorExecutor", true, LOG)); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/KafkaMetricAnomalyFinderTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/KafkaMetricAnomalyFinderTest.java index ba11f5971..1971c7def 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/KafkaMetricAnomalyFinderTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/KafkaMetricAnomalyFinderTest.java @@ -7,6 +7,7 @@ import com.linkedin.cruisecontrol.config.CruiseControlConfig; import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomaly; import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyFinder; +import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyType; import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils; import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig; @@ -50,6 +51,9 @@ public void testMetricAnomalies() { createCurrentMetrics(Collections.singletonMap(METRIC_ID, 40.0), 21, BROKER_ENTITIES.get(0)); Collection> anomalies = anomalyFinder.metricAnomalies(history, currentMetrics); assertEquals("There should be exactly a single metric anomaly", 1, anomalies.size()); + assertEquals("There should be exactly a single recent", 1, anomalyFinder.numAnomaliesOfType(MetricAnomalyType.RECENT)); + assertEquals("There should be no suspect anomaly", 0, anomalyFinder.numAnomaliesOfType(MetricAnomalyType.SUSPECT)); + assertEquals("There should be no persistent anomaly", 0, anomalyFinder.numAnomaliesOfType(MetricAnomalyType.PERSISTENT)); MetricAnomaly anomaly = anomalies.iterator().next(); assertTrue(anomaly.entities().containsKey(BROKER_ENTITIES.get(0))); assertEquals(ANOMALY_DETECTION_TIME_MS, (long) anomaly.entities().get(BROKER_ENTITIES.get(0))); @@ -64,6 +68,9 @@ public void testInsufficientData() { createCurrentMetrics(Collections.singletonMap(METRIC_ID, 20.0), 20, BROKER_ENTITIES.get(0)); Collection> anomalies = anomalyFinder.metricAnomalies(history, currentMetrics); assertTrue(anomalies.isEmpty()); + assertEquals("There should be no recent anomaly", 0, anomalyFinder.numAnomaliesOfType(MetricAnomalyType.RECENT)); + assertEquals("There should be no suspect anomaly", 0, anomalyFinder.numAnomaliesOfType(MetricAnomalyType.SUSPECT)); + assertEquals("There should be no persistent anomaly", 0, anomalyFinder.numAnomaliesOfType(MetricAnomalyType.PERSISTENT)); } private MetricAnomalyFinder createKafkaMetricAnomalyFinder() { diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/SlowBrokerFinderTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/SlowBrokerFinderTest.java index 78e62859b..422b1bb24 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/SlowBrokerFinderTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/SlowBrokerFinderTest.java @@ -4,6 +4,7 @@ package com.linkedin.kafka.cruisecontrol.detector; import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomaly; +import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyType; import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils; import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig; @@ -54,7 +55,10 @@ public void testDetectingSlowBrokerFromHistory() { NORMAL_LOG_FLUSH_TIME_MS * METRIC_ANOMALY_MULTIPLIER), CURRENT_METRIC_WINDOW, BROKER_ENTITIES.get(0)); Collection> anomalies = slowBrokerFinder.metricAnomalies(history, currentMetrics); - assertTrue("There should be exactly a single slow broker", anomalies.size() == 1); + assertEquals("There should be exactly a single slow broker", 1, anomalies.size()); + assertEquals("There should be exactly a single recent", 1, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.RECENT)); + assertEquals("There should be no suspect anomaly", 0, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.SUSPECT)); + assertEquals("There should be no persistent anomaly", 0, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.PERSISTENT)); MetricAnomaly anomaly = anomalies.iterator().next(); assertTrue(anomaly.entities().containsKey(BROKER_ENTITIES.get(0))); assertEquals(ANOMALY_DETECTION_TIME_MS, (long) anomaly.entities().get(BROKER_ENTITIES.get(0))); @@ -88,7 +92,10 @@ public void testDetectingSlowBrokerFromPeer() { METRIC_HISTORY_WINDOW_SIZE, BROKER_ENTITIES.get(i))); } Collection> anomalies = slowBrokerFinder.metricAnomalies(history, currentMetrics); - assertTrue("There should be exactly a single slow broker", anomalies.size() == 1); + assertEquals("There should be exactly a single slow broker", 1, anomalies.size()); + assertEquals("There should be exactly a single recent", 1, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.RECENT)); + assertEquals("There should be no suspect anomaly", 0, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.SUSPECT)); + assertEquals("There should be no persistent anomaly", 0, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.PERSISTENT)); MetricAnomaly anomaly = anomalies.iterator().next(); assertTrue(anomaly.entities().containsKey(BROKER_ENTITIES.get(0))); assertEquals(ANOMALY_DETECTION_TIME_MS, (long) anomaly.entities().get(BROKER_ENTITIES.get(0))); @@ -120,6 +127,9 @@ public void testExcludingSmallTrafficBroker() { } Collection> anomalies = slowBrokerFinder.metricAnomalies(history, currentMetrics); assertTrue(anomalies.isEmpty()); + assertEquals("There should be no recent anomaly", 0, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.RECENT)); + assertEquals("There should be no suspect anomaly", 0, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.SUSPECT)); + assertEquals("There should be no persistent anomaly", 0, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.PERSISTENT)); } /** @@ -140,6 +150,9 @@ public void testInsufficientData() { METRIC_HISTORY_WINDOW_SIZE / 2 + 1, BROKER_ENTITIES.get(0)); Collection> anomalies = slowBrokerFinder.metricAnomalies(history, currentMetrics); assertTrue(anomalies.isEmpty()); + assertEquals("There should be no recent anomaly", 0, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.RECENT)); + assertEquals("There should be no suspect anomaly", 0, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.SUSPECT)); + assertEquals("There should be no persistent anomaly", 0, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.PERSISTENT)); } /** @@ -159,6 +172,9 @@ public void testNoFalsePositiveDetectionDueToTrafficFluctuation() { CURRENT_METRIC_WINDOW, BROKER_ENTITIES.get(0)); Collection> anomalies = slowBrokerFinder.metricAnomalies(history, currentMetrics); assertTrue(anomalies.isEmpty()); + assertEquals("There should be no recent anomaly", 0, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.RECENT)); + assertEquals("There should be no suspect anomaly", 0, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.SUSPECT)); + assertEquals("There should be no persistent anomaly", 0, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.PERSISTENT)); } /** @@ -188,7 +204,10 @@ public void testNoFalsePositiveDetectionOnSmallLogFlushTime() { METRIC_HISTORY_WINDOW_SIZE, BROKER_ENTITIES.get(i))); } Collection> anomalies = slowBrokerFinder.metricAnomalies(history, currentMetrics); - assertEquals(0, anomalies.size()); + assertTrue(anomalies.isEmpty()); + assertEquals("There should be no recent anomaly", 0, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.RECENT)); + assertEquals("There should be no suspect anomaly", 0, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.SUSPECT)); + assertEquals("There should be no persistent anomaly", 0, slowBrokerFinder.numAnomaliesOfType(MetricAnomalyType.PERSISTENT)); } private Map populateMetricValues(double leaderBytesInRate, double replicationBytesInRate, double logFlushTimeMs) { diff --git a/docs/wiki/User Guide/Sensors.md b/docs/wiki/User Guide/Sensors.md index 52e3f3489..3cd48ba12 100644 --- a/docs/wiki/User Guide/Sensors.md +++ b/docs/wiki/User Guide/Sensors.md @@ -74,6 +74,9 @@ Cruise Control metrics are useful to monitor the state of Cruise Control itself. | Metric anomaly rate | kafka.cruisecontrol:name=AnomalyDetector.metric-anomaly-rate | | Disk failure rate | kafka.cruisecontrol:name=AnomalyDetector.disk-failure-rate | | Topic anomaly rate | kafka.cruisecontrol:name=AnomalyDetector.topic-anomaly-rate | +| The number of brokers that are metric anomaly suspects, pending more evidence to conclude either way | kafka.cruisecontrol:name=AnomalyDetector.num-suspect-metric-anomalies | +| The number of brokers that have recently been identified with a metric anomaly | kafka.cruisecontrol:name=AnomalyDetector.num-recent-metric-anomalies | +| The number of brokers that continue to be identified with a metric anomaly for a prolonged period | kafka.cruisecontrol:name=AnomalyDetector.num-persistent-metric-anomalies | ### GoalOptimizer Sensors