Skip to content

Commit

Permalink
Fixed subscribing to ourselves
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Dec 1, 2018
1 parent d0d921f commit 85b1f69
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 50 deletions.
15 changes: 14 additions & 1 deletion internal/broker/cluster/events.go
Expand Up @@ -27,9 +27,9 @@ import (

// SubscriptionEvent represents a subscription event.
type SubscriptionEvent struct {
Ssid message.Ssid // The SSID for the subscription.
Peer mesh.PeerName // The name of the peer.
Conn security.ID // The connection identifier.
Ssid message.Ssid // The SSID for the subscription.
}

// Encode encodes the event to string representation.
Expand Down Expand Up @@ -135,6 +135,19 @@ func (st *subscriptionState) Remove(ev string) {
(*collection.LWWSet)(st).Remove(ev)
}

// RemoveAll removes all of the subscription events by prefix.
func (st *subscriptionState) RemoveAll(name mesh.PeerName) {
buffer := make([]byte, 10, 10)
offset := bin.PutUvarint(buffer, uint64(name))
prefix := buffer[:offset]

for ev, v := range st.All() {
if bytes.HasPrefix([]byte(ev), prefix) && v.IsAdded() {
st.Remove(ev)
}
}
}

// All ...
func (st *subscriptionState) All() collection.LWWState {
return (*collection.LWWSet)(st).All()
Expand Down
33 changes: 32 additions & 1 deletion internal/broker/cluster/events_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/emitter-io/emitter/internal/collection"
"github.com/emitter-io/emitter/internal/message"
"github.com/stretchr/testify/assert"
"github.com/weaveworks/mesh"
)

func TestEncodeSubscriptionState(t *testing.T) {
Expand Down Expand Up @@ -67,5 +68,35 @@ func restoreClock(clk func() int64) {
// SetClock sets the clock time for testing
func setClock(t int64) {
collection.Now = func() int64 { return t }
println("clock set to", collection.Now())
}

func TestEncodeSubscriptionState_RemoveFor(t *testing.T) {
defer restoreClock(collection.Now)

setClock(0)
state := newSubscriptionState()

for i := 1; i <= 10; i++ {
ev := SubscriptionEvent{Ssid: message.Ssid{1}, Peer: mesh.PeerName(i % 3), Conn: 777}
setClock(int64(i))
state.Add(ev.Encode())
}

// Must have 3 keys alive
setClock(int64(20))
assert.Equal(t, 3, countAdded(state))

// Must have 2 keys alive after removal
setClock(int64(21))
state.RemoveAll(mesh.PeerName(1))
assert.Equal(t, 2, countAdded(state))
}

func countAdded(state *subscriptionState) (added int) {
for _, v := range state.All() {
if v.IsAdded() {
added++
}
}
return
}
72 changes: 72 additions & 0 deletions internal/broker/cluster/memberlist.go
@@ -0,0 +1,72 @@
/**********************************************************************************
* Copyright (c) 2009-2017 Misakai Ltd.
* This program is free software: you can redistribute it and/or modify it under the
* terms of the GNU Affero General Public License as published by the Free Software
* Foundation, either version 3 of the License, or(at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License along
* with this program. If not, see<http://www.gnu.org/licenses/>.
************************************************************************************/

package cluster

import (
"sync"
"sync/atomic"
"time"

"github.com/weaveworks/mesh"
)

// memberlist represents a peer cache
type memberlist struct {
list sync.Map
ctor func(mesh.PeerName) *Peer
}

// newMemberlist creates a new memberlist
func newMemberlist(ctor func(mesh.PeerName) *Peer) *memberlist {
return &memberlist{
ctor: ctor,
}
}

// GetOrAdd gets or adds a peer, returns a peer and whether a new peer was added or not
func (m *memberlist) GetOrAdd(name mesh.PeerName) (*Peer, bool) {
if p, ok := m.list.Load(name); ok {
return p.(*Peer), false
}

// Create new peer and store it
peer := m.ctor(name)
v, loaded := m.list.LoadOrStore(name, peer)
return v.(*Peer), !loaded
}

// Touch updates the last activity time
func (m *memberlist) Touch(name mesh.PeerName) {
peer, _ := m.GetOrAdd(name)
atomic.StoreInt64(&peer.activity, time.Now().Unix())
}

// Contains checks if a peer is in the memberlist
func (m *memberlist) Contains(name mesh.PeerName) bool {
_, ok := m.list.Load(name)
return ok
}

// Remove removes the peer from the memberlist
func (m *memberlist) Remove(name mesh.PeerName) (*Peer, bool) {
if p, ok := m.list.Load(name); ok {
peer := p.(*Peer)
m.list.Delete(peer.name)
atomic.StoreInt64(&peer.activity, 0)
return peer, true
}

return nil, false
}
15 changes: 15 additions & 0 deletions internal/broker/cluster/memberlist_test.go
@@ -0,0 +1,15 @@
/**********************************************************************************
* Copyright (c) 2009-2017 Misakai Ltd.
* This program is free software: you can redistribute it and/or modify it under the
* terms of the GNU Affero General Public License as published by the Free Software
* Foundation, either version 3 of the License, or(at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License along
* with this program. If not, see<http://www.gnu.org/licenses/>.
************************************************************************************/

package cluster
5 changes: 0 additions & 5 deletions internal/broker/cluster/peer.go
Expand Up @@ -118,11 +118,6 @@ func (p *Peer) swap() (swapped message.Frame) {
return
}

// Touch updates the activity time of the peer.
func (p *Peer) touch() {
atomic.StoreInt64(&p.activity, time.Now().Unix())
}

// processSendQueue flushes the current frame to the remote server
func (p *Peer) processSendQueue() {
if len(p.frame) == 0 {
Expand Down
77 changes: 43 additions & 34 deletions internal/broker/cluster/swarm.go
Expand Up @@ -39,7 +39,7 @@ type Swarm struct {
state *subscriptionState // The state to synchronise.
router *mesh.Router // The mesh router.
gossip mesh.Gossip // The gossip protocol.
members sync.Map // The map of members in the peer set.
members *memberlist // The memberlist of peers.

OnSubscribe func(message.Ssid, message.Subscriber) bool // Delegate to invoke when the subscription event is received.
OnUnsubscribe func(message.Ssid, message.Subscriber) bool // Delegate to invoke when the subscription event is received.
Expand Down Expand Up @@ -98,44 +98,47 @@ func NewSwarm(cfg *config.ClusterConfig) *Swarm {
//Store the gossip and the router
swarm.gossip = gossip
swarm.router = router
swarm.members = newMemberlist(swarm.newPeer)
return swarm
}

// onPeerOnline occurs when a new peer is created.
func (s *Swarm) onPeerOnline(peer *Peer) {
logging.LogTarget("swarm", "peer created", peer.name)

// Subscribe to all of its subscriptions
for _, c := range peer.subs.All() {
s.OnSubscribe(c.Ssid, peer)
}
}

// Occurs when a peer is garbage collected.
func (s *Swarm) onPeerOffline(name mesh.PeerName) {
if v, ok := s.members.Load(name); ok {
peer := v.(*Peer)
if peer, deleted := s.members.Remove(name); deleted {
logging.LogTarget("swarm", "unreachable peer removed", peer.name)
peer.Close() // Close the peer on our end

// We also need to remove the peer from our set, so next time a new peer can be created.
s.members.Delete(peer.name)

// Unsubscribe from all active subscriptions
// Unsubscribe from all active subscriptions and also broadcast the fact
// that the peer has gone offline.
for _, c := range peer.subs.All() {
s.OnUnsubscribe(c.Ssid, peer)
}

// We also need to broadcast the fact that the peer is offline
op := newSubscriptionState()
op.RemoveAll(name)
s.gossip.GossipBroadcast(op)
}
}

// FindPeer retrieves a peer.
func (s *Swarm) FindPeer(name mesh.PeerName) (*Peer, bool) {
if p, ok := s.members.Load(name); ok {
return p.(*Peer), true
}

// Only add a peer if such exists
if exists := s.router.Peers.Fetch(name); exists == nil {
return nil, false
func (s *Swarm) FindPeer(name mesh.PeerName) *Peer {
peer, added := s.members.GetOrAdd(name)
if added {
s.onPeerOnline(peer)
}

// Create new peer and store it
peer := s.newPeer(name)
v, ok := s.members.LoadOrStore(name, peer)
if !ok {
logging.LogTarget("swarm", "peer created", peer.name)
}
return v.(*Peer), true
return peer
}

// ID returns the local node ID.
Expand All @@ -160,10 +163,11 @@ func (s *Swarm) update() {
desc := s.router.Peers.Descriptions()
for _, peer := range desc {
if !peer.Self {

// Mark the peer as active, so even if there's no messages being exchanged
// we still keep the peer, since we know that the peer is live.
if p, ok := s.FindPeer(peer.Name); ok {
p.touch()
if exists := s.router.Peers.Fetch(peer.Name); exists != nil {
s.members.Touch(peer.Name)
}

// reinforce structure
Expand Down Expand Up @@ -227,19 +231,24 @@ func (s *Swarm) merge(buf []byte) (mesh.GossipData, error) {
return nil, err
}

// Get the peer to use
if peer, ok := s.FindPeer(ev.Peer); ok {
// Skip ourselves
if ev.Peer == s.router.Ourself.Name {
continue
}

// If the subscription is added, notify (TODO: use channels)
if v.IsAdded() && peer.onSubscribe(k, ev.Ssid) {
s.OnSubscribe(ev.Ssid, peer)
}
// Find the active peer for this subscription event
peer := s.FindPeer(ev.Peer)

// If the subscription is removed, notify (TODO: use channels)
if v.IsRemoved() && peer.onUnsubscribe(k, ev.Ssid) {
s.OnUnsubscribe(ev.Ssid, peer)
}
// If the subscription is added, notify (TODO: use channels)
if v.IsAdded() && peer.onSubscribe(k, ev.Ssid) && peer.IsActive() {
s.OnSubscribe(ev.Ssid, peer)
}

// If the subscription is removed, notify (TODO: use channels)
if v.IsRemoved() && peer.onUnsubscribe(k, ev.Ssid) && peer.IsActive() {
s.OnUnsubscribe(ev.Ssid, peer)
}

}

return delta, nil
Expand Down
11 changes: 5 additions & 6 deletions internal/broker/cluster/swarm_test.go
Expand Up @@ -72,14 +72,12 @@ func TestNewSwarm_Scenario(t *testing.T) {
assert.Equal(t, io.EOF, err)

// Find peer
peer, hasPeer := s.FindPeer(123)
assert.False(t, hasPeer)
assert.Nil(t, peer)
peer := s.FindPeer(123)
assert.NotNil(t, peer)

// Remove that peer, it should not be there
s.onPeerOffline(123)
_, ok := s.members.Load(mesh.PeerName(123))
assert.False(t, ok)
assert.False(t, s.members.Contains(mesh.PeerName(123)))

// Close the swarm
err = s.Close()
Expand Down Expand Up @@ -131,9 +129,10 @@ func Test_merge(t *testing.T) {
}
defer s.Close()

s.members.Touch(2)
_, err := s.merge(in.Encode()[0])
assert.NoError(t, err)
assert.False(t, subscribed)
assert.True(t, subscribed)
}

func TestJoin(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions internal/broker/query.go
Expand Up @@ -126,9 +126,9 @@ func (c *QueryManager) onRequest(ssid message.Ssid, channel string, payload []by
}

// Get the peer to reply to
peer, ok := c.service.cluster.FindPeer(replyAddr)
if !ok {
return errors.New("unable to reply to a request, peer does not exist")
peer := c.service.cluster.FindPeer(replyAddr)
if !peer.IsActive() {
return errors.New("unable to reply to a request, peer is not active")
}

// Go through all the handlers and execute the first matching one
Expand Down

0 comments on commit 85b1f69

Please sign in to comment.