forked from weaveworks/mesh
-
Notifications
You must be signed in to change notification settings - Fork 0
/
state.go
133 lines (113 loc) · 2.94 KB
/
state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import (
"bytes"
"sync"
"encoding/gob"
"github.com/csghh/mesh"
)
// state is an implementation of a G-counter.
type state struct {
mtx sync.RWMutex
set map[mesh.PeerName]int
self mesh.PeerName
}
// state implements GossipData.
var _ mesh.GossipData = &state{}
// Construct an empty state object, ready to receive updates.
// This is suitable to use at program start.
// Other peers will populate us with data.
func newState(self mesh.PeerName) *state {
return &state{
set: map[mesh.PeerName]int{},
self: self,
}
}
func (st *state) get() (result int) {
st.mtx.RLock()
defer st.mtx.RUnlock()
for _, v := range st.set {
result += v
}
return result
}
func (st *state) incr() (complete *state) {
st.mtx.Lock()
defer st.mtx.Unlock()
st.set[st.self]++
return &state{
set: st.set,
}
}
func (st *state) copy() *state {
st.mtx.RLock()
defer st.mtx.RUnlock()
return &state{
set: st.set,
}
}
// Encode serializes our complete state to a slice of byte-slices.
// In this simple example, we use a single gob-encoded
// buffer: see https://golang.org/pkg/encoding/gob/
func (st *state) Encode() [][]byte {
st.mtx.RLock()
defer st.mtx.RUnlock()
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(st.set); err != nil {
panic(err)
}
return [][]byte{buf.Bytes()}
}
// Merge merges the other GossipData into this one,
// and returns our resulting, complete state.
func (st *state) Merge(other mesh.GossipData) (complete mesh.GossipData) {
return st.mergeComplete(other.(*state).copy().set)
}
// Merge the set into our state, abiding increment-only semantics.
// Return a non-nil mesh.GossipData representation of the received set.
func (st *state) mergeReceived(set map[mesh.PeerName]int) (received mesh.GossipData) {
st.mtx.Lock()
defer st.mtx.Unlock()
for peer, v := range set {
if v <= st.set[peer] {
delete(set, peer) // optimization: make the forwarded data smaller
continue
}
st.set[peer] = v
}
return &state{
set: set, // all remaining elements were novel to us
}
}
// Merge the set into our state, abiding increment-only semantics.
// Return any key/values that have been mutated, or nil if nothing changed.
func (st *state) mergeDelta(set map[mesh.PeerName]int) (delta mesh.GossipData) {
st.mtx.Lock()
defer st.mtx.Unlock()
for peer, v := range set {
if v <= st.set[peer] {
delete(set, peer) // requirement: it's not part of a delta
continue
}
st.set[peer] = v
}
if len(set) <= 0 {
return nil // per OnGossip requirements
}
return &state{
set: set, // all remaining elements were novel to us
}
}
// Merge the set into our state, abiding increment-only semantics.
// Return our resulting, complete state.
func (st *state) mergeComplete(set map[mesh.PeerName]int) (complete mesh.GossipData) {
st.mtx.Lock()
defer st.mtx.Unlock()
for peer, v := range set {
if v > st.set[peer] {
st.set[peer] = v
}
}
return &state{
set: st.set, // n.b. can't .copy() due to lock contention
}
}