/
throttles.go
111 lines (94 loc) · 3.31 KB
/
throttles.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package main
import (
"context"
"time"
"github.com/DataDog/kafka-kit/v3/cmd/autothrottle/internal/throttlestore"
"github.com/DataDog/kafka-kit/v3/kafkaadmin"
"github.com/DataDog/kafka-kit/v3/kafkametrics"
"github.com/DataDog/kafka-kit/v3/kafkazk"
)
// ThrottleManager manages Kafka throttle rates.
type ThrottleManager struct {
reassignments kafkazk.Reassignments
zk kafkazk.Handler
km kafkametrics.Handler
ka kafkaadmin.KafkaAdmin
overrideRate int
kafkaNativeMode bool
kafkaAPIRequestTimeout int
// The following three fields are for brokers with static overrides set
// and a topicThrottledReplicas for topics where those brokers are assigned.
brokerOverrides throttlestore.BrokerOverrides
overrideThrottleLists topicThrottledReplicas
skipOverrideTopicUpdates bool
reassigningBrokers reassigningBrokers
events *DDEventWriter
previouslySetThrottles replicationCapacityByBroker
limits Limits
failureThreshold int
failures int
skipTopicUpdates bool
}
// InitKafkaAdmin takes a csv Kafka broker list and initializes a kafkaadmin
// client.
func (tm *ThrottleManager) InitKafkaAdmin(brokers string) error {
cfg := kafkaadmin.Config{BootstrapServers: brokers}
ka, err := kafkaadmin.NewClient(cfg)
if err != nil {
return err
}
tm.ka = ka
return nil
}
func hasActiveOverride(bto throttlestore.BrokerThrottleOverride) bool {
return bto.Config.Rate != 0
}
func notReassignmentParticipant(bto throttlestore.BrokerThrottleOverride) bool {
return !bto.ReassignmentParticipant && bto.Config.Rate != 0
}
// Failure increments the failures count and returns true if the
// count exceeds the failures threshold.
func (r *ThrottleManager) Failure() bool {
r.failures++
if r.failures > r.failureThreshold {
return true
}
return false
}
// ResetFailures resets the failures count.
func (tm *ThrottleManager) ResetFailures() {
tm.failures = 0
}
// DisableTopicUpdates prevents topic throttled replica lists from being
// updated in ZooKeeper.
func (tm *ThrottleManager) DisableTopicUpdates() {
tm.skipTopicUpdates = true
}
// DisableTopicUpdates allows topic throttled replica lists updates in ZooKeeper.
func (tm *ThrottleManager) EnableTopicUpdates() {
tm.skipTopicUpdates = false
}
// DisableOverrideTopicUpdates prevents topic throttled replica lists for
// topics assigned to override brokers from being updated in ZooKeeper.
func (tm *ThrottleManager) DisableOverrideTopicUpdates() {
tm.skipOverrideTopicUpdates = true
}
// EnableOverrideTopicUpdates allows topic throttled replica lists for
// topics assigned to override brokers to be updated in ZooKeeper.
func (tm *ThrottleManager) EnableOverrideTopicUpdates() {
tm.skipOverrideTopicUpdates = false
}
// kafkaRequestContext returns a context and cancel func with the default
// ThrottleManager Kafka API request timeout.
func (tm *ThrottleManager) kafkaRequestContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(
context.Background(),
time.Duration(tm.kafkaAPIRequestTimeout)*time.Second,
)
}
// ThrottledBrokers is a list of brokers with a throttle applied
// for an ongoing reassignment.
type ThrottledBrokers struct {
Src []*kafkametrics.Broker
Dst []*kafkametrics.Broker
}