Skip to content
Permalink
Browse files
Add: listen on random local port
  • Loading branch information
AlexStocks authored and watermelo committed Jul 18, 2020
1 parent 0683f35 commit ab472d9d5ba8c67f0f98f8dcd7bb6d0db1428d67
Showing 4 changed files with 70 additions and 62 deletions.
2 go.sum
@@ -6,6 +6,8 @@ github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI=
github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg=
github.com/dubbogo/gost v1.5.2 h1:ri/03971hdpnn3QeCU+4UZgnRNGDXLDGDucR/iozZm8=
github.com/dubbogo/gost v1.5.2/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.9.0 h1:UT+dWwvLyJiDotxJERO75jB3Yxgsdy10KztR5ycxRAk=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
@@ -17,6 +17,7 @@ import (
"io/ioutil"
"net"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
@@ -67,9 +68,9 @@ func newServer(t EndPointType, opts ...ServerOption) *server {

s.init(opts...)

if s.addr == "" {
panic(fmt.Sprintf("@addr:%s", s.addr))
}
//if len(s.addr) == 0 {
// panic(fmt.Sprintf("@addr:%s", s.addr))
//}

return s
}
@@ -163,9 +164,16 @@ func (s *server) listenTCP() error {
streamListener net.Listener
)

streamListener, err = net.Listen("tcp", s.addr)
if err != nil {
return perrors.Wrapf(err, "net.Listen(tcp, addr:%s))", s.addr)
if len(s.addr) == 0 || !strings.Contains(s.addr, ":") {
streamListener, err = gxnet.ListenOnTCPRandomPort(s.addr)
if err != nil {
return perrors.Wrapf(err, "gxnet.ListenOnTCPRandomPort(addr:%s)", s.addr)
}
} else {
streamListener, err = net.Listen("tcp", s.addr)
if err != nil {
return perrors.Wrapf(err, "net.Listen(tcp, addr:%s)", s.addr)
}
}

s.streamListener = streamListener
@@ -180,13 +188,20 @@ func (s *server) listenUDP() error {
pktListener *net.UDPConn
)

localAddr, err = net.ResolveUDPAddr("udp", s.addr)
if err != nil {
return perrors.Wrapf(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr)
}
pktListener, err = net.ListenUDP("udp", localAddr)
if err != nil {
return perrors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
if len(s.addr) == 0 || !strings.Contains(s.addr, ":") {
pktListener, err = gxnet.ListenOnUDPRandomPort(s.addr)
if err != nil {
return perrors.Wrapf(err, "gxnet.ListenOnUDPRandomPort(addr:%s)", s.addr)
}
} else {
localAddr, err = net.ResolveUDPAddr("udp", s.addr)
if err != nil {
return perrors.Wrapf(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr)
}
pktListener, err = net.ListenUDP("udp", localAddr)
if err != nil {
return perrors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
}
}

s.pktListener = pktListener
@@ -256,7 +271,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
}
continue
}
log.Warnf("server{%s}.Accept() = err {%+v}", s.addr, err)
log.Warnf("server{%s}.Accept() = err {%+v}", s.addr, perrors.WithStack(err))
continue
}
delay = 0
@@ -357,7 +372,7 @@ func (s *server) runWSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock()
err = server.Serve(s.streamListener)
if err != nil {
log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, err)
log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, perrors.WithStack(err))
// panic(err)
}
}()
@@ -381,7 +396,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {

if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil {
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%+v}",
s.cert, s.privateKey, err))
s.cert, s.privateKey, perrors.WithStack(err)))
return
}
config = &tls.Config{
@@ -394,7 +409,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
if s.caCert != "" {
certPem, err = ioutil.ReadFile(s.caCert)
if err != nil {
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err{%+v}", s.caCert, err))
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err{%+v}", s.caCert, perrors.WithStack(err)))
}
certPool = x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(certPem); !ok {
@@ -419,7 +434,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock()
err = server.Serve(tls.NewListener(s.streamListener, config))
if err != nil {
log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, err)
log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, perrors.WithStack(err))
panic(err)
}
}()
@@ -429,7 +444,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
// @newSession: new connection callback
func (s *server) RunEventLoop(newSession NewSessionCallback) {
if err := s.listen(); err != nil {
panic(fmt.Errorf("server.listen() = error:%+v", err))
panic(fmt.Errorf("server.listen() = error:%+v", perrors.WithStack(err)))
}

switch s.endPointType {
@@ -9,28 +9,29 @@ import (
"github.com/stretchr/testify/assert"
)

func TestTCPServer(t *testing.T) {
func testTCPServer(t *testing.T, address string) {
var (
server *server
serverMsgHandler MessageHandler
)
addr := "127.0.0.1:0"

func() {
server = newServer(
TCP_SERVER,
WithLocalAddress(addr),
WithLocalAddress(address),
)
newServerSession := func(session Session) error {
return newSessionCallback(session, &serverMsgHandler)
}
server.RunEventLoop(newServerSession)
assert.True(t, server.ID() > 0)
assert.True(t, server.EndPointType() == TCP_SERVER)
assert.NotNil(t, server.streamListener)
}()
time.Sleep(500e6)

addr = server.streamListener.Addr().String()
t.Logf("server addr: %v", addr)
addr := server.streamListener.Addr().String()
t.Logf("@address:%s, tcp server addr: %v", address, addr)
clt := newClient(TCP_CLIENT,
WithServerAddress(addr),
WithReconnectInterval(5e8),
@@ -58,51 +59,41 @@ func TestTCPServer(t *testing.T) {
assert.True(t, server.IsClosed())
}

func TestUDPServer(t *testing.T) {
func testUDPServer(t *testing.T, address string) {
var (
server *server
serverMsgHandler MessageHandler
)
addr := "127.0.0.1:0"
func() {
server = newServer(
UDP_ENDPOINT,
WithLocalAddress(addr),
WithLocalAddress(address),
)
newServerSession := func(session Session) error {
return newSessionCallback(session, &serverMsgHandler)
}
server.RunEventLoop(newServerSession)
assert.True(t, server.ID() > 0)
assert.True(t, server.EndPointType() == UDP_ENDPOINT)
assert.NotNil(t, server.pktListener)
}()
time.Sleep(500e6)

//addr = server.streamListener.Addr().String()
//t.Logf("server addr: %v", addr)
//clt := newClient(TCP_CLIENT,
// WithServerAddress(addr),
// WithReconnectInterval(5e8),
// WithConnectionNumber(1),
//)
//assert.NotNil(t, clt)
//assert.True(t, clt.ID() > 0)
//assert.Equal(t, clt.endPointType, TCP_CLIENT)
//
//var (
// msgHandler MessageHandler
//)
//cb := func(session Session) error {
// return newSessionCallback(session, &msgHandler)
//}
//
//clt.RunEventLoop(cb)
//time.Sleep(1e9)
//
//assert.Equal(t, 1, msgHandler.SessionNumber())
//clt.Close()
//assert.True(t, clt.IsClosed())
//
//server.Close()
//assert.True(t, server.IsClosed())
addr := server.pktListener.LocalAddr().String()
t.Logf("@address:%s, udp server addr: %v", address, addr)
}

func TestServer(t *testing.T) {
var addr string

testTCPServer(t, addr)
testUDPServer(t, addr)

addr = "127.0.0.1:0"
testTCPServer(t, addr)
testUDPServer(t, addr)

addr = "127.0.0.1"
testTCPServer(t, addr)
testUDPServer(t, addr)
}
@@ -703,12 +703,12 @@ func (s *session) handleTCPPackage() error {
break
}
if perrors.Cause(err) == io.EOF {
log.Infof("%s, [session.conn.read] = error:%+v", s.sessionToken(), err)
log.Infof("%s, [session.conn.read] = error:%+v", s.sessionToken(), perrors.WithStack(err))
err = nil
exit = true
break
}
log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), err)
log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), perrors.WithStack(err))
exit = true
}
break
@@ -784,7 +784,7 @@ func (s *session) handleUDPPackage() error {
}

bufLen, addr, err = conn.recv(buf)
log.Debugf("conn.read() = bufLen:%d, addr:%#v, err:%+v", bufLen, addr, err)
log.Debugf("conn.read() = bufLen:%d, addr:%#v, err:%+v", bufLen, addr, perrors.WithStack(err))
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
continue
}
@@ -796,7 +796,7 @@ func (s *session) handleUDPPackage() error {
}

if bufLen == 0 {
log.Errorf("conn.read() = bufLen:%d, addr:%s, err:%+v", bufLen, addr, err)
log.Errorf("conn.read() = bufLen:%d, addr:%s, err:%+v", bufLen, addr, perrors.WithStack(err))
continue
}

@@ -806,17 +806,17 @@ func (s *session) handleUDPPackage() error {
}

pkg, pkgLen, err = s.reader.Read(s, buf[:bufLen])
log.Debugf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, err)
log.Debugf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, perrors.WithStack(err))
if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) {
err = perrors.Errorf("Message Too Long, bufLen %d, session max message len %d", bufLen, s.maxMsgLen)
}
if err != nil {
log.Warnf("%s, [session.handleUDPPackage] = len{%d}, error:%+v",
s.sessionToken(), pkgLen, err)
s.sessionToken(), pkgLen, perrors.WithStack(err))
continue
}
if pkgLen == 0 {
log.Errorf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, err)
log.Errorf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, perrors.WithStack(err))
continue
}

@@ -861,7 +861,7 @@ func (s *session) handleWSPackage() error {
}
if err != nil {
log.Warnf("%s, [session.handleWSPackage] = len{%d}, error:%+v",
s.sessionToken(), length, err)
s.sessionToken(), length, perrors.WithStack(err))
continue
}

0 comments on commit ab472d9

Please sign in to comment.