forked from libp2p/go-libp2p-transport-upgrader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
upgrader.go
127 lines (113 loc) · 3.63 KB
/
upgrader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package stream
import (
"context"
"errors"
"fmt"
"net"
ss "github.com/libp2p/go-conn-security"
pnet "github.com/libp2p/go-libp2p-interface-pnet"
peer "github.com/libp2p/go-libp2p-peer"
transport "github.com/libp2p/go-libp2p-transport"
filter "github.com/libp2p/go-maddr-filter"
smux "github.com/libp2p/go-stream-muxer"
manet "github.com/multiformats/go-multiaddr-net"
)
// ErrNilPeer is returned when attempting to upgrade an outbound connection
// without specifying a peer ID.
var ErrNilPeer = errors.New("nil peer")
// AcceptQueueLength is the number of connections to fully setup before not accepting any new connections
var AcceptQueueLength = 16
// Upgrader is a multistream upgrader that can upgrade an underlying connection
// to a full transport connection (secure and multiplexed).
type Upgrader struct {
Protector pnet.Protector
Secure ss.Transport
Muxer smux.Transport
Filters *filter.Filters
}
// UpgradeListener upgrades the passed multiaddr-net listener into a full libp2p-transport listener.
func (u *Upgrader) UpgradeListener(t transport.Transport, list manet.Listener) transport.Listener {
ctx, cancel := context.WithCancel(context.Background())
l := &listener{
Listener: list,
upgrader: u,
transport: t,
threshold: newThreshold(AcceptQueueLength),
incoming: make(chan transport.Conn),
cancel: cancel,
ctx: ctx,
}
go l.handleIncoming()
return l
}
// UpgradeOutbound upgrades the given outbound multiaddr-net connection into a
// full libp2p-transport connection.
func (u *Upgrader) UpgradeOutbound(ctx context.Context, t transport.Transport, maconn manet.Conn, p peer.ID) (transport.Conn, error) {
if p == "" {
return nil, ErrNilPeer
}
return u.upgrade(ctx, t, maconn, p)
}
// UpgradeInbound upgrades the given inbound multiaddr-net connection into a
// full libp2p-transport connection.
func (u *Upgrader) UpgradeInbound(ctx context.Context, t transport.Transport, maconn manet.Conn) (transport.Conn, error) {
return u.upgrade(ctx, t, maconn, "")
}
func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, p peer.ID) (transport.Conn, error) {
if u.Filters != nil && u.Filters.AddrBlocked(maconn.RemoteMultiaddr()) {
log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr())
maconn.Close()
return nil, fmt.Errorf("blocked connection from %s", maconn.RemoteMultiaddr())
}
var conn net.Conn = maconn
if u.Protector != nil {
pconn, err := u.Protector.Protect(conn)
if err != nil {
conn.Close()
return nil, err
}
conn = pconn
} else if pnet.ForcePrivateNetwork {
log.Error("tried to dial with no Private Network Protector but usage" +
" of Private Networks is forced by the enviroment")
return nil, pnet.ErrNotInPrivateNetwork
}
sconn, err := u.setupSecurity(ctx, conn, p)
if err != nil {
conn.Close()
return nil, err
}
smconn, err := u.setupMuxer(ctx, sconn, p)
if err != nil {
conn.Close()
return nil, err
}
return &transportConn{
Conn: smconn,
ConnMultiaddrs: maconn,
ConnSecurity: sconn,
transport: t,
}, nil
}
func (u *Upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID) (ss.Conn, error) {
if p == "" {
return u.Secure.SecureInbound(ctx, conn)
}
return u.Secure.SecureOutbound(ctx, conn, p)
}
func (u *Upgrader) setupMuxer(ctx context.Context, conn net.Conn, p peer.ID) (smux.Conn, error) {
// TODO: The muxer should take a context.
done := make(chan struct{})
var smconn smux.Conn
var err error
go func() {
defer close(done)
smconn, err = u.Muxer.NewConn(conn, p == "")
}()
select {
case <-done:
return smconn, err
case <-ctx.Done():
return nil, ctx.Err()
}
}