forked from lovoo/goka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
builders.go
80 lines (67 loc) · 3.07 KB
/
builders.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package kafka
import (
"hash"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
)
// ConsumerBuilder creates a Kafka consumer.
type ConsumerBuilder func(brokers []string, group, clientID string) (Consumer, error)
// DefaultConsumerBuilder creates a Kafka consumer using the Sarama library.
func DefaultConsumerBuilder(brokers []string, group, clientID string) (Consumer, error) {
config := NewConfig()
config.ClientID = clientID
return NewSaramaConsumer(brokers, group, config)
}
// ConsumerBuilderWithConfig creates a Kafka consumer using the Sarama library.
func ConsumerBuilderWithConfig(config *cluster.Config) ConsumerBuilder {
return func(brokers []string, group, clientID string) (Consumer, error) {
config.ClientID = clientID
return NewSaramaConsumer(brokers, group, config)
}
}
// ProducerBuilder create a Kafka producer.
type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)
// DefaultProducerBuilder creates a Kafka producer using the Sarama library.
func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config := NewConfig()
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher)
return NewProducer(brokers, &config.Config)
}
// ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library.
func ProducerBuilderWithConfig(config *cluster.Config) ProducerBuilder {
return func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher)
return NewProducer(brokers, &config.Config)
}
}
// TopicManagerBuilder creates a TopicManager to check partition counts and
// create tables.
type TopicManagerBuilder func(brokers []string) (TopicManager, error)
// DefaultTopicManagerBuilder creates TopicManager using the Sarama library.
// This topic manager cannot create topics.
func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error) {
return NewSaramaTopicManager(brokers, sarama.NewConfig())
}
// TopicManagerBuilderWithConfig creates TopicManager using the Sarama library.
// This topic manager cannot create topics.
func TopicManagerBuilderWithConfig(config *cluster.Config) TopicManagerBuilder {
return func(brokers []string) (TopicManager, error) {
return NewSaramaTopicManager(brokers, &config.Config)
}
}
// ZKTopicManagerBuilder creates a TopicManager that connects with ZooKeeper to
// check partition counts and create tables.
func ZKTopicManagerBuilder(servers []string) TopicManagerBuilder {
return func([]string) (TopicManager, error) {
return NewTopicManager(servers, NewTopicManagerConfig())
}
}
// ZKTopicManagerBuilderWithConfig creates a TopicManager that connects with ZooKeeper to
// check partition counts and create tables given a topic configuration.
func ZKTopicManagerBuilderWithConfig(servers []string, config *TopicManagerConfig) TopicManagerBuilder {
return func([]string) (TopicManager, error) {
return NewTopicManager(servers, config)
}
}