Skip to content

Commit

Permalink
MemberSet
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed May 2, 2021
1 parent 14f90bf commit 53084be
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 78 deletions.
2 changes: 1 addition & 1 deletion cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func New(actorSystem *actor.ActorSystem, config *Config) *Cluster {
/*
private void SubscribeToTopologyEvents() =>
System.EventStream.Subscribe<ClusterTopology>(e => {
System.Metrics.Get<ClusterMetrics>().ClusterTopologyEventGauge.Set(e.Members.Count,
System.Metrics.Get<ClusterMetrics>().ClusterTopologyEventGauge.Set(e.MemberSet.Count,
new[] {System.Id, System.Address, e.TopologyHash().ToString()}
);
Expand Down
86 changes: 25 additions & 61 deletions cluster/member_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ import (
type MemberList struct {
cluster *Cluster
mutex sync.RWMutex
membersByMemberId map[string]*Member
members *MemberSet
memberStrategyByKind map[string]MemberStrategy
bannedMemberIds map[string]bool
topologyHash uint64
chashByKind map[string]chash.ConsistentHash
bannedMembers *MemberSet

chashByKind map[string]chash.ConsistentHash
}

func NewMemberList(cluster *Cluster) *MemberList {
memberList := &MemberList{
cluster: cluster,
membersByMemberId: make(map[string]*Member),
members: emptyMemberSet,
memberStrategyByKind: make(map[string]MemberStrategy),
bannedMemberIds: make(map[string]bool),
bannedMembers: emptyMemberSet,
}
return memberList
}
Expand All @@ -52,52 +52,43 @@ func (ml *MemberList) GetActivatorMember(kind string) string {
}

func (ml *MemberList) Length() int {
ml.mutex.RLock()
defer ml.mutex.RUnlock()
return len(ml.membersByMemberId)
return ml.members.Len()
}

func (ml *MemberList) Members() *MemberSet {
return ml.members
}

func (ml *MemberList) UpdateClusterTopology(members []*Member) {
ml.mutex.Lock()
defer ml.mutex.Unlock()

memberSet := NewMemberSet(members)

//get active members
//(this bit means that we will never allow a member that failed a health check to join back in)
activeMembers := MembersExcept(members, ml.bannedMemberIds)

//get the new topology hash
newTopologyHash := TopologyHash(activeMembers)
newMembers := memberSet.Except(ml.bannedMembers)

//nothing changed? exit
if newTopologyHash == ml.topologyHash {
if newMembers.Equals(ml.members) {
return
}

//remember the new topology hash
ml.topologyHash = newTopologyHash

//membersByMemberId that left
left := ml.getLeftMembers(activeMembers)

//membersByMemberId that joined
joined := ml.getJoinedMembers(activeMembers)

//union membersByMemberId that left into bannedMemberIds set
AddMembersToSet(ml.bannedMemberIds, left)

//replace the member lookup with new data
ml.membersByMemberId = MembersToMap(activeMembers)
left := ml.members.Except(newMembers)
joined := newMembers.Except(ml.members)
ml.bannedMembers = ml.bannedMembers.Union(left)
ml.members = newMembers

//for any member that left, send a endpoint terminate event
for _, m := range left {
for _, m := range left.Members() {
ml.TerminateMember(m)
}

newTopology := &ClusterTopology{
TopologyHash: newTopologyHash,
Members: activeMembers,
Left: left,
Joined: joined,
TopologyHash: newMembers.TopologyHash(),
Members: newMembers.Members(),
Left: left.Members(),
Joined: joined.Members(),
}

//recalculate member strategies
Expand All @@ -106,40 +97,13 @@ func (ml *MemberList) UpdateClusterTopology(members []*Member) {
ml.cluster.ActorSystem.EventStream.Publish(newTopology)

plog.Info("Updated ClusterTopology",
log.Uint64("topologyHash", ml.topologyHash),
log.Uint64("topologyHash", ml.members.TopologyHash()),
log.Int("membersByMemberId", len(members)),
log.Int("joined", len(newTopology.Joined)),
log.Int("left", len(newTopology.Left)),
)
}

func (ml *MemberList) getJoinedMembers(activeMembers []*Member) []*Member {
joinedMembers := make([]*Member, 0)
joinedMemberIds := make(map[string]bool)
for _, m := range activeMembers {
if _, isExisting := ml.membersByMemberId[m.Id]; isExisting {
continue
}
joinedMembers = append(joinedMembers, m)
joinedMemberIds[m.Id] = true
}
return joinedMembers
}

func (ml *MemberList) getLeftMembers(activeMembers []*Member) []*Member {
activeMemberIds := MembersToSet(activeMembers)
leftMembers := make([]*Member, 0)
leftMemberIds := make(map[string]bool)
for _, m := range ml.membersByMemberId {
if _, isActive := activeMemberIds[m.Id]; isActive {
continue
}
leftMembers = append(leftMembers, m)
leftMemberIds[m.Id] = true
}
return leftMembers
}

func (ml *MemberList) TerminateMember(m *Member) {
//tell the world that this endpoint should is no longer relevant
endpointTerminated := &remote.EndpointTerminatedEvent{
Expand Down
4 changes: 2 additions & 2 deletions cluster/member_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func TestMemberList_UpdateClusterToplogy(t *testing.T) {
return (list)[i].Port < (list)[j].Port
})
}
// dumpMembers(tpl.Members)
// dumpMembers(tpl.MemberSet)
_sortMembers(tpl.Members)
// dumpMembers(tpl.Members)
// dumpMembers(tpl.MemberSet)
_sortMembers(tpl.Left)
_sortMembers(tpl.Joined)
}
Expand Down
66 changes: 56 additions & 10 deletions cluster/members.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,76 @@
package cluster

type Members struct {
members []*Member
lookup map[string]*Member
type MemberSet struct {
topologyHash uint64
members []*Member
lookup map[string]*Member
}

func NewMembers(members []*Member) *Members {
var emptyMemberSet = NewMemberSet(make([]*Member, 0))

func NewMemberSet(members []*Member) *MemberSet {
SortMembers(members)
lookup := MembersToMap(members)
ms := &Members{
members: members,
lookup: lookup,
ms := &MemberSet{
topologyHash: TopologyHash(members),
members: members,
lookup: lookup,
}
return ms
}

func (ms *Members) Members() []*Member {
func (ms *MemberSet) Len() int {
return len(ms.members)
}

func (ms *MemberSet) TopologyHash() uint64 {
return ms.topologyHash
}

func (ms *MemberSet) Members() []*Member {
return ms.members
}

func (ms *Members) ContainsId(id string) bool {
func (ms *MemberSet) ContainsId(id string) bool {
_, ok := ms.lookup[id]
return ok
}

func (ms *Members) GetMemberById(id string) *Member {
func (ms *MemberSet) GetMemberById(id string) *Member {
member, _ := ms.lookup[id]
return member
}

func (ms *MemberSet) Except(other *MemberSet) *MemberSet {

res := make([]*Member, 0)
for _, m := range ms.members {
if other.ContainsId(m.Id) {
continue
}

res = append(res, m)
}

return NewMemberSet(res)
}

func (ms *MemberSet) Union(other *MemberSet) *MemberSet {
mapp := make(map[string]*Member, 0)
for _, m := range ms.members {
mapp[m.Id] = m
}
for _, m := range other.members {
mapp[m.Id] = m
}
res := make([]*Member, 0)
for _, m := range mapp {
res = append(res, m)
}

return NewMemberSet(res)
}

func (ms *MemberSet) Equals(other *MemberSet) bool {
return ms.topologyHash == other.topologyHash
}
8 changes: 4 additions & 4 deletions cluster/protos.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 53084be

Please sign in to comment.