forked from shenghui0779/yiigo
/
nsq.go
202 lines (160 loc) Β· 4.17 KB
/
nsq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
package yiigo
import (
"time"
"github.com/nsqio/go-nsq"
"go.uber.org/zap"
)
var producer *nsq.Producer
// NSQLogger NSQ logger
type NSQLogger struct{}
// Output implements the NSQ logger interface
func (l *NSQLogger) Output(calldepth int, s string) error {
logger.Error(s, zap.Int("call_depth", calldepth))
return nil
}
func initProducer(nsqd string) error {
p, err := nsq.NewProducer(nsqd, nsq.NewConfig())
if err != nil {
logger.Error("init producer error", zap.Error(err))
return err
}
p.SetLogger(&NSQLogger{}, nsq.LogLevelError)
producer = p
return nil
}
// NSQMessage NSQ message
type NSQMessage interface {
Bytes() ([]byte, error)
// Do message processing
Do() error
}
// NSQPublish synchronously publishes a message body to the specified topic.
func NSQPublish(topic string, msg NSQMessage) error {
b, err := msg.Bytes()
if err != nil {
return err
}
return producer.Publish(topic, b)
}
// NSQDeferredPublish synchronously publishes a message body to the specified topic
// where the message will queue at the channel level until the timeout expires.
func NSQDeferredPublish(topic string, msg NSQMessage, duration time.Duration) error {
b, err := msg.Bytes()
if err != nil {
return err
}
return producer.DeferredPublish(topic, duration, b)
}
// NSQConsumer NSQ consumer
type NSQConsumer interface {
nsq.Handler
Topic() string
Channel() string
AttemptCount() uint16
}
type nsqSetting struct {
lookupdPollInterval time.Duration
rdyRedistributeInterval time.Duration
maxInFlight int
consumers []NSQConsumer
}
// NSQOption configures how we set up the nsq config.
type NSQOption func(s *nsqSetting)
// WithLookupdPollInterval specifies the `LookupdPollInterval` for nsq config.
func WithLookupdPollInterval(t time.Duration) NSQOption {
return func(s *nsqSetting) {
s.lookupdPollInterval = t
}
}
// WithRDYRedistributeInterval specifies the `RDYRedistributeInterval` for nsq config.
func WithRDYRedistributeInterval(t time.Duration) NSQOption {
return func(s *nsqSetting) {
s.rdyRedistributeInterval = t
}
}
// WithMaxInFlight specifies the `MaxInFlight` for nsq config.
func WithMaxInFlight(n int) NSQOption {
return func(s *nsqSetting) {
s.maxInFlight = n
}
}
// WithNSQConsumer specifies the consumer for nsq.
func WithNSQConsumer(consumer NSQConsumer) NSQOption {
return func(s *nsqSetting) {
s.consumers = append(s.consumers, consumer)
}
}
func setConsumers(lookupd []string, options ...NSQOption) error {
setting := &nsqSetting{
lookupdPollInterval: time.Second,
rdyRedistributeInterval: time.Second,
maxInFlight: 1000,
}
for _, f := range options {
f(setting)
}
for _, c := range setting.consumers {
cfg := nsq.NewConfig()
cfg.LookupdPollInterval = setting.lookupdPollInterval
cfg.RDYRedistributeInterval = setting.rdyRedistributeInterval
cfg.MaxInFlight = setting.maxInFlight
// set attempt acount, default: 5
if c.AttemptCount() > 0 {
if err := cfg.Set("max_attempts", c.AttemptCount()); err != nil {
return err
}
}
nc, err := nsq.NewConsumer(c.Topic(), c.Channel(), cfg)
if err != nil {
return err
}
nc.SetLogger(&NSQLogger{}, nsq.LogLevelError)
nc.AddHandler(c)
if err := nc.ConnectToNSQLookupds(lookupd); err != nil {
return err
}
}
return nil
}
func initNSQ(nsqd string, lookupd []string, options ...NSQOption) {
// init producer
if err := initProducer(nsqd); err != nil {
logger.Panic("[yiigo] init nsq error", zap.Error(err))
}
// set consumers
if err := setConsumers(lookupd, options...); err != nil {
logger.Panic("[yiigo] init nsq error", zap.Error(err))
}
logger.Info("[yiigo] nsq is OK")
}
// NextAttemptDuration helper for attempt duration.
func NextAttemptDuration(attempts uint16) time.Duration {
var d time.Duration
switch attempts {
case 0:
d = 5 * time.Second
case 1:
d = 10 * time.Second
case 2:
d = 15 * time.Second
case 3:
d = 30 * time.Second
case 4:
d = 1 * time.Minute
case 5:
d = 2 * time.Minute
case 6:
d = 5 * time.Minute
case 7:
d = 10 * time.Minute
case 8:
d = 15 * time.Minute
case 9:
d = 30 * time.Minute
case 10:
d = 1 * time.Hour
default:
d = 1 * time.Hour
}
return d
}