-
Notifications
You must be signed in to change notification settings - Fork 21
/
topic.go
44 lines (36 loc) · 969 Bytes
/
topic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package nats
import (
"github.com/nats-io/nats.go"
)
type topicInterpreter struct {
js nats.JetStreamManager
subjectCalculator SubjectCalculator
queueGroupPrefix string
}
func newTopicInterpreter(js nats.JetStreamManager, formatter SubjectCalculator, queueGroupPrefix string) *topicInterpreter {
if formatter == nil {
// this should always be setup to the default
panic("no subject calculator")
}
return &topicInterpreter{
js: js,
subjectCalculator: formatter,
queueGroupPrefix: queueGroupPrefix,
}
}
func (b *topicInterpreter) ensureStream(topic string) error {
_, err := b.js.StreamInfo(topic)
if err != nil {
// TODO: provision durable as well
// or simply provide override capability
_, err = b.js.AddStream(&nats.StreamConfig{
Name: topic,
Description: "",
Subjects: b.subjectCalculator(b.queueGroupPrefix, topic).All(),
})
if err != nil {
return err
}
}
return err
}