-
Notifications
You must be signed in to change notification settings - Fork 3
/
peerManager.go
122 lines (97 loc) · 2.47 KB
/
peerManager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package tss
import (
"fmt"
"sync"
"github.com/getamis/sirius/log"
"google.golang.org/protobuf/proto"
)
type Message struct {
peerID string
message interface{}
}
type PeerManager struct {
id string
peers map[string]bool
handleMessageFunction func([]byte) error
outwardMessages []Message
mu sync.Mutex
}
func NewPeerManager(id string) *PeerManager {
return &PeerManager{
id: id,
peers: make(map[string]bool),
}
}
func (p *PeerManager) NumPeers() uint32 {
return uint32(len(p.peers))
}
func (p *PeerManager) SelfID() string {
return p.id
}
func (p *PeerManager) PeerIDs() []string {
ids := make([]string, len(p.peers))
i := 0
for id := range p.peers {
ids[i] = id
i++
}
return ids
}
func (p *PeerManager) MustSend(peerID string, message interface{}) {
p.mu.Lock()
defer p.mu.Unlock()
p.outwardMessages = append(p.outwardMessages, Message{
peerID: peerID,
message: message,
})
}
// // EnsureAllConnected connects the host to specified peer and sends the message to it.
// func (p *peerManager) EnsureAllConnected() {
// var wg sync.WaitGroup
// for _, peerAddr := range p.peers {
// wg.Add(1)
// go connectToPeer(p.host, peerAddr, &wg)
// }
// wg.Wait()
// }
// AddPeers adds peers to peer list.
func (p *PeerManager) AddPeer(peerID string) {
p.peers[peerID] = true
}
func (p *PeerManager) GetNextMessageToSend(peerID string) ([]byte, error) {
// log.Printf("get next message to send, outwardMessages:%v\n", p.outwardMessages)
var nextMsg []byte
i := 0
for _, el := range p.outwardMessages {
if el.peerID == peerID && len(nextMsg) == 0 {
msg, ok := el.message.(proto.Message)
if !ok {
return nil, fmt.Errorf("invalid proto message")
}
// log.Printf("next message to send: %+v\n", msg)
bs, err := proto.Marshal(msg)
if err != nil {
log.Warn("Cannot marshal message", "err", err)
return nil, err
}
nextMsg = bs
} else {
p.mu.Lock()
p.outwardMessages[i] = el
p.mu.Unlock()
i++
}
}
p.outwardMessages = p.outwardMessages[:i]
// log.Println("post GetNextMessageToSend outwardMessages:", p.outwardMessages)
return nextMsg, nil
}
func (p *PeerManager) RegisterHandleMessage(handleFunc func([]byte) error) {
p.handleMessageFunction = handleFunc
}
func (p *PeerManager) HandleMessage(msg []byte) error {
return p.handleMessageFunction(msg)
}
// func remove(slice []Message, s int) []Message {
// return append(slice[:s], slice[s+1:]...)
// }