-
Notifications
You must be signed in to change notification settings - Fork 9
/
listener.go
223 lines (192 loc) · 5.87 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package p2p
import (
"fmt"
"net"
"strconv"
"strings"
"time"
log "github.com/sirupsen/logrus"
cmn "github.com/tendermint/tmlibs/common"
cfg "github.com/vapor/config"
"github.com/vapor/errors"
"github.com/vapor/p2p/upnp"
)
const (
numBufferedConnections = 10
defaultExternalPort = 8770
tryListenTimes = 5
)
//Listener subset of the methods of DefaultListener
type Listener interface {
Connections() <-chan net.Conn
InternalAddress() *NetAddress
ExternalAddress() *NetAddress
String() string
Stop() bool
}
// Defaults to tcp
func protocolAndAddress(listenAddr string) (string, string) {
p, address := "tcp", listenAddr
parts := strings.SplitN(address, "://", 2)
if len(parts) == 2 {
p, address = parts[0], parts[1]
}
return p, address
}
// GetListener get listener and listen address.
func GetListener(config *cfg.P2PConfig) (Listener, string) {
p, address := protocolAndAddress(config.ListenAddress)
l, listenerStatus := NewDefaultListener(p, address, config.SkipUPNP)
// We assume that the rpcListener has the same ExternalAddress.
// This is probably true because both P2P and RPC listeners use UPnP,
// except of course if the rpc is only bound to localhost
if listenerStatus {
return l, cmn.Fmt("%v:%v", l.ExternalAddress().IP.String(), l.ExternalAddress().Port)
}
return l, cmn.Fmt("%v:%v", l.InternalAddress().IP.String(), l.InternalAddress().Port)
}
//getUPNPExternalAddress UPNP external address discovery & port mapping
func getUPNPExternalAddress(externalPort, internalPort int) (*NetAddress, error) {
nat, err := upnp.Discover()
if err != nil {
return nil, errors.Wrap(err, "could not perform UPNP discover")
}
ext, err := nat.GetExternalAddress()
if err != nil {
return nil, errors.Wrap(err, "could not perform UPNP external address")
}
if externalPort == 0 {
externalPort = defaultExternalPort
}
externalPort, err = nat.AddPortMapping("tcp", externalPort, internalPort, "vapord tcp", 0)
if err != nil {
return nil, errors.Wrap(err, "could not add tcp UPNP port mapping")
}
externalPort, err = nat.AddPortMapping("udp", externalPort, internalPort, "vapord udp", 0)
if err != nil {
return nil, errors.Wrap(err, "could not add udp UPNP port mapping")
}
return NewNetAddressIPPort(ext, uint16(externalPort)), nil
}
func splitHostPort(addr string) (host string, port int) {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
cmn.PanicSanity(err)
}
port, err = strconv.Atoi(portStr)
if err != nil {
cmn.PanicSanity(err)
}
return host, port
}
//DefaultListener Implements vapord server Listener
type DefaultListener struct {
cmn.BaseService
listener net.Listener
intAddr *NetAddress
extAddr *NetAddress
connections chan net.Conn
}
//NewDefaultListener create a default listener
func NewDefaultListener(protocol string, lAddr string, skipUPNP bool) (Listener, bool) {
// Local listen IP & port
lAddrIP, lAddrPort := splitHostPort(lAddr)
listener, err := net.Listen(protocol, lAddr)
for i := 0; i < tryListenTimes && err != nil; i++ {
time.Sleep(time.Second * 1)
listener, err = net.Listen(protocol, lAddr)
}
if err != nil {
log.Panic(err)
}
intAddr, err := NewNetAddressString(lAddr)
if err != nil {
log.Panic(err)
}
// Actual listener local IP & port
listenerIP, listenerPort := splitHostPort(listener.Addr().String())
log.Info("Local listener", " ip:", listenerIP, " port:", listenerPort)
// Determine external address...
var extAddr *NetAddress
var upnpMap bool
if !skipUPNP && (lAddrIP == "" || lAddrIP == "0.0.0.0") {
extAddr, err = getUPNPExternalAddress(lAddrPort, listenerPort)
upnpMap = err == nil
log.WithFields(log.Fields{"module": logModule, "err": err}).Info("get UPNP external address")
}
// Get the IPv4 available
if extAddr == nil {
if ip, err := ExternalIPv4(); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("get ipv4 external address")
log.Panic("get ipv4 external address fail!")
} else {
extAddr = NewNetAddressIPPort(net.ParseIP(ip), uint16(lAddrPort))
log.WithFields(log.Fields{"module": logModule, "addr": extAddr}).Info("get ipv4 external address success")
}
}
dl := &DefaultListener{
listener: listener,
intAddr: intAddr,
extAddr: extAddr,
connections: make(chan net.Conn, numBufferedConnections),
}
dl.BaseService = *cmn.NewBaseService(nil, "DefaultListener", dl)
dl.Start() // Started upon construction
if upnpMap {
return dl, true
}
conn, err := net.DialTimeout("tcp", extAddr.String(), 3*time.Second)
if err != nil {
return dl, false
}
conn.Close()
return dl, true
}
//OnStart start listener
func (l *DefaultListener) OnStart() error {
l.BaseService.OnStart()
go l.listenRoutine()
return nil
}
//OnStop stop listener
func (l *DefaultListener) OnStop() {
l.BaseService.OnStop()
l.listener.Close()
}
//listenRoutine Accept connections and pass on the channel
func (l *DefaultListener) listenRoutine() {
for {
conn, err := l.listener.Accept()
if !l.IsRunning() {
break // Go to cleanup
}
// listener wasn't stopped,
// yet we encountered an error.
if err != nil {
log.Panic(err)
}
l.connections <- conn
}
// Cleanup
close(l.connections)
}
//Connections a channel of inbound connections. It gets closed when the listener closes.
func (l *DefaultListener) Connections() <-chan net.Conn {
return l.connections
}
//InternalAddress listener internal address
func (l *DefaultListener) InternalAddress() *NetAddress {
return l.intAddr
}
//ExternalAddress listener external address for remote peer dial
func (l *DefaultListener) ExternalAddress() *NetAddress {
return l.extAddr
}
// NetListener the returned listener is already Accept()'ing. So it's not suitable to pass into http.Serve().
func (l *DefaultListener) NetListener() net.Listener {
return l.listener
}
//String string of default listener
func (l *DefaultListener) String() string {
return fmt.Sprintf("Listener(@%v)", l.extAddr)
}