-
Notifications
You must be signed in to change notification settings - Fork 21
/
publisher.go
166 lines (132 loc) · 4.67 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package nats
import (
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
)
// PublisherConfig is the configuration to create a publisher
type PublisherConfig struct {
// URL is the NATS URL.
URL string
// NatsOptions are custom options for a connection.
NatsOptions []nats.Option
// Marshaler is marshaler used to marshal messages between watermill and wire formats
Marshaler Marshaler
// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup)
SubjectCalculator SubjectCalculator
// JetStream holds JetStream specific settings
JetStream JetStreamConfig
}
// PublisherPublishConfig is the configuration subset needed for an individual publish call
type PublisherPublishConfig struct {
// Marshaler is marshaler used to marshal messages between watermill and wire formats
Marshaler Marshaler
// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup)
SubjectCalculator SubjectCalculator
// JetStream holds JetStream specific settings
JetStream JetStreamConfig
}
func (c *PublisherConfig) setDefaults() {
if c.Marshaler == nil {
c.Marshaler = &NATSMarshaler{}
}
if c.SubjectCalculator == nil {
c.SubjectCalculator = DefaultSubjectCalculator
}
}
// Validate ensures configuration is valid before use
func (c PublisherConfig) Validate() error {
if c.Marshaler == nil {
return errors.New("PublisherConfig.Marshaler is missing")
}
if c.SubjectCalculator == nil {
return errors.New("PublisherConfig.SubjectCalculator is missing")
}
return nil
}
// GetPublisherPublishConfig gets the configuration subset needed for individual publish calls once a connection has been established
func (c PublisherConfig) GetPublisherPublishConfig() PublisherPublishConfig {
return PublisherPublishConfig{
Marshaler: c.Marshaler,
SubjectCalculator: c.SubjectCalculator,
JetStream: c.JetStream,
}
}
// Publisher provides the nats implementation for watermill publish operations
type Publisher struct {
conn Connection
config PublisherPublishConfig
logger watermill.LoggerAdapter
topicInterpreter *topicInterpreter
}
// NewPublisher creates a new Publisher.
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error) {
config.setDefaults()
if err := config.Validate(); err != nil {
return nil, err
}
conn, err := nats.Connect(config.URL, config.NatsOptions...)
if err != nil {
return nil, errors.Wrap(err, "cannot connect to nats")
}
return NewPublisherWithNatsConn(conn, config.GetPublisherPublishConfig(), logger)
}
// NewPublisherWithNatsConn creates a new Publisher with the provided nats connection.
func NewPublisherWithNatsConn(conn *nats.Conn, config PublisherPublishConfig, logger watermill.LoggerAdapter) (*Publisher, error) {
if logger == nil {
logger = watermill.NopLogger{}
}
var connection Connection = conn
var interpreter *topicInterpreter
if !config.JetStream.Disabled {
js, err := conn.JetStream(config.JetStream.ConnectOptions...)
connection = &jsConnection{conn, js, config.JetStream}
if err != nil {
return nil, err
}
interpreter = newTopicInterpreter(js, config.SubjectCalculator, "")
}
return &Publisher{
conn: connection,
config: config,
logger: logger,
topicInterpreter: interpreter,
}, nil
}
// Publish publishes message to NATS.
//
// Publish will not return until an ack has been received from JetStream.
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
// TODO: should we auto provision on publish? Need durable on publish options...
// should also cache this result to minimize chatter to broker
if p.config.JetStream.ShouldAutoProvision() {
err := p.topicInterpreter.ensureStream(topic)
if err != nil {
return err
}
}
for _, msg := range messages {
messageFields := watermill.LogFields{
"message_uuid": msg.UUID,
"topic_name": topic,
}
p.logger.Trace("Publishing message", messageFields)
natsMsg, err := p.config.Marshaler.Marshal(topic, msg)
if err != nil {
return err
}
if err := p.conn.PublishMsg(natsMsg); err != nil {
return errors.Wrap(err, "sending message failed")
}
}
return nil
}
// Close closes the publisher and the underlying connection
func (p *Publisher) Close() error {
p.logger.Trace("Closing publisher", nil)
defer p.logger.Trace("Publisher closed", nil)
p.conn.Close()
return nil
}