/
lmux.go
152 lines (138 loc) · 3.29 KB
/
lmux.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package lmux
import (
"errors"
"net"
"sync/atomic"
"time"
)
type event struct {
err error
conn net.Conn
}
type listenerAB struct {
a, b *ChanListener
}
// New returns a ListenerMux.
func New(maxOnlineA int) *ListenerMux {
return &ListenerMux{
listeners: map[net.Listener]listenerAB{},
chClose: make(chan struct{}),
maxOnlineA: int32(maxOnlineA),
}
}
// ListenerMux manages listeners and handle the connection dispatching logic.
type ListenerMux struct {
shutdown bool
listeners map[net.Listener]listenerAB
chClose chan struct{}
onlineA int32
maxOnlineA int32
}
// Mux creates and returns ChanListener A and B:
// If the online num of A is less than ListenerMux. maxOnlineA, the new connection will be dispatched to A;
// Else the new connection will be dispatched to B.
func (lm *ListenerMux) Mux(l net.Listener) (*ChanListener, *ChanListener) {
if l == nil || lm == nil {
return nil, nil
}
if lm.listeners == nil {
lm.listeners = map[net.Listener]listenerAB{}
}
ab := listenerAB{
a: &ChanListener{
addr: l.Addr(),
chClose: lm.chClose,
chEvent: make(chan event, 1024*64),
decrease: lm.DecreaseOnlineA,
},
b: &ChanListener{
addr: l.Addr(),
chClose: lm.chClose,
chEvent: make(chan event, 1024*64),
},
}
lm.listeners[l] = ab
return ab.a, ab.b
}
// Start starts to accept and dispatch the connections to ChanListener A or B.
func (lm *ListenerMux) Start() {
if lm == nil {
return
}
lm.shutdown = false
for k, v := range lm.listeners {
go func(l net.Listener, listenerA *ChanListener, listenerB *ChanListener) {
for !lm.shutdown {
c, err := l.Accept()
if err != nil {
var ne net.Error
if ok := errors.As(err, &ne); ok && ne.Timeout() {
time.Sleep(time.Second / 20)
continue
} else {
if !lm.shutdown {
//logging.Error("Accept failed: %v, exit...", err)
}
listenerA.chEvent <- event{err: err, conn: c}
listenerB.chEvent <- event{err: err, conn: c}
return
}
}
if atomic.AddInt32(&lm.onlineA, 1) <= lm.maxOnlineA {
listenerA.chEvent <- event{err: nil, conn: c}
} else {
atomic.AddInt32(&lm.onlineA, -1)
listenerB.chEvent <- event{err: nil, conn: c}
}
}
}(k, v.a, v.b)
}
}
// Stop stops all the listeners.
func (lm *ListenerMux) Stop() {
if lm == nil {
return
}
lm.shutdown = true
for l, ab := range lm.listeners {
l.Close()
ab.a.Close()
ab.b.Close()
}
close(lm.chClose)
}
// DecreaseOnlineA decreases the online num of ChanListener A.
func (lm *ListenerMux) DecreaseOnlineA() {
atomic.AddInt32(&lm.onlineA, -1)
}
// ChanListener .
type ChanListener struct {
addr net.Addr
chEvent chan event
chClose chan struct{}
decrease func()
}
// Accept accepts a connection.
func (l *ChanListener) Accept() (net.Conn, error) {
select {
case e := <-l.chEvent:
return e.conn, e.err
case <-l.chClose:
return nil, net.ErrClosed
}
}
// Close does nothing but implementing net.Conn.Close.
// User should call ListenerMux.Close to close it automatically.
func (l *ChanListener) Close() error {
return nil
}
// Addr returns the listener's network address.
func (l *ChanListener) Addr() net.Addr {
return l.addr
}
// Decrease decreases the online num if it's A.
func (l *ChanListener) Decrease() {
if l.decrease != nil {
l.decrease()
}
}