/
mq_kafka_producer_impl.go
94 lines (82 loc) · 2.09 KB
/
mq_kafka_producer_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package emq
import (
"strconv"
"sync"
"github.com/Shopify/sarama"
"golang.org/x/net/context"
)
type KafkaProducer struct {
asyncProducer sarama.AsyncProducer
client *KafkaClient
lock *sync.Mutex
callbackMap map[int64]func(msg *sarama.ProducerMessage, e error)
idx int64
returnCloseCh chan struct{}
}
func (p *KafkaProducer) InitProducerReturn(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-p.returnCloseCh:
return
case success := <-p.asyncProducer.Successes():
p.lock.Lock()
idx := success.Metadata.(int64)
p.callbackMap[idx](success, nil)
delete(p.callbackMap, idx)
p.lock.Unlock()
case err := <-p.asyncProducer.Errors():
p.lock.Lock()
idx := err.Msg.Metadata.(int64)
p.callbackMap[idx](err.Msg, err)
delete(p.callbackMap, idx)
p.lock.Unlock()
}
}
}
func (p *KafkaProducer) WriteMsg(ctx context.Context, key string, value []byte, properties map[string]string) (partition int32, msgId string, err error) {
var headersIdx = 0
headers := make([]sarama.RecordHeader, len(properties))
for k, v := range properties {
header := sarama.RecordHeader{
Key: []byte(k),
Value: []byte(v),
}
headers[headersIdx] = header
headersIdx++
}
wg := sync.WaitGroup{}
wg.Add(1)
p.lock.Lock()
msg := &sarama.ProducerMessage{
Topic: p.client.baseConfig.GetMqTopic(),
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(value),
Headers: headers,
Metadata: p.idx,
}
var callbackErr error
var tempMsgid int64
var tempPartition int32
p.callbackMap[p.idx] = func(msg *sarama.ProducerMessage, e error) {
callbackErr = e
tempMsgid = msg.Offset
tempPartition = msg.Partition
wg.Done()
}
p.idx++
p.lock.Unlock()
p.asyncProducer.Input() <- msg
wg.Wait()
return tempPartition, strconv.FormatInt(tempMsgid, 10), callbackErr
}
func WriteAsyncMsg(ctx context.Context, key string, value []byte, properties map[string]string) error {
return nil
}
func (p *KafkaProducer) Close() error {
p.lock.Lock()
defer p.lock.Unlock()
close(p.returnCloseCh)
return p.asyncProducer.Close()
}