/
main.go
454 lines (390 loc) · 15.9 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/DataDog/kafka-kit/v4/cmd/autothrottle/internal/api"
"github.com/DataDog/kafka-kit/v4/cmd/autothrottle/internal/throttlestore"
"github.com/DataDog/kafka-kit/v4/kafkametrics"
"github.com/DataDog/kafka-kit/v4/kafkametrics/datadog"
"github.com/DataDog/kafka-kit/v4/kafkazk"
"github.com/jamiealquiza/envy"
)
var (
// This can be set with -ldflags "-X main.version=x.x.x"
version = "0.0.0"
// Config holds configuration parameters.
Config struct {
KafkaNativeMode bool
KafkaAPIRequestTimeout int
APIKey string
AppKey string
NetworkTXQuery string
NetworkRXQuery string
BrokerIDTag string
InstanceTypeTag string
MetricsWindow int
BootstrapServers string
ZKAddr string
ZKPrefix string
Interval int
APIListen string
ConfigZKPrefix string
DDEventTags string
MinRate float64
SourceMaxRate float64
DestinationMaxRate float64
ChangeThreshold float64
FailureThreshold int
CapMap map[string]float64
CleanupAfter int64
}
// Misc.
topicsRegex = []*regexp.Regexp{regexp.MustCompile(".*")}
)
func main() {
v := flag.Bool("version", false, "version")
flag.BoolVar(&Config.KafkaNativeMode, "kafka-native-mode", false, "Favor native Kafka RPCs over ZooKeeper metadata access")
flag.IntVar(&Config.KafkaAPIRequestTimeout, "kafka-api-request-timeout", 15, "Kafka API request timeout (seconds)")
flag.StringVar(&Config.APIKey, "api-key", "", "Datadog API key")
flag.StringVar(&Config.AppKey, "app-key", "", "Datadog app key")
flag.StringVar(&Config.NetworkTXQuery, "net-tx-query", "avg:system.net.bytes_sent{service:kafka} by {host}", "Datadog query for broker outbound bandwidth by host")
flag.StringVar(&Config.NetworkRXQuery, "net-rx-query", "avg:system.net.bytes_rcvd{service:kafka} by {host}", "Datadog query for broker inbound bandwidth by host")
flag.StringVar(&Config.BrokerIDTag, "broker-id-tag", "broker_id", "Datadog host tag for broker ID")
flag.StringVar(&Config.InstanceTypeTag, "instance-type-tag", "instance-type", "Datadog tag for instance type")
flag.IntVar(&Config.MetricsWindow, "metrics-window", 120, "Time span of metrics required (seconds)")
flag.StringVar(&Config.BootstrapServers, "bootstrap-servers", "localhost:9092", "Kafka bootstrap servers")
flag.StringVar(&Config.ZKAddr, "zk-addr", "localhost:2181", "ZooKeeper connect string (for broker metadata or rebuild-topic lookups)")
flag.StringVar(&Config.ZKPrefix, "zk-prefix", "", "ZooKeeper namespace prefix")
flag.IntVar(&Config.Interval, "interval", 180, "Autothrottle check interval (seconds)")
flag.StringVar(&Config.APIListen, "api-listen", "localhost:8080", "Admin API listen address:port")
flag.StringVar(&Config.ConfigZKPrefix, "zk-config-prefix", "autothrottle", "ZooKeeper prefix to store autothrottle configuration")
flag.StringVar(&Config.DDEventTags, "dd-event-tags", "", "Comma-delimited list of Datadog event tags")
flag.Float64Var(&Config.MinRate, "min-rate", 10, "Minimum replication throttle rate (MB/s)")
flag.Float64Var(&Config.SourceMaxRate, "max-tx-rate", 90, "Maximum outbound replication throttle rate (as a percentage of available capacity)")
flag.Float64Var(&Config.DestinationMaxRate, "max-rx-rate", 90, "Maximum inbound replication throttle rate (as a percentage of available capacity)")
flag.Float64Var(&Config.ChangeThreshold, "change-threshold", 10, "Required change in replication throttle to trigger an update (percent)")
flag.IntVar(&Config.FailureThreshold, "failure-threshold", 1, "Number of iterations that throttle determinations can fail before reverting to the min-rate")
m := flag.String("cap-map", "", "JSON map of instance types to network capacity in MB/s")
flag.Int64Var(&Config.CleanupAfter, "cleanup-after", 60, "Number of intervals after which to issue a global throttle unset if no replication is running")
envy.Parse("AUTOTHROTTLE")
flag.Parse()
if *v {
fmt.Println(version)
os.Exit(0)
}
// Deserialize instance-type capacity map.
Config.CapMap = map[string]float64{}
if len(*m) > 0 {
err := json.Unmarshal([]byte(*m), &Config.CapMap)
if err != nil {
fmt.Printf("Error parsing cap-map flag: %s\n", err)
os.Exit(1)
}
}
log.Println("Autothrottle Running")
// Lazily prevent a tight restart loop from thrashing ZK.
time.Sleep(1 * time.Second)
// Init ZK.
zk, err := kafkazk.NewHandler(&kafkazk.Config{
Connect: Config.ZKAddr,
Prefix: Config.ZKPrefix,
})
if err != nil {
log.Fatal(err)
}
defer zk.Close()
// Init the admin API.
apiConfig := &api.APIConfig{
Listen: Config.APIListen,
ZKPrefix: Config.ConfigZKPrefix,
}
trigger := make(chan struct{}, 1)
api.Init(apiConfig, zk, trigger)
log.Printf("Admin API: %s\n", Config.APIListen)
// Init a Kafka metrics fetcher.
km, err := datadog.NewHandler(&datadog.Config{
APIKey: Config.APIKey,
AppKey: Config.AppKey,
NetworkTXQuery: Config.NetworkTXQuery,
NetworkRXQuery: Config.NetworkRXQuery,
BrokerIDTag: Config.BrokerIDTag,
InstanceTypeTag: Config.InstanceTypeTag,
MetricsWindow: Config.MetricsWindow,
})
if err != nil {
log.Fatal(err)
}
// Get optional Datadog event tags.
t := strings.Split(Config.DDEventTags, ",")
tags := []string{"name:kafka-autothrottle"}
for _, tag := range t {
tags = append(tags, tag)
}
// Init the Datadog event writer.
echan := make(chan *kafkametrics.Event, 100)
go eventWriter(km, echan)
// Init an DDEventWriter.
events := &DDEventWriter{
c: echan,
titlePrefix: eventTitlePrefix,
tags: tags,
}
// Default to true on startup in case throttles were set in an autothrottle
// process other than the current one.
knownThrottles := true
var reassignments kafkazk.Reassignments
// Track topic replication states across intervals.
var topicsReplicatingNow = newSet()
var topicsReplicatingPreviously = newSet()
// Track override broker states.
var brokersThrottledPreviously = newSet()
// Params for the updateReplicationThrottle request.
newLimitsConfig := NewLimitsConfig{
Minimum: Config.MinRate,
SourceMaximum: Config.SourceMaxRate,
DestinationMaximum: Config.DestinationMaxRate,
CapacityMap: Config.CapMap,
}
lim, err := NewLimits(newLimitsConfig)
if err != nil {
log.Fatal(err)
}
ThrottleManager := &ThrottleManager{
zk: zk,
km: km,
kafkaNativeMode: Config.KafkaNativeMode,
kafkaAPIRequestTimeout: Config.KafkaAPIRequestTimeout,
events: events,
previouslySetThrottles: make(replicationCapacityByBroker),
limits: lim,
failureThreshold: Config.FailureThreshold,
}
// Init a KafkaAdmin Client if needed.
if Config.KafkaNativeMode {
if err := ThrottleManager.InitKafkaAdmin(Config.BootstrapServers); err != nil {
log.Fatal(err)
}
log.Printf("Connected to Kafka: %s\n", Config.BootstrapServers)
}
// Run.
var interval int64
var ticker = time.NewTicker(time.Duration(Config.Interval) * time.Second)
// TODO(jamie): refactor this loop.
for {
// Get topics undergoing reassignment.
if !Config.KafkaNativeMode {
reassignments = zk.GetReassignments()
} else {
// KIP-455 compatible reassignments lookup.
reassignments, err = zk.ListReassignments()
if err != nil {
fmt.Printf("error fetching reassignments: %s\n", err)
continue
}
}
topicsReplicatingNow = newSet()
for t := range reassignments {
topicsReplicatingNow.add(t)
}
// Check for topics that were previously seen replicating, but are no
// longer in this interval.
topicsDoneReplicating := topicsReplicatingPreviously.diff(topicsReplicatingNow)
// Log and write event.
if len(topicsDoneReplicating) > 0 {
m := fmt.Sprintf("Topics done reassigning: %s", topicsDoneReplicating.keys())
log.Println(m)
events.Write("Topics done reassigning", m)
}
// If all of the currently replicating topics are a subset
// of the previously replicating topics, we can stop updating
// the Kafka topic throttled replicas list. This minimizes
// state that must be propagated through the cluster.
if topicsReplicatingNow.isSubSet(topicsReplicatingPreviously) {
ThrottleManager.DisableTopicUpdates()
} else {
ThrottleManager.EnableTopicUpdates()
// Unset any previously stored throttle rates. This is done to avoid a
// scenario that results in autothrottle being unaware of externally
// specified throttles and failing to override them. The condition can be
// triggered when two subsequent reassignments involving the same broker
// set are handled by autothrottle. The error condition is as follows:
//
// - Autothrottle sees reassignment 1 involving brokers 1001, 1002
// and determines a throttle rate of 100MB/s.
// - Reassignment 1 completes, reassignment 2 is started in-between
// autothrottle intervals and a manual rate of 25MB/s is specified from
// the reassignment tool.
// - Autothrottle sees reassignment 2, revisits throughput and determines
// the rate for brokers 1001 and 1002 should be 105MB/s, below the
// ChangeThreshold of 10% when compared to the last known rates set;
// throttle updates are skipped.
// - The reassignment is now stuck at 25MB/s.
//
// There's two solutions considered to reconcile the stale state:
// - Reset all previously stored rates when the current reassigning
// topic list is not a subset of the previous reassigning topic list.
// - Force throttle updates every so many intervals, regardless of the
// required ChangeThreshold.
//
// Ensure we're doing option 1 right here:
ThrottleManager.previouslySetThrottles.reset()
}
// Rebuild topicsReplicatingPreviously with the current replications
// for the next check iteration.
topicsReplicatingPreviously = topicsReplicatingNow.copy()
// Check if a global throttle override was configured.
overrideCfg, err := throttlestore.FetchThrottleOverride(zk, api.OverrideRateZnodePath)
if err != nil {
log.Println(err)
}
// Fetch all broker-specific overrides.
bo, err := throttlestore.FetchBrokerOverrides(zk, api.OverrideRateZnodePath)
if err != nil {
log.Println(err)
}
// Get the maps of brokers handling reassignments.
rb, err := getReassigningBrokers(reassignments, zk)
if err != nil {
log.Println(err)
}
ThrottleManager.brokerOverrides = bo
ThrottleManager.reassigningBrokers = rb
// If topics are being reassigned, update the replication throttle.
if len(topicsReplicatingNow) > 0 {
log.Printf("Topics with ongoing reassignments: %s\n", topicsReplicatingNow.keys())
// Update the ThrottleManager.
ThrottleManager.overrideRate = overrideCfg.Rate
ThrottleManager.reassignments = reassignments
err = ThrottleManager.updateReplicationThrottle()
if err != nil {
log.Println(err)
} else {
// Set knownThrottles.
knownThrottles = true
}
}
// Get brokers with active overrides, ie where the override rate is non-0,
// that are also not part of a reassignment.
activeOverrideBrokers := ThrottleManager.brokerOverrides.Filter(notReassignmentParticipant)
// Apply any additional broker-specific throttles that were not applied as
// part of a reassignment.
if len(ThrottleManager.brokerOverrides) > 0 {
// Find all topics that include brokers with static overrides
// configured that aren't being reassigned. In order for broker-specific
// throttles to be applied, topics being replicated by those brokers
// must include them in the follower.replication.throttled.replicas
// dynamic configuration parameter. It's clumsy, but this is the way
// Kafka was designed.
// TODO(jamie): is there a scenario where we should exclude topics
// have also have a reassignment? We're discovering topics here by
// reverse lookup of brokers that are not reassignment participants.
var err error
ThrottleManager.overrideThrottleLists, err = ThrottleManager.getTopicsWithThrottledBrokers()
if err != nil {
log.Printf("Error fetching topic states: %s\n", err)
}
// Determine whether we need to propagate topic throttle replica
// list configs. If the brokers with overrides remains the same,
// we don't need to need to update those configs.
var brokersThrottledNow = newSet()
for broker := range activeOverrideBrokers {
brokersThrottledNow.add(strconv.Itoa(broker))
}
if brokersThrottledNow.equal(brokersThrottledPreviously) {
ThrottleManager.DisableOverrideTopicUpdates()
} else {
ThrottleManager.EnableOverrideTopicUpdates()
}
brokersThrottledPreviously = brokersThrottledNow.copy()
// Update throttles.
if err := ThrottleManager.updateOverrideThrottles(); err != nil {
log.Println(err)
}
// If we're updating throttles and the active count (those not marked for
// removal) is > 0, we should set the knownThrottles to true.
if len(activeOverrideBrokers) > 0 {
knownThrottles = true
}
}
// Remove and delete any broker-specific overrides set to 0.
if errs := ThrottleManager.purgeOverrideThrottles(); errs != nil {
log.Println("Error removing persisted broker throttle overrides")
for i := range errs {
log.Println(errs[i])
}
}
// If there's no topics being reassigned, clear any throttles marked
// for automatic removal. Also, check if there's any broker throttles set.
// There's a somewhat complicated state problem here; if we previously
// set a broker throttle override but there's no reassignment, we'll
// immediately clear it here. There's two options:
//
// 1) Simply hold up clearing throttles if there's a broker throttle
// override set.
// 2) Fetch all topics where any brokers with overrides are assigned
// replicas, fetch all topic ISR states, diff the ISR states and the
// replica assignments to track under-replicated topics, then adding
// an under-replicated == 0 condition here.
//
// We're going with option 1 for now.
// Capture all the current conditions:
// Are there throttles eligible to be cleared?
var throttlesToClear = knownThrottles || interval == Config.CleanupAfter
// Are any topics being reassigned?
var topicsReassigning bool
if len(topicsReplicatingNow) > 0 {
topicsReassigning = true
}
// Do any brokers have throttle overrides set?
var brokerOverridesSet bool
if len(activeOverrideBrokers) > 0 {
brokerOverridesSet = true
}
// Next steps according to the various conditions:
if !topicsReassigning {
log.Println("No topics undergoing reassignment")
}
if !topicsReassigning && throttlesToClear && brokerOverridesSet {
log.Println("One or more brokers level override are set; automatic throttle removal will be skipped")
}
// If there's previously set throttles but no topics reassigning nor
// broker overrides set, we can issue a global throttle removal.
if throttlesToClear && !topicsReassigning && !brokerOverridesSet {
// Reset the interval count.
interval = 0
// Remove all the broker + topic throttle configs.
err := ThrottleManager.removeAllThrottles()
if err != nil {
log.Printf("Error removing throttles: %s\n", err.Error())
} else {
// Only set knownThrottles to false if we've removed all
// without error.
knownThrottles = false
}
// Ensure topic throttle updates are re-enabled.
ThrottleManager.EnableTopicUpdates()
ThrottleManager.EnableOverrideTopicUpdates()
// Remove any configured throttle overrides if AutoRemove is true.
if overrideCfg.AutoRemove {
err := throttlestore.StoreThrottleOverride(zk, api.OverrideRateZnodePath, throttlestore.ThrottleOverrideConfig{})
if err != nil {
log.Println(err)
} else {
log.Println("Global throttle override removed")
}
}
}
select {
case <-ticker.C:
interval++
case <-trigger:
}
}
}