Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add: listen on random local port #37

Merged
merged 6 commits into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 19 additions & 0 deletions .github/ISSUE_TEMPLATE/bug-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
name: Bug Report
about: Report a bug
labels: kind/bug

---

<!-- Please use this template while reporting a bug and provide as much info as possible. Not doing so may result in your bug not being addressed in a timely manner. Thanks!
-->


**What happened**:

**What you expected to happen**:

**How to reproduce it (as minimally and precisely as possible)**:

**Anything else we need to know?**:
11 changes: 11 additions & 0 deletions .github/ISSUE_TEMPLATE/enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
name: Enhancement Request
about: Suggest an enhancement
labels: kind/feature

---
<!-- Please only use this template for submitting enhancement requests -->

**What would you like to be added**:

**Why is this needed**:
24 changes: 24 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<!-- Thanks for sending a pull request!
-->

**What this PR does**:

**Which issue(s) this PR fixes**:
<!--
*Automatically closes linked issue when PR is merged.
Usage: `Fixes #<issue number>`, or `Fixes (paste link of issue)`.
_If PR is about `failing-tests or flakes`, please post the related issues/tests in a comment and do not use `Fixes`_*
-->
Fixes #

**Special notes for your reviewer**:

**Does this PR introduce a user-facing change?**:
<!--
If no, just write "NONE" in the release-note block below.
If yes, a release note is required:
Enter your extended release note in the block below. If the PR requires additional action from users switching to the new release, include the string "action required".
-->
```release-note
```
5 changes: 5 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
language: go

os:
- linux

go:
- "1.13"

env:
- GO111MODULE=on

install: true

script:
- go fmt ./... && [[ -z `git status -s` ]]
- go mod vendor && go test $(go list ./... | grep -v vendor | grep -v demo) -coverprofile=coverage.txt -covermode=atomic
Expand Down
22 changes: 10 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (c *client) dialTCP() Session {
return newTCPSession(conn, c)
}

log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, err)
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
<-wheel.After(connectInterval)
}
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func (c *client) dialUDP() Session {
err = errSelfConnect
}
if err != nil {
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, err)
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval)
continue
}
Expand All @@ -194,7 +194,7 @@ func (c *client) dialUDP() Session {
conn.SetWriteDeadline(time.Now().Add(1e9))
if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close()
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, err)
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, perrors.WithStack(err))
<-wheel.After(connectInterval)
continue
}
Expand All @@ -204,7 +204,7 @@ func (c *client) dialUDP() Session {
err = nil
}
if err != nil {
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, err)
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, perrors.WithStack(err))
conn.Close()
<-wheel.After(connectInterval)
continue
Expand All @@ -229,7 +229,7 @@ func (c *client) dialWS() Session {
return nil
}
conn, _, err = dialer.Dial(c.addr, nil)
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
Expand All @@ -243,7 +243,7 @@ func (c *client) dialWS() Session {
return ss
}

log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval)
}
}
Expand All @@ -269,7 +269,7 @@ func (c *client) dialWSS() Session {
if c.cert != "" {
certPEMBlock, err := ioutil.ReadFile(c.cert)
if err != nil {
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error:%+v", c.cert, err))
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error:%+v", c.cert, perrors.WithStack(err)))
}

var cert tls.Certificate
Expand All @@ -291,7 +291,7 @@ func (c *client) dialWSS() Session {
for _, c := range config.Certificates {
roots, err = x509.ParseCertificates(c.Certificate[len(c.Certificate)-1])
if err != nil {
panic(fmt.Sprintf("error parsing server's root cert: %+v\n", err))
panic(fmt.Sprintf("error parsing server's root cert: %+v\n", perrors.WithStack(err)))
}
for _, root = range roots {
certPool.AddCert(root)
Expand Down Expand Up @@ -321,7 +321,7 @@ func (c *client) dialWSS() Session {
return ss
}

log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval)
}
}
Expand Down Expand Up @@ -387,7 +387,7 @@ func (c *client) connect() {
}
}

// there are two methods to keep connection pool. the first approch is like
// there are two methods to keep connection pool. the first approach is like
// redigo's lazy connection pool(https://github.com/gomodule/redigo/blob/master/redis/pool.go:),
// in which you should apply testOnBorrow to check alive of the connection.
// the second way is as follows. @RunEventLoop detects the aliveness of the connection
Expand All @@ -405,13 +405,11 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) {
func (c *client) reConnect() {
var num, max, times, interval int

// c.Lock()
max = c.number
interval = c.reconnectInterval
if interval == 0 {
interval = reconnectInterval
}
// c.Unlock()
for {
if c.IsClosed() {
log.Warnf("client{peer:%s} goroutine exit now.", c.addr)
Expand Down
2 changes: 2 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ func TestNewWSClient(t *testing.T) {
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
err = ss.WriteBytes([]byte("hello"))
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
err = ss.WriteBytesArray([]byte("hello"), []byte("hello"))
assert.Equal(t, beforeWritePkgNum+3, atomic.LoadUint32(&conn.writePkgNum))
err = conn.writePing()
assert.Nil(t, err)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module github.com/dubbogo/getty

require (
github.com/dubbogo/gost v1.5.2
github.com/dubbogo/gost v1.9.0
github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.0
github.com/juju/errors v0.0.0-20190930114154-d42613fe1ab9
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/gost v1.5.2 h1:ri/03971hdpnn3QeCU+4UZgnRNGDXLDGDucR/iozZm8=
github.com/dubbogo/gost v1.5.2/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.9.0 h1:UT+dWwvLyJiDotxJERO75jB3Yxgsdy10KztR5ycxRAk=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
Expand Down
58 changes: 34 additions & 24 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"io/ioutil"
"net"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -67,10 +68,6 @@ func newServer(t EndPointType, opts ...ServerOption) *server {

s.init(opts...)

if s.addr == "" {
panic(fmt.Sprintf("@addr:%s", s.addr))
}

return s
}

Expand Down Expand Up @@ -163,9 +160,16 @@ func (s *server) listenTCP() error {
streamListener net.Listener
)

streamListener, err = net.Listen("tcp", s.addr)
if err != nil {
return perrors.Wrapf(err, "net.Listen(tcp, addr:%s))", s.addr)
if len(s.addr) == 0 || !strings.Contains(s.addr, ":") {
streamListener, err = gxnet.ListenOnTCPRandomPort(s.addr)
if err != nil {
return perrors.Wrapf(err, "gxnet.ListenOnTCPRandomPort(addr:%s)", s.addr)
}
} else {
streamListener, err = net.Listen("tcp", s.addr)
if err != nil {
return perrors.Wrapf(err, "net.Listen(tcp, addr:%s)", s.addr)
}
}

s.streamListener = streamListener
Expand All @@ -180,13 +184,20 @@ func (s *server) listenUDP() error {
pktListener *net.UDPConn
)

localAddr, err = net.ResolveUDPAddr("udp", s.addr)
if err != nil {
return perrors.Wrapf(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr)
}
pktListener, err = net.ListenUDP("udp", localAddr)
if err != nil {
return perrors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
if len(s.addr) == 0 || !strings.Contains(s.addr, ":") {
pktListener, err = gxnet.ListenOnUDPRandomPort(s.addr)
if err != nil {
return perrors.Wrapf(err, "gxnet.ListenOnUDPRandomPort(addr:%s)", s.addr)
}
} else {
localAddr, err = net.ResolveUDPAddr("udp", s.addr)
if err != nil {
return perrors.Wrapf(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr)
}
pktListener, err = net.ListenUDP("udp", localAddr)
if err != nil {
return perrors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
}
}

s.pktListener = pktListener
Expand All @@ -213,7 +224,7 @@ func (s *server) accept(newSession NewSessionCallback) (Session, error) {
}
if gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
log.Warnf("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String())
return nil, errSelfConnect
return nil, perrors.WithStack(errSelfConnect)
}

ss := newTCPSession(conn, s)
Expand All @@ -237,7 +248,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
)
for {
if s.IsClosed() {
log.Warnf("server{%s} stop acceptting client connect request.", s.addr)
log.Warnf("server{%s} stop accepting client connect request.", s.addr)
return
}
if delay != 0 {
Expand All @@ -256,7 +267,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
}
continue
}
log.Warnf("server{%s}.Accept() = err {%+v}", s.addr, err)
log.Warnf("server{%s}.Accept() = err {%+v}", s.addr, perrors.WithStack(err))
continue
}
delay = 0
Expand Down Expand Up @@ -357,8 +368,7 @@ func (s *server) runWSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock()
err = server.Serve(s.streamListener)
if err != nil {
log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, err)
// panic(err)
log.Errorf("http.server.Serve(addr{%s}) = err:%+v", s.addr, perrors.WithStack(err))
}
}()
}
Expand All @@ -380,8 +390,8 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
defer s.wg.Done()

if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil {
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%+v}",
s.cert, s.privateKey, err))
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err:%+v",
s.cert, s.privateKey, perrors.WithStack(err)))
return
}
config = &tls.Config{
Expand All @@ -394,7 +404,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
if s.caCert != "" {
certPem, err = ioutil.ReadFile(s.caCert)
if err != nil {
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err{%+v}", s.caCert, err))
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err:%+v", s.caCert, perrors.WithStack(err)))
}
certPool = x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(certPem); !ok {
Expand All @@ -419,7 +429,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock()
err = server.Serve(tls.NewListener(s.streamListener, config))
if err != nil {
log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, err)
log.Errorf("http.server.Serve(addr{%s}) = err:%+v", s.addr, perrors.WithStack(err))
panic(err)
}
}()
Expand All @@ -429,7 +439,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
// @newSession: new connection callback
func (s *server) RunEventLoop(newSession NewSessionCallback) {
if err := s.listen(); err != nil {
panic(fmt.Errorf("server.listen() = error:%+v", err))
panic(fmt.Errorf("server.listen() = error:%+v", perrors.WithStack(err)))
}

switch s.endPointType {
Expand Down