-
Notifications
You must be signed in to change notification settings - Fork 200
/
p2p.go
200 lines (160 loc) · 6.88 KB
/
p2p.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
package p2p
import (
"context"
"io"
"github.com/mr-tron/base58/base58"
)
// MessageProcessor is the interface used to describe what a receive message processor should do
// All implementations that will be called from Messenger implementation will need to satisfy this interface
// If the function returns a non nil value, the received message will not be propagated to its connected peers
type MessageProcessor interface {
ProcessReceivedMessage(message MessageP2P, broadcastHandler func(buffToSend []byte)) error
IsInterfaceNil() bool
}
// BroadcastCallbackHandler will be implemented by those message processor instances that need to send back
// a subset of received message (after filtering occurs)
type BroadcastCallbackHandler interface {
SetBroadcastCallback(callback func(buffToSend []byte))
IsInterfaceNil() bool
}
// SendableData represents the struct used in data throttler implementation
type SendableData struct {
Buff []byte
Topic string
}
// PeerID is a p2p peer identity.
type PeerID string
// Bytes returns the peer ID as byte slice
func (pid PeerID) Bytes() []byte {
return []byte(pid)
}
// Pretty returns a b58-encoded string of the peer id
func (pid PeerID) Pretty() string {
return base58.Encode(pid.Bytes())
}
// ContextProvider defines an interface for providing context to various messenger components
type ContextProvider interface {
Context() context.Context
IsInterfaceNil() bool
}
// PeerDiscoverer defines the behaviour of a peer discovery mechanism
type PeerDiscoverer interface {
Bootstrap() error
Name() string
ApplyContext(ctxProvider ContextProvider) error
IsInterfaceNil() bool
}
// Reconnecter defines the behaviour of a network reconnection mechanism
type Reconnecter interface {
ReconnectToNetwork() <-chan struct{}
Pause() // pause the peer discovery
Resume() // resume the peer discovery
IsInterfaceNil() bool
}
// Messenger is the main struct used for communication with other peers
type Messenger interface {
io.Closer
// ID is the Messenger's unique peer identifier across the network (a
// string). It is derived from the public key of the P2P credentials.
ID() PeerID
// Peers is the list of IDs of peers known to the Messenger.
Peers() []PeerID
// Addresses is the list of addresses that the Messenger is currently bound
// to and listening to.
Addresses() []string
// ConnectToPeer explicitly connect to a specific peer with a known address (note that the
// address contains the peer ID). This function is usually not called
// manually, because any underlying implementation of the Messenger interface
// should be keeping connections to peers open.
ConnectToPeer(address string) error
// IsConnected returns true if the Messenger are connected to a specific peer.
IsConnected(peerID PeerID) bool
// ConnectedPeers returns the list of IDs of the peers the Messenger is
// currently connected to.
ConnectedPeers() []PeerID
// ConnectedAddresses returns the list of addresses of the peers to which the
// Messenger is currently connected.
ConnectedAddresses() []string
// PeerAddress builds an address for the given peer ID, e.g.
// ConnectToPeer(PeerAddress(somePeerID)).
PeerAddress(pid PeerID) string
// ConnectedPeersOnTopic returns the IDs of the peers to which the Messenger
// is currently connected, but filtered by a topic they are registered to.
ConnectedPeersOnTopic(topic string) []PeerID
// TrimConnections tries to optimize the number of open connections, closing
// those that are considered expendable.
TrimConnections()
// Bootstrap runs the initialization phase which includes peer discovery,
// setting up initial connections and self-announcement in the network.
Bootstrap() error
// CreateTopic defines a new topic for sending messages, and optionally
// creates a channel in the LoadBalancer for this topic (otherwise, the topic
// will use a default channel).
CreateTopic(name string, createChannelForTopic bool) error
// HasTopic returns true if the Messenger has declared interest in a topic
// and it is listening to messages referencing it.
HasTopic(name string) bool
// HasTopicValidator returns true if the Messenger has registered a custom
// validator for a given topic name.
HasTopicValidator(name string) bool
// RegisterMessageProcessor adds the provided MessageProcessor to the list
// of handlers that are invoked whenever a message is received on the
// specified topic.
RegisterMessageProcessor(topic string, handler MessageProcessor) error
// UnregisterMessageProcessor removes the MessageProcessor set by the
// Messenger from the list of registered handlers for the messages on the
// given topic.
UnregisterMessageProcessor(topic string) error
// OutgoingChannelLoadBalancer returns the ChannelLoadBalancer instance
// through which the Messenger is sending messages to the network.
OutgoingChannelLoadBalancer() ChannelLoadBalancer
// BroadcastOnChannelBlocking asynchronously waits until it can send a
// message on the channel, but once it is able to, it synchronously sends the
// message, blocking until sending is completed.
BroadcastOnChannelBlocking(channel string, topic string, buff []byte) error
// BroadcastOnChannel asynchronously sends a message on a given topic
// through a specified channel.
BroadcastOnChannel(channel string, topic string, buff []byte)
// Broadcast is a convenience function that calls BroadcastOnChannelBlocking,
// but implicitly sets the channel to be identical to the specified topic.
Broadcast(topic string, buff []byte)
// SendToConnectedPeer asynchronously sends a message to a peer directly,
// bypassing pubsub and topics. It opens a new connection with the given
// peer, but reuses a connection and a stream if possible.
SendToConnectedPeer(topic string, buff []byte, peerID PeerID) error
IsConnectedToTheNetwork() bool
ThresholdMinConnectedPeers() int
SetThresholdMinConnectedPeers(minConnectedPeers int) error
// IsInterfaceNil returns true if there is no value under the interface
IsInterfaceNil() bool
}
// MessageP2P defines what a p2p message can do (should return)
type MessageP2P interface {
From() []byte
Data() []byte
SeqNo() []byte
TopicIDs() []string
Signature() []byte
Key() []byte
Peer() PeerID
IsInterfaceNil() bool
}
// ChannelLoadBalancer defines what a load balancer that uses chans should do
type ChannelLoadBalancer interface {
AddChannel(channel string) error
RemoveChannel(channel string) error
GetChannelOrDefault(channel string) chan *SendableData
CollectOneElementFromChannels() *SendableData
IsInterfaceNil() bool
}
// DirectSender defines a component that can send direct messages to connected peers
type DirectSender interface {
NextSeqno(counter *uint64) []byte
Send(topic string, buff []byte, peer PeerID) error
IsInterfaceNil() bool
}
// PeerDiscoveryFactory defines the factory for peer discoverer implementation
type PeerDiscoveryFactory interface {
CreatePeerDiscoverer() (PeerDiscoverer, error)
IsInterfaceNil() bool
}