forked from IBM/sarama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
partitioner.go
217 lines (187 loc) · 7.33 KB
/
partitioner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package sarama
import (
"hash"
"hash/fnv"
"math/rand"
"time"
)
// Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1],
// decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
// as simple default implementations.
type Partitioner interface {
// Partition takes a message and partition count and chooses a partition
Partition(message *ProducerMessage, numPartitions int32) (int32, error)
// RequiresConsistency indicates to the user of the partitioner whether the
// mapping of key->partition is consistent or not. Specifically, if a
// partitioner requires consistency then it must be allowed to choose from all
// partitions (even ones known to be unavailable), and its choice must be
// respected by the caller. The obvious example is the HashPartitioner.
RequiresConsistency() bool
}
// DynamicConsistencyPartitioner can optionally be implemented by Partitioners
// in order to allow more flexibility than is originally allowed by the
// RequiresConsistency method in the Partitioner interface. This allows
// partitioners to require consistency sometimes, but not all times. It's useful
// for, e.g., the HashPartitioner, which does not require consistency if the
// message key is nil.
type DynamicConsistencyPartitioner interface {
Partitioner
// MessageRequiresConsistency is similar to Partitioner.RequiresConsistency,
// but takes in the message being partitioned so that the partitioner can
// make a per-message determination.
MessageRequiresConsistency(message *ProducerMessage) bool
}
// PartitionerConstructor is the type for a function capable of constructing new Partitioners.
type PartitionerConstructor func(topic string) Partitioner
type manualPartitioner struct{}
// HashPartitionOption lets you modify default values of the partitioner
type HashPartitionerOption func(*hashPartitioner)
// WithAbsFirst means that the partitioner handles absolute values
// in the same way as the reference Java implementation
func WithAbsFirst() HashPartitionerOption {
return func(hp *hashPartitioner) {
hp.referenceAbs = true
}
}
// WithCustomHashFunction lets you specify what hash function to use for the partitioning
func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption {
return func(hp *hashPartitioner) {
hp.hasher = hasher()
}
}
// WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty
func WithCustomFallbackPartitioner(randomHP *hashPartitioner) HashPartitionerOption {
return func(hp *hashPartitioner) {
hp.random = hp
}
}
// NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided
// ProducerMessage's Partition field as the partition to produce to.
func NewManualPartitioner(topic string) Partitioner {
return new(manualPartitioner)
}
func (p *manualPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
return message.Partition, nil
}
func (p *manualPartitioner) RequiresConsistency() bool {
return true
}
type randomPartitioner struct {
generator *rand.Rand
}
// NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
func NewRandomPartitioner(topic string) Partitioner {
p := new(randomPartitioner)
p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
return p
}
func (p *randomPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
return int32(p.generator.Intn(int(numPartitions))), nil
}
func (p *randomPartitioner) RequiresConsistency() bool {
return false
}
type roundRobinPartitioner struct {
partition int32
}
// NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.
func NewRoundRobinPartitioner(topic string) Partitioner {
return &roundRobinPartitioner{}
}
func (p *roundRobinPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
if p.partition >= numPartitions {
p.partition = 0
}
ret := p.partition
p.partition++
return ret, nil
}
func (p *roundRobinPartitioner) RequiresConsistency() bool {
return false
}
type hashPartitioner struct {
random Partitioner
hasher hash.Hash32
referenceAbs bool
}
// NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher.
// The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that
// each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.
func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor {
return func(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = hasher()
p.referenceAbs = false
return p
}
}
// NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options
func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor {
return func(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a()
p.referenceAbs = false
for _, option := range options {
option(p)
}
return p
}
}
// NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a
// random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used,
// modulus the number of partitions. This ensures that messages with the same key always end up on the
// same partition.
func NewHashPartitioner(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a()
p.referenceAbs = false
return p
}
// NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values
// in the same way as the reference Java implementation. NewHashPartitioner was supposed to do
// that but it had a mistake and now there are people depending on both behaviours. This will
// all go away on the next major version bump.
func NewReferenceHashPartitioner(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a()
p.referenceAbs = true
return p
}
func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
if message.Key == nil {
return p.random.Partition(message, numPartitions)
}
bytes, err := message.Key.Encode()
if err != nil {
return -1, err
}
p.hasher.Reset()
_, err = p.hasher.Write(bytes)
if err != nil {
return -1, err
}
var partition int32
// Turns out we were doing our absolute value in a subtly different way from the upstream
// implementation, but now we need to maintain backwards compat for people who started using
// the old version; if referenceAbs is set we are compatible with the reference java client
// but not past Sarama versions
if p.referenceAbs {
partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions
} else {
partition = int32(p.hasher.Sum32()) % numPartitions
if partition < 0 {
partition = -partition
}
}
return partition, nil
}
func (p *hashPartitioner) RequiresConsistency() bool {
return true
}
func (p *hashPartitioner) MessageRequiresConsistency(message *ProducerMessage) bool {
return message.Key != nil
}