Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ftr: move task pool to endpoint layer #879

Merged
merged 8 commits into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ require (
github.com/Workiva/go-datastructures v1.0.52
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/alibaba/sentinel-golang v0.6.2
github.com/apache/dubbo-getty v1.3.10
github.com/apache/dubbo-getty v1.4.1-rc1
github.com/apache/dubbo-go-hessian2 v1.7.0
github.com/coreos/etcd v3.3.25+incompatible
github.com/creasty/defaults v1.5.1
github.com/dubbogo/go-zookeeper v1.0.2
github.com/dubbogo/gost v1.9.2
github.com/dubbogo/gost v1.9.8
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
github.com/emicklei/go-restful/v3 v3.0.0
github.com/frankban/quicktest v1.4.1 // indirect
Expand Down
13 changes: 6 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,16 @@ github.com/alangpierce/go-forceexport v0.0.0-20160317203124-8f1d6941cd75/go.mod
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alibaba/sentinel-golang v0.6.2 h1:1OjjpljJbNKWp9p5RJKxOqS1gHGZPUWPlCcokv5xYJs=
github.com/alibaba/sentinel-golang v0.6.1 h1:Pxyw2X7ryklvToF40KG9l4uuO90jRZA2MWb8Z3d1wPo=
github.com/alibaba/sentinel-golang v0.6.1/go.mod h1:5jemKdyCQCKVf+quEia53fo9a17OSe+wnl9HX2NbNpc=
github.com/alibaba/sentinel-golang v0.6.2/go.mod h1:5jemKdyCQCKVf+quEia53fo9a17OSe+wnl9HX2NbNpc=
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190808125512-07798873deee/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFmb7mUnp8nH9fQBA=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk=
github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/apache/dubbo-getty v1.3.10 h1:ys5mwjPdxG/KwkPjS6EI0RzQtU6p6FCPoKpaFEzpAL0=
github.com/apache/dubbo-getty v1.3.10/go.mod h1:x6rraK01BL5C7jUM2fPl5KMkAxLVIx54ZB8/XEOik9Y=
github.com/apache/dubbo-getty v1.4.1-rc1 h1:sbvOa0Dr0Y95hF0UjhgVqit/VxsgW5sRaAig0PEYKAo=
github.com/apache/dubbo-getty v1.4.1-rc1/go.mod h1:MKtl4vaAPJm6PllIq5KO/6qqtybDER+lgPdgI/NOdC0=
github.com/apache/dubbo-go-hessian2 v1.7.0 h1:u2XxIuepu/zb6JcGZc7EbvKboXdKoJbf7rbmeq6SF1w=
github.com/apache/dubbo-go-hessian2 v1.7.0/go.mod h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
Expand Down Expand Up @@ -213,8 +214,8 @@ github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZ
github.com/dubbogo/go-zookeeper v1.0.2 h1:xmEnPL8SlCe3/+J5ZR9e8qE35LmFVYe8VVpDakjNM4A=
github.com/dubbogo/go-zookeeper v1.0.2/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.9.2 h1:lTo5WETmqDKSW4d+Fr3Emiz1rKsVaQCPWRypJPAAfcw=
github.com/dubbogo/gost v1.9.2/go.mod h1:QNM5RaeRdNWehUu8S0hUP5Qa8QUfGf6KH1JhqOVFvEI=
github.com/dubbogo/gost v1.9.8 h1:ciAvb0M0rYh3j7+RZsf4QLyDjWVIW/fZbSBsDgHJA7M=
github.com/dubbogo/gost v1.9.8/go.mod h1:XfXynl+iquKdvsklRfl/JlqE+i0sOhHWepH9vPdhGOY=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
Expand Down Expand Up @@ -797,8 +798,6 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shirou/gopsutil v0.0.0-20181107111621-48177ef5f880 h1:1Ge4j/3uB2rxzPWD3TC+daeCw+w91z8UCUL/7WH5gn8=
github.com/shirou/gopsutil v0.0.0-20181107111621-48177ef5f880/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v2.19.12+incompatible h1:WRstheAymn1WOPesh+24+bZKFkqrdCR8JOc77v4xV3Q=
github.com/shirou/gopsutil v2.19.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 h1:udFKJ0aHUL60LboW/A+DfgoHVedieIzIXE8uylPue0U=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
Expand Down
5 changes: 2 additions & 3 deletions remoting/getty/getty_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
errClientReadTimeout = perrors.New("client read timeout")

clientConf *ClientConfig
clientGrpool *gxsync.TaskPool
clientGrpool gxsync.GenericTaskPool
)

// it is init client for single protocol.
Expand Down Expand Up @@ -102,8 +102,7 @@ func SetClientConf(c ClientConfig) {

func setClientGrpool() {
if clientConf.GrPoolSize > 1 {
clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber))
clientGrpool = gxsync.NewTaskPoolSimple(clientConf.GrPoolSize)
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
36 changes: 10 additions & 26 deletions remoting/getty/getty_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ import (
)

var (
srvConf *ServerConfig
srvGrpool *gxsync.TaskPool
srvConf *ServerConfig
)

func initServer(protocol string) {
Expand Down Expand Up @@ -76,7 +75,6 @@ func initServer(protocol string) {
if err := srvConf.CheckValidity(); err != nil {
panic(err)
}
SetServerGrpool()
}

// SetServerConfig set dubbo server config.
Expand All @@ -87,25 +85,13 @@ func SetServerConfig(s ServerConfig) {
logger.Warnf("[ServerConfig CheckValidity] error: %v", err)
return
}
SetServerGrpool()
}

// GetServerConfig get getty server config.
func GetServerConfig() ServerConfig {
return *srvConf
}

// SetServerGrpool set getty server GrPool
func SetServerGrpool() {
if srvConf.GrPoolSize > 1 {
srvGrpool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize),
gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber),
)
}
}

// Server define getty server
type Server struct {
conf ServerConfig
Expand Down Expand Up @@ -157,7 +143,6 @@ func (s *Server) newSession(session getty.Session) error {
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("server accepts new session:%s\n", session.Stat())
session.SetTaskPool(srvGrpool)
return nil
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
Expand Down Expand Up @@ -198,7 +183,6 @@ func (s *Server) newSession(session getty.Session) error {
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("server accepts new session: %s", session.Stat())
session.SetTaskPool(srvGrpool)
return nil
}

Expand All @@ -210,17 +194,17 @@ func (s *Server) Start() {
)

addr = s.addr
serverOpts := []getty.ServerOption{getty.WithLocalAddress(addr)}
if s.conf.SSLEnabled {
tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr),
getty.WithServerSslEnabled(s.conf.SSLEnabled),
getty.WithServerTlsConfigBuilder(config.GetServerTlsConfigBuilder()),
)
} else {
tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr),
)
serverOpts = append(serverOpts, getty.WithServerSslEnabled(s.conf.SSLEnabled),
getty.WithServerTlsConfigBuilder(config.GetServerTlsConfigBuilder()))
}

if s.conf.GrPoolSize != 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!= 0 ?what about < 0? Is that a right logic?

serverOpts = append(serverOpts, getty.WithServerTaskPool(gxsync.NewTaskPoolSimple(s.conf.GrPoolSize)))
}

tcpServer = getty.NewTCPServer(serverOpts...)
tcpServer.RunEventLoop(s.newSession)
logger.Debugf("s bind addr{%s} ok!", s.addr)
s.tcpServer = tcpServer
Expand Down
29 changes: 12 additions & 17 deletions remoting/getty/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,20 @@ func newGettyRPCClientConn(pool *gettyRPCClientPool, addr string) (*gettyRPCClie
sslEnabled bool
)
sslEnabled = pool.sslEnabled
clientOpts := []getty.ClientOption{
getty.WithServerAddress(addr),
getty.WithConnectionNumber((int)(pool.rpcClient.conf.ConnectionNum)),
getty.WithReconnectInterval(pool.rpcClient.conf.ReconnectInterval),
}
if sslEnabled {
gettyClient = getty.NewTCPClient(
getty.WithServerAddress(addr),
getty.WithConnectionNumber((int)(pool.rpcClient.conf.ConnectionNum)),
getty.WithReconnectInterval(pool.rpcClient.conf.ReconnectInterval),
getty.WithClientSslEnabled(pool.sslEnabled),
getty.WithClientTlsConfigBuilder(config.GetClientTlsConfigBuilder()),
)
} else {
gettyClient = getty.NewTCPClient(
getty.WithServerAddress(addr),
getty.WithConnectionNumber((int)(pool.rpcClient.conf.ConnectionNum)),
getty.WithReconnectInterval(pool.rpcClient.conf.ReconnectInterval),
)
clientOpts = append(clientOpts, getty.WithClientSslEnabled(pool.sslEnabled), getty.WithClientTlsConfigBuilder(config.GetClientTlsConfigBuilder()))
}

if clientGrpool != nil {
clientOpts = append(clientOpts, getty.WithClientTaskPool(clientGrpool))
}

gettyClient = getty.NewTCPClient(clientOpts...)
c := &gettyRPCClient{
addr: addr,
pool: pool,
Expand Down Expand Up @@ -142,7 +141,6 @@ func (c *gettyRPCClient) newSession(session getty.Session) error {
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("client new session:%s\n", session.Stat())
session.SetTaskPool(clientGrpool)
return nil
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
Expand All @@ -167,9 +165,6 @@ func (c *gettyRPCClient) newSession(session getty.Session) error {
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("client new session:%s\n", session.Stat())

session.SetTaskPool(clientGrpool)

return nil
}

Expand Down