Skip to content

Commit

Permalink
Merge pull request #37 from divebomb/master
Browse files Browse the repository at this point in the history
Add: listen on random local port
  • Loading branch information
wongoo committed Apr 24, 2020
2 parents 45d3d7d + f3924fe commit 076a806
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 92 deletions.
19 changes: 19 additions & 0 deletions .github/ISSUE_TEMPLATE/bug-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
name: Bug Report
about: Report a bug
labels: kind/bug

---

<!-- Please use this template while reporting a bug and provide as much info as possible. Not doing so may result in your bug not being addressed in a timely manner. Thanks!
-->


**What happened**:

**What you expected to happen**:

**How to reproduce it (as minimally and precisely as possible)**:

**Anything else we need to know?**:
11 changes: 11 additions & 0 deletions .github/ISSUE_TEMPLATE/enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
name: Enhancement Request
about: Suggest an enhancement
labels: kind/feature

---
<!-- Please only use this template for submitting enhancement requests -->

**What would you like to be added**:

**Why is this needed**:
24 changes: 24 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<!-- Thanks for sending a pull request!
-->

**What this PR does**:

**Which issue(s) this PR fixes**:
<!--
*Automatically closes linked issue when PR is merged.
Usage: `Fixes #<issue number>`, or `Fixes (paste link of issue)`.
_If PR is about `failing-tests or flakes`, please post the related issues/tests in a comment and do not use `Fixes`_*
-->
Fixes #

**Special notes for your reviewer**:

**Does this PR introduce a user-facing change?**:
<!--
If no, just write "NONE" in the release-note block below.
If yes, a release note is required:
Enter your extended release note in the block below. If the PR requires additional action from users switching to the new release, include the string "action required".
-->
```release-note
```
5 changes: 5 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
language: go

os:
- linux

go:
- "1.13"

env:
- GO111MODULE=on

install: true

script:
- go fmt ./... && [[ -z `git status -s` ]]
- go mod vendor && go test $(go list ./... | grep -v vendor | grep -v demo) -coverprofile=coverage.txt -covermode=atomic
Expand Down
22 changes: 10 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (c *client) dialTCP() Session {
return newTCPSession(conn, c)
}

log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, err)
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
<-wheel.After(connectInterval)
}
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func (c *client) dialUDP() Session {
err = errSelfConnect
}
if err != nil {
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, err)
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval)
continue
}
Expand All @@ -194,7 +194,7 @@ func (c *client) dialUDP() Session {
conn.SetWriteDeadline(time.Now().Add(1e9))
if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close()
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, err)
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, perrors.WithStack(err))
<-wheel.After(connectInterval)
continue
}
Expand All @@ -204,7 +204,7 @@ func (c *client) dialUDP() Session {
err = nil
}
if err != nil {
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, err)
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, perrors.WithStack(err))
conn.Close()
<-wheel.After(connectInterval)
continue
Expand All @@ -229,7 +229,7 @@ func (c *client) dialWS() Session {
return nil
}
conn, _, err = dialer.Dial(c.addr, nil)
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
Expand All @@ -243,7 +243,7 @@ func (c *client) dialWS() Session {
return ss
}

log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval)
}
}
Expand All @@ -269,7 +269,7 @@ func (c *client) dialWSS() Session {
if c.cert != "" {
certPEMBlock, err := ioutil.ReadFile(c.cert)
if err != nil {
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error:%+v", c.cert, err))
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error:%+v", c.cert, perrors.WithStack(err)))
}

var cert tls.Certificate
Expand All @@ -291,7 +291,7 @@ func (c *client) dialWSS() Session {
for _, c := range config.Certificates {
roots, err = x509.ParseCertificates(c.Certificate[len(c.Certificate)-1])
if err != nil {
panic(fmt.Sprintf("error parsing server's root cert: %+v\n", err))
panic(fmt.Sprintf("error parsing server's root cert: %+v\n", perrors.WithStack(err)))
}
for _, root = range roots {
certPool.AddCert(root)
Expand Down Expand Up @@ -321,7 +321,7 @@ func (c *client) dialWSS() Session {
return ss
}

log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval)
}
}
Expand Down Expand Up @@ -387,7 +387,7 @@ func (c *client) connect() {
}
}

// there are two methods to keep connection pool. the first approch is like
// there are two methods to keep connection pool. the first approach is like
// redigo's lazy connection pool(https://github.com/gomodule/redigo/blob/master/redis/pool.go:),
// in which you should apply testOnBorrow to check alive of the connection.
// the second way is as follows. @RunEventLoop detects the aliveness of the connection
Expand All @@ -405,13 +405,11 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) {
func (c *client) reConnect() {
var num, max, times, interval int

// c.Lock()
max = c.number
interval = c.reconnectInterval
if interval == 0 {
interval = reconnectInterval
}
// c.Unlock()
for {
if c.IsClosed() {
log.Warnf("client{peer:%s} goroutine exit now.", c.addr)
Expand Down
2 changes: 2 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ func TestNewWSClient(t *testing.T) {
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
err = ss.WriteBytes([]byte("hello"))
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
err = ss.WriteBytesArray([]byte("hello"), []byte("hello"))
assert.Equal(t, beforeWritePkgNum+3, atomic.LoadUint32(&conn.writePkgNum))
err = conn.writePing()
assert.Nil(t, err)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module github.com/dubbogo/getty

require (
github.com/dubbogo/gost v1.5.2
github.com/dubbogo/gost v1.9.0
github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.0
github.com/juju/errors v0.0.0-20190930114154-d42613fe1ab9
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/gost v1.5.2 h1:ri/03971hdpnn3QeCU+4UZgnRNGDXLDGDucR/iozZm8=
github.com/dubbogo/gost v1.5.2/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.9.0 h1:UT+dWwvLyJiDotxJERO75jB3Yxgsdy10KztR5ycxRAk=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
Expand Down
58 changes: 34 additions & 24 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"io/ioutil"
"net"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -67,10 +68,6 @@ func newServer(t EndPointType, opts ...ServerOption) *server {

s.init(opts...)

if s.addr == "" {
panic(fmt.Sprintf("@addr:%s", s.addr))
}

return s
}

Expand Down Expand Up @@ -163,9 +160,16 @@ func (s *server) listenTCP() error {
streamListener net.Listener
)

streamListener, err = net.Listen("tcp", s.addr)
if err != nil {
return perrors.Wrapf(err, "net.Listen(tcp, addr:%s))", s.addr)
if len(s.addr) == 0 || !strings.Contains(s.addr, ":") {
streamListener, err = gxnet.ListenOnTCPRandomPort(s.addr)
if err != nil {
return perrors.Wrapf(err, "gxnet.ListenOnTCPRandomPort(addr:%s)", s.addr)
}
} else {
streamListener, err = net.Listen("tcp", s.addr)
if err != nil {
return perrors.Wrapf(err, "net.Listen(tcp, addr:%s)", s.addr)
}
}

s.streamListener = streamListener
Expand All @@ -180,13 +184,20 @@ func (s *server) listenUDP() error {
pktListener *net.UDPConn
)

localAddr, err = net.ResolveUDPAddr("udp", s.addr)
if err != nil {
return perrors.Wrapf(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr)
}
pktListener, err = net.ListenUDP("udp", localAddr)
if err != nil {
return perrors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
if len(s.addr) == 0 || !strings.Contains(s.addr, ":") {
pktListener, err = gxnet.ListenOnUDPRandomPort(s.addr)
if err != nil {
return perrors.Wrapf(err, "gxnet.ListenOnUDPRandomPort(addr:%s)", s.addr)
}
} else {
localAddr, err = net.ResolveUDPAddr("udp", s.addr)
if err != nil {
return perrors.Wrapf(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr)
}
pktListener, err = net.ListenUDP("udp", localAddr)
if err != nil {
return perrors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
}
}

s.pktListener = pktListener
Expand All @@ -213,7 +224,7 @@ func (s *server) accept(newSession NewSessionCallback) (Session, error) {
}
if gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
log.Warnf("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String())
return nil, errSelfConnect
return nil, perrors.WithStack(errSelfConnect)
}

ss := newTCPSession(conn, s)
Expand All @@ -237,7 +248,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
)
for {
if s.IsClosed() {
log.Warnf("server{%s} stop acceptting client connect request.", s.addr)
log.Warnf("server{%s} stop accepting client connect request.", s.addr)
return
}
if delay != 0 {
Expand All @@ -256,7 +267,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
}
continue
}
log.Warnf("server{%s}.Accept() = err {%+v}", s.addr, err)
log.Warnf("server{%s}.Accept() = err {%+v}", s.addr, perrors.WithStack(err))
continue
}
delay = 0
Expand Down Expand Up @@ -357,8 +368,7 @@ func (s *server) runWSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock()
err = server.Serve(s.streamListener)
if err != nil {
log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, err)
// panic(err)
log.Errorf("http.server.Serve(addr{%s}) = err:%+v", s.addr, perrors.WithStack(err))
}
}()
}
Expand All @@ -380,8 +390,8 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
defer s.wg.Done()

if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil {
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%+v}",
s.cert, s.privateKey, err))
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err:%+v",
s.cert, s.privateKey, perrors.WithStack(err)))
return
}
config = &tls.Config{
Expand All @@ -394,7 +404,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
if s.caCert != "" {
certPem, err = ioutil.ReadFile(s.caCert)
if err != nil {
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err{%+v}", s.caCert, err))
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err:%+v", s.caCert, perrors.WithStack(err)))
}
certPool = x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(certPem); !ok {
Expand All @@ -419,7 +429,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock()
err = server.Serve(tls.NewListener(s.streamListener, config))
if err != nil {
log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, err)
log.Errorf("http.server.Serve(addr{%s}) = err:%+v", s.addr, perrors.WithStack(err))
panic(err)
}
}()
Expand All @@ -429,7 +439,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
// @newSession: new connection callback
func (s *server) RunEventLoop(newSession NewSessionCallback) {
if err := s.listen(); err != nil {
panic(fmt.Errorf("server.listen() = error:%+v", err))
panic(fmt.Errorf("server.listen() = error:%+v", perrors.WithStack(err)))
}

switch s.endPointType {
Expand Down

0 comments on commit 076a806

Please sign in to comment.