-
Notifications
You must be signed in to change notification settings - Fork 0
/
nat_manager.go
217 lines (178 loc) · 4.97 KB
/
nat_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package messenger
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/dnerochain/dnero/common"
"github.com/dnerochain/dnero/p2p/nat"
pr "github.com/dnerochain/dnero/p2p/peer"
"github.com/dnerochain/dnero/p2p/types"
"github.com/dnerochain/dnero/rlp"
)
const (
natMappingPulseInterval = 1 * time.Minute
)
// NatMappingMessage defines the structure of the NAT mapping message
type NATMappingMessage struct {
EPort uint16
}
type NATManager struct {
port int
eport int
natDevice nat.NAT
messenger *Messenger
peerTable *pr.PeerTable
// Life cycle
wg *sync.WaitGroup
quit chan struct{}
ctx context.Context
cancel context.CancelFunc
stopped bool
}
func CreateNATManager(port int) *NATManager {
nmgr := &NATManager{
natDevice: nil,
port: port,
wg: &sync.WaitGroup{},
}
return nmgr
}
// DiscoverGateway discovers the gateway for the NAT mapping
func (nmgr *NATManager) DiscoverGateway() error {
logger.Infof("Discovering NAT gateway...")
natDevice, err := nat.DiscoverGateway()
if err != nil {
nmgr.natDevice = nil
logger.Warnf("Failed to detect the NAT device: %v", err)
return err
}
logger.Infof("NAT type: %s", natDevice.Type())
nmgr.natDevice = natDevice
return nil
}
// SetMessenger sets the Messenger for the NATManager
func (nmgr *NATManager) SetMessenger(msgr *Messenger) {
nmgr.messenger = msgr
nmgr.peerTable = &msgr.peerTable
}
// Start is called when the NATManager instance starts
func (nmgr *NATManager) Start(ctx context.Context) error {
c, cancel := context.WithCancel(ctx)
nmgr.ctx = c
nmgr.cancel = cancel
if nmgr.natDevice != nil {
nmgr.wg.Add(1)
go nmgr.maintainNATMappingRoutine()
}
return nil
}
// Wait suspends the caller goroutine
func (nmgr *NATManager) Wait() {
nmgr.wg.Wait()
}
// Stop is called when the NATManager instance stops
func (nmgr *NATManager) Stop() {
nmgr.cancel()
}
func (nmgr *NATManager) maintainNATMappingRoutine() {
defer nmgr.wg.Done()
natMappingPulse := time.NewTicker(natMappingPulseInterval)
for {
select {
case <-natMappingPulse.C:
nmgr.maintainNATMapping()
}
}
}
func (nmgr *NATManager) maintainNATMapping() {
eport, err := nmgr.NatMapping(nmgr.port)
if err != nil {
logger.Warnf("Failed to perform NAT mapping: %v", err)
}
if nmgr.eport != eport {
// notify peers
content := NATMappingMessage{
EPort: uint16(eport),
}
message := types.Message{
ChannelID: common.ChannelIDNATMapping,
Content: content,
}
nmgr.messenger.Broadcast(message)
nmgr.eport = eport
logger.Debugf("Notify peers with the new external port: %v", eport)
}
}
func (nmgr *NATManager) NatMapping(port int) (eport int, err error) {
if nmgr.natDevice == nil {
return port, fmt.Errorf("No available NAT device")
}
iaddr, err := nmgr.natDevice.GetInternalAddress()
if err != nil {
return port, err
}
logger.Infof("Internal address: %s", iaddr)
eaddr, err := nmgr.natDevice.GetExternalAddress()
if err != nil {
return port, err
}
logger.Infof("External address: %s", eaddr)
eport, err = nmgr.natDevice.AddPortMapping("tcp", port, "tcp", 60*time.Second)
if err != nil {
return port, err
}
logger.Infof("External port for %v is %v", port, eport)
return eport, nil
}
// GetChannelIDs implements the p2p.MessageHandler interface
func (nmgr *NATManager) GetChannelIDs() []common.ChannelIDEnum {
return []common.ChannelIDEnum{
common.ChannelIDNATMapping,
}
}
// EncodeMessage implements the p2p.MessageHandler interface
func (nmgr *NATManager) EncodeMessage(message interface{}) (common.Bytes, error) {
return rlp.EncodeToBytes(message)
}
// ParseMessage implements the p2p.MessageHandler interface
func (nmgr *NATManager) ParseMessage(peerID string,
channelID common.ChannelIDEnum, rawMessageBytes common.Bytes) (types.Message, error) {
mappingMsg, err := decodeNATMappingMessage(rawMessageBytes)
message := types.Message{
PeerID: peerID,
ChannelID: channelID,
Content: mappingMsg,
}
if err != nil {
logger.Errorf("Error decoding NATMappingMessage: %v", err)
return message, err
}
return message, nil
}
// HandleMessage implements the p2p.MessageHandler interface
func (nmgr *NATManager) HandleMessage(msg types.Message) error {
if msg.ChannelID != common.ChannelIDNATMapping {
errMsg := fmt.Sprintf("Invalid channelID for the NATMappingMessageHandler: %v", msg.ChannelID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
peerID := msg.PeerID
peer := nmgr.peerTable.GetPeer(peerID)
if peer == nil {
errMsg := fmt.Sprintf("Cannot find peer %v in the peer table", peerID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
natMsg := (msg.Content).(NATMappingMessage)
peerAddr := peer.NetAddress()
peerAddr.Port = natMsg.EPort
peer.SetNetAddress(peerAddr)
logger.Debugf("Update peer address for %v - external port: %v, peerAddr: %v", peer.ID(), peerAddr.Port, peerAddr.String())
return nil
}
func decodeNATMappingMessage(msgBytes common.Bytes) (message NATMappingMessage, err error) {
err = rlp.DecodeBytes(msgBytes, &message)
return
}