/
rawsocketserver.go
125 lines (107 loc) · 3.44 KB
/
rawsocketserver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package router
import (
"crypto/tls"
"fmt"
"io"
"net"
"time"
"github.com/gammazero/nexus/v3/transport"
)
// RawSocketServer handles socket connections.
type RawSocketServer struct {
// RecvLimit is the maximum length of messages the server is willing to
// receive. Defaults to maximum allowed for protocol: 16M.
RecvLimit int
// KeepAlive is the TCP keep-alive period. Default is disable keep-alive.
KeepAlive time.Duration
// OutQueueSize is the maximum number of pending outbound messages, per
// client. The default is defaultOutQueueSize.
OutQueueSize int
router Router
}
// NewRawSocketServer takes a router instance and creates a new socket server.
func NewRawSocketServer(r Router) *RawSocketServer {
return &RawSocketServer{
router: r,
}
}
// ListenAndServe listens on the specified endpoint and starts a goroutine that
// accepts new client connections until the returned io.closer is closed.
func (s *RawSocketServer) ListenAndServe(network, address string) (io.Closer, error) {
l, err := net.Listen(network, address)
if err != nil {
return nil, err
}
// Start request handler loop.
go s.requestHandler(l)
return l, nil
}
// ListenAndServeTLS listens on the specified endpoint and starts a
// goroutine that accepts new TLS client connections until the returned
// io.closer is closed. If tls.Config does not already contain a certificate,
// then certFile and keyFile, if specified, are used to load an X509
// certificate.
func (s *RawSocketServer) ListenAndServeTLS(network, address string, tlscfg *tls.Config, certFile, keyFile string) (io.Closer, error) { //nolint:lll
var hasCert bool
if tlscfg == nil {
tlscfg = &tls.Config{}
} else if len(tlscfg.Certificates) > 0 || tlscfg.GetCertificate != nil {
hasCert = true
}
if !hasCert || certFile != "" || keyFile != "" {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, fmt.Errorf("error loading X509 key pair: %w", err)
}
tlscfg.Certificates = append(tlscfg.Certificates, cert)
}
l, err := tls.Listen(network, address, tlscfg)
if err != nil {
return nil, err
}
// Start request handler loop.
go s.requestHandler(l)
return l, nil
}
func (s *RawSocketServer) requestHandler(l net.Listener) {
for {
conn, err := l.Accept()
if err != nil {
// Error normal when listener closed, do not log.
l.Close()
return
}
if tcpConn, ok := conn.(*net.TCPConn); ok {
if s.KeepAlive != 0 {
if err = tcpConn.SetKeepAlive(true); err != nil {
s.router.Logger().Println("Error enabling keepalive:", err)
}
if err = tcpConn.SetKeepAlivePeriod(s.KeepAlive); err != nil {
s.router.Logger().Println("Error setting keepalive period:", err)
}
} else {
if err = tcpConn.SetKeepAlive(false); err != nil {
s.router.Logger().Println("Error disabling keepalive:", err)
}
}
}
go s.handleRawSocket(conn)
}
}
// handleRawSocket accepts a connection from the listening socket, handles the
// client handshake, creates a rawSocketPeer, and then attaches that peer to
// the router.
func (s *RawSocketServer) handleRawSocket(conn net.Conn) {
qsize := s.OutQueueSize
if qsize == 0 {
qsize = defaultOutQueueSize
}
peer, err := transport.AcceptRawSocket(conn, s.router.Logger(), s.RecvLimit, qsize)
if err != nil {
s.router.Logger().Println("Error accepting rawsocket client:", err)
return
}
if err := s.router.Attach(peer); err != nil {
s.router.Logger().Println("Error attaching to router:", err)
}
}