/
nsq.go
117 lines (102 loc) · 2.34 KB
/
nsq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/*
* @Author: lwnmengjing
* @Date: 2021/5/30 7:30 下午
* @Last Modified by: lwnmengjing
* @Last Modified time: 2021/5/30 7:30 下午
*/
package queue
import (
"github.com/Hzhenyong/go-admin-core/storage"
json "github.com/json-iterator/go"
"github.com/nsqio/go-nsq"
)
// NewNSQ nsq模式 只能监听一个channel
func NewNSQ(addresses []string, cfg *nsq.Config, channelPrefix string) (*NSQ, error) {
n := &NSQ{
addresses: addresses,
cfg: cfg,
channelPrefix: channelPrefix,
}
var err error
n.producer, err = n.newProducer()
return n, err
}
type NSQ struct {
addresses []string
cfg *nsq.Config
producer *nsq.Producer
consumer *nsq.Consumer
channelPrefix string
}
// String 字符串类型
func (NSQ) String() string {
return "nsq"
}
// switchAddress ⚠️生产环境至少配置三个节点
func (e *NSQ) switchAddress() {
if len(e.addresses) > 1 {
e.addresses[0], e.addresses[len(e.addresses)-1] =
e.addresses[1],
e.addresses[0]
}
}
func (e *NSQ) newProducer() (*nsq.Producer, error) {
if e.cfg == nil {
e.cfg = nsq.NewConfig()
}
return nsq.NewProducer(e.addresses[0], e.cfg)
}
func (e *NSQ) newConsumer(topic string, h nsq.Handler) (err error) {
if e.cfg == nil {
e.cfg = nsq.NewConfig()
}
if e.consumer == nil {
e.consumer, err = nsq.NewConsumer(topic, e.channelPrefix+topic, e.cfg)
if err != nil {
return err
}
}
e.consumer.AddHandler(h)
err = e.consumer.ConnectToNSQDs(e.addresses)
return err
}
// Append 消息入生产者
func (e *NSQ) Append(message storage.Messager) error {
rb, err := json.Marshal(message.GetValues())
if err != nil {
return err
}
return e.producer.Publish(message.GetStream(), rb)
}
// Register 监听消费者
func (e *NSQ) Register(name string, f storage.ConsumerFunc) {
h := &nsqConsumerHandler{f}
err := e.newConsumer(name, h)
if err != nil {
//目前不支持动态注册
panic(err)
}
}
func (e *NSQ) Run() {
}
func (e *NSQ) Shutdown() {
if e.producer != nil {
e.producer.Stop()
}
if e.consumer != nil {
e.consumer.Stop()
}
}
type nsqConsumerHandler struct {
f storage.ConsumerFunc
}
func (e nsqConsumerHandler) HandleMessage(message *nsq.Message) error {
m := new(Message)
data := make(map[string]interface{})
err := json.Unmarshal(message.Body, &data)
if err != nil {
return err
}
m.SetValues(data)
return e.f(m)
}