Skip to content
Permalink
Browse files
opt: move taskpool at endpoint layer
  • Loading branch information
watermelo committed Nov 15, 2020
1 parent bf36b5d commit e743d226cac5fb933f22df3517b2b08bc2a6d0ab
Showing 8 changed files with 47 additions and 11 deletions.
@@ -33,6 +33,7 @@ import (
import (
"github.com/dubbogo/gost/bytes"
"github.com/dubbogo/gost/net"
gxsync "github.com/dubbogo/gost/sync"
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
)
@@ -356,6 +357,10 @@ func (c *client) dial() Session {
return nil
}

func (c *client) GetTaskPool() gxsync.GenericTaskPool {
return c.tPool
}

func (c *client) sessionNum() int {
var num int

@@ -40,7 +40,7 @@ var (
)

var (
taskPool *gxsync.TaskPool
taskPool gxsync.GenericTaskPool
)

func main() {
@@ -72,6 +72,5 @@ func NewHelloServerSession(session getty.Session) (err error) {
if err != nil {
return
}
session.SetTaskPool(taskPool)
return
}
@@ -168,6 +168,7 @@ type Session interface {

SetWQLen(int)
SetWaitTime(time.Duration)
// Deprecated: don't use SetTaskPool, move to endpoints layer.
SetTaskPool(*gxsync.TaskPool)

GetAttribute(interface{}) interface{}
@@ -197,6 +198,7 @@ type EndPoint interface {
IsClosed() bool
// close the endpoint and free its resource
Close()
GetTaskPool() gxsync.GenericTaskPool
}

type Client interface {
2 go.mod
@@ -7,6 +7,6 @@ require (
github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.2
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.5.1
github.com/stretchr/testify v1.6.1
go.uber.org/zap v1.15.0
)
13 go.sum
@@ -4,19 +4,22 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/gost v1.9.0 h1:UT+dWwvLyJiDotxJERO75jB3Yxgsdy10KztR5ycxRAk=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
github.com/k0kubun/pp v3.0.1+incompatible/go.mod h1:GWse8YhT0p8pT4ir3ZgBbfZild3tgzSScAn6HmfYukg=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -28,8 +31,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
@@ -49,6 +52,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
@@ -63,5 +68,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
@@ -17,6 +17,8 @@

package getty

import gxsync "github.com/dubbogo/gost/sync"

/////////////////////////////////////////
// Server Options
/////////////////////////////////////////
@@ -33,6 +35,7 @@ type ServerOptions struct {
cert string
privateKey string
caCert string
tPool gxsync.GenericTaskPool
}

// @addr server listen address.
@@ -70,6 +73,13 @@ func WithWebsocketServerRootCert(cert string) ServerOption {
}
}

// @pool server task pool.
func WithServerTaskPool(pool gxsync.GenericTaskPool) ServerOption {
return func(o *ServerOptions) {
o.tPool = pool
}
}

// @WithSslEnabled enable use tls
func WithServerSslEnabled(sslEnabled bool) ServerOption {
return func(o *ServerOptions) {
@@ -102,7 +112,8 @@ type ClientOptions struct {
// the certs file of wss server which may contain server domain, server ip, the starting effective date, effective
// duration, the hash alg, the len of the private key.
// wss client will use it.
cert string
cert string
tPool gxsync.GenericTaskPool
}

// @addr is server address.
@@ -121,6 +132,13 @@ func WithReconnectInterval(reconnectInterval int) ClientOption {
}
}

// @pool client task pool.
func WithClientTaskPool(pool gxsync.GenericTaskPool) ClientOption {
return func(o *ClientOptions) {
o.tPool = pool
}
}

// @num is connection number.
func WithConnectionNumber(num int) ClientOption {
return func(o *ClientOptions) {
@@ -22,6 +22,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
gxsync "github.com/dubbogo/gost/sync"
"io/ioutil"
"net"
"net/http"
@@ -150,6 +151,10 @@ func (s *server) stop() {
}
}

func (s *server) GetTaskPool() gxsync.GenericTaskPool {
return s.tPool
}

func (s *server) IsClosed() bool {
select {
case <-s.done:
@@ -93,7 +93,7 @@ type session struct {

// handle logic
maxMsgLen int32
// task queue
// Deprecated: don't use tPool, move to endpoints layer.
tPool *gxsync.TaskPool

// heartbeat
@@ -325,7 +325,7 @@ func (s *session) SetWaitTime(waitTime time.Duration) {
s.wait = waitTime
}

// set task pool
// Deprecated: set task pool
func (s *session) SetTaskPool(p *gxsync.TaskPool) {
s.lock.Lock()
defer s.lock.Unlock()
@@ -655,8 +655,8 @@ func (s *session) addTask(pkg interface{}) {
s.listener.OnMessage(s, pkg)
s.incReadPkgNum()
}
if s.tPool != nil {
s.tPool.AddTask(f)
if taskPool := s.EndPoint().GetTaskPool(); taskPool != nil {
taskPool.AddTaskAlways(f)
return
}
f()

0 comments on commit e743d22

Please sign in to comment.