/
tag.go
349 lines (287 loc) · 8.44 KB
/
tag.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
package server
import (
"errors"
"fmt"
"reflect"
"regexp"
"strings"
pb "github.com/DataDog/kafka-kit/v4/registry/registry"
)
var (
// ErrInvalidKafkaObjectType error.
ErrInvalidKafkaObjectType = errors.New("invalid Kafka object type")
// ErrKafkaObjectDoesNotExist error.
ErrKafkaObjectDoesNotExist = errors.New("requested Kafka object does not exist")
// ErrNilTagSet error.
ErrNilTagSet = errors.New("must provide a non-nil or non-empty TagSet")
// ErrNilTags error.
ErrNilTags = errors.New("must provide a non-nil or non-empty tags")
// Regex for proto fields to ignore.
defaultProtoFieldRegex = regexp.MustCompile("^XXX|^unknownFields$|^state$|^sizeCache$")
)
// ErrReservedTag error.
type ErrReservedTag struct {
t string
}
func (e ErrReservedTag) Error() string {
return fmt.Sprintf("tag '%s' is a reserved tag", e.t)
}
// TagHandler provides object filtering by tags along with tag storage and retrieval.
type TagHandler struct {
Store TagStorage
}
// TagStorage handles tag persistence to stable storage.
type TagStorage interface {
LoadReservedFields(ReservedFields) error
FieldReserved(KafkaObject, string) bool
SetTags(KafkaObject, TagSet) error
GetTags(KafkaObject) (TagSet, error)
DeleteTags(KafkaObject, []string) error
GetAllTags() (map[KafkaObject]TagSet, error)
}
// NewTagHandler initializes a TagHandler.
func NewTagHandler(c TagHandlerConfig) (*TagHandler, error) {
ts, err := NewZKTagStorage(ZKTagStorageConfig{Prefix: c.Prefix})
if err != nil {
return nil, err
}
err = ts.LoadReservedFields(GetReservedFields())
if err != nil {
return nil, err
}
return &TagHandler{
// More sophisticated initialization/config passing if additional TagStorage
// backends are written.
Store: ts,
}, nil
}
// TagHandlerConfig holds TagHandler configuration.
type TagHandlerConfig struct {
Prefix string
}
// Tags is a []string of "key:value" pairs.
type Tags []string
// TagSet is a map of key:values.
type TagSet map[string]string
// KafkaObject holds an object type (broker, topic) and object identifier (ID,
// name).
type KafkaObject struct {
Type string
ID string
}
// Valid checks if a KafkaObject has a valid Type field value.
func (o KafkaObject) Valid() bool {
switch {
case o.Type == "broker", o.Type == "topic":
return true
}
return false
}
// Complete checks if a KafkaObject is valid and has a non-empty ID field value.
func (o KafkaObject) Complete() bool {
return o.Valid() && o.ID != ""
}
// TagSetFromObject takes a protobuf type and returns the default TagSet along
// with any user-defined tags.
func (t *TagHandler) TagSetFromObject(o interface{}) (TagSet, error) {
var ts = TagSet{}
var ko = KafkaObject{}
// Populate default tags.
switch o.(type) {
case *pb.Topic:
t := o.(*pb.Topic)
ko.Type = "topic"
ko.ID = t.Name
ts["name"] = t.Name
ts["partitions"] = fmt.Sprintf("%d", t.Partitions)
ts["replication"] = fmt.Sprintf("%d", t.Replication)
case *pb.Broker:
b := o.(*pb.Broker)
ko.Type = "broker"
ko.ID = fmt.Sprintf("%d", b.Id)
ts["id"] = fmt.Sprintf("%d", b.Id)
ts["host"] = b.Host
ts["rack"] = b.Rack
ts["port"] = fmt.Sprintf("%d", b.Port)
ts["logmessageformat"] = b.Logmessageformat
ts["interbrokerprotocolversion"] = b.Interbrokerprotocolversion
}
// Fetch stored tags.
st, err := t.Store.GetTags(ko)
if err != nil {
switch {
// ErrKafkaObjectDoesNotExist from TagStorage simply means we do not have any
// user-defined tags stored for the requested object.
case err == ErrKafkaObjectDoesNotExist:
break
default:
return nil, err
}
}
// Merge stored tags with default tags.
for k, v := range st {
ts[k] = v
}
return ts, nil
}
// FilterTopics takes a map of topic names to *pb.Topic and tags KV list.
// A filtered map is returned that includes topics where all tags
// values match the provided input tag KVs. Additionally, any custom
// tags persisted in the TagStorage backend are populated into the
// Tags field for each matched object.
func (t *TagHandler) FilterTopics(in TopicSet, tags Tags) (TopicSet, error) {
var out = make(TopicSet)
// Get tag key/values.
tagKV, err := tags.TagSet()
if err != nil {
return nil, err
}
// Filter input topics.
for name, topic := range in {
ts, err := t.TagSetFromObject(topic)
if err != nil {
return nil, err
}
if ts.matchAll(tagKV) {
out[name] = topic
// Ensure that custom tags fetched from storage are populated into the
// tags field for the object.
for k, v := range ts {
// Custom tags are any non-reserved object fields.
if !t.Store.FieldReserved(KafkaObject{Type: "topic"}, k) {
if out[name].Tags == nil {
out[name].Tags = map[string]string{}
}
out[name].Tags[k] = v
}
}
}
}
return out, nil
}
// FilterBrokers takes a map of broker IDs to *pb.Broker and tags KV list. A
// filtered map is returned that includes brokers where all tags values match
// the provided input tag KVs. Additionally, any custom tags persisted in the
// TagStorage backend are populated into the Tags field for each matched object.
func (t *TagHandler) FilterBrokers(in BrokerSet, tags Tags) (BrokerSet, error) {
var out = make(BrokerSet)
// Get tag key/values.
tagKV, err := tags.TagSet()
if err != nil {
return nil, err
}
// Filter input brokers.
for id, broker := range in {
ts, err := t.TagSetFromObject(broker)
if err != nil {
return nil, err
}
if ts.matchAll(tagKV) {
out[id] = broker
// Ensure that custom tags fetched from storage are populated into the tags
// field for the object.
for k, v := range ts {
// Custom tags are any non-reserved object fields.
if !t.Store.FieldReserved(KafkaObject{Type: "broker"}, k) {
if out[id].Tags == nil {
out[id].Tags = map[string]string{}
}
out[id].Tags[k] = v
}
}
}
}
return out, nil
}
// Keys returns a []string of all tag keys for a Tags. It's possible to receive
// fully formed tags or just tag keys.
func (t Tags) Keys() []string {
var keys []string
for _, tag := range t {
kv := strings.Split(tag, ":")
keys = append(keys, kv[0])
}
return keys
}
// matchAll takes a TagSet and returns true if all key/values are present and
// equal to those in the input TagSet.
func (t TagSet) matchAll(kv TagSet) bool {
for k, v := range kv {
if t[k] != v {
return false
}
}
return true
}
// Equal checks if the input TagSet has the same key:value pairs as the calling
// TagSet.
func (t1 TagSet) Equal(t2 TagSet) bool {
if len(t1) != len(t2) {
return false
}
for k, v := range t1 {
if t2[k] != v {
return false
}
}
return true
}
// Tags takes a TagSet and returns a Tags.
func (t TagSet) Tags() Tags {
var ts Tags
for k, v := range t {
ts = append(ts, fmt.Sprintf("%s:%s", k, v))
}
return ts
}
// Keys returns a []string of all tag keys for a TagSet.
func (t TagSet) Keys() []string {
var keys []string
for k := range t {
keys = append(keys, k)
}
return keys
}
// TagSet takes a Tags and returns a TagSet and error for any malformed tags.
// Tags are expected to be formatted as a comma delimited "key:value,key2:value2"
// string.
// TODO normalize all tag usage to lower case.
func (t Tags) TagSet() (TagSet, error) {
var ts = TagSet{}
for _, tag := range t {
kv := strings.Split(tag, ":")
if len(kv) != 2 {
return nil, fmt.Errorf("invalid tag '%s': must be formatted as key:value", t)
}
ts[kv[0]] = kv[1]
}
return ts, nil
}
// ReservedFields is a mapping of object types (topic, broker) to a set of fields
// reserved for internal use; these are default fields that become searchable
// through the tags interface.
type ReservedFields map[string]map[string]struct{}
// GetReservedFields returns a map proto message types to field names considered
// reserved for internal use. All fields specified in the Registry proto messages
// are discovered here and reserved by default.
func GetReservedFields() ReservedFields {
var fs = make(ReservedFields)
fs["topic"] = fieldsFromStruct(&pb.Topic{})
fs["broker"] = fieldsFromStruct(&pb.Broker{})
return fs
}
// fieldsFromStruct extracts all user-defined fields from proto messages.
// Discovered fields are returned all lowercase.
func fieldsFromStruct(s interface{}) map[string]struct{} {
var fs = make(map[string]struct{})
// Iterate over fields.
v := reflect.ValueOf(s).Elem()
for i := 0; i < v.NumField(); i++ {
// Exclude proto generated fields.
if !defaultProtoFieldRegex.MatchString(v.Type().Field(i).Name) {
f := strings.ToLower(v.Type().Field(i).Name)
fs[f] = struct{}{}
}
}
return fs
}