-
Notifications
You must be signed in to change notification settings - Fork 0
/
asyncProducer.go
188 lines (168 loc) · 5.23 KB
/
asyncProducer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package kafkax
import (
"strconv"
"time"
"go.uber.org/zap"
"github.com/chenxinqun/ginWarpPkg/convert"
"github.com/chenxinqun/ginWarpPkg/loggerx"
"github.com/Shopify/sarama"
)
type AsyncProducer struct {
Client sarama.AsyncProducer
successes chan *ProducerMessage
errors chan *ProducerError
}
func newAsyncProducer(cfg Info, optionHandlers ...OptionHandler) (sarama.AsyncProducer, error) { //nolint:nolintlint,cyclop
// For the access log, we are looking for AP semantics, with high throughput.
// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
opt := new(option)
for _, handler := range optionHandlers {
handler(opt)
}
config := sarama.NewConfig()
tlsConfig, err := createTLSConfiguration(cfg)
if err != nil {
return nil, err
}
if tlsConfig != nil {
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}
var (
requiredAcks sarama.RequiredAcks
compression sarama.CompressionCodec
)
if cfg.Producer.RequiredAcks != "" {
r, err := strconv.Atoi(cfg.Producer.RequiredAcks) //nolint:govet
requiredAcks = sarama.RequiredAcks(r)
if err != nil {
return nil, err
}
} else {
requiredAcks = sarama.WaitForLocal
}
if cfg.Producer.Compression != "" {
r, err := strconv.Atoi(cfg.Producer.Compression) //nolint:govet
compression = sarama.CompressionCodec(r)
if err != nil {
return nil, err
}
} else {
compression = sarama.CompressionSnappy
}
if cfg.Producer.FlushFrequency <= 0 {
cfg.Producer.FlushFrequency = DefaultFlushFrequency
}
config.Producer.RequiredAcks = requiredAcks
config.Producer.Compression = compression
config.Producer.Flush.Frequency = time.Duration(cfg.Producer.FlushFrequency) * time.Millisecond
config.Producer.Return.Errors = true
producer, err := sarama.NewAsyncProducer(cfg.BrokerList, config)
if err != nil {
return nil, err
}
// We will just log to STDOUT if we're not able to produce messages.
// Note: messages will only be returned here after all retry attempts are exhausted.
go func() {
for err := range producer.Errors() {
loggerx.Default().Error("kafkax async producer 客户端异常", zap.Error(err))
}
}()
return producer, nil
}
func (p *AsyncProducer) AsyncClose() {
p.Client.AsyncClose()
}
func (p *AsyncProducer) GetClient() *AsyncProducer {
return p
}
// Close shuts down the producer and waits for any buffered messages to be
// flushed. You must call this function before a producer object passes out of
// scope, as it may otherwise leak memory. You must call this before process
// shutting down, or you may lose messages. You must call this before calling
// Close on the underlying client.
func (p *AsyncProducer) Close() error {
return p.Client.Close()
}
func (p *AsyncProducer) SendMessage(topic Topic, key string, value interface{}) (msg *ProducerMessage, err error) {
var byteVal []byte
byteVal, err = convert.StructToJSON(value)
if err != nil {
return nil, err
}
msg, err = p.SendMessageByte(topic, key, byteVal)
return msg, nil
}
func (p *AsyncProducer) SendMessageByte(topic Topic, key string, value []byte) (msg *ProducerMessage, err error) {
m := &sarama.ProducerMessage{
Topic: topic.String(),
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(value),
}
// TODO ctx, close的情况.
p.Client.Input() <- m
msg = &ProducerMessage{*m}
return msg, nil
}
func (p *AsyncProducer) SendMessages(topic Topic,
key string, values ...interface{}) (msgList []*ProducerMessage, err error) {
mList := make([][]byte, 0)
for _, value := range values {
var byteVal []byte
byteVal, err = convert.StructToJSON(value)
if err != nil {
return nil, err
}
mList = append(mList, byteVal)
}
msgList, err = p.SendMessagesByte(topic, key, mList...)
return msgList, nil
}
func (p *AsyncProducer) SendMessagesByte(topic Topic, key string, values ...[]byte) (msgList []*ProducerMessage, err error) {
for _, value := range values {
if msgList == nil {
msgList = make([]*ProducerMessage, 0)
}
m := &sarama.ProducerMessage{
Topic: topic.String(),
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(value),
}
p.Client.Input() <- m
msg := &ProducerMessage{*m}
msgList = append(msgList, msg)
}
return msgList, nil
}
// Successes is the success output channel back to the user when Return.Successes is
// enabled. If Return.Successes is true, you MUST read from this channel or the
// Producer will deadlock. It is suggested that you send and read messages
// together in a single select statement.
func (p *AsyncProducer) Successes() <-chan *ProducerMessage {
if p.successes == nil {
p.successes = make(chan *ProducerMessage)
}
go func() {
for succ := range p.Client.Successes() {
s := &ProducerMessage{*succ}
p.successes <- s
}
}()
return p.successes
}
// Errors is the error output channel back to the user. You MUST read from this
// channel or the Producer will deadlock when the channel is full. Alternatively,
// you can set Producer.Return.Errors in your config to false, which prevents
// errors to be returned.
func (p *AsyncProducer) Errors() <-chan *ProducerError {
if p.errors == nil {
p.errors = make(chan *ProducerError)
}
go func() {
for succ := range p.Client.Errors() {
s := &ProducerError{*succ}
p.errors <- s
}
}()
return p.errors
}