forked from zorkian/kafka
-
Notifications
You must be signed in to change notification settings - Fork 3
/
distributing_producer.go
265 lines (237 loc) · 9.63 KB
/
distributing_producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
package kafka
import (
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/discord/zorkian-kafka/proto"
"github.com/jpillora/backoff"
)
var ErrNoPartitionsAvailable = errors.New("all partitions suspended due to previous failures, refusing to produce")
// DistributingProducer is the interface similar to Producer, but never require
// to explicitly specify partition.
//
// Distribute writes messages to the given topic, automatically choosing
// partition, returning the post-commit offset and any error encountered. The
// offset of each message is also updated accordingly.
type DistributingProducer interface {
Distribute(topic string, messages ...*proto.Message) (partition int32, offset int64, err error)
}
// PartitionCountSource lets a DistributingProducer determine how many
// partitions exist for a particular topic. Broker fulfills this interface
// but a cache could be used instead.
type PartitionCountSource interface {
PartitionCount(topic string) (count int32, err error)
}
// ErrorAverseRRProducerOpts controls the behavior of errorAverseRRProducer.
// PartitionCountSource: required
// Producer: required
// ErrorAverseBackoff: optional. Controls how long we should wait after a
// produce fails to a particular partition.
// PartitionFetchTimeout: optional. Controls how long Distribute will wait
// to get a partition in the case where they are all unavailable due to
// error averse backoff.
type errorAverseRRProducerConf struct {
PartitionCountSource PartitionCountSource
Producer Producer
ErrorAverseBackoff *backoff.Backoff
PartitionFetchTimeout time.Duration
}
func NewErrorAverseRRProducerConf() *errorAverseRRProducerConf {
return &errorAverseRRProducerConf{
PartitionCountSource: nil,
Producer: nil,
ErrorAverseBackoff: &backoff.Backoff{
Min: time.Duration(10 * time.Second),
Max: time.Duration(5 * time.Minute),
Factor: 2, // Avoids race condition, see https://github.com/jpillora/backoff/pull/5
Jitter: true,
},
PartitionFetchTimeout: time.Duration(10 * time.Second),
}
}
// errorAverseRRProducer writes to a topic's partitions in order sequentially
// (stateful round robin) but when a produce fails, that partition is set
// aside temporarily using exponential backoff.
type errorAverseRRProducer struct {
partitionCountSource PartitionCountSource
producer Producer
partitionManager *partitionManager
}
func NewErrorAverseRRProducer(conf *errorAverseRRProducerConf) DistributingProducer {
return &errorAverseRRProducer{
partitionCountSource: conf.PartitionCountSource,
producer: conf.Producer,
partitionManager: &partitionManager{
availablePartitions: make(map[string]chan *partitionData),
lock: &sync.RWMutex{},
sharedRetry: conf.ErrorAverseBackoff,
getTimeout: conf.PartitionFetchTimeout,
}}
}
func (d *errorAverseRRProducer) Distribute(topic string, messages ...*proto.Message) (int32, int64, error) {
if count, err := d.partitionCountSource.PartitionCount(topic); err == nil {
d.partitionManager.SetPartitionCount(topic, count)
} else {
// This topic doesn't exist, so we pretend it has one partition for now.
d.partitionManager.SetPartitionCount(topic, 1)
}
partitionData, err := d.partitionManager.GetPartition(topic)
if err != nil {
log.Error(err.Error())
return 0, 0, ErrNoPartitionsAvailable
}
// We are now obligated to call Success or Failure on partitionData.
var offset int64
offset, err = d.producer.Produce(topic, partitionData.Partition, messages...)
if err != nil {
log.Errorf("Failed to produce [%s:%d]: %s", topic, partitionData.Partition, err)
partitionData.Failure()
return 0, 0, err
}
partitionData.Success()
return partitionData.Partition, offset, nil
}
// partitionData wraps a retry tracker and the partitionManager's chan for
// a particular partition. We have a pointer to the chan instead of the
// partitionManager because the partitionManager will throw away and rebuild
// its availablePartitions chan whenever the partition count changes. This means
// calls to stale partitionData objects will manipulate a defunct
// availablePartitions chan, which is harmless.
//
// successiveFailures is meant to atomically track the most recent count of
// calls to Failure without an intervening call to Success. The implementation
// is somewhat racy because it does not use a mutex, which means that in the
// worst case of a partition which is intermittently unavailable, we may
// suspend the partition for more or less time than we would have in a fully
// synchronized implementation. However, in the steady states of healthy
// or fully down, this implementation is more performant.
type partitionData struct {
Partition int32
reset chan struct{}
sharedRetry *backoff.Backoff
successiveFailures uint64
availablePartitions chan *partitionData
topic string // Just for debugging
}
func (d *partitionData) Success() {
// This logging message is quite racy, but is still useful because it indicates
// a successful produce definitely happened in some sort of proximity to
// a failed produce.
if successiveFailures := atomic.LoadUint64(&d.successiveFailures); successiveFailures > 0 {
log.Infof("Resetting partition successiveFailures for %d of %s, was %d",
d.Partition, d.topic, successiveFailures)
}
atomic.StoreUint64(&d.successiveFailures, 0)
select {
case d.reset <- struct{}{}:
default:
}
}
func (d *partitionData) Failure() {
atomic.AddUint64(&d.successiveFailures, 1)
}
// reEnqueue makes this partition available for another producer thread, either
// immediately or after a sleep corresponding to the number of successiveFailures
// seen.
//
// While the inner function is running/sleeping, there may be other produces
// to this partition in flight, but no new producer threads can get this
// partition out of GetPartition.
func (d *partitionData) reEnqueue() {
go func() {
if successiveFailures := atomic.LoadUint64(&d.successiveFailures); successiveFailures > 0 {
// The interface to ForAttempt is that the first failure should be #0.
t := d.sharedRetry.ForAttempt(float64(successiveFailures - 1))
log.Warningf("Suspending partition %d of %s for %s (%d)",
d.Partition, d.topic, t, successiveFailures)
select {
case <-time.After(t):
// This is a race and might see a reset from a call to Success a long time ago, but
// the successiveFailures count will keep counting upwards so the impact is minimal.
case <-d.reset:
}
log.Warningf("Re-enqueueing partition %d of %s after %s",
d.Partition, d.topic, t)
}
select {
case d.availablePartitions <- d:
default:
log.Errorf("Programmer error in reEnqueue(%s, %d)! This should never happen.",
d.topic, d.Partition)
}
}()
}
// partitionManager wraps the current availablePartitions which it
// rebuilds in response to changes in partition counts.
// The partitionManager also keeps a single Backoff object for shared use
// among all the partitionData objects that will exist.
type partitionManager struct {
availablePartitions map[string]chan *partitionData
lock *sync.RWMutex
sharedRetry *backoff.Backoff
getTimeout time.Duration
}
// GetPartitionCount returns the size of a topic's availablePartitions chan.
func (p *partitionManager) GetPartitionCount(topic string) (int32, error) {
p.lock.RLock()
defer p.lock.RUnlock()
if availablePartitions, ok := p.availablePartitions[topic]; !ok {
return 0, fmt.Errorf("No such topic %s", topic)
} else {
return int32(cap(availablePartitions)), nil
}
}
// SetPartitionCount resets the partitionManager's state for the given topic
// if the count has changed. Partitions currently in error-averse backoff
// will immediately be available for writing again.
func (p *partitionManager) SetPartitionCount(topic string, partitionCount int32) {
if count, err := p.GetPartitionCount(topic); err == nil && count == partitionCount {
return
}
p.lock.Lock()
defer p.lock.Unlock()
if availablePartitions, ok := p.availablePartitions[topic]; ok && int32(cap(availablePartitions)) == partitionCount {
log.Errorf("partitionManager(%s) hit slow path on SetPartitionCount but "+
"there is now no work to do. Count %d", topic, partitionCount)
return
} else {
log.Infof("partitionManager adjusting partition count for %s: %d -> %d",
topic, cap(availablePartitions), partitionCount)
availablePartitions = make(chan *partitionData, partitionCount)
// Randomize the order of partitions to decorrelate publish partitions when many producers are restarted at once
for _, i := range rand.Perm(int(partitionCount)) {
availablePartitions <- &partitionData{
Partition: int32(i),
reset: make(chan struct{}, 1),
sharedRetry: p.sharedRetry,
availablePartitions: availablePartitions,
topic: topic,
}
}
p.availablePartitions[topic] = availablePartitions
}
}
// GetPartition fetches the next available partitionData object for the
// given topic. The caller must call Success or Failure on this partitionData.
func (p *partitionManager) GetPartition(topic string) (*partitionData, error) {
p.lock.RLock()
defer p.lock.RUnlock()
availablePartitions, ok := p.availablePartitions[topic]
if !ok {
return nil, fmt.Errorf("No such topic %s", topic)
}
select {
case partitionData, ok := <-availablePartitions:
if !ok {
return nil, fmt.Errorf(fmt.Sprintf("Programmer error in GetPartition(%s)! "+
"This should never happen.", topic))
}
defer partitionData.reEnqueue()
return partitionData, nil
case <-time.After(p.getTimeout):
return nil, fmt.Errorf(fmt.Sprintf("Timeout waiting for partition for %s.", topic))
}
}