forked from twmb/franz-go
-
Notifications
You must be signed in to change notification settings - Fork 1
/
consumer_direct.go
159 lines (145 loc) · 4.23 KB
/
consumer_direct.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package kgo
type directConsumer struct {
cfg *cfg
tps *topicsPartitions // data for topics that the user assigned
using mtmps // topics we are currently using
m mtmps // mirrors cfg.topics and cfg.partitions, but can change with Purge or Add
ps map[string]map[int32]Offset // mirrors cfg.partitions, changed in Purge or Add
reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not
}
func (c *consumer) initDirect() {
d := &directConsumer{
cfg: &c.cl.cfg,
tps: newTopicsPartitions(),
reSeen: make(map[string]bool),
using: make(mtmps),
m: make(mtmps),
ps: make(map[string]map[int32]Offset),
}
c.d = d
if d.cfg.regex {
return
}
var topics []string
for topic, partitions := range d.cfg.partitions {
topics = append(topics, topic)
for partition := range partitions {
d.m.add(topic, partition)
}
p := make(map[int32]Offset, len(partitions))
for partition, offset := range partitions {
p[partition] = offset
}
d.ps[topic] = p
}
for topic := range d.cfg.topics {
topics = append(topics, topic)
d.m.addt(topic)
}
d.tps.storeTopics(topics) // prime topics to load if non-regex (this is of no benefit if regex)
}
// For SetOffsets, unlike the group consumer, we just blindly translate the
// input EpochOffsets into Offsets, and those will be set directly.
func (*directConsumer) getSetAssigns(setOffsets map[string]map[int32]EpochOffset) (assigns map[string]map[int32]Offset) {
assigns = make(map[string]map[int32]Offset)
for topic, partitions := range setOffsets {
set := make(map[int32]Offset)
for partition, eo := range partitions {
set[partition] = Offset{
at: eo.Offset,
epoch: eo.Epoch,
}
}
assigns[topic] = set
}
return assigns
}
// findNewAssignments returns new partitions to consume at given offsets
// based off the current topics.
func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset {
topics := d.tps.load()
var rns reNews
if d.cfg.regex {
defer rns.log(d.cfg)
}
toUse := make(map[string]map[int32]Offset, 10)
for topic, topicPartitions := range topics {
var useTopic bool
if d.cfg.regex {
want, seen := d.reSeen[topic]
if !seen {
for rawRe, re := range d.cfg.topics {
if want = re.MatchString(topic); want {
rns.add(rawRe, topic)
break
}
}
if !want {
rns.skip(topic)
}
d.reSeen[topic] = want
}
useTopic = want
} else {
useTopic = d.m.onlyt(topic)
}
// If the above detected that we want to keep this topic, we
// set all partitions as usable.
//
// For internal partitions, we only allow consuming them if
// the topic is explicitly specified.
if useTopic {
partitions := topicPartitions.load()
if d.cfg.regex && partitions.isInternal {
continue
}
toUseTopic := make(map[int32]Offset, len(partitions.partitions))
for partition := range partitions.partitions {
toUseTopic[int32(partition)] = d.cfg.resetOffset
}
toUse[topic] = toUseTopic
}
// Lastly, if this topic has some specific partitions pinned,
// we set those. We only use partitions from topics that have
// not been purged.
for topic := range d.m {
for partition, offset := range d.ps[topic] {
toUseTopic, exists := toUse[topic]
if !exists {
toUseTopic = make(map[int32]Offset, 10)
toUse[topic] = toUseTopic
}
toUseTopic[partition] = offset
}
}
}
// With everything we want to consume, remove what we are already.
for topic, partitions := range d.using {
toUseTopic, exists := toUse[topic]
if !exists {
continue // metadata update did not return this topic (regex or failing load)
}
for partition := range partitions {
delete(toUseTopic, partition)
}
if len(toUseTopic) == 0 {
delete(toUse, topic)
}
}
if len(toUse) == 0 {
return nil
}
// Finally, toUse contains new partitions that we must consume.
// Add them to our using map and assign them.
for topic, partitions := range toUse {
topicUsing, exists := d.using[topic]
if !exists {
topicUsing = make(map[int32]struct{})
d.using[topic] = topicUsing
}
for partition := range partitions {
topicUsing[partition] = struct{}{}
}
}
return toUse
}