-
Notifications
You must be signed in to change notification settings - Fork 92
/
topics.go
82 lines (72 loc) · 1.79 KB
/
topics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package kafkatarget
// This code is copied from Promtail (https://github.com/grafana/loki/commit/065bee7e72b00d800431f4b70f0d673d6e0e7a2b). The kafkatarget package is used to
// configure and run the targets that can read kafka entries and forward them
// to other loki components.
import (
"errors"
"fmt"
"regexp"
"sort"
)
type topicClient interface {
RefreshMetadata(topics ...string) error
Topics() ([]string, error)
}
type topicManager struct {
client topicClient
patterns []*regexp.Regexp
matches []string
}
// newTopicManager fetches topics and returns matchings one based on list of requested topics.
// If a topic starts with a '^' it is treated as a regexp and can match multiple topics.
func newTopicManager(client topicClient, topics []string) (*topicManager, error) {
var (
patterns []*regexp.Regexp
matches []string
)
for _, t := range topics {
if len(t) == 0 {
return nil, errors.New("invalid empty topic")
}
if t[0] != '^' {
matches = append(matches, t)
}
re, err := regexp.Compile(t)
if err != nil {
return nil, fmt.Errorf("invalid topic pattern: %w", err)
}
patterns = append(patterns, re)
}
return &topicManager{
client: client,
patterns: patterns,
matches: matches,
}, nil
}
func (tm *topicManager) Topics() ([]string, error) {
if err := tm.client.RefreshMetadata(); err != nil {
return nil, err
}
topics, err := tm.client.Topics()
if err != nil {
return nil, err
}
result := make([]string, 0, len(topics))
Outer:
for _, topic := range topics {
for _, m := range tm.matches {
if m == topic {
result = append(result, topic)
continue Outer
}
}
for _, p := range tm.patterns {
if p.MatchString(topic) {
result = append(result, topic)
continue Outer
}
}
}
sort.Strings(result)
return result, nil
}