-
Notifications
You must be signed in to change notification settings - Fork 21
/
sla.go
115 lines (96 loc) · 2.74 KB
/
sla.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package sla
import (
"fmt"
"strconv"
)
const (
SlaKeyRetentionHours = "retention.hours"
SlaKeyRetentionBytes = "retention.bytes"
SlaKeyPartitions = "partitions"
SlaKeyReplicas = "replicas"
SlaKeyRetryTopic = "retry"
SlaKeyDeadLetterTopic = "dead"
)
const (
defaultRetentionBytes = -1 // unlimited
defaultRetentionHours = 7 * 24 // 7 days
defaultPartitions = 1
defaultReplicas = 2
defaultMinInsyncReplicas = 1
maxReplicas = 3
maxPartitions = 20
maxRetentionHours = 20 * 7 * 24
)
type TopicSla struct {
RetentionHours float64
RetentionBytes int
Partitions int
Replicas int
MinInsyncReplicas int
}
func DefaultSla() *TopicSla {
return &TopicSla{
RetentionBytes: -1,
RetentionHours: defaultRetentionHours,
Partitions: defaultPartitions,
Replicas: defaultReplicas,
MinInsyncReplicas: defaultMinInsyncReplicas,
}
}
func (this *TopicSla) IsDefault() bool {
return this.Replicas == defaultReplicas &&
this.Partitions == defaultPartitions &&
this.RetentionBytes == defaultRetentionBytes &&
this.RetentionHours == defaultRetentionHours &&
this.MinInsyncReplicas == defaultMinInsyncReplicas
}
func (this *TopicSla) Validate() error {
if this.Partitions > 20 {
return ErrTooBigPartitions
}
return nil
}
func (this *TopicSla) ParseRetentionHours(s string) error {
if len(s) == 0 {
return ErrEmptyArg
}
f, e := strconv.ParseFloat(s, 64)
if e != nil {
return ErrNotNumber
}
if f <= 0 {
return ErrNegative
}
this.RetentionHours = f
return nil
}
// Dump the sla for kafka-topics.sh as arguments.
func (this *TopicSla) DumpForCreateTopic() []string {
r := make([]string, 0)
if this.Partitions < 1 || this.Partitions > maxPartitions {
this.Partitions = defaultPartitions
}
r = append(r, fmt.Sprintf("--partitions %d", this.Partitions))
if this.Replicas < 1 || this.Replicas > maxReplicas {
this.Replicas = defaultReplicas
}
r = append(r, fmt.Sprintf("--replication-factor %d", this.Replicas))
return r
}
func (this *TopicSla) DumpForAlterTopic() []string {
r := make([]string, 0)
if this.RetentionBytes != defaultRetentionBytes && this.RetentionBytes > 0 {
r = append(r, fmt.Sprintf("--config retention.bytes=%d", this.RetentionBytes))
}
if this.RetentionHours != defaultRetentionHours && this.RetentionHours > 0 && this.RetentionHours <= maxRetentionHours {
r = append(r, fmt.Sprintf("--config retention.ms=%d",
int(this.RetentionHours*1000*3600)))
}
if this.Partitions != defaultPartitions {
r = append(r, fmt.Sprintf("--partitions %d", this.Partitions))
}
if this.MinInsyncReplicas != defaultMinInsyncReplicas {
r = append(r, fmt.Sprintf("--config min.insync.replicas=%d", this.MinInsyncReplicas))
}
return r
}