-
Notifications
You must be signed in to change notification settings - Fork 6
/
publisher.go
181 lines (142 loc) · 5.13 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package jetstream
import (
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
)
// PublisherConfig is the configuration to create a publisher
type PublisherConfig struct {
// URL is the NATS URL.
URL string
// NatsOptions are custom options for a connection.
NatsOptions []nats.Option
// JetstreamOptions are custom Jetstream options for a connection.
JetstreamOptions []nats.JSOpt
// Marshaler is marshaler used to marshal messages between watermill and wire formats
Marshaler Marshaler
// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to "{topic}.*")
SubjectCalculator SubjectCalculator
// AutoProvision bypasses client validation and provisioning of streams
AutoProvision bool
// PublishOptions are custom publish option to be used on all publication
PublishOptions []nats.PubOpt
// TrackMsgId uses the Nats.MsgId option with the msg UUID to prevent duplication
TrackMsgId bool
}
// PublisherPublishConfig is the configuration subset needed for an individual publish call
type PublisherPublishConfig struct {
// Marshaler is marshaler used to marshal messages between watermill and wire formats
Marshaler Marshaler
// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to "{topic}.*")
SubjectCalculator SubjectCalculator
// AutoProvision bypasses client validation and provisioning of streams
AutoProvision bool
// JetstreamOptions are custom Jetstream options for a connection.
JetstreamOptions []nats.JSOpt
// PublishOptions are custom publish option to be used on all publication
PublishOptions []nats.PubOpt
// TrackMsgId uses the Nats.MsgId option with the msg UUID to prevent duplication
TrackMsgId bool
}
func (c *PublisherConfig) setDefaults() {
if c.SubjectCalculator == nil {
c.SubjectCalculator = defaultSubjectCalculator
}
}
// Validate ensures configuration is valid before use
func (c PublisherConfig) Validate() error {
if c.Marshaler == nil {
return errors.New("PublisherConfig.Marshaler is missing")
}
if c.SubjectCalculator == nil {
return errors.New("PublisherConfig.SubjectCalculator is missing")
}
return nil
}
// GetPublisherPublishConfig gets the configuration subset needed for individual publish calls once a connection has been established
func (c PublisherConfig) GetPublisherPublishConfig() PublisherPublishConfig {
return PublisherPublishConfig{
Marshaler: c.Marshaler,
SubjectCalculator: c.SubjectCalculator,
AutoProvision: c.AutoProvision,
JetstreamOptions: c.JetstreamOptions,
PublishOptions: c.PublishOptions,
TrackMsgId: c.TrackMsgId,
}
}
// Publisher provides the jetstream implementation for watermill publish operations
type Publisher struct {
conn *nats.Conn
config PublisherPublishConfig
logger watermill.LoggerAdapter
js nats.JetStream
topicInterpreter *topicInterpreter
}
// NewPublisher creates a new Publisher.
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error) {
config.setDefaults()
if err := config.Validate(); err != nil {
return nil, err
}
conn, err := nats.Connect(config.URL, config.NatsOptions...)
if err != nil {
return nil, errors.Wrap(err, "cannot connect to nats")
}
return NewPublisherWithNatsConn(conn, config.GetPublisherPublishConfig(), logger)
}
// NewPublisherWithNatsConn creates a new Publisher with the provided nats connection.
func NewPublisherWithNatsConn(conn *nats.Conn, config PublisherPublishConfig, logger watermill.LoggerAdapter) (*Publisher, error) {
if logger == nil {
logger = watermill.NopLogger{}
}
js, err := conn.JetStream(config.JetstreamOptions...)
if err != nil {
return nil, err
}
return &Publisher{
conn: conn,
config: config,
logger: logger,
js: js,
topicInterpreter: newTopicInterpreter(js, config.SubjectCalculator),
}, nil
}
// Publish publishes message to NATS.
//
// Publish will not return until an ack has been received from JetStream.
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
if p.config.AutoProvision {
err := p.topicInterpreter.ensureStream(topic)
if err != nil {
return err
}
}
for _, msg := range messages {
messageFields := watermill.LogFields{
"message_uuid": msg.UUID,
"topic_name": topic,
}
p.logger.Trace("Publishing message", messageFields)
natsMsg, err := p.config.Marshaler.Marshal(topic, msg)
if err != nil {
return err
}
publishOpts := p.config.PublishOptions
if p.config.TrackMsgId {
publishOpts = append(publishOpts, nats.MsgId(msg.UUID))
}
if _, err := p.js.PublishMsg(natsMsg, publishOpts...); err != nil {
return errors.Wrap(err, "sending message failed")
}
}
return nil
}
// Close closes the publisher and the underlying connection
func (p *Publisher) Close() error {
p.logger.Trace("Closing publisher", nil)
defer p.logger.Trace("Publisher closed", nil)
p.conn.Close()
return nil
}