forked from go-micro/go-micro
/
default.go
1420 lines (1207 loc) · 31.5 KB
/
default.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package tunnel
import (
"errors"
"math/rand"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/transport"
)
var (
// DiscoverTime sets the time at which we fire discover messages
DiscoverTime = 30 * time.Second
// KeepAliveTime defines time interval we send keepalive messages to outbound links
KeepAliveTime = 30 * time.Second
// ReconnectTime defines time interval we periodically attempt to reconnect dead links
ReconnectTime = 5 * time.Second
)
// tun represents a network tunnel
type tun struct {
options Options
sync.RWMutex
// the unique id for this tunnel
id string
// tunnel token for session encryption
token string
// to indicate if we're connected or not
connected bool
// the send channel for all messages
send chan *message
// close channel
closed chan bool
// a map of sessions based on Micro-Tunnel-Channel
sessions map[string]*session
// outbound links
links map[string]*link
// listener
listener transport.Listener
}
// create new tunnel on top of a link
func newTunnel(opts ...Option) *tun {
rand.Seed(time.Now().UnixNano())
options := DefaultOptions()
for _, o := range opts {
o(&options)
}
return &tun{
options: options,
id: options.Id,
token: options.Token,
send: make(chan *message, 128),
closed: make(chan bool),
sessions: make(map[string]*session),
links: make(map[string]*link),
}
}
// Init initializes tunnel options
func (t *tun) Init(opts ...Option) error {
t.Lock()
for _, o := range opts {
o(&t.options)
}
t.Unlock()
return nil
}
// getSession returns a session from the internal session map.
// It does this based on the Micro-Tunnel-Channel and Micro-Tunnel-Session
func (t *tun) getSession(channel, session string) (*session, bool) {
// get the session
t.RLock()
s, ok := t.sessions[channel+session]
t.RUnlock()
return s, ok
}
// delSession deletes a session if it exists
func (t *tun) delSession(channel, session string) {
t.Lock()
if s, ok := t.sessions[channel+session]; ok {
s.Close()
}
delete(t.sessions, channel+session)
t.Unlock()
}
// listChannels returns a list of listening channels
func (t *tun) listChannels() []string {
t.RLock()
//nolint:prealloc
var channels []string
for _, session := range t.sessions {
if session.session != "listener" {
continue
}
channels = append(channels, session.channel)
}
t.RUnlock()
return channels
}
// newSession creates a new session and saves it
func (t *tun) newSession(channel, sessionId string) (*session, bool, error) {
// new session
s := &session{
tunnel: t.id,
channel: channel,
session: sessionId,
token: t.token,
closed: make(chan bool),
recv: make(chan *message, 128),
send: t.send,
errChan: make(chan error, 1),
key: []byte(t.token + channel + sessionId),
}
gcm, err := newCipher(s.key)
if err != nil {
return nil, false, err
}
s.gcm = gcm
// save session
t.Lock()
_, ok := t.sessions[channel+sessionId]
if ok {
// session already exists
t.Unlock()
return nil, false, nil
}
t.sessions[channel+sessionId] = s
t.Unlock()
// return session
return s, true, nil
}
// TODO: use tunnel id as part of the session
func (t *tun) newSessionId() string {
return uuid.New().String()
}
// announce will send a message to the link to tell the other side of a channel mapping we have.
// This usually happens if someone calls Dial and sends a discover message but otherwise we
// periodically send these messages to asynchronously manage channel mappings.
func (t *tun) announce(channel, session string, link *link) {
// create the "announce" response message for a discover request
msg := &transport.Message{
Header: map[string]string{
"Micro-Tunnel": "announce",
"Micro-Tunnel-Id": t.id,
"Micro-Tunnel-Channel": channel,
"Micro-Tunnel-Session": session,
"Micro-Tunnel-Link": link.id,
},
}
// if no channel is present we've been asked to discover all channels
if len(channel) == 0 {
// get the list of channels
channels := t.listChannels()
// if there are no channels continue
if len(channels) == 0 {
return
}
// create a list of channels as comma separated list
channel = strings.Join(channels, ",")
// set channels as header
msg.Header["Micro-Tunnel-Channel"] = channel
} else {
// otherwise look for a single channel mapping
// looking for existing mapping as a listener
_, exists := t.getSession(channel, "listener")
if !exists {
return
}
}
if logger.V(logger.TraceLevel, log) {
log.Debugf("Tunnel sending announce for discovery of channel(s) %s", channel)
}
// send back the announcement
if err := link.Send(msg); err != nil {
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel failed to send announcement for channel(s) %s message: %v", channel, err)
}
}
}
// manage monitors outbound links and attempts to reconnect to the failed ones
func (t *tun) manage(reconnect time.Duration) {
r := time.NewTicker(reconnect)
defer r.Stop()
for {
select {
case <-t.closed:
return
case <-r.C:
t.manageLinks()
}
}
}
// manageLink sends channel discover requests periodically and
// keepalive messages to link
func (t *tun) manageLink(link *link) {
keepalive := time.NewTicker(KeepAliveTime)
defer keepalive.Stop()
discover := time.NewTicker(DiscoverTime)
defer discover.Stop()
wait := func(d time.Duration) {
// jitter
j := rand.Int63n(int64(d.Seconds() / 2.0))
time.Sleep(time.Duration(j) * time.Second)
}
for {
select {
case <-t.closed:
return
case <-link.closed:
return
case <-discover.C:
// wait half the discover time
wait(DiscoverTime)
// send a discovery message to the link
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel sending discover to link: %v", link.Remote())
}
if err := t.sendMsg("discover", link); err != nil {
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err)
}
}
case <-keepalive.C:
// wait half the keepalive time
wait(KeepAliveTime)
// send keepalive message
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel sending keepalive to link: %v", link.Remote())
}
if err := t.sendMsg("keepalive", link); err != nil {
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err)
}
t.delLink(link.Remote())
return
}
}
}
}
// manageLinks is a function that can be called to immediately to link setup
// it purges dead links while generating new links for any nodes not connected
func (t *tun) manageLinks() {
delLinks := make(map[*link]string)
connected := make(map[string]bool)
t.RLock()
// get list of nodes from options
nodes := t.options.Nodes
// check the link status and purge dead links
for node, link := range t.links {
// check link status
switch link.State() {
case "closed", "error":
delLinks[link] = node
default:
connected[node] = true
}
}
t.RUnlock()
// build a list of links to connect to
var connect []string
for _, node := range nodes {
// check if we're connected
if _, ok := connected[node]; ok {
continue
}
// add nodes to connect o
connect = append(connect, node)
}
// delete the dead links
if len(delLinks) > 0 {
t.Lock()
for link, node := range delLinks {
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel deleting dead link for %s", node)
}
// check if the link exists
l, ok := t.links[node]
if ok {
// close and delete
l.Close()
delete(t.links, node)
}
// if the link does not match our own
if l != link {
// close our link just in case
link.Close()
}
}
t.Unlock()
}
var wg sync.WaitGroup
// establish new links
for _, node := range connect {
wg.Add(1)
go func(node string) {
defer wg.Done()
// create new link
// if we're using quic it should be a max 10 second handshake period
link, err := t.setupLink(node)
if err != nil {
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel failed to setup node link to %s: %v", node, err)
}
return
}
t.Lock()
// just check nothing else was setup in the interim
if _, ok := t.links[node]; ok {
link.Close()
t.Unlock()
return
}
// save the link
t.links[node] = link
t.Unlock()
}(node)
}
// wait for all threads to finish
wg.Wait()
}
// process outgoing messages sent by all local sessions
func (t *tun) process() {
// manage the send buffer
// all pseudo sessions throw everything down this
for {
select {
case msg := <-t.send:
// build a list of links to send to
var sendTo []*link
var err error
t.RLock()
// build the list of links ot send to
for _, link := range t.links {
// get the values we need
link.RLock()
id := link.id
connected := link.connected
loopback := link.loopback
_, exists := link.channels[msg.channel]
link.RUnlock()
// if the link is not connected skip it
if !connected {
if logger.V(logger.DebugLevel, log) {
log.Debugf("Link for node %s not connected", id)
}
err = ErrLinkDisconnected
continue
}
// if the link was a loopback accepted connection
// and the message is being sent outbound via
// a dialled connection don't use this link
if loopback && msg.outbound {
err = ErrLinkLoopback
continue
}
// if the message was being returned by the loopback listener
// send it back up the loopback link only
if msg.loopback && !loopback {
err = ErrLinkRemote
continue
}
// check the multicast mappings
if msg.mode == Multicast {
// channel mapping not found in link
if !exists {
continue
}
} else {
// if we're picking the link check the id
// this is where we explicitly set the link
// in a message received via the listen method
if len(msg.link) > 0 && id != msg.link {
err = ErrLinkNotFound
continue
}
}
// add to link list
sendTo = append(sendTo, link)
}
t.RUnlock()
// no links to send to
if len(sendTo) == 0 {
if logger.V(logger.DebugLevel, log) {
log.Debugf("No links to send message type: %s channel: %s", msg.typ, msg.channel)
}
t.respond(msg, err)
continue
}
// send the message
go t.sendTo(sendTo, msg)
case <-t.closed:
return
}
}
}
// send response back for a message to the caller
func (t *tun) respond(msg *message, err error) {
select {
case msg.errChan <- err:
default:
}
}
// sendTo sends a message to the chosen links
func (t *tun) sendTo(links []*link, msg *message) error {
// the function that sends the actual message
send := func(link *link, msg *transport.Message) error {
if err := link.Send(msg); err != nil {
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel error sending %+v to %s: %v", msg.Header, link.Remote(), err)
}
t.delLink(link.Remote())
return err
}
return nil
}
newMsg := &transport.Message{
Header: make(map[string]string),
}
// set the data
if msg.data != nil {
for k, v := range msg.data.Header {
newMsg.Header[k] = v
}
newMsg.Body = msg.data.Body
}
// set message head
newMsg.Header["Micro-Tunnel"] = msg.typ
// set the tunnel id on the outgoing message
newMsg.Header["Micro-Tunnel-Id"] = msg.tunnel
// set the tunnel channel on the outgoing message
newMsg.Header["Micro-Tunnel-Channel"] = msg.channel
// set the session id
newMsg.Header["Micro-Tunnel-Session"] = msg.session
// error channel for call
errChan := make(chan error, len(links))
// execute in parallel
sendTo := func(l *link, m *transport.Message, errChan chan error) {
errChan <- send(l, m)
}
// send the message
for _, link := range links {
// send the message via the current link
if logger.V(logger.TraceLevel, log) {
log.Tracef("Tunnel sending %+v to %s", newMsg.Header, link.Remote())
}
// blast it in a go routine since its multicast/broadcast
if msg.mode > Unicast {
// make a copy
m := &transport.Message{
Header: make(map[string]string),
Body: make([]byte, len(newMsg.Body)),
}
copy(m.Body, newMsg.Body)
for k, v := range newMsg.Header {
m.Header[k] = v
}
go sendTo(link, m, errChan)
continue
}
// otherwise send as unicast
if err := send(link, newMsg); err != nil {
// put in the error chan if it failed
errChan <- err
continue
}
// sent successfully so just return
t.respond(msg, nil)
return nil
}
// either all unicast attempts failed or we're
// checking the multicast/broadcast attempts
var err error
// check all the errors
for i := 0; i < len(links); i++ {
err = <-errChan
// success
if err == nil {
break
}
}
// return error. it's non blocking
t.respond(msg, err)
return err
}
func (t *tun) delLink(remote string) {
t.Lock()
// get the link
for id, link := range t.links {
if link.id != remote {
continue
}
// close and delete
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel deleting link node: %s remote: %s", id, link.Remote())
}
link.Close()
delete(t.links, id)
}
t.Unlock()
}
// process incoming messages
func (t *tun) listen(link *link) {
// remove the link on exit
defer func() {
t.delLink(link.Remote())
}()
// let us know if its a loopback
var loopback bool
var connected bool
// set the connected value
link.RLock()
connected = link.connected
link.RUnlock()
for {
// process anything via the net interface
msg := new(transport.Message)
if err := link.Recv(msg); err != nil {
log.Debugf("Tunnel link %s receive error: %v", link.Remote(), err)
return
}
// TODO: figure out network authentication
// for now we use tunnel token to encrypt/decrypt
// session communication, but we will probably need
// some sort of network authentication (token) to avoid
// having rogue actors spamming the network
// message type
mtype := msg.Header["Micro-Tunnel"]
// the tunnel id
id := msg.Header["Micro-Tunnel-Id"]
// the tunnel channel
channel := msg.Header["Micro-Tunnel-Channel"]
// the session id
sessionId := msg.Header["Micro-Tunnel-Session"]
// if its not connected throw away the link
// the first message we process needs to be connect
if !connected && mtype != "connect" {
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel link %s not connected", link.id)
}
return
}
// this state machine block handles the only message types
// that we know or care about; connect, close, open, accept,
// discover, announce, session, keepalive
switch mtype {
case "connect":
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel link %s received connect message", link.Remote())
}
link.Lock()
// check if we're connecting to ourselves?
if id == t.id {
link.loopback = true
loopback = true
}
// set to remote node
link.id = link.Remote()
// set as connected
link.connected = true
connected = true
link.Unlock()
// save the link once connected
t.Lock()
t.links[link.Remote()] = link
t.Unlock()
// send back an announcement of our channels discovery
go t.announce("", "", link)
// ask for the things on the other wise
go t.sendMsg("discover", link)
// nothing more to do
continue
case "close":
// if there is no channel then we close the link
// as its a signal from the other side to close the connection
if len(channel) == 0 {
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel link %s received close message", link.Remote())
}
return
}
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel link %s received close message for %s", link.Remote(), channel)
}
// the entire listener was closed by the remote side so we need to
// remove the channel mapping for it. should we also close sessions?
if sessionId == "listener" {
link.delChannel(channel)
// TODO: find all the non listener unicast sessions
// and close them. think aboud edge cases first
continue
}
// assuming there's a channel and session
// try get the dialing socket
s, exists := t.getSession(channel, sessionId)
if exists && !loopback {
// only delete the session if its unicast
// otherwise ignore close on the multicast
if s.mode == Unicast {
// only delete this if its unicast
// but not if its a loopback conn
t.delSession(channel, sessionId)
continue
}
}
// otherwise its a session mapping of sorts
case "keepalive":
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel link %s received keepalive", link.Remote())
}
// save the keepalive
link.keepalive()
continue
// a new connection dialled outbound
case "open":
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel link %s received open %s %s", link.id, channel, sessionId)
}
// we just let it pass through to be processed
// an accept returned by the listener
case "accept":
s, exists := t.getSession(channel, sessionId)
// just set accepted on anything not unicast
if exists && s.mode > Unicast {
s.accepted = true
continue
}
// if its already accepted move on
if exists && s.accepted {
continue
}
// otherwise we're going to process to accept
// a continued session
case "session":
// process message
if logger.V(logger.TraceLevel, log) {
log.Tracef("Tunnel received %+v from %s", msg.Header, link.Remote())
}
// an announcement of a channel listener
case "announce":
if logger.V(logger.TraceLevel, log) {
log.Tracef("Tunnel received %+v from %s", msg.Header, link.Remote())
}
// process the announcement
channels := strings.Split(channel, ",")
// update mapping in the link
link.setChannel(channels...)
// this was an announcement not intended for anything
// if the dialing side sent "discover" then a session
// id would be present. We skip in case of multicast.
switch sessionId {
case "listener", "multicast", "":
continue
}
// get the session that asked for the discovery
s, exists := t.getSession(channel, sessionId)
if exists {
// don't bother it's already discovered
if s.discovered {
continue
}
msg := &message{
typ: "announce",
tunnel: id,
channel: channel,
session: sessionId,
link: link.id,
}
// send the announce back to the caller
select {
case <-s.closed:
case s.recv <- msg:
}
}
continue
case "discover":
// send back an announcement
go t.announce(channel, sessionId, link)
continue
default:
// blackhole it
continue
}
// strip tunnel message header
for k := range msg.Header {
if strings.HasPrefix(k, "Micro-Tunnel") {
delete(msg.Header, k)
}
}
// if the session id is blank there's nothing we can do
// TODO: check this is the case, is there any reason
// why we'd have a blank session? Is the tunnel
// used for some other purpose?
if len(channel) == 0 || len(sessionId) == 0 {
continue
}
var s *session
var exists bool
// If its a loopback connection then we've enabled link direction
// listening side is used for listening, the dialling side for dialling
switch {
case loopback, mtype == "open":
s, exists = t.getSession(channel, "listener")
// only return accept to the session
case mtype == "accept":
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel received accept message for channel: %s session: %s", channel, sessionId)
}
s, exists = t.getSession(channel, sessionId)
if exists && s.accepted {
continue
}
default:
// get the session based on the tunnel id and session
// this could be something we dialed in which case
// we have a session for it otherwise its a listener
s, exists = t.getSession(channel, sessionId)
if !exists {
// try get it based on just the tunnel id
// the assumption here is that a listener
// has no session but its set a listener session
s, exists = t.getSession(channel, "listener")
}
}
// bail if no session or listener has been found
if !exists {
if logger.V(logger.TraceLevel, log) {
log.Tracef("Tunnel skipping no channel: %s session: %s exists", channel, sessionId)
}
// drop it, we don't care about
// messages we don't know about
continue
}
// is the session closed?
select {
case <-s.closed:
// closed
delete(t.sessions, channel)
continue
default:
// otherwise process
}
if logger.V(logger.TraceLevel, log) {
log.Tracef("Tunnel using channel: %s session: %s type: %s", s.channel, s.session, mtype)
}
// construct a new transport message
tmsg := &transport.Message{
Header: msg.Header,
Body: msg.Body,
}
// construct the internal message
imsg := &message{
tunnel: id,
typ: mtype,
channel: channel,
session: sessionId,
mode: s.mode,
data: tmsg,
link: link.id,
loopback: loopback,
errChan: make(chan error, 1),
}
// append to recv backlog
// we don't block if we can't pass it on
select {
case s.recv <- imsg:
default:
}
}
}
func (t *tun) sendMsg(method string, link *link) error {
return link.Send(&transport.Message{
Header: map[string]string{
"Micro-Tunnel": method,
"Micro-Tunnel-Id": t.id,
},
})
}
// setupLink connects to node and returns link if successful
// It returns error if the link failed to be established
func (t *tun) setupLink(node string) (*link, error) {
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel setting up link: %s", node)
}
c, err := t.options.Transport.Dial(node)
if err != nil {
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel failed to connect to %s: %v", node, err)
}
return nil, err
}
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel connected to %s", node)
}
// create a new link
link := newLink(c)
// set link id to remote side
link.Lock()
link.id = c.Remote()
link.Unlock()
// send the first connect message
if err := t.sendMsg("connect", link); err != nil {
link.Close()
return nil, err
}
// we made the outbound connection
// and sent the connect message
link.connected = true
// process incoming messages
go t.listen(link)
// manage keepalives and discovery messages
go t.manageLink(link)
return link, nil
}
func (t *tun) setupLinks() {
var wg sync.WaitGroup
for _, node := range t.options.Nodes {
wg.Add(1)
go func(node string) {
defer wg.Done()
// we're not trying to fix existing links
if _, ok := t.links[node]; ok {
return
}
// create new link
link, err := t.setupLink(node)
if err != nil {
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel failed to setup node link to %s: %v", node, err)
}
return
}
// save the link
t.links[node] = link
}(node)
}
// wait for all threads to finish
wg.Wait()
}
// connect the tunnel to all the nodes and listen for incoming tunnel connections
func (t *tun) connect() error {
l, err := t.options.Transport.Listen(t.options.Address)
if err != nil {
return err
}
// save the listener
t.listener = l
go func() {
// accept inbound connections
err := l.Accept(func(sock transport.Socket) {
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel accepted connection from %s", sock.Remote())
}
// create a new link
link := newLink(sock)
// manage the link
go t.manageLink(link)
// listen for inbound messages.
// only save the link once connected.
// we do this inside liste
t.listen(link)