-
Notifications
You must be signed in to change notification settings - Fork 6
/
worker_nsq.go
87 lines (77 loc) · 1.8 KB
/
worker_nsq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package pubsub
import (
"log"
"time"
"github.com/febytanzil/gobroker"
"github.com/nsqio/go-nsq"
)
type nsqWorker struct {
channel *nsq.Consumer
concurrent int
lookupd string
retry int
contentType string
}
func newNSQWorker(c *config, s *SubHandler) *nsqWorker {
cfg := nsq.NewConfig()
cfg.Set("max_attempts", s.MaxRequeue)
cfg.Set("msg_timeout", s.Timeout)
cfg.Set("max_in_flight", s.MaxInFlight)
con, err := nsq.NewConsumer(s.Topic, s.Name, cfg)
if nil != err {
log.Fatal("failed to initialize nsq consumer:", err)
}
return &nsqWorker{
channel: con,
concurrent: s.Concurrent,
lookupd: c.serverURL,
retry: c.retry,
contentType: c.contentType,
}
}
func (n *nsqWorker) Consume(name, topic string, maxRequeue int, handler gobroker.Handler) {
retries := 0
if 0 >= n.concurrent {
n.concurrent = 1
}
n.channel.AddConcurrentHandlers(n.nsqMiddleware(handler), n.concurrent)
for {
err := n.channel.ConnectToNSQLookupd(n.lookupd)
if nil != err {
log.Printf("worker failed to initialize retried [%d] %s \n", retries, err)
if 0 == n.retry || n.retry > retries {
retries++
continue
}
retries++
} else {
// reset retry counter for next possible disconnect
retries = 0
}
}
}
func (n *nsqWorker) Stop() error {
n.channel.Stop()
return nil
}
func (n *nsqWorker) nsqMiddleware(h gobroker.Handler) nsq.HandlerFunc {
return func(m *nsq.Message) error {
err := h(&gobroker.Message{
Body: m.Body,
Attempts: int(m.Attempts),
ContentType: n.contentType,
})
if nil != err {
switch err.(type) {
case *gobroker.DeferredError:
dErr := err.(*gobroker.DeferredError)
m.RequeueWithoutBackoff(dErr.GetDelay())
default:
m.RequeueWithoutBackoff(time.Second)
}
return err
}
m.Finish()
return nil
}
}