-
Notifications
You must be signed in to change notification settings - Fork 189
/
listener.go
128 lines (108 loc) · 2.92 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package router
import (
"crypto/tls"
"errors"
"io"
"net"
"net/http"
"strconv"
"strings"
"sync"
"github.com/eolinker/eosc/log"
"github.com/eolinker/apinto/certs"
"github.com/eolinker/eosc/common/bean"
"github.com/eolinker/eosc/config"
"github.com/eolinker/eosc/traffic"
"github.com/eolinker/eosc/traffic/mixl"
"github.com/soheilhy/cmux"
)
type RouterType int
const (
GRPC RouterType = iota
Http
Dubbo2
TslTCP
AnyTCP
depth
)
var (
handlers = make([]RouterServerHandler, depth)
//matchers = make([][]cmux.Matcher, depth)
matchWriters = make([][]cmux.MatchWriter, depth)
ErrorDuplicateRouterType = errors.New("duplicate")
)
func Register(tp RouterType, handler RouterServerHandler) error {
if handlers[tp] != nil {
return ErrorDuplicateRouterType
}
handlers[tp] = handler
return nil
}
type RouterServerHandler func(port int, listener net.Listener)
func init() {
matchWriters[AnyTCP] = matchersToMatchWriters(cmux.Any())
matchWriters[TslTCP] = matchersToMatchWriters(cmux.TLS())
matchWriters[Http] = matchersToMatchWriters(cmux.HTTP1Fast(http.MethodPatch))
matchWriters[Dubbo2] = matchersToMatchWriters(cmux.PrefixMatcher(string([]byte{0xda, 0xbb})))
matchWriters[GRPC] = []cmux.MatchWriter{cmux.HTTP2MatchHeaderFieldPrefixSendSettings("content-type", "application/grpc")}
var tf traffic.ITraffic
var listenCfg *config.ListenUrl
bean.Autowired(&tf, &listenCfg)
bean.AddInitializingBeanFunc(func() {
initListener(tf, listenCfg)
})
}
func initListener(tf traffic.ITraffic, listenCfg *config.ListenUrl) {
if tf.IsStop() {
return
}
wg := sync.WaitGroup{}
tcp, ssl := tf.Listen(listenCfg.ListenUrls...)
listenerByPort := make(map[int][]net.Listener)
for _, l := range tcp {
port := readPort(l.Addr())
listenerByPort[port] = append(listenerByPort[port], l)
}
if len(ssl) > 0 {
tlsConfig := &tls.Config{GetCertificate: certs.GetCertificateFunc()}
for _, l := range ssl {
log.Debug("ssl listen: ", l.Addr().String())
port := readPort(l.Addr())
listenerByPort[port] = append(listenerByPort[port], tls.NewListener(l, tlsConfig))
}
}
for port, lns := range listenerByPort {
var ln net.Listener = mixl.NewMixListener(port, lns...)
wg.Add(1)
go func(ln net.Listener, p int) {
wg.Done()
cMux := cmux.New(ln)
for i, handler := range handlers {
log.Debug("i is ", i, " handler is ", handler)
if handler != nil {
go handler(p, cMux.MatchWithWriters(matchWriters[i]...))
}
}
cMux.Serve()
}(ln, port)
}
wg.Wait()
return
}
func readPort(addr net.Addr) int {
ipPort := addr.String()
i := strings.LastIndex(ipPort, ":")
port := ipPort[i+1:]
pv, _ := strconv.Atoi(port)
return pv
}
func matchersToMatchWriters(matchers ...cmux.Matcher) []cmux.MatchWriter {
mws := make([]cmux.MatchWriter, 0, len(matchers))
for _, m := range matchers {
cm := m
mws = append(mws, func(w io.Writer, r io.Reader) bool {
return cm(r)
})
}
return mws
}