forked from weaveworks/mesh
-
Notifications
You must be signed in to change notification settings - Fork 0
/
peer.go
136 lines (118 loc) · 3.26 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package main
import (
"log"
"bytes"
"encoding/gob"
"github.com/csghh/mesh"
)
// Peer encapsulates state and implements mesh.Gossiper.
// It should be passed to mesh.Router.NewGossip,
// and the resulting Gossip registered in turn,
// before calling mesh.Router.Start.
type peer struct {
st *state
send mesh.Gossip
actions chan<- func()
quit chan struct{}
logger *log.Logger
}
// peer implements mesh.Gossiper.
var _ mesh.Gossiper = &peer{}
// Construct a peer with empty state.
// Be sure to register a channel, later,
// so we can make outbound communication.
func newPeer(self mesh.PeerName, logger *log.Logger) *peer {
actions := make(chan func())
p := &peer{
st: newState(self),
send: nil, // must .register() later
actions: actions,
quit: make(chan struct{}),
logger: logger,
}
go p.loop(actions)
return p
}
func (p *peer) loop(actions <-chan func()) {
for {
select {
case f := <-actions:
f()
case <-p.quit:
return
}
}
}
// register the result of a mesh.Router.NewGossip.
func (p *peer) register(send mesh.Gossip) {
p.actions <- func() { p.send = send }
}
// Return the current value of the counter.
func (p *peer) get() int {
return p.st.get()
}
// Increment the counter by one.
func (p *peer) incr() (result int) {
c := make(chan struct{})
p.actions <- func() {
defer close(c)
st := p.st.incr()
if p.send != nil {
p.send.GossipBroadcast(st)
} else {
p.logger.Printf("no sender configured; not broadcasting update right now")
}
result = st.get()
}
<-c
return result
}
func (p *peer) stop() {
close(p.quit)
}
// Return a copy of our complete state.
func (p *peer) Gossip() (complete mesh.GossipData) {
complete = p.st.copy()
p.logger.Printf("Gossip => complete %v", complete.(*state).set)
return complete
}
// Merge the gossiped data represented by buf into our state.
// Return the state information that was modified.
func (p *peer) OnGossip(buf []byte) (delta mesh.GossipData, err error) {
var set map[mesh.PeerName]int
if err := gob.NewDecoder(bytes.NewReader(buf)).Decode(&set); err != nil {
return nil, err
}
delta = p.st.mergeDelta(set)
if delta == nil {
p.logger.Printf("OnGossip %v => delta %v", set, delta)
} else {
p.logger.Printf("OnGossip %v => delta %v", set, delta.(*state).set)
}
return delta, nil
}
// Merge the gossiped data represented by buf into our state.
// Return the state information that was modified.
func (p *peer) OnGossipBroadcast(src mesh.PeerName, buf []byte) (received mesh.GossipData, err error) {
var set map[mesh.PeerName]int
if err := gob.NewDecoder(bytes.NewReader(buf)).Decode(&set); err != nil {
return nil, err
}
received = p.st.mergeReceived(set)
if received == nil {
p.logger.Printf("OnGossipBroadcast %s %v => delta %v", src, set, received)
} else {
p.logger.Printf("OnGossipBroadcast %s %v => delta %v", src, set, received.(*state).set)
}
return received, nil
}
// Merge the gossiped data represented by buf into our state.
func (p *peer) OnGossipUnicast(src mesh.PeerName, buf []byte) error {
var set map[mesh.PeerName]int
if err := gob.NewDecoder(bytes.NewReader(buf)).Decode(&set); err != nil {
return err
}
complete := p.st.mergeComplete(set)
p.logger.Printf("OnGossipUnicast %s %v => complete %v", src, set, complete)
return nil
}