forked from dropbox/marshal
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cluster.go
310 lines (264 loc) · 8.88 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
/*
* portal - marshal
*
* a library that implements an algorithm for doing consumer coordination within Kafka, rather
* than using Zookeeper or another external system.
*
*/
package marshal
import (
"crypto/md5"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/dropbox/kafka"
)
// KafkaCluster is a user-agnostic view of the world. It connects to a Kafka cluster
// and runs rationalizers to observe the complete world state.
type KafkaCluster struct {
// These members are not protected by the lock and can be read at any
// time as they're write-once or only ever atomically updated. They must
// never be overwritten once a KafkaCluster is created.
quit *int32
name string
broker *kafka.Broker
producer kafka.Producer
partitions int
jitters chan time.Duration
// Lock protects the following members; you must have this lock in order to
// read from or write to these.
lock sync.RWMutex
marshalers []*Marshaler
topics map[string]int
groups map[string]map[string]*topicState
// This WaitGroup is used for signalling when all of the rationalizers have
// finished processing.
rationalizers sync.WaitGroup
// rsteps is updated whenever a rationalizer processes a log entry, this is
// used mainly by the test suite.
rsteps *int32
// This is for testing only. When this is non-zero, the rationalizer will answer
// queries based on THIS time instead of the current, actual time.
ts int64
}
// Dial returns a new cluster object which can be used to instantiate a number of Marshalers
// that all use the same cluster.
func Dial(name string, brokers []string) (*KafkaCluster, error) {
brokerConf := kafka.NewBrokerConf("PortalMarshal")
broker, err := kafka.Dial(brokers, brokerConf)
if err != nil {
return nil, err
}
c := &KafkaCluster{
quit: new(int32),
rsteps: new(int32),
name: name,
broker: broker,
producer: broker.Producer(kafka.NewProducerConf()),
topics: make(map[string]int),
groups: make(map[string]map[string]*topicState),
jitters: make(chan time.Duration, 100),
}
// Do an initial metadata fetch, this will block a bit
err = c.refreshMetadata()
if err != nil {
return nil, fmt.Errorf("Failed to get metadata: %s", err)
}
// If there is no marshal topic, then we can't run. The admins must go create the topic
// before they can use this library. Please see the README.
c.partitions = c.getTopicPartitions(MarshalTopic)
if c.partitions == 0 {
return nil, errors.New("Marshalling topic not found. Please see the documentation.")
}
// Now we start a goroutine to start consuming each of the partitions in the marshal
// topic. Note that this doesn't handle increasing the partition count on that topic
// without stopping all consumers.
c.rationalizers.Add(c.partitions)
for id := 0; id < c.partitions; id++ {
go c.rationalize(id, c.kafkaConsumerChannel(id))
}
// A jitter calculator, just fills a channel with random numbers so that other
// people don't have to build their own random generator...
go func() {
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
for {
jitter := rnd.Intn(HeartbeatInterval/2) + (HeartbeatInterval / 2)
c.jitters <- time.Duration(jitter) * time.Second
}
}()
// Now start the metadata refreshing goroutine
go func() {
for !c.Terminated() {
time.Sleep(<-c.jitters)
log.Infof("[%s] Refreshing topic metadata.", c.name)
c.refreshMetadata()
// See if the number of partitions in the marshal topic changed. This is bad if
// it happens, since it means we can no longer coordinate correctly.
if c.getTopicPartitions(MarshalTopic) != c.partitions {
log.Errorf("[%s] Marshal topic partition count changed. Terminating!", c.name)
c.Terminate()
}
}
}()
// Wait for all rationalizers to come alive
log.Infof("[%s] Waiting for all rationalizers to come alive.", c.name)
c.rationalizers.Wait()
log.Infof("[%s] All rationalizers alive, KafkaCluster now alive.", c.name)
return c, nil
}
// NewMarshaler creates a Marshaler off of an existing cluster. This is more efficient
// if you're creating multiple instances, since they can share the same underlying cluster.
func (c *KafkaCluster) NewMarshaler(clientID, groupID string) (*Marshaler, error) {
if c.Terminated() {
return nil, errors.New("Cluster is terminated.")
}
// Get offset coordinator so we can look up (and save) committed offsets later.
coordinator, err := c.getOffsetCoordinator(groupID)
if err != nil {
return nil, err
}
m := &Marshaler{
quit: new(int32),
cluster: c,
instanceID: newInstanceID(),
clientID: clientID,
groupID: groupID,
offsets: coordinator,
}
c.lock.Lock()
defer c.lock.Unlock()
// Remove any dead marshalers from our slice and add the new one
filtered := make([]*Marshaler, 0)
for _, marshaler := range c.marshalers {
if !marshaler.Terminated() {
filtered = append(filtered, marshaler)
}
}
filtered = append(filtered, m)
c.marshalers = filtered
return m, nil
}
// refreshMetadata is periodically used to update our internal state with topic information
// about the world.
func (c *KafkaCluster) refreshMetadata() error {
md, err := c.broker.Metadata()
if err != nil {
return err
}
newTopics := make(map[string]int)
for _, topic := range md.Topics {
newTopics[topic.Name] = len(topic.Partitions)
}
c.lock.Lock()
defer c.lock.Unlock()
c.topics = newTopics
return nil
}
// getOffsetCoordinator returns a kafka.OffsetCoordinator for a specific group.
func (c *KafkaCluster) getOffsetCoordinator(groupID string) (kafka.OffsetCoordinator, error) {
return c.broker.OffsetCoordinator(
kafka.NewOffsetCoordinatorConf(groupID))
}
// getClaimPartition calculates which partition a topic should use for coordination. This uses
// a hashing function (non-cryptographic) to predictably partition the topic space.
func (c *KafkaCluster) getClaimPartition(topicName string) int {
// We use MD5 because it's a fast and good hashing algorithm and we don't need cryptographic
// properties. We then take the first 8 bytes and treat them as a uint64 and modulo that
// across how many partitions we have.
hash := md5.Sum([]byte(topicName))
uval := binary.LittleEndian.Uint64(hash[0:8])
return int(uval % uint64(c.partitions))
}
// getPartitionState returns a topicState and possibly creates it and the partition state within
// the State.
func (c *KafkaCluster) getPartitionState(groupID, topicName string, partID int) *topicState {
c.lock.Lock()
defer c.lock.Unlock()
group, ok := c.groups[groupID]
if !ok {
group = make(map[string]*topicState)
c.groups[groupID] = group
}
topic, ok := group[topicName]
if !ok {
topic = &topicState{
claimPartition: c.getClaimPartition(topicName),
partitions: make([]PartitionClaim, partID+1),
}
group[topicName] = topic
}
// Take the topic lock if we can
topic.lock.Lock()
defer topic.lock.Unlock()
// They might be referring to a partition we don't know about, maybe extend it
if len(topic.partitions) < partID+1 {
for i := len(topic.partitions); i <= partID; i++ {
topic.partitions = append(topic.partitions, PartitionClaim{})
}
}
return topic
}
// getTopics returns the list of known topics.
func (c *KafkaCluster) getTopics() []string {
c.lock.RLock()
defer c.lock.RUnlock()
topics := make([]string, 0, len(c.topics))
for topic := range c.topics {
topics = append(topics, topic)
}
return topics
}
// getTopicPartitions returns the count of how many partitions are in a given topic. Returns 0 if a
// topic is unknown.
func (c *KafkaCluster) getTopicPartitions(topicName string) int {
c.lock.RLock()
defer c.lock.RUnlock()
count, _ := c.topics[topicName]
return count
}
// removeMarshal removes a terminated Marshal from a cluster's list.
func (c *KafkaCluster) removeMarshal(m *Marshaler) {
c.lock.Lock()
defer c.lock.Unlock()
for i, ml := range c.marshalers {
if ml == m {
c.marshalers = append(c.marshalers[:i], c.marshalers[i+1:]...)
break
}
}
}
// waitForRsteps is used by the test suite to ask the rationalizer to wait until some number
// of events have been processed. This also returns the current rsteps when it returns.
func (c *KafkaCluster) waitForRsteps(steps int) int {
for {
cval := atomic.LoadInt32(c.rsteps)
if cval >= int32(steps) {
return int(cval)
}
time.Sleep(5 * time.Millisecond)
}
}
// Terminate is called when we're done with the marshaler and want to shut down.
func (c *KafkaCluster) Terminate() {
if !atomic.CompareAndSwapInt32(c.quit, 0, 1) {
return
}
c.lock.Lock()
defer c.lock.Unlock()
// Terminate all Marshalers which will in turn terminate all Consumers and
// let everybody know we're all done.
for _, marshaler := range c.marshalers {
marshaler.terminateAndCleanup(false)
}
c.marshalers = nil
// Close the broker!
c.broker.Close()
}
// Terminated returns whether or not we have been terminated.
func (c *KafkaCluster) Terminated() bool {
return atomic.LoadInt32(c.quit) == 1
}