-
Notifications
You must be signed in to change notification settings - Fork 0
/
link.go
125 lines (112 loc) · 2.33 KB
/
link.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package lib
import (
"fmt"
"io"
"sync"
)
// Represents a link between a local and a directly connected remote node.
type Link struct {
name string
readChan, writeChan io.Closer
reader ServerProtocolReader
writer ServerProtocolWriter
sq *SendQ
recv chan<- LinkMessage
trans chan LinkMessage
exit chan bool
// Ensures the link can only be closed once.
closed bool
Silence bool
}
func NewLink(reader io.ReadCloser, writer io.WriteCloser, sendBufferSize int, protoFactory ServerProtocolFactory, recv chan<- LinkMessage, wg *sync.WaitGroup) *Link {
l := &Link{
name: "unnamed",
readChan: reader,
writeChan: writer,
recv: recv,
trans: make(chan LinkMessage, 1),
exit: make(chan bool, 1),
}
l.sq = NewSendQ(writer, sendBufferSize, wg)
l.reader = protoFactory.Reader(reader)
l.writer = protoFactory.Writer(l.sq)
wg.Add(2)
go l.readLoop(wg)
go l.controlLoop(wg)
return l
}
func (l *Link) SetName(name string) {
l.name = name
l.sq.name = fmt.Sprintf("sq:%s", name)
}
func (l *Link) WriteMessage(msg SSMessage) error {
return l.writer.WriteMessage(msg)
}
func (l *Link) Close() {
l.readChan.Close()
if !l.closed {
l.closed = true
close(l.exit)
}
l.sq.Close()
}
func (l *Link) Shutdown() {
l.readChan.Close()
if !l.closed {
l.closed = true
close(l.exit)
}
l.sq.FlushAndClose()
}
func (l *Link) controlLoop(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-l.exit:
trans := l.trans
if trans != nil {
for _ = range trans {
}
}
close(l.recv)
return
case msg, ok := <-l.trans:
if !ok {
l.trans = nil
} else {
l.recv <- msg
}
case sqErr := <-l.sq.ErrChan():
if sqErr != nil {
l.recv <- LinkMessage{l, nil, sqErr}
}
}
}
}
func (l *Link) readLoop(wg *sync.WaitGroup) {
defer wg.Done()
for {
// Look for an exit signal. Nothing will actually be received,
// but the channel can be closed.
select {
case <-l.exit:
close(l.trans)
return
default:
// Attempt to read.
msg, err := l.reader.ReadMessage()
l.trans <- LinkMessage{l, msg, err}
// Don't attempt to read anymore.
if err != nil {
close(l.trans)
return
}
}
}
}
// A message sent pertaining to a particular link.
type LinkMessage struct {
link *Link
msg SSMessage
err error
}