forked from linkedin/cruise-control
/
AnomalyDetector.java
564 lines (531 loc) · 29.3 KB
/
AnomalyDetector.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
/*
* Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/
package com.linkedin.kafka.cruisecontrol.detector;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.cruisecontrol.detector.AnomalyType;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotificationResult;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier;
import com.linkedin.kafka.cruisecontrol.detector.notifier.KafkaAnomalyType;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.OPERATION_LOGGER;
import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.sanityCheckGoals;
import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.anomalyComparator;
import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.getSelfHealingGoalNames;
import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.SHUTDOWN_ANOMALY;
import static com.linkedin.kafka.cruisecontrol.detector.notifier.KafkaAnomalyType.*;
/**
* The anomaly detector class that helps detect and handle anomalies.
*/
public class AnomalyDetector {
static final String METRIC_REGISTRY_NAME = "AnomalyDetector";
private static final int INIT_JITTER_BOUND = 10000;
private static final long SCHEDULER_SHUTDOWN_TIMEOUT_MS = 5000L;
// For each anomaly type, one thread is needed to run corresponding anomaly detector.
// One more thread is needed to run anomaly handler task.
private static final int NUM_ANOMALY_DETECTION_THREADS = KafkaAnomalyType.cachedValues().size() + 1;
private static final int ANOMALY_QUEUE_INITIAL_CAPACITY = 10;
private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetector.class);
private static final Logger OPERATION_LOG = LoggerFactory.getLogger(OPERATION_LOGGER);
private final KafkaCruiseControl _kafkaCruiseControl;
private final AnomalyNotifier _anomalyNotifier;
private final AdminClient _adminClient;
// Detectors
private final GoalViolationDetector _goalViolationDetector;
private final BrokerFailureDetector _brokerFailureDetector;
private final MetricAnomalyDetector _metricAnomalyDetector;
private final DiskFailureDetector _diskFailureDetector;
private final TopicAnomalyDetector _topicAnomalyDetector;
private final ScheduledExecutorService _detectorScheduler;
private final Map<KafkaAnomalyType, Long> _anomalyDetectionIntervalMsByType;
private final long _brokerFailureDetectionBackoffMs;
private final PriorityBlockingQueue<Anomaly> _anomalies;
private volatile boolean _shutdown;
private final AnomalyDetectorState _anomalyDetectorState;
private final List<String> _selfHealingGoals;
private final ExecutorService _anomalyLoggerExecutor;
private volatile Anomaly _anomalyInProgress;
private AtomicLong _numCheckedWithDelay;
private final Object _shutdownLock;
public AnomalyDetector(KafkaCruiseControl kafkaCruiseControl,
Time time,
MetricRegistry dropwizardMetricRegistry) {
// For anomalies of different types, prioritize handling anomaly of higher priority;
// otherwise, handle anomaly in order of detected time.
_anomalies = new PriorityBlockingQueue<>(ANOMALY_QUEUE_INITIAL_CAPACITY, anomalyComparator());
KafkaCruiseControlConfig config = kafkaCruiseControl.config();
_adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(config));
Long anomalyDetectionIntervalMs = config.getLong(AnomalyDetectorConfig.ANOMALY_DETECTION_INTERVAL_MS_CONFIG);
_anomalyDetectionIntervalMsByType = new HashMap<>(KafkaAnomalyType.cachedValues().size() - 1);
Long goalViolationDetectionIntervalMs = config.getLong(AnomalyDetectorConfig.GOAL_VIOLATION_DETECTION_INTERVAL_MS_CONFIG);
_anomalyDetectionIntervalMsByType.put(GOAL_VIOLATION, goalViolationDetectionIntervalMs == null ? anomalyDetectionIntervalMs
: goalViolationDetectionIntervalMs);
Long metricAnomalyDetectionIntervalMs = config.getLong(AnomalyDetectorConfig.METRIC_ANOMALY_DETECTION_INTERVAL_MS_CONFIG);
_anomalyDetectionIntervalMsByType.put(METRIC_ANOMALY, metricAnomalyDetectionIntervalMs == null ? anomalyDetectionIntervalMs
: metricAnomalyDetectionIntervalMs);
Long topicAnomalyDetectionIntervalMs = config.getLong(AnomalyDetectorConfig.TOPIC_ANOMALY_DETECTION_INTERVAL_MS_CONFIG);
_anomalyDetectionIntervalMsByType.put(TOPIC_ANOMALY, topicAnomalyDetectionIntervalMs == null ? anomalyDetectionIntervalMs
: topicAnomalyDetectionIntervalMs);
Long diskFailureDetectionIntervalMs = config.getLong(AnomalyDetectorConfig.DISK_FAILURE_DETECTION_INTERVAL_MS_CONFIG);
_anomalyDetectionIntervalMsByType.put(DISK_FAILURE, diskFailureDetectionIntervalMs == null ? anomalyDetectionIntervalMs
: diskFailureDetectionIntervalMs);
_brokerFailureDetectionBackoffMs = config.getLong(AnomalyDetectorConfig.BROKER_FAILURE_DETECTION_BACKOFF_MS_CONFIG);
_anomalyNotifier = config.getConfiguredInstance(AnomalyDetectorConfig.ANOMALY_NOTIFIER_CLASS_CONFIG,
AnomalyNotifier.class);
_kafkaCruiseControl = kafkaCruiseControl;
_selfHealingGoals = getSelfHealingGoalNames(config);
sanityCheckGoals(_selfHealingGoals, false, config);
_goalViolationDetector = new GoalViolationDetector(_anomalies, _kafkaCruiseControl);
_brokerFailureDetector = new BrokerFailureDetector(_anomalies, _kafkaCruiseControl);
_metricAnomalyDetector = new MetricAnomalyDetector(_anomalies, _kafkaCruiseControl);
_diskFailureDetector = new DiskFailureDetector(_adminClient, _anomalies, _kafkaCruiseControl);
_topicAnomalyDetector = new TopicAnomalyDetector(_anomalies, _kafkaCruiseControl);
_detectorScheduler = Executors.newScheduledThreadPool(NUM_ANOMALY_DETECTION_THREADS,
new KafkaCruiseControlThreadFactory(METRIC_REGISTRY_NAME, false, LOG));
_shutdown = false;
// Add anomaly detector state
int numCachedRecentAnomalyStates = config.getInt(AnomalyDetectorConfig.NUM_CACHED_RECENT_ANOMALY_STATES_CONFIG);
_anomalyLoggerExecutor =
Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("AnomalyLogger", true, null));
_anomalyInProgress = null;
_numCheckedWithDelay = new AtomicLong();
_shutdownLock = new Object();
// Register gauge sensors.
registerGaugeSensors(dropwizardMetricRegistry);
_anomalyDetectorState = new AnomalyDetectorState(time,
_anomalyNotifier.selfHealingEnabled(),
numCachedRecentAnomalyStates,
dropwizardMetricRegistry);
}
/**
* Package private constructor for unit test.
*/
AnomalyDetector(PriorityBlockingQueue<Anomaly> anomalies,
AdminClient adminClient,
long anomalyDetectionIntervalMs,
KafkaCruiseControl kafkaCruiseControl,
AnomalyNotifier anomalyNotifier,
GoalViolationDetector goalViolationDetector,
BrokerFailureDetector brokerFailureDetector,
MetricAnomalyDetector metricAnomalyDetector,
DiskFailureDetector diskFailureDetector,
TopicAnomalyDetector topicAnomalyDetector,
ScheduledExecutorService detectorScheduler) {
_anomalies = anomalies;
_adminClient = adminClient;
_anomalyDetectionIntervalMsByType = new HashMap<>(KafkaAnomalyType.cachedValues().size() - 1);
KafkaAnomalyType.cachedValues().stream().filter(type -> type != BROKER_FAILURE)
.forEach(type -> _anomalyDetectionIntervalMsByType.put(type, anomalyDetectionIntervalMs));
_brokerFailureDetectionBackoffMs = anomalyDetectionIntervalMs;
_anomalyNotifier = anomalyNotifier;
_goalViolationDetector = goalViolationDetector;
_brokerFailureDetector = brokerFailureDetector;
_metricAnomalyDetector = metricAnomalyDetector;
_diskFailureDetector = diskFailureDetector;
_topicAnomalyDetector = topicAnomalyDetector;
_kafkaCruiseControl = kafkaCruiseControl;
_detectorScheduler = detectorScheduler;
_shutdown = false;
_selfHealingGoals = Collections.emptyList();
_anomalyLoggerExecutor =
Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("AnomalyLogger", true, null));
_anomalyInProgress = null;
_numCheckedWithDelay = new AtomicLong();
_shutdownLock = new Object();
// Add anomaly detector state
_anomalyDetectorState = new AnomalyDetectorState(new SystemTime(), new HashMap<>(KafkaAnomalyType.cachedValues().size()), 10, null);
}
/**
* Register gauge sensors.
*/
private void registerGaugeSensors(MetricRegistry dropwizardMetricRegistry) {
dropwizardMetricRegistry.register(MetricRegistry.name(METRIC_REGISTRY_NAME, "balancedness-score"),
(Gauge<Double>) _goalViolationDetector::balancednessScore);
// Self-Healing is turned on/off. 1/0 metric for each of the self-healing options.
for (KafkaAnomalyType anomalyType : KafkaAnomalyType.cachedValues()) {
dropwizardMetricRegistry.register(MetricRegistry.name(METRIC_REGISTRY_NAME,
String.format("%s-self-healing-enabled", anomalyType.toString().toLowerCase())),
(Gauge<Integer>) () -> _anomalyNotifier.selfHealingEnabled().get(anomalyType) ? 1 : 0);
}
}
/**
* Start each anomaly detector.
*/
public void startDetection() {
LOG.info("Starting anomaly detector.");
_brokerFailureDetector.startDetection();
int jitter = new Random().nextInt(INIT_JITTER_BOUND);
LOG.debug("Starting goal violation detector with delay of {} ms", jitter);
long goalViolationDetectionIntervalMs = _anomalyDetectionIntervalMsByType.get(GOAL_VIOLATION);
_detectorScheduler.scheduleAtFixedRate(_goalViolationDetector,
goalViolationDetectionIntervalMs / 2 + jitter,
goalViolationDetectionIntervalMs,
TimeUnit.MILLISECONDS);
jitter = new Random().nextInt(INIT_JITTER_BOUND);
long metricAnomalyDetectionIntervalMs = _anomalyDetectionIntervalMsByType.get(METRIC_ANOMALY);
LOG.debug("Starting metric anomaly detector with delay of {} ms", jitter);
_detectorScheduler.scheduleAtFixedRate(_metricAnomalyDetector,
metricAnomalyDetectionIntervalMs / 2 + jitter,
metricAnomalyDetectionIntervalMs,
TimeUnit.MILLISECONDS);
jitter = new Random().nextInt(INIT_JITTER_BOUND);
long topicAnomalyDetectionIntervalMs = _anomalyDetectionIntervalMsByType.get(TOPIC_ANOMALY);
LOG.debug("Starting topic anomaly detector with delay of {} ms", jitter);
_detectorScheduler.scheduleAtFixedRate(_topicAnomalyDetector,
topicAnomalyDetectionIntervalMs / 2 + jitter,
topicAnomalyDetectionIntervalMs,
TimeUnit.MILLISECONDS);
jitter = new Random().nextInt(INIT_JITTER_BOUND);
long diskFailureDetectionIntervalMs = _anomalyDetectionIntervalMsByType.get(DISK_FAILURE);
LOG.debug("Starting disk failure detector with delay of {} ms", jitter);
_detectorScheduler.scheduleAtFixedRate(_diskFailureDetector,
diskFailureDetectionIntervalMs / 2 + jitter,
diskFailureDetectionIntervalMs,
TimeUnit.MILLISECONDS);
_detectorScheduler.submit(new AnomalyHandlerTask());
}
/**
* Shutdown the anomaly detector.
* Note that if a fix is being started as shutdown is requested, shutdown will wait until the fix is initiated.
*/
public void shutdown() {
LOG.info("Shutting down anomaly detector.");
synchronized (_shutdownLock) {
_shutdown = true;
}
// SHUTDOWN_ANOMALY is a broker failure with detection time set to 0ms. Here we expect it is added to the front of the
// priority queue and notify anomaly handler immediately.
_anomalies.add(SHUTDOWN_ANOMALY);
_detectorScheduler.shutdown();
KafkaCruiseControlUtils.closeAdminClientWithTimeout(_adminClient);
try {
_detectorScheduler.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (!_detectorScheduler.isTerminated()) {
LOG.warn("The sampling scheduler failed to shutdown in " + SCHEDULER_SHUTDOWN_TIMEOUT_MS + " ms.");
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for anomaly detector to shutdown.");
}
_brokerFailureDetector.shutdown();
_anomalyLoggerExecutor.shutdownNow();
LOG.info("Anomaly detector shutdown completed.");
}
/**
* @return Anomaly detector state.
*/
public synchronized AnomalyDetectorState anomalyDetectorState() {
_anomalyDetectorState.refreshMetrics(_anomalyNotifier.selfHealingEnabledRatio(), _goalViolationDetector.balancednessScore());
return _anomalyDetectorState;
}
/**
* @return Number of anomaly fixes started by the anomaly detector for self healing.
*/
long numSelfHealingStarted() {
return _anomalyDetectorState.numSelfHealingStarted();
}
/**
* @return Number of anomaly fixes failed to start despite the anomaly in progress being ready to fix. This typically
* indicates the need for expanding the cluster or relaxing the constraints of self-healing goals.
*/
long numSelfHealingFailedToStart() {
return _anomalyDetectorState.numSelfHealingFailedToStart();
}
/**
* See {@link AnomalyDetectorState#maybeClearOngoingAnomalyDetectionTimeMs}.
*/
public void maybeClearOngoingAnomalyDetectionTimeMs() {
_anomalyDetectorState.maybeClearOngoingAnomalyDetectionTimeMs();
}
/**
* See {@link AnomalyDetectorState#resetHasUnfixableGoals}.
*/
public void resetHasUnfixableGoals() {
_anomalyDetectorState.resetHasUnfixableGoals();
}
/**
* (1) Enable or disable self healing for the given anomaly type and (2) update the cached anomaly detector state.
*
* @param anomalyType Type of anomaly for which to enable or disable self healing.
* @param isSelfHealingEnabled True if self healing is enabled, false otherwise.
* @return The old value of self healing for the given anomaly type.
*/
public boolean setSelfHealingFor(AnomalyType anomalyType, boolean isSelfHealingEnabled) {
boolean oldSelfHealingEnabled = _anomalyNotifier.setSelfHealingFor(anomalyType, isSelfHealingEnabled);
_anomalyDetectorState.setSelfHealingFor(anomalyType, isSelfHealingEnabled);
return oldSelfHealingEnabled;
}
/**
* @return Number of anomalies checked with delay.
*/
public long numCheckedWithDelay() {
return _numCheckedWithDelay.get();
}
/**
* Update anomaly status once associated self-healing operation has finished.
*
* @param anomalyId Unique id of anomaly which triggered self-healing operation.
*/
public void markSelfHealingFinished(String anomalyId) {
LOG.debug("Self healing with id {} has finished.", anomalyId);
_anomalyDetectorState.markSelfHealingFinished(anomalyId);
}
/**
* A class that handles all the anomalies.
*/
class AnomalyHandlerTask implements Runnable {
@Override
public void run() {
LOG.info("Starting anomaly handler");
while (true) {
// In case handling the anomaly in progress fails, do some post processing.
boolean postProcessAnomalyInProgress = false;
_anomalyInProgress = null;
try {
_anomalyInProgress = _anomalies.take();
LOG.trace("Processing anomaly {}.", _anomalyInProgress);
if (_anomalyInProgress == SHUTDOWN_ANOMALY) {
// Service has shutdown.
_anomalyInProgress = null;
break;
}
handleAnomalyInProgress();
} catch (InterruptedException e) {
LOG.debug("Received interrupted exception.", e);
postProcessAnomalyInProgress = true;
} catch (OptimizationFailureException ofe) {
LOG.warn("Encountered optimization failure when trying to fix the anomaly {}.", _anomalyInProgress, ofe);
// If self-healing failed due to an optimization failure, that indicates a hard goal violation; hence there is
// no further processing anomaly detector can do without human intervention for the anomaly (i.e. other than
// what has already been done in the {@link #handlePostFixAnomaly(boolean, boolean, String)}).
postProcessAnomalyInProgress = false;
} catch (IllegalStateException ise) {
LOG.warn("Unexpected state prevents anomaly detector from handling the anomaly {}.", _anomalyInProgress, ise);
// An illegal state may indicate a transient process blocking self-healing (e.g. an ongoing execution not
// started by Cruise Control).
postProcessAnomalyInProgress = false;
} catch (Throwable t) {
LOG.error("Uncaught exception in anomaly handler.", t);
postProcessAnomalyInProgress = true;
}
if (postProcessAnomalyInProgress) {
LOG.info("Post processing anomaly {}.", _anomalyInProgress);
postProcessAnomalyInProgress(_brokerFailureDetectionBackoffMs);
}
}
LOG.info("Anomaly handler exited.");
}
private void handleAnomalyInProgress() throws Exception {
// Add anomaly detection to anomaly detector state.
AnomalyType anomalyType = _anomalyInProgress.anomalyType();
_anomalyDetectorState.addAnomalyDetection(anomalyType, _anomalyInProgress);
// We schedule a delayed check if the executor is doing some work.
ExecutorState.State executionState = _kafkaCruiseControl.executionState();
if (executionState != ExecutorState.State.NO_TASK_IN_PROGRESS) {
LOG.info("Post processing anomaly {} because executor is in {} state.", _anomalyInProgress, executionState);
postProcessAnomalyInProgress(_brokerFailureDetectionBackoffMs);
} else {
processAnomalyInProgress(anomalyType);
}
}
/**
* Processes the anomaly based on the notification result.
*
* @param anomalyType The type of the ongoing anomaly
*/
private void processAnomalyInProgress(AnomalyType anomalyType) throws Exception {
_anomalyDetectorState.markAnomalyRate(anomalyType);
// Call the anomaly notifier to see if an action is requested.
AnomalyNotificationResult notificationResult = notifyAnomalyInProgress(anomalyType);
if (notificationResult != null) {
_anomalyDetectorState.maybeSetOngoingAnomalyDetectionTimeMs();
switch (notificationResult.action()) {
case FIX:
fixAnomalyInProgress(anomalyType);
break;
case CHECK:
LOG.info("Post processing anomaly {} for {}.", _anomalyInProgress, AnomalyState.Status.CHECK_WITH_DELAY);
postProcessAnomalyInProgress(notificationResult.delay());
break;
case IGNORE:
_anomalyDetectorState.onAnomalyHandle(_anomalyInProgress, AnomalyState.Status.IGNORED);
break;
default:
throw new IllegalStateException("Unrecognized anomaly notification result.");
}
}
}
/**
* Call the {@link AnomalyNotifier} handler corresponding to the type of {@link #_anomalyInProgress} to get the
* notification result.
*
* @param anomalyType The type of the {@link #_anomalyInProgress}.
* @return The notification result corresponding to the {@link #_anomalyInProgress}.
*/
private AnomalyNotificationResult notifyAnomalyInProgress(AnomalyType anomalyType) {
// Call the anomaly notifier to see if a fix is desired.
AnomalyNotificationResult notificationResult;
switch ((KafkaAnomalyType) anomalyType) {
case GOAL_VIOLATION:
GoalViolations goalViolations = (GoalViolations) _anomalyInProgress;
notificationResult = _anomalyNotifier.onGoalViolation(goalViolations);
_anomalyDetectorState.refreshHasUnfixableGoal(goalViolations);
break;
case BROKER_FAILURE:
BrokerFailures brokerFailures = (BrokerFailures) _anomalyInProgress;
notificationResult = _anomalyNotifier.onBrokerFailure(brokerFailures);
break;
case METRIC_ANOMALY:
KafkaMetricAnomaly metricAnomaly = (KafkaMetricAnomaly) _anomalyInProgress;
notificationResult = _anomalyNotifier.onMetricAnomaly(metricAnomaly);
break;
case DISK_FAILURE:
DiskFailures diskFailures = (DiskFailures) _anomalyInProgress;
notificationResult = _anomalyNotifier.onDiskFailure(diskFailures);
break;
case TOPIC_ANOMALY:
TopicAnomaly topicAnomaly = (TopicAnomaly) _anomalyInProgress;
notificationResult = _anomalyNotifier.onTopicAnomaly(topicAnomaly);
break;
default:
throw new IllegalStateException("Unrecognized anomaly type.");
}
LOG.debug("Received notification result {}", notificationResult);
return notificationResult;
}
/**
* Updates the state of the anomaly in progress and if the anomaly is a {@link KafkaAnomalyType#BROKER_FAILURE}, then it
* schedules a broker failure detection after the given delay.
*
* @param delayMs The delay for broker failure detection.
*/
private void postProcessAnomalyInProgress(long delayMs) {
// Anomaly detector does delayed check for broker failures, otherwise it ignores the anomaly.
if (_anomalyInProgress.anomalyType() == KafkaAnomalyType.BROKER_FAILURE) {
synchronized (_shutdownLock) {
if (_shutdown) {
LOG.debug("Skip delayed checking anomaly {}, because anomaly detector is shutting down.", _anomalyInProgress);
} else {
LOG.debug("Scheduling broker failure detection with delay of {} ms", delayMs);
_numCheckedWithDelay.incrementAndGet();
_detectorScheduler.schedule(() -> _brokerFailureDetector.detectBrokerFailures(false), delayMs, TimeUnit.MILLISECONDS);
_anomalyDetectorState.onAnomalyHandle(_anomalyInProgress, AnomalyState.Status.CHECK_WITH_DELAY);
}
}
} else {
_anomalyDetectorState.onAnomalyHandle(_anomalyInProgress, AnomalyState.Status.IGNORED);
}
}
/**
* Check whether the anomaly in progress is ready for fix. An anomaly is ready if it (1) meets completeness
* requirements and (2) load monitor is not in an unexpected state.
*
* @return True if ready for a fix, false otherwise.
*/
private boolean isAnomalyInProgressReadyToFix(AnomalyType anomalyType) {
LoadMonitorTaskRunner.LoadMonitorTaskRunnerState loadMonitorTaskRunnerState = _kafkaCruiseControl.getLoadMonitorTaskRunnerState();
// Fixing anomalies is possible only when (1) the state is not in and unavailable state ( e.g. loading or
// bootstrapping) and (2) the completeness requirements are met for all goals.
if (!AnomalyUtils.isLoadMonitorReady(loadMonitorTaskRunnerState)) {
LOG.info("Skipping {} fix because load monitor is in {} state.", anomalyType, loadMonitorTaskRunnerState);
_anomalyDetectorState.onAnomalyHandle(_anomalyInProgress, AnomalyState.Status.LOAD_MONITOR_NOT_READY);
} else {
if (_kafkaCruiseControl.meetCompletenessRequirements(_selfHealingGoals)) {
return true;
} else {
LOG.warn("Skipping {} fix because load completeness requirement is not met for goals.", anomalyType);
_anomalyDetectorState.onAnomalyHandle(_anomalyInProgress, AnomalyState.Status.COMPLETENESS_NOT_READY);
}
}
return false;
}
private void logSelfHealingOperation(String anomalyId, OptimizationFailureException ofe, String optimizationResult) {
if (optimizationResult != null) {
OPERATION_LOG.info("[{}] Self-healing started successfully:\n{}", anomalyId, optimizationResult);
} else if (ofe != null) {
OPERATION_LOG.warn("[{}] Self-healing failed to start:\n{}", anomalyId, ofe);
} else {
OPERATION_LOG.warn("[{}] Self-healing failed to start due to inability to optimize combined self-healing goals ({}).",
anomalyId, _selfHealingGoals);
}
}
private void fixAnomalyInProgress(AnomalyType anomalyType) throws Exception {
synchronized (_shutdownLock) {
if (_shutdown) {
LOG.info("Skip fixing anomaly {}, because anomaly detector is shutting down.", _anomalyInProgress);
} else {
boolean isReadyToFix = isAnomalyInProgressReadyToFix(anomalyType);
boolean fixStarted = false;
String anomalyId = _anomalyInProgress.anomalyId();
// Upon post-handling the anomaly, skip reporting broker failure if the failed brokers have not changed.
boolean skipReportingIfNotUpdated = false;
try {
if (isReadyToFix) {
LOG.info("Generating a fix for the anomaly {}.", _anomalyInProgress);
fixStarted = _anomalyInProgress.fix();
LOG.info("{} the anomaly {}.", fixStarted ? "Fixing" : "Cannot fix", _anomalyInProgress);
String optimizationResult = fixStarted ? _anomalyInProgress.optimizationResult(false) : null;
_anomalyLoggerExecutor.submit(() -> logSelfHealingOperation(anomalyId, null, optimizationResult));
}
} catch (OptimizationFailureException ofe) {
_anomalyLoggerExecutor.submit(() -> logSelfHealingOperation(anomalyId, ofe, null));
skipReportingIfNotUpdated = anomalyType == KafkaAnomalyType.BROKER_FAILURE;
throw ofe;
} finally {
handlePostFixAnomaly(isReadyToFix, fixStarted, anomalyId, skipReportingIfNotUpdated);
}
}
}
}
private void handlePostFixAnomaly(boolean isReadyToFix, boolean fixStarted, String anomalyId, boolean skipReportingIfNotUpdated) {
if (isReadyToFix) {
_anomalyDetectorState.onAnomalyHandle(_anomalyInProgress, fixStarted ? AnomalyState.Status.FIX_STARTED
: AnomalyState.Status.FIX_FAILED_TO_START);
if (fixStarted) {
_anomalyDetectorState.incrementNumSelfHealingStarted();
LOG.info("[{}] Self-healing started successfully.", anomalyId);
} else {
_anomalyDetectorState.incrementNumSelfHealingFailedToStart();
LOG.warn("[{}] Self-healing failed to start.", anomalyId);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Clearing {} anomalies and scheduling a broker failure detection in {}ms.", _anomalies.size(),
isReadyToFix ? 0L : _brokerFailureDetectionBackoffMs);
}
_anomalies.clear();
// Explicitly detect broker failures after clearing the queue. This ensures that anomaly detector does not miss
// broker failures upon (1) fixing another anomaly, or (2) having broker failures that are not yet ready for fix.
// We don't need to worry about other anomaly types because they run periodically.
// If there has not been any failed brokers at the time of detecting broker failures, this is a no-op. Otherwise,
// the call will create a broker failure anomaly. Depending on the time of the first broker failure in that anomaly,
// it will trigger either a delayed check or a fix.
_detectorScheduler.schedule(() -> _brokerFailureDetector.detectBrokerFailures(skipReportingIfNotUpdated),
isReadyToFix ? 0L : _brokerFailureDetectionBackoffMs, TimeUnit.MILLISECONDS);
}
}
}