Skip to content

Commit

Permalink
Add sensors to report the number of slow brokers (linkedin#1463)
Browse files Browse the repository at this point in the history
  • Loading branch information
efeg authored and Adem Efe Gencer committed Jan 26, 2021
1 parent 4e287e4 commit edacb39
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,12 @@ public interface MetricAnomalyFinder<E extends Entity> extends CruiseControlConf
*/
Collection<MetricAnomaly<E>> metricAnomalies(Map<E, ValuesAndExtrapolations> metricsHistoryByEntity,
Map<E, ValuesAndExtrapolations> currentMetricsByEntity);

/**
* Get the latest number of metric anomalies with the given type detected by this metric anomaly finder.
*
* @param type Metric anomaly type for which the latest number of metric anomalies is queried.
* @return The latest number of metric anomalies with the given type detected by this metric anomaly finder.
*/
int numAnomaliesOfType(MetricAnomalyType type);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.cruisecontrol.detector.metricanomaly;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;


/**
* Flags to indicate if a {@link MetricAnomalyFinder} identified brokers as anomaly suspects, recent anomalies, or persistent anomalies.
* A {@link MetricAnomalyFinder#metricAnomalies(Map, Map)} may report all or a selected subset of metric anomaly types
* (e.g. {@link #RECENT} and {@link #PERSISTENT}, but not {@link #SUSPECT}).
*
* <ul>
* <li>{@link #SUSPECT}: The broker is a metric anomaly suspect, but there is not yet enough evidence to conclude either way.</li>
* <li>{@link #RECENT}: The broker has recently been identified with a metric anomaly.</li>
* <li>{@link #PERSISTENT}: The broker continues to be identified with a metric anomaly for a prolonged period.</li>
* </ul>
*/
public enum MetricAnomalyType {
SUSPECT, RECENT, PERSISTENT;

private static final List<MetricAnomalyType> CACHED_VALUES = Collections.unmodifiableList(Arrays.asList(values()));

/**
* Use this instead of values() because values() creates a new array each time.
* @return enumerated values in the same order as values()
*/
public static List<MetricAnomalyType> cachedValues() {
return CACHED_VALUES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ public abstract class PercentileMetricAnomalyFinder<E extends Entity> implements
protected Double _anomalyUpperPercentile;
protected Double _anomalyLowerPercentile;
protected Set<String> _interestedMetrics;
protected int _numRecentAnomalies;

public PercentileMetricAnomalyFinder() {
_percentile = new Percentile();
_numRecentAnomalies = 0;
}

/**
Expand Down Expand Up @@ -170,9 +172,16 @@ public Collection<MetricAnomaly<E>> metricAnomalies(Map<E, ValuesAndExtrapolatio
}
}

_numRecentAnomalies = metricAnomalies.size();
return metricAnomalies;
}

@Override
public int numAnomaliesOfType(MetricAnomalyType type) {
// Percentile Metric Anomaly Finder can only report the recent number of metric anomalies.
return type != MetricAnomalyType.RECENT ? 0 : _numRecentAnomalies;
}

@Override
public void configure(Map<String, ?> configs) {
PercentileMetricAnomalyFinderConfig internalConfig = new PercentileMetricAnomalyFinderConfig(configs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.codahale.metrics.MetricRegistry;
import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.cruisecontrol.detector.AnomalyType;
import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyType;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionStatus;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
Expand Down Expand Up @@ -189,6 +190,13 @@ private void registerGaugeSensors(MetricRegistry dropwizardMetricRegistry) {
dropwizardMetricRegistry.register(MetricRegistry.name(METRIC_REGISTRY_NAME, "right-sized"),
(Gauge<Integer>) () -> (_goalViolationDetector.provisionStatus() == ProvisionStatus.RIGHT_SIZED)
? 1 : 0);

// The number of metric anomalies corresponding to each metric anomaly type.
for (MetricAnomalyType type : MetricAnomalyType.cachedValues()) {
dropwizardMetricRegistry.register(MetricRegistry.name(METRIC_REGISTRY_NAME,
String.format("num-%s-metric-anomalies", type.toString().toLowerCase())),
(Gauge<Integer>) () -> _metricAnomalyDetector.numAnomaliesOfType(type));
}
}

private void scheduleDetectorAtFixedRate(KafkaAnomalyType anomalyType, Runnable anomalyDetector) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyFinder;
import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyType;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig;
Expand All @@ -30,6 +31,7 @@ public class MetricAnomalyDetector extends AbstractAnomalyDetector implements Ru
public static final String METRIC_ANOMALY_BROKER_ENTITIES_OBJECT_CONFIG = "metric.anomaly.broker.entities.object";
public static final String METRIC_ANOMALY_FIXABLE_OBJECT_CONFIG = "metric.anomaly.fixable.object";
private final List<MetricAnomalyFinder> _kafkaMetricAnomalyFinders;
private boolean _skippedLatestDetection;

public MetricAnomalyDetector(Queue<Anomaly> anomalies, KafkaCruiseControl kafkaCruiseControl) {
super(anomalies, kafkaCruiseControl);
Expand All @@ -39,13 +41,28 @@ public MetricAnomalyDetector(Queue<Anomaly> anomalies, KafkaCruiseControl kafkaC
AnomalyDetectorConfig.METRIC_ANOMALY_FINDER_CLASSES_CONFIG,
MetricAnomalyFinder.class,
configWithCruiseControlObject);
_skippedLatestDetection = true;
}

/**
* Get the latest total number of metric anomalies with the given type detected by metric anomaly finders, or {@code 0} if the latest
* anomaly detection was skipped.
*
* @param type Metric anomaly type for which the latest total number of metric anomalies is queried.
* @return The latest total number of metric anomalies with the given type detected by metric anomaly finders, or {@code 0} if the latest
* anomaly detection was skipped.
*/
int numAnomaliesOfType(MetricAnomalyType type) {
return _skippedLatestDetection ? 0 : _kafkaMetricAnomalyFinders.stream().mapToInt(finder -> finder.numAnomaliesOfType(type)).sum();
}

@Override
@SuppressWarnings("unchecked")
public void run() {
try {
if (getAnomalyDetectionStatus(_kafkaCruiseControl, true, true) != AnomalyDetectionStatus.READY) {
// Skip the latest detection because metric anomaly detector is not ready
_skippedLatestDetection = true;
return;
}

Expand All @@ -56,8 +73,9 @@ public void run() {
for (MetricAnomalyFinder<BrokerEntity> kafkaMetricAnomalyFinder : _kafkaMetricAnomalyFinders) {
_anomalies.addAll(kafkaMetricAnomalyFinder.metricAnomalies(metricsHistoryByBroker, currentMetricsByBroker));
}

_skippedLatestDetection = false;
} catch (Exception e) {
_skippedLatestDetection = true;
LOG.warn("Metric Anomaly Detector encountered exception: ", e);
} finally {
LOG.debug("Metric anomaly detection finished.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomaly;
import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyFinder;
import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyType;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerEntity;
import java.util.Collection;
Expand All @@ -25,6 +26,11 @@ public Collection<MetricAnomaly<BrokerEntity>> metricAnomalies(
return Collections.emptySet();
}

@Override
public int numAnomaliesOfType(MetricAnomalyType type) {
return 0;
}

@Override
public void configure(Map<String, ?> configs) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomaly;
import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyFinder;
import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyType;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.AggregatedMetricValues;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
Expand Down Expand Up @@ -140,6 +141,7 @@ public class SlowBrokerFinder implements MetricAnomalyFinder<BrokerEntity> {
private final Map<BrokerEntity, Integer> _brokerSlownessScore;
private final Map<BrokerEntity, Long> _detectedSlowBrokers;
private final Percentile _percentile;
private final Map<MetricAnomalyType, Integer> _numSlowBrokersByType;
private double _bytesInRateDetectionThreshold;
private double _logFlushTimeThresholdMs;
private double _metricHistoryPercentile;
Expand All @@ -154,6 +156,8 @@ public SlowBrokerFinder() {
_brokerSlownessScore = new HashMap<>();
_detectedSlowBrokers = new HashMap<>();
_percentile = new Percentile();
_numSlowBrokersByType = new HashMap<>(MetricAnomalyType.cachedValues().size());
MetricAnomalyType.cachedValues().forEach(type -> _numSlowBrokersByType.put(type, 0));
}

private Set<BrokerEntity> detectMetricAnomalies(Map<BrokerEntity, ValuesAndExtrapolations> metricsHistoryByBroker,
Expand Down Expand Up @@ -347,6 +351,11 @@ public Collection<MetricAnomaly<BrokerEntity>> metricAnomalies(Map<BrokerEntity,
return Collections.emptySet();
}

@Override
public int numAnomaliesOfType(MetricAnomalyType type) {
return _numSlowBrokersByType.get(type);
}

private void updateBrokerSlownessScore(Set<BrokerEntity> detectedMetricAnomalies) {
for (BrokerEntity broker : detectedMetricAnomalies) {
// Update slow broker detection time and slowness score.
Expand Down Expand Up @@ -389,10 +398,15 @@ private Set<MetricAnomaly<BrokerEntity>> createSlowBrokerAnomalies(Set<BrokerEnt
brokersToDemote.put(broker, _detectedSlowBrokers.get(broker));
}
}
// Update number of slow brokers with the given type.
int numBrokersToDemoteOrRemove = brokersToDemote.size() + brokersToRemove.size();
_numSlowBrokersByType.put(MetricAnomalyType.PERSISTENT, brokersToRemove.size());
_numSlowBrokersByType.put(MetricAnomalyType.RECENT, brokersToDemote.size());
_numSlowBrokersByType.put(MetricAnomalyType.SUSPECT, _detectedSlowBrokers.size() - numBrokersToDemoteOrRemove);

// If too many brokers in the cluster are detected as slow brokers, report anomaly as not fixable.
// Otherwise report anomaly with brokers to be removed/demoted.
if (brokersToDemote.size() + brokersToRemove.size() > clusterSize * _selfHealingUnfixableRatio) {
if (numBrokersToDemoteOrRemove > clusterSize * _selfHealingUnfixableRatio) {
brokersToRemove.forEach(brokersToDemote::put);
detectedSlowBrokers.add(createSlowBrokersAnomaly(brokersToDemote, false, false));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ public class ExecutionTaskTracker {
private final Time _time;
private volatile boolean _stopRequested;

private static final String INTER_BROKER_REPLICA_ACTION = "replica-action";
private static final String INTRA_BROKER_REPLICA_ACTION = "intra-broker-replica-action";
private static final String LEADERSHIP_ACTION = "leadership-action";
private static final String IN_PROGRESS = "in-progress";
private static final String PENDING = "pending";
private static final String ABORTING = "aborting";
private static final String ABORTED = "aborted";
private static final String DEAD = "dead";
private static final String COMPLETED = "completed";
private static final String GAUGE_ONGOING_EXECUTION_IN_KAFKA_ASSIGNER_MODE = "ongoing-execution-kafka_assigner";
private static final String GAUGE_ONGOING_EXECUTION_IN_NON_KAFKA_ASSIGNER_MODE = "ongoing-execution-non_kafka_assigner";
public static final String INTER_BROKER_REPLICA_ACTION = "replica-action";
public static final String INTRA_BROKER_REPLICA_ACTION = "intra-broker-replica-action";
public static final String LEADERSHIP_ACTION = "leadership-action";
public static final String IN_PROGRESS = "in-progress";
public static final String PENDING = "pending";
public static final String ABORTING = "aborting";
public static final String ABORTED = "aborted";
public static final String DEAD = "dead";
public static final String COMPLETED = "completed";
public static final String GAUGE_ONGOING_EXECUTION_IN_KAFKA_ASSIGNER_MODE = "ongoing-execution-kafka_assigner";
public static final String GAUGE_ONGOING_EXECUTION_IN_NON_KAFKA_ASSIGNER_MODE = "ongoing-execution-non_kafka_assigner";

ExecutionTaskTracker(MetricRegistry dropwizardMetricRegistry, Time time) {
List<ExecutionTaskState> states = ExecutionTaskState.cachedValues();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class LoadMonitor {
// Metadata TTL is set based on experience -- i.e. a short TTL with large metadata may cause excessive load on brokers.
private static final long METADATA_TTL = TimeUnit.SECONDS.toMillis(10);
private static final long METADATA_REFRESH_BACKOFF = TimeUnit.SECONDS.toMillis(5);
private static final String LOAD_MONITOR_METRICS_NAME_PREFIX = "LoadMonitor";
public static final String LOAD_MONITOR_METRICS_NAME_PREFIX = "LoadMonitor";
// The maximum time allowed to make a state update. If the state value cannot be updated in time it will be invalidated.
// TODO: Make this configurable.
private final long _monitorStateUpdateTimeoutMs;
Expand Down Expand Up @@ -171,7 +171,7 @@ public LoadMonitor(KafkaCruiseControlConfig config, Time time, MetricRegistry dr
_loadMonitorTaskRunner =
new LoadMonitorTaskRunner(config, _partitionMetricSampleAggregator, _brokerMetricSampleAggregator, _metadataClient,
metricDef, time, dropwizardMetricRegistry, _brokerCapacityConfigResolver);
_clusterModelCreationTimer = dropwizardMetricRegistry.timer(MetricRegistry.name("LoadMonitor",
_clusterModelCreationTimer = dropwizardMetricRegistry.timer(MetricRegistry.name(LOAD_MONITOR_METRICS_NAME_PREFIX,
"cluster-model-creation-timer"));
_loadMonitorExecutor = Executors.newScheduledThreadPool(2,
new KafkaCruiseControlThreadFactory("LoadMonitorExecutor", true, LOG));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.cruisecontrol.config.CruiseControlConfig;
import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomaly;
import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyFinder;
import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyType;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils;
import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig;
Expand Down Expand Up @@ -50,6 +51,9 @@ public void testMetricAnomalies() {
createCurrentMetrics(Collections.singletonMap(METRIC_ID, 40.0), 21, BROKER_ENTITIES.get(0));
Collection<MetricAnomaly<BrokerEntity>> anomalies = anomalyFinder.metricAnomalies(history, currentMetrics);
assertEquals("There should be exactly a single metric anomaly", 1, anomalies.size());
assertEquals("There should be exactly a single recent", 1, anomalyFinder.numAnomaliesOfType(MetricAnomalyType.RECENT));
assertEquals("There should be no suspect anomaly", 0, anomalyFinder.numAnomaliesOfType(MetricAnomalyType.SUSPECT));
assertEquals("There should be no persistent anomaly", 0, anomalyFinder.numAnomaliesOfType(MetricAnomalyType.PERSISTENT));
MetricAnomaly<BrokerEntity> anomaly = anomalies.iterator().next();
assertTrue(anomaly.entities().containsKey(BROKER_ENTITIES.get(0)));
assertEquals(ANOMALY_DETECTION_TIME_MS, (long) anomaly.entities().get(BROKER_ENTITIES.get(0)));
Expand All @@ -64,6 +68,9 @@ public void testInsufficientData() {
createCurrentMetrics(Collections.singletonMap(METRIC_ID, 20.0), 20, BROKER_ENTITIES.get(0));
Collection<MetricAnomaly<BrokerEntity>> anomalies = anomalyFinder.metricAnomalies(history, currentMetrics);
assertTrue(anomalies.isEmpty());
assertEquals("There should be no recent anomaly", 0, anomalyFinder.numAnomaliesOfType(MetricAnomalyType.RECENT));
assertEquals("There should be no suspect anomaly", 0, anomalyFinder.numAnomaliesOfType(MetricAnomalyType.SUSPECT));
assertEquals("There should be no persistent anomaly", 0, anomalyFinder.numAnomaliesOfType(MetricAnomalyType.PERSISTENT));
}

private MetricAnomalyFinder<BrokerEntity> createKafkaMetricAnomalyFinder() {
Expand Down
Loading

0 comments on commit edacb39

Please sign in to comment.