-
Notifications
You must be signed in to change notification settings - Fork 255
/
connectionGater.go
288 lines (245 loc) · 6.65 KB
/
connectionGater.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
package manage
import (
"container/list"
"context"
"runtime"
"sort"
"sync"
"time"
"github.com/33cn/chain33/types"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p-core/control"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
//net "github.com/multiformats/go-multiaddr-net"
net "github.com/multiformats/go-multiaddr/net"
)
const (
// limit for rate limiter when processing new inbound dials.
ipLimit = 4
// burst limit for inbound dials.
ipBurst = 8
//缓存的临时的节点连接数量,虽然达到了最大限制,但是有的节点连接是查询需要,开辟缓冲区
)
//CacheLimit cachebuffer
var CacheLimit int32 = 20
//Conngater gater struct data
type Conngater struct {
host *host.Host
maxConnectNum int32
ipLimiter *leakybucket.Collector
blacklist *TimeCache
whitPeerList map[peer.ID]multiaddr.Multiaddr
}
//NewConnGater connect gater
func NewConnGater(h *host.Host, limit int32, cache *TimeCache, whitPeers []*peer.AddrInfo) *Conngater {
gater := &Conngater{}
gater.host = h
if limit == 0 {
limit = 4096
}
gater.maxConnectNum = limit
gater.blacklist = cache
if gater.blacklist == nil {
gater.blacklist = NewTimeCache(context.Background(), time.Minute*5)
}
gater.ipLimiter = leakybucket.NewCollector(ipLimit, ipBurst, true)
for _, pr := range whitPeers {
if gater.whitPeerList == nil {
gater.whitPeerList = make(map[peer.ID]multiaddr.Multiaddr)
}
gater.whitPeerList[pr.ID] = pr.Addrs[0]
}
return gater
}
// InterceptPeerDial tests whether we're permitted to Dial the specified peer.
func (s *Conngater) InterceptPeerDial(p peer.ID) (allow bool) {
//具体的拦截策略
//黑名单检查
//1.增加校验p2p白名单节点列表
if !s.checkWhitePeerList(p) {
return false
}
if s.blacklist.Has(p.Pretty()) {
return false
}
return !s.isPeerAtLimit(network.DirOutbound)
}
func (s *Conngater) checkWhitePeerList(p peer.ID) bool {
if s.whitPeerList != nil {
if _, ok := s.whitPeerList[p]; !ok {
return false
}
}
return true
}
// InterceptAddrDial tests whether we're permitted to dial the specified
// multiaddr for the given peer.
func (s *Conngater) InterceptAddrDial(p peer.ID, m multiaddr.Multiaddr) (allow bool) {
return !s.blacklist.Has(p.Pretty())
}
// InterceptAccept tests whether an incipient inbound connection is allowed.
func (s *Conngater) InterceptAccept(n network.ConnMultiaddrs) (allow bool) {
if !s.validateDial(n.RemoteMultiaddr()) { //对连进来的节点进行速率限制
log.Error("InterceptAccept: query too frequent", "multiAddr", n.RemoteMultiaddr())
// Allow other go-routines to run in the event
// we receive a large amount of junk connections.
runtime.Gosched()
return false
}
//增加校验p2p白名单节点列表
if !s.checkWhitAddr(n.RemoteMultiaddr()) {
return false
}
return !s.isPeerAtLimit(network.DirInbound)
}
func (s *Conngater) checkWhitAddr(addr multiaddr.Multiaddr) bool {
if s.whitPeerList == nil {
return true
}
iswhiteIP := false
checkIP, _ := net.ToIP(addr)
for _, maddr := range s.whitPeerList {
ip, err := net.ToIP(maddr)
if err != nil {
continue
}
if ip.String() == checkIP.String() {
iswhiteIP = true
}
}
return iswhiteIP
}
// InterceptSecured tests whether a given connection, now authenticated,
// is allowed.
func (s *Conngater) InterceptSecured(_ network.Direction, p peer.ID, n network.ConnMultiaddrs) (allow bool) {
return !s.blacklist.Has(p.Pretty())
}
// InterceptUpgraded tests whether a fully capable connection is allowed.
func (s *Conngater) InterceptUpgraded(n network.Conn) (allow bool, reason control.DisconnectReason) {
if n == nil {
return false, 0
}
return !s.blacklist.Has(n.RemotePeer().Pretty()), 0
}
func (s *Conngater) validateDial(addr multiaddr.Multiaddr) bool {
ip, err := net.ToIP(addr)
if err != nil {
return false
}
remaining := s.ipLimiter.Remaining(ip.String())
if remaining <= 0 {
return false
}
s.ipLimiter.Add(ip.String(), 1)
return true
}
func (s *Conngater) isPeerAtLimit(direction network.Direction) bool {
if s.maxConnectNum == 0 { //不对连接节点数量进行限制
return false
}
host := (*s.host)
if host == nil {
return false
}
var inboundNum, outboundNum int32
for _, conn := range host.Network().Conns() {
if conn.Stat().Direction == network.DirInbound {
inboundNum++
} else {
outboundNum++
}
}
if direction == network.DirInbound { //inbound connect
return inboundNum >= s.maxConnectNum+CacheLimit
}
return outboundNum >= s.maxConnectNum+CacheLimit
}
//TimeCache data struct
type TimeCache struct {
cacheLock sync.Mutex
Q *list.List
M map[string]time.Time
ctx context.Context
span time.Duration
}
//对系统的连接时长按照从大到小的顺序排序
type blacklist []*types.BlackInfo
//Len return size of blackinfo
func (b blacklist) Len() int { return len(b) }
//Swap swap data between i,j
func (b blacklist) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
//Less check lifetime
func (b blacklist) Less(i, j int) bool { //从小到大排序,即index=0 ,表示数值最大
return b[i].Lifetime < b[j].Lifetime
}
//NewTimeCache new time cache obj.
func NewTimeCache(ctx context.Context, span time.Duration) *TimeCache {
cache := &TimeCache{
Q: list.New(),
M: make(map[string]time.Time),
span: span,
ctx: ctx,
}
go cache.sweep()
return cache
}
//Add add key
func (tc *TimeCache) Add(s string, lifetime time.Duration) {
tc.cacheLock.Lock()
defer tc.cacheLock.Unlock()
if lifetime == 0 {
lifetime = tc.span
}
tc.M[s] = time.Now().Add(lifetime) //update lifetime
tc.Q.PushFront(s)
}
func (tc *TimeCache) sweep() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
tc.checkOvertimekey()
for {
select {
case <-ticker.C:
tc.checkOvertimekey()
case <-tc.ctx.Done():
return
}
}
}
func (tc *TimeCache) checkOvertimekey() {
tc.cacheLock.Lock()
defer tc.cacheLock.Unlock()
for e := tc.Q.Front(); e != nil; e = e.Next() {
v := e.Value.(string)
t, ok := tc.M[v]
if !ok {
tc.Q.Remove(e)
continue
}
if time.Now().After(t) {
tc.Q.Remove(e)
delete(tc.M, v)
}
}
}
//Has check key
func (tc *TimeCache) Has(s string) bool {
tc.cacheLock.Lock()
defer tc.cacheLock.Unlock()
_, ok := tc.M[s]
return ok
}
//List show all peers
func (tc *TimeCache) List() *types.Blacklist {
tc.cacheLock.Lock()
defer tc.cacheLock.Unlock()
var list blacklist
for pid, p := range tc.M {
list = append(list, &types.BlackInfo{PeerName: pid, Lifetime: int64(^(time.Since(p) / time.Second) + 1)})
}
sort.Sort(list)
return &types.Blacklist{Blackinfo: list}
}