Skip to content

Commit

Permalink
added query tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Nov 16, 2017
1 parent f62c673 commit 204fa11
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 2 deletions.
7 changes: 5 additions & 2 deletions broker/cluster/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Swarm struct {
state *subscriptionState // The state to synchronise.
router *mesh.Router // The mesh router.
gossip mesh.Gossip // The gossip protocol.
members *sync.Map // The map of members in the peer set.
members sync.Map // The map of members in the peer set.

OnSubscribe func(message.Ssid, message.Subscriber) bool // Delegate to invoke when the subscription event is received.
OnUnsubscribe func(message.Ssid, message.Subscriber) bool // Delegate to invoke when the subscription event is received.
Expand All @@ -57,7 +57,6 @@ func NewSwarm(cfg *config.ClusterConfig, closing chan bool) *Swarm {
closing: closing,
config: cfg,
state: newSubscriptionState(),
members: new(sync.Map),
}

// Get the cluster binding address
Expand Down Expand Up @@ -215,6 +214,10 @@ func (s *Swarm) merge(buf []byte) (mesh.GossipData, error) {

// NumPeers returns the number of connected peers.
func (s *Swarm) NumPeers() int {
if s.router == nil {
return 0
}

for _, peer := range s.router.Peers.Descriptions() {
if peer.Self {
return peer.NumConnections
Expand Down
66 changes: 66 additions & 0 deletions broker/query_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,67 @@
package broker

import (
"errors"
"testing"
"time"

"github.com/emitter-io/emitter/broker/cluster"
"github.com/emitter-io/emitter/broker/message"
"github.com/stretchr/testify/assert"
)

func Test_newQueryManager(t *testing.T) {
s := new(Service)
q := newQueryManager(s)

assert.Equal(t, s, q.service)
assert.Equal(t, uint32(0), q.next)
assert.NotEqual(t, "", q.ID())
assert.Equal(t, message.SubscriberDirect, q.Type())
}

func TestQuerySend_noSSID(t *testing.T) {
q := newQueryManager(nil)

err := q.Send(&message.Message{})
assert.Error(t, errors.New("Invalid query received"), err)
}

func TestQuerySend_Response(t *testing.T) {
q := newQueryManager(nil)

err := q.Send(&message.Message{
Channel: []byte("response"),
Ssid: []uint32{0, 1, 2},
})

// There should be no awaiter, hence this should just pass
assert.NoError(t, err)
}

func TestQuerySend_Request(t *testing.T) {
q := newQueryManager(&Service{
cluster: &cluster.Swarm{},
})

err := q.Send(&message.Message{
Channel: []byte("request/12345/"),
Ssid: []uint32{0, 1, 2},
})

assert.Equal(t, "No query handler found for request/12345/", err.Error())
}

func TestQuery_Query(t *testing.T) {
q := newQueryManager(&Service{
subscriptions: message.NewTrie(),
cluster: &cluster.Swarm{},
})

awaiter, err := q.Query("test", nil)
assert.NoError(t, err)
assert.NotNil(t, awaiter)

result := awaiter.Gather(1 * time.Millisecond)
assert.Empty(t, result)
}

0 comments on commit 204fa11

Please sign in to comment.