/
kafka.go
120 lines (106 loc) · 2.75 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package bridge
import (
"encoding/json"
"errors"
"io/ioutil"
"regexp"
"github.com/Shopify/sarama"
"go.uber.org/zap"
)
type kafakConfig struct {
Addr []string `json:"addr"`
ConnectTopic string `json:"onConnect"`
SubscribeTopic string `json:"onSubscribe"`
PublishTopic string `json:"onPublish"`
UnsubscribeTopic string `json:"onUnsubscribe"`
DisconnectTopic string `json:"onDisconnect"`
RegexpMap map[string]string `json:"regexpMap"`
}
type kafka struct {
kafakConfig kafakConfig
kafkaClient sarama.AsyncProducer
}
//Init init kafak client
func InitKafka() *kafka {
log.Info("start connect kafka....")
content, err := ioutil.ReadFile("./plugins/mq/kafka/kafka.json")
if err != nil {
log.Fatal("Read config file error: ", zap.Error(err))
}
// log.Info(string(content))
var config kafakConfig
err = json.Unmarshal(content, &config)
if err != nil {
log.Fatal("Unmarshal config file error: ", zap.Error(err))
}
c := &kafka{kafakConfig: config}
c.connect()
return c
}
//connect
func (k *kafka) connect() {
conf := sarama.NewConfig()
conf.Version = sarama.V1_1_1_0
kafkaClient, err := sarama.NewAsyncProducer(k.kafakConfig.Addr, conf)
if err != nil {
log.Fatal("create kafka async producer failed: ", zap.Error(err))
}
go func() {
for err := range kafkaClient.Errors() {
log.Error("send msg to kafka failed: ", zap.Error(err))
}
}()
k.kafkaClient = kafkaClient
}
//Publish publish to kafka
func (k *kafka) Publish(e *Elements) error {
config := k.kafakConfig
key := e.ClientID
var topics []string
switch e.Action {
case Connect:
if config.ConnectTopic != "" {
topics = append(topics, config.ConnectTopic)
}
case Publish:
if config.PublishTopic != "" {
topics = append(topics, config.PublishTopic)
}
// foreach regexp map config
for reg, topic := range config.RegexpMap {
match, _ := regexp.MatchString(reg, e.Topic)
if match {
topics = append(topics, topic)
}
}
case Subscribe:
if config.SubscribeTopic != "" {
topics = append(topics, config.SubscribeTopic)
}
case Unsubscribe:
if config.UnsubscribeTopic != "" {
topics = append(topics, config.UnsubscribeTopic)
}
case Disconnect:
if config.DisconnectTopic != "" {
topics = append(topics, config.DisconnectTopic)
}
default:
return errors.New("error action: " + e.Action)
}
return k.publish(topics, key, e)
}
func (k *kafka) publish(topics []string, key string, msg *Elements) error {
payload, err := json.Marshal(msg)
if err != nil {
return err
}
for _, topic := range topics {
k.kafkaClient.Input() <- &sarama.ProducerMessage{
Topic: topic,
Key: sarama.ByteEncoder(key),
Value: sarama.ByteEncoder(payload),
}
}
return nil
}