forked from ethereum/go-ethereum
-
Notifications
You must be signed in to change notification settings - Fork 0
/
peer.go
128 lines (101 loc) · 2.24 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package whisper
import (
"fmt"
"io/ioutil"
"time"
"github.com/ethereum/go-ethereum/p2p"
"gopkg.in/fatih/set.v0"
)
const (
protocolVersion = 0x02
)
type peer struct {
host *Whisper
peer *p2p.Peer
ws p2p.MsgReadWriter
// XXX Eventually this is going to reach exceptional large space. We need an expiry here
known *set.Set
quit chan struct{}
}
func NewPeer(host *Whisper, p *p2p.Peer, ws p2p.MsgReadWriter) *peer {
return &peer{host, p, ws, set.New(), make(chan struct{})}
}
func (self *peer) init() error {
if err := self.handleStatus(); err != nil {
return err
}
return nil
}
func (self *peer) start() {
go self.update()
self.peer.Infoln("whisper started")
}
func (self *peer) stop() {
self.peer.Infoln("whisper stopped")
close(self.quit)
}
func (self *peer) update() {
relay := time.NewTicker(300 * time.Millisecond)
out:
for {
select {
case <-relay.C:
err := self.broadcast(self.host.envelopes())
if err != nil {
self.peer.Infoln("broadcast err:", err)
break out
}
case <-self.quit:
break out
}
}
}
func (self *peer) broadcast(envelopes []*Envelope) error {
envs := make([]interface{}, len(envelopes))
i := 0
for _, envelope := range envelopes {
if !self.known.Has(envelope.Hash()) {
envs[i] = envelope
self.known.Add(envelope.Hash())
i++
}
}
if i > 0 {
msg := p2p.NewMsg(envelopesMsg, envs[:i]...)
if err := self.ws.WriteMsg(msg); err != nil {
return err
}
self.peer.DebugDetailln("broadcasted", i, "message(s)")
}
return nil
}
func (self *peer) addKnown(envelope *Envelope) {
self.known.Add(envelope.Hash())
}
func (self *peer) handleStatus() error {
ws := self.ws
if err := ws.WriteMsg(self.statusMsg()); err != nil {
return err
}
msg, err := ws.ReadMsg()
if err != nil {
return err
}
if msg.Code != statusMsg {
return fmt.Errorf("peer send %x before status msg", msg.Code)
}
data, err := ioutil.ReadAll(msg.Payload)
if err != nil {
return err
}
if len(data) == 0 {
return fmt.Errorf("malformed status. data len = 0")
}
if pv := data[0]; pv != protocolVersion {
return fmt.Errorf("protocol version mismatch %d != %d", pv, protocolVersion)
}
return nil
}
func (self *peer) statusMsg() p2p.Msg {
return p2p.NewMsg(statusMsg, protocolVersion)
}