Skip to content

Commit

Permalink
support multiple brokers
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Jun 9, 2024
1 parent f41c0bf commit e1d2a03
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 7 deletions.
9 changes: 9 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@ type Config struct {
// This function is called each time new channel appears on the Node.
// See the doc comment for ChannelMediumOptions for more details about channel medium concept.
GetChannelMediumOptions func(channel string) ChannelMediumOptions

// GetBroker when set allows returning a custom Broker to use for a specific channel. If not set
// then the default Node's Broker is always used for all channels. Also, Node's default Broker is
// always used for control channels. It's the responsibility of an application to call Broker.Run
// method of all brokers except the default one (called automatically inside Node.Run). Also, a
// proper Broker shutdown is the responsibility of application because Node does not know about
// custom Broker instances. When GetBroker returns false as the second argument then Node will
// use the default Broker for the channel.
GetBroker func(channel string) (Broker, bool)
}

const (
Expand Down
24 changes: 17 additions & 7 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func (n *Node) publish(ch string, data []byte, opts ...PublishOption) (PublishRe
opt(pubOpts)
}
n.metrics.incMessagesSent("publication")
streamPos, fromCache, err := n.broker.Publish(ch, data, *pubOpts)
streamPos, fromCache, err := n.getBroker(ch).Publish(ch, data, *pubOpts)
if err != nil {
return PublishResult{}, err
}
Expand Down Expand Up @@ -767,14 +767,14 @@ func (n *Node) Publish(channel string, data []byte, opts ...PublishOption) (Publ
// or leave message when someone unsubscribes from channel.
func (n *Node) publishJoin(ch string, info *ClientInfo) error {
n.metrics.incMessagesSent("join")
return n.broker.PublishJoin(ch, info)
return n.getBroker(ch).PublishJoin(ch, info)
}

// publishLeave allows publishing join message into channel when someone subscribes on it
// or leave message when someone unsubscribes from channel.
func (n *Node) publishLeave(ch string, info *ClientInfo) error {
n.metrics.incMessagesSent("leave")
return n.broker.PublishLeave(ch, info)
return n.getBroker(ch).PublishLeave(ch, info)
}

var errNotificationHandlerNotRegistered = errors.New("notification handler not registered")
Expand Down Expand Up @@ -996,7 +996,7 @@ func (n *Node) addSubscription(ch string, sub subInfo) error {
}
}

err := n.broker.Subscribe(ch)
err := n.getBroker(ch).Subscribe(ch)
if err != nil {
_, _ = n.hub.removeSub(ch, sub.client)
if n.config.GetChannelMediumOptions != nil {
Expand Down Expand Up @@ -1035,7 +1035,7 @@ func (n *Node) removeSubscription(ch string, c *Client) error {
defer mu.Unlock()
empty := n.hub.NumSubscribers(ch) == 0
if empty {
err := n.broker.Unsubscribe(ch)
err := n.getBroker(ch).Unsubscribe(ch)
if err != nil {
// Cool down a bit since broker is not ready to process unsubscription.
time.Sleep(500 * time.Millisecond)
Expand Down Expand Up @@ -1294,11 +1294,21 @@ type HistoryResult struct {
Publications []*Publication
}

func (n *Node) getBroker(ch string) Broker {
if n.config.GetBroker != nil {
if broker, ok := n.config.GetBroker(ch); ok {
return broker
}
}
return n.broker
}

func (n *Node) history(ch string, opts *HistoryOptions) (HistoryResult, error) {
if opts.Filter.Reverse && opts.Filter.Since != nil && opts.Filter.Since.Offset == 0 {
return HistoryResult{}, ErrorBadRequest
}
pubs, streamTop, err := n.broker.History(ch, *opts)

pubs, streamTop, err := n.getBroker(ch).History(ch, *opts)
if err != nil {
return HistoryResult{}, err
}
Expand Down Expand Up @@ -1420,7 +1430,7 @@ func (n *Node) checkPosition(ch string, clientPosition StreamPosition, historyMe
// RemoveHistory removes channel history.
func (n *Node) RemoveHistory(ch string) error {
n.metrics.incActionCount("history_remove")
return n.broker.RemoveHistory(ch)
return n.getBroker(ch).RemoveHistory(ch)
}

type nodeRegistry struct {
Expand Down
24 changes: 24 additions & 0 deletions node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1385,3 +1385,27 @@ func TestNodeCheckPosition(t *testing.T) {
require.NoError(t, err)
require.False(t, isValid)
}

func TestGetBroker(t *testing.T) {
node := defaultTestNode()
customBroker := NewTestBroker()
node.config.GetBroker = func(channel string) (Broker, bool) {
if channel == "test" {
return nil, false
}
return customBroker, true
}
defer func() { _ = node.Shutdown(context.Background()) }()

broker := NewTestBroker()
node.SetBroker(broker)

_, err := node.Publish("test", []byte("{}"))
require.NoError(t, err)
require.Equal(t, int32(1), broker.publishCount)

_, err = node.Publish("test2", []byte("{}"))
require.NoError(t, err)
require.Equal(t, int32(1), broker.publishCount)
require.Equal(t, int32(1), customBroker.publishCount)
}

0 comments on commit e1d2a03

Please sign in to comment.