Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNS-340 - add ability to publish to specific subject hierarchies #112

Merged
merged 2 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bahamut.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func New(options ...Option) Server {
zap.L().Warn("Push server is enabled but neither dispatching or publishing is. Use bahamut.OptPushPublishHandler() and/or bahamut.OptPushDispatchHandler()")
}

if !c.pushServer.enabled && c.pushServer.subjectHierarchiesEnabled {
zap.L().Warn("Push server subject hierarchies have been enabled, but no push server has been configured. Use bahamut.OptPushServer to configure a Push server.")
}

if (c.restServer.enabled || c.pushServer.enabled) && len(c.model.modelManagers) == 0 {
zap.L().Warn("No elemental.ModelManager is defined. Use bahamut.OptModel()")
}
Expand Down
17 changes: 9 additions & 8 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,15 @@ type config struct {
}

pushServer struct {
service PubSubClient
topic string
endpoint string
dispatchHandler PushDispatchHandler
publishHandler PushPublishHandler
enabled bool
publishEnabled bool
dispatchEnabled bool
service PubSubClient
topic string
endpoint string
dispatchHandler PushDispatchHandler
publishHandler PushPublishHandler
enabled bool
subjectHierarchiesEnabled bool
publishEnabled bool
dispatchEnabled bool
}

healthServer struct {
Expand Down
17 changes: 17 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,23 @@ func OptPushServer(service PubSubClient, topic string) Option {
}
}

// OptPushServerEnableSubjectHierarchies will cause the push server to push to specific subject hierarchies under the configured
// pub/sub topic you have chosen for your push server. This option has no effect if OptPushServer is not set.
//
// For example:
//
// If the push server topic has been set to "global-events" and the server is about to push a "create" event w/ an identity
// value of "apples", enabling this option, would cause the push server to target a new publication to the subject
// "global-events.apples.create", INSTEAD OF "global-events". Consequently, as a result of this, any upstream push
// servers that are interested in receiving all events you publish to this topic would need to utilize subject wildcards.
//
// See: https://docs.nats.io/nats-concepts/subjects#wildcards for more details.
func OptPushServerEnableSubjectHierarchies() Option {
return func(c *config) {
c.pushServer.subjectHierarchiesEnabled = true
}
}

// OptPushEndpoint sets the endpoint to use for websocket channel.
//
// If unset, it fallsback to the default which is /events. This option
Expand Down
5 changes: 5 additions & 0 deletions options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ func TestBahamut_Options(t *testing.T) {
So(c.pushServer.publishHandler, ShouldEqual, h)
})

Convey("Calling OptPushServerEnableSubjectHierarchies should work", t, func() {
OptPushServerEnableSubjectHierarchies()(&c)
So(c.pushServer.subjectHierarchiesEnabled, ShouldEqual, true)
})

Convey("Calling OptHealthServer should work", t, func() {
h := func() error { return nil }
OptHealthServer("1.2.3.4:123", h)(&c)
Expand Down
35 changes: 33 additions & 2 deletions websocket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,27 @@ func (n *pushServer) pushEvents(events ...*elemental.Event) {
}
}

publication := NewPublication(n.cfg.pushServer.topic)
// if subject hierarchies are enabled, for the benefit of subscribers interested in specific identities, we utilize
// a subject hierarchy here to publish to a specific subject under the configured topic of the push server.
//
// for example:
//
// if the push server topic has been set to "global-events" and the server is about to push a "create" event w/ an identity
// value of "apples", enabling this option, would cause the push server to target a new publication to the subject
// "global-events.apples.create", INSTEAD OF "global-events".
//
// consequently, clients interested in receiving events pertaining to the "apples" resource can then subscribe
// on that specific topic, as opposed to ignoring events they don't care about. For clients interested in receiving
// ALL events published to "global-events", they can utilize NATS wildcards and subscribe to "global-events.>"
// ('>' targets all hierarchies) or "global-events.*" ('*' matching a single token).
//
// more details: https://docs.nats.io/nats-concepts/subjects#subject-hierarchies
topic := n.cfg.pushServer.topic
if n.cfg.pushServer.subjectHierarchiesEnabled {
topic = fmt.Sprintf("%s.%s.%s", topic, event.Identity, event.Type)
}

publication := NewPublication(topic)
if err = publication.Encode(event); err != nil {
zap.L().Error("Unable to encode event", zap.Error(err))
break
Expand Down Expand Up @@ -258,7 +278,18 @@ func (n *pushServer) start(ctx context.Context) {

if n.cfg.pushServer.service != nil {
errors := make(chan error, 24000)
defer n.cfg.pushServer.service.Subscribe(n.publications, errors, n.cfg.pushServer.topic)()
subTopic := n.cfg.pushServer.topic
// backwards compatibility: if the push server is using subject hierarchies when publishing events, we must by default
// listen to all child subjects of the configured topic via a wildcard '>'.
//
// see: https://docs.nats.io/nats-concepts/subjects#wildcards for more details.
//
// TODO: in the future, support to subscribing to specific subjects and/or wildcards may be added.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀 @primalmotion 😉

if n.cfg.pushServer.subjectHierarchiesEnabled {
subTopic = fmt.Sprintf("%s.>", subTopic)
}

defer n.cfg.pushServer.service.Subscribe(n.publications, errors, subTopic)()
}

zap.L().Debug("Websocket server started",
Expand Down
36 changes: 36 additions & 0 deletions websocket_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,41 @@ func TestWebsocketServer_pushEvents(t *testing.T) {
So(len(srv.publications), ShouldEqual, 0)
})
})

Convey("When I call pushEvents on a server w/ NATS subject hierarchies enabled", func() {

srv := &mockPubSubServer{}
h := &mockSessionHandler{}
h.shouldPublishOK = true

cfg := config{}
cfg.pushServer.service = srv
cfg.pushServer.enabled = true
cfg.pushServer.publishEnabled = true
cfg.pushServer.dispatchEnabled = true
cfg.pushServer.subjectHierarchiesEnabled = true
cfg.pushServer.topic = "events"
cfg.pushServer.publishHandler = h

wss := newPushServer(cfg, mux, pf)
testEvent := elemental.NewEvent(elemental.EventCreate, testmodel.NewList())
wss.pushEvents(testEvent)

Convey("Then I should find one publication sent to the correct topic", func() {

eventOut := elemental.NewEvent(elemental.EventCreate, testmodel.NewList())
eventOut.Timestamp = testEvent.Timestamp
r, err := elemental.Encode(elemental.EncodingTypeMSGPACK, eventOut)
So(err, ShouldBeNil)
So(len(srv.publications), ShouldEqual, 1)
pub := srv.publications[0]
So(string(pub.Data), ShouldResemble, string(r))
So(pub.Topic, ShouldEqual, fmt.Sprintf("%s.%s.%s",
cfg.pushServer.topic,
testmodel.ListIdentity.Name,
elemental.EventCreate))
})
})
})
}

Expand All @@ -542,6 +577,7 @@ func TestWebsocketServer_start(t *testing.T) {
cfg.pushServer.enabled = true
cfg.pushServer.publishEnabled = true
cfg.pushServer.dispatchEnabled = true
cfg.pushServer.subjectHierarchiesEnabled = true
cfg.pushServer.dispatchHandler = pushHandler

wss := newPushServer(cfg, mux, pf)
Expand Down