-
Notifications
You must be signed in to change notification settings - Fork 3
/
producer.go
62 lines (49 loc) · 1.38 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package nsq
import (
"context"
lzap "github.com/alexfalkowski/go-service/transport/nsq/logger/zap"
"github.com/alexfalkowski/go-service/transport/nsq/message"
"github.com/alexfalkowski/go-service/transport/nsq/meta"
"github.com/alexfalkowski/go-service/transport/nsq/producer"
"github.com/alexfalkowski/go-service/transport/nsq/trace/opentracing"
"github.com/nsqio/go-nsq"
"go.uber.org/fx"
"go.uber.org/zap"
)
// ProducerParams for NSQ.
type ProducerParams struct {
Config *Config
Logger *zap.Logger
}
// NewProducer for NSQ.
// nolint:ireturn
func NewProducer(lc fx.Lifecycle, params *ProducerParams) (producer.Producer, error) {
cfg := nsq.NewConfig()
p, err := nsq.NewProducer(params.Config.Host, cfg)
if err != nil {
return nil, err
}
p.SetLogger(lzap.NewLogger(params.Logger), nsq.LogLevelInfo)
lc.Append(fx.Hook{
OnStop: func(context.Context) error {
p.Stop()
return nil
},
})
var pr producer.Producer = &nsqProducer{Producer: p}
pr = lzap.NewProducer(params.Logger, pr)
pr = opentracing.NewProducer(pr)
pr = meta.NewProducer(params.Config.UserAgent, pr)
return pr, nil
}
type nsqProducer struct {
*nsq.Producer
}
// Publish a message to a topic.
func (p *nsqProducer) Publish(ctx context.Context, topic string, msg *message.Message) error {
bytes, err := message.Marshal(msg)
if err != nil {
return err
}
return p.Producer.Publish(topic, bytes)
}