-
Notifications
You must be signed in to change notification settings - Fork 1
/
local.go
121 lines (94 loc) · 2.48 KB
/
local.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package p2p
import (
"context"
"time"
"github.com/ipsn/go-ipfs/gxlibs/github.com/libp2p/go-libp2p-net"
"github.com/ipsn/go-ipfs/gxlibs/github.com/multiformats/go-multiaddr-net"
ma "github.com/ipsn/go-ipfs/gxlibs/github.com/multiformats/go-multiaddr"
tec "github.com/jbenet/go-temp-err-catcher"
"github.com/ipsn/go-ipfs/gxlibs/github.com/libp2p/go-libp2p-peer"
"github.com/ipsn/go-ipfs/gxlibs/github.com/libp2p/go-libp2p-protocol"
)
// localListener manet streams and proxies them to libp2p services
type localListener struct {
ctx context.Context
p2p *P2P
proto protocol.ID
laddr ma.Multiaddr
peer peer.ID
listener manet.Listener
}
// ForwardLocal creates new P2P stream to a remote listener
func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.ID, bindAddr ma.Multiaddr) (Listener, error) {
listener := &localListener{
ctx: ctx,
p2p: p2p,
proto: proto,
peer: peer,
}
maListener, err := manet.Listen(bindAddr)
if err != nil {
return nil, err
}
listener.listener = maListener
listener.laddr = maListener.Multiaddr()
if err := p2p.ListenersLocal.Register(listener); err != nil {
return nil, err
}
go listener.acceptConns()
return listener, nil
}
func (l *localListener) dial(ctx context.Context) (net.Stream, error) {
cctx, cancel := context.WithTimeout(ctx, time.Second*30) //TODO: configurable?
defer cancel()
return l.p2p.peerHost.NewStream(cctx, l.peer, l.proto)
}
func (l *localListener) acceptConns() {
for {
local, err := l.listener.Accept()
if err != nil {
if tec.ErrIsTemporary(err) {
continue
}
return
}
go l.setupStream(local)
}
}
func (l *localListener) setupStream(local manet.Conn) {
remote, err := l.dial(l.ctx)
if err != nil {
local.Close()
log.Warningf("failed to dial to remote %s/%s", l.peer.Pretty(), l.proto)
return
}
stream := &Stream{
Protocol: l.proto,
OriginAddr: local.RemoteMultiaddr(),
TargetAddr: l.TargetAddress(),
peer: l.peer,
Local: local,
Remote: remote,
Registry: l.p2p.Streams,
}
l.p2p.Streams.Register(stream)
}
func (l *localListener) close() {
l.listener.Close()
}
func (l *localListener) Protocol() protocol.ID {
return l.proto
}
func (l *localListener) ListenAddress() ma.Multiaddr {
return l.laddr
}
func (l *localListener) TargetAddress() ma.Multiaddr {
addr, err := ma.NewMultiaddr(maPrefix + l.peer.Pretty())
if err != nil {
panic(err)
}
return addr
}
func (l *localListener) key() string {
return l.ListenAddress().String()
}