forked from childe/healer
/
producer.go
123 lines (109 loc) · 3.04 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package healer
import (
"errors"
"math/rand"
"time"
"github.com/aviddiviner/go-murmur"
"github.com/golang/glog"
)
type Producer struct {
config *ProducerConfig
topic string
simpleProducers map[int32]*SimpleProducer
currentProducer *SimpleProducer
currentPartitionID int32
brokers *Brokers
topicMeta *TopicMetadata
}
func NewProducer(topic string, config *ProducerConfig) *Producer {
var err error
err = config.checkValid()
if err != nil {
glog.Errorf("producer config error: %s", err)
return nil
}
p := &Producer{
config: config,
topic: topic,
simpleProducers: make(map[int32]*SimpleProducer),
}
brokerConfig := getBrokerConfigFromProducerConfig(config)
p.brokers, err = NewBrokersWithConfig(config.BootstrapServers, brokerConfig)
if err != nil {
glog.Errorf("init brokers error: %s", err)
return nil
}
err = p.refreshTopicMeta()
if err != nil {
glog.Error(err)
return nil
}
rand.Seed(time.Now().Unix())
p.refreshCurrentProducer()
if p.currentProducer == nil {
return nil
}
go func() {
for range time.NewTicker(time.Duration(config.MetadataMaxAgeMS) * time.Millisecond).C {
err := p.refreshTopicMeta()
if err != nil {
glog.Error(err)
}
p.refreshCurrentProducer()
}
}()
return p
}
func (p *Producer) refreshTopicMeta() error {
for i := 0; i < p.config.FetchTopicMetaDataRetrys; i++ {
metadataResponse, err := p.brokers.RequestMetaData(p.config.ClientID, []string{p.topic})
if err != nil {
glog.Errorf("get topic metadata error: %s", err)
continue
}
if len(metadataResponse.TopicMetadatas) == 0 {
glog.Errorf("get topic metadata error: %s", zeroTopicMetadata)
continue
}
p.topicMeta = metadataResponse.TopicMetadatas[0]
return nil
}
return errors.New("failed to get topic meta after all tries")
}
func (p *Producer) refreshCurrentProducer() {
var validPartitionID []int32
for _, partition := range p.topicMeta.PartitionMetadatas {
if partition.PartitionErrorCode == 0 {
validPartitionID = append(validPartitionID, partition.PartitionID)
}
}
if len(validPartitionID) == 0 {
return
}
partitionID := validPartitionID[rand.Int31n(int32(len(validPartitionID)))]
glog.V(5).Infof("current partitionID is %d", partitionID)
sp := NewSimpleProducer(p.topic, partitionID, p.config)
if sp == nil {
glog.Error("could not referesh current simple producer")
}
p.currentProducer = sp
p.simpleProducers[partitionID] = p.currentProducer
}
func (p *Producer) AddMessage(key []byte, value []byte) error {
if len(key) == 0 {
return p.currentProducer.AddMessage(key, value)
}
partitionID := int32(murmur.MurmurHash2(key, 0) % uint32(len(p.topicMeta.PartitionMetadatas)))
if s, ok := p.simpleProducers[partitionID]; ok {
return s.AddMessage(key, value)
} else {
simpleProducer := NewSimpleProducer(p.topic, partitionID, p.config)
p.simpleProducers[partitionID] = simpleProducer
return simpleProducer.AddMessage(key, value)
}
}
func (p *Producer) Close() {
for _, sp := range p.simpleProducers {
sp.Close()
}
}