Skip to content

Commit

Permalink
Merge pull request #879 from watermelo/opt_taskpool
Browse files Browse the repository at this point in the history
Ftr: move task pool to endpoint layer
  • Loading branch information
AlexStocks committed Jan 5, 2021
2 parents f11f1f9 + 8d0ccff commit ded3c49
Show file tree
Hide file tree
Showing 15 changed files with 52 additions and 76 deletions.
2 changes: 1 addition & 1 deletion config/config_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
import (
"github.com/Workiva/go-datastructures/slice/skip"
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
gxpage "github.com/dubbogo/gost/hash/page"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ require (
github.com/Workiva/go-datastructures v1.0.52
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/alibaba/sentinel-golang v1.0.1
github.com/apache/dubbo-getty v1.3.10
github.com/apache/dubbo-getty v1.4.1
github.com/apache/dubbo-go-hessian2 v1.8.0
github.com/coreos/etcd v3.3.25+incompatible
github.com/creasty/defaults v1.5.1
github.com/dubbogo/go-zookeeper v1.0.2
github.com/dubbogo/gost v1.9.5
github.com/dubbogo/gost v1.10.1
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
github.com/emicklei/go-restful/v3 v3.4.0
github.com/frankban/quicktest v1.4.1 // indirect
Expand Down
14 changes: 10 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIOA6tDi6QXUemppXK3P9BI7mr2hd6gx8=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/Workiva/go-datastructures v1.0.52 h1:PLSK6pwn8mYdaoaCZEMsXBpBotr4HHn9abU0yMQt0NI=
github.com/Workiva/go-datastructures v1.0.52/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
Expand All @@ -80,8 +82,8 @@ github.com/alibaba/sentinel-golang v1.0.1 h1:WlhN0XUxRyfkiDc8TO6CcRrnakwFP9zFtvJ
github.com/alibaba/sentinel-golang v1.0.1/go.mod h1:QsB99f/z35D2AiMrAWwgWE85kDTkBUIkcmPrRt+61NI=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFmb7mUnp8nH9fQBA=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk=
github.com/apache/dubbo-getty v1.3.10 h1:ys5mwjPdxG/KwkPjS6EI0RzQtU6p6FCPoKpaFEzpAL0=
github.com/apache/dubbo-getty v1.3.10/go.mod h1:x6rraK01BL5C7jUM2fPl5KMkAxLVIx54ZB8/XEOik9Y=
github.com/apache/dubbo-getty v1.4.1 h1:M9yaFhemThQSWtRwmJNrxNuv7FzydlFx5EY8oq1v+lw=
github.com/apache/dubbo-getty v1.4.1/go.mod h1:ansXgKxxyhCOiQL29nO5ce1MDcEKmCyZuNR9oMs3hek=
github.com/apache/dubbo-go-hessian2 v1.8.0 h1:+GJQHxWd/WUw2p4hbfCal/zjKvGVb8yJZzOke8IEazc=
github.com/apache/dubbo-go-hessian2 v1.8.0/go.mod h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
Expand Down Expand Up @@ -173,8 +175,9 @@ github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZ
github.com/dubbogo/go-zookeeper v1.0.2 h1:xmEnPL8SlCe3/+J5ZR9e8qE35LmFVYe8VVpDakjNM4A=
github.com/dubbogo/go-zookeeper v1.0.2/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.9.5 h1:UeG4y0O55lR3dzgdmCm/7bMWvpKrlpR7fsfKjrcXq/g=
github.com/dubbogo/gost v1.9.5/go.mod h1:QNM5RaeRdNWehUu8S0hUP5Qa8QUfGf6KH1JhqOVFvEI=
github.com/dubbogo/gost v1.10.1 h1:39kF9Cd5JOiMpmwG6dX1/aLWNFqFv9gHp8HrhzMmjLY=
github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
Expand Down Expand Up @@ -233,6 +236,8 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-ole/go-ole v1.2.1 h1:2lOsA72HgjxAuMlKpFiCbHTvu44PIVkZ5hqm3RSdI/E=
github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+35s3my2LFTysnkMfxsJBAMHj/DoqoB9knIWoYG/Vk0=
github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg=
github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nAiCcj+friV/PDoE1/3eeccG9LYBs0tYvLOWc=
Expand Down Expand Up @@ -689,6 +694,7 @@ github.com/sean-/pager v0.0.0-20180208200047-666be9bf53b5/go.mod h1:BeybITEsBEg6
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shirou/gopsutil v0.0.0-20181107111621-48177ef5f880/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.20.11-0.20201116082039-2fb5da2f2449+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.20.11+incompatible h1:LJr4ZQK4mPpIV5gOa4jCOKOGb4ty4DZO54I4FGqIpto=
github.com/shirou/gopsutil v3.20.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc=
Expand Down
6 changes: 3 additions & 3 deletions registry/consul/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

import (
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/page"
"github.com/dubbogo/gost/hash/page"
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
perrors "github.com/pkg/errors"
Expand Down Expand Up @@ -339,7 +339,7 @@ func (csd *consulServiceDiscovery) GetInstancesByPage(serviceName string, offset
for i := offset; i < len(all) && i < offset+pageSize; i++ {
res = append(res, all[i])
}
return gxpage.New(offset, pageSize, res, len(all))
return gxpage.NewPage(offset, pageSize, res, len(all))
}

func (csd *consulServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
Expand All @@ -358,7 +358,7 @@ func (csd *consulServiceDiscovery) GetHealthyInstancesByPage(serviceName string,
}
i++
}
return gxpage.New(offset, pageSize, res, len(all))
return gxpage.NewPage(offset, pageSize, res, len(all))
}

func (csd *consulServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
Expand Down
6 changes: 3 additions & 3 deletions registry/etcdv3/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

import (
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
gxpage "github.com/dubbogo/gost/hash/page"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
perrors "github.com/pkg/errors"
)
Expand Down Expand Up @@ -180,7 +180,7 @@ func (e *etcdV3ServiceDiscovery) GetInstancesByPage(serviceName string, offset i
res = append(res, all[i])
}

return gxpage.New(offset, pageSize, res, len(all))
return gxpage.NewPage(offset, pageSize, res, len(all))
}

// GetHealthyInstancesByPage will return a page containing instances of ServiceInstance.
Expand All @@ -202,7 +202,7 @@ func (e *etcdV3ServiceDiscovery) GetHealthyInstancesByPage(serviceName string, o
}
i++
}
return gxpage.New(offset, pageSize, res, len(all))
return gxpage.NewPage(offset, pageSize, res, len(all))
}

// Batch get all instances by the specified service names
Expand Down
2 changes: 1 addition & 1 deletion registry/event/event_publishing_service_deiscovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

import (
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
gxpage "github.com/dubbogo/gost/hash/page"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
Expand Down
2 changes: 1 addition & 1 deletion registry/event/event_publishing_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package event

import (
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
gxpage "github.com/dubbogo/gost/hash/page"
)

import (
Expand Down
2 changes: 1 addition & 1 deletion registry/file/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

import (
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
gxpage "github.com/dubbogo/gost/hash/page"
perrors "github.com/pkg/errors"
)

Expand Down
6 changes: 3 additions & 3 deletions registry/nacos/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

import (
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/page"
"github.com/dubbogo/gost/hash/page"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
Expand Down Expand Up @@ -175,7 +175,7 @@ func (n *nacosServiceDiscovery) GetInstancesByPage(serviceName string, offset in
for i := offset; i < len(all) && i < offset+pageSize; i++ {
res = append(res, all[i])
}
return gxpage.New(offset, pageSize, res, len(all))
return gxpage.NewPage(offset, pageSize, res, len(all))
}

// GetHealthyInstancesByPage will return the instance
Expand All @@ -198,7 +198,7 @@ func (n *nacosServiceDiscovery) GetHealthyInstancesByPage(serviceName string, of
}
i++
}
return gxpage.New(offset, pageSize, res, len(all))
return gxpage.NewPage(offset, pageSize, res, len(all))
}

// GetRequestInstances will return the instances
Expand Down
2 changes: 1 addition & 1 deletion registry/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

import (
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
gxpage "github.com/dubbogo/gost/hash/page"
)

const DefaultPageSize = 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

import (
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/page"
"github.com/dubbogo/gost/hash/page"
"github.com/stretchr/testify/assert"
)
import (
Expand Down
6 changes: 3 additions & 3 deletions registry/zookeeper/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

import (
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/page"
"github.com/dubbogo/gost/hash/page"
perrors "github.com/pkg/errors"
)

Expand Down Expand Up @@ -231,7 +231,7 @@ func (zksd *zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, of
for i := offset; i < len(all) && i < offset+pageSize; i++ {
res = append(res, all[i])
}
return gxpage.New(offset, pageSize, res, len(all))
return gxpage.NewPage(offset, pageSize, res, len(all))
}

// GetHealthyInstancesByPage will return the instance
Expand All @@ -254,7 +254,7 @@ func (zksd *zookeeperServiceDiscovery) GetHealthyInstancesByPage(serviceName str
}
i++
}
return gxpage.New(offset, pageSize, res, len(all))
return gxpage.NewPage(offset, pageSize, res, len(all))
}

// GetRequestInstances will return the instances
Expand Down
7 changes: 2 additions & 5 deletions remoting/getty/getty_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var (
errClientReadTimeout = perrors.New("client read timeout")

clientConf *ClientConfig
clientGrpool *gxsync.TaskPool
clientGrpool gxsync.GenericTaskPool
)

// it is init client for single protocol.
Expand Down Expand Up @@ -99,10 +99,7 @@ func SetClientConf(c ClientConfig) {
}

func setClientGrpool() {
if clientConf.GrPoolSize > 1 {
clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber))
}
clientGrpool = gxsync.NewTaskPoolSimple(clientConf.GrPoolSize)
}

// Options : param config
Expand Down
36 changes: 8 additions & 28 deletions remoting/getty/getty_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ import (
)

var (
srvConf *ServerConfig
srvGrpool *gxsync.TaskPool
srvConf *ServerConfig
)

func initServer(protocol string) {
Expand Down Expand Up @@ -76,7 +75,6 @@ func initServer(protocol string) {
if err := srvConf.CheckValidity(); err != nil {
panic(err)
}
SetServerGrpool()
}

// SetServerConfig set dubbo server config.
Expand All @@ -87,25 +85,13 @@ func SetServerConfig(s ServerConfig) {
logger.Warnf("[ServerConfig CheckValidity] error: %v", err)
return
}
SetServerGrpool()
}

// GetServerConfig get getty server config.
func GetServerConfig() ServerConfig {
return *srvConf
}

// SetServerGrpool set getty server GrPool
func SetServerGrpool() {
if srvConf.GrPoolSize > 1 {
srvGrpool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize),
gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber),
)
}
}

// Server define getty server
type Server struct {
conf ServerConfig
Expand Down Expand Up @@ -151,13 +137,11 @@ func (s *Server) newSession(session getty.Session) error {
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcServerPackageHandler(s))
session.SetEventListener(s.rpcHandler)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("server accepts new session:%s\n", session.Stat())
session.SetTaskPool(srvGrpool)
return nil
}
if _, ok = session.Conn().(*net.TCPConn); !ok {
Expand Down Expand Up @@ -192,13 +176,11 @@ func (s *Server) newSession(session getty.Session) error {
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcServerPackageHandler(s))
session.SetEventListener(s.rpcHandler)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("server accepts new session: %s", session.Stat())
session.SetTaskPool(srvGrpool)
return nil
}

Expand All @@ -210,17 +192,15 @@ func (s *Server) Start() {
)

addr = s.addr
serverOpts := []getty.ServerOption{getty.WithLocalAddress(addr)}
if s.conf.SSLEnabled {
tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr),
getty.WithServerSslEnabled(s.conf.SSLEnabled),
getty.WithServerTlsConfigBuilder(config.GetServerTlsConfigBuilder()),
)
} else {
tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr),
)
serverOpts = append(serverOpts, getty.WithServerSslEnabled(s.conf.SSLEnabled),
getty.WithServerTlsConfigBuilder(config.GetServerTlsConfigBuilder()))
}

serverOpts = append(serverOpts, getty.WithServerTaskPool(gxsync.NewTaskPoolSimple(s.conf.GrPoolSize)))

tcpServer = getty.NewTCPServer(serverOpts...)
tcpServer.RunEventLoop(s.newSession)
logger.Debugf("s bind addr{%s} ok!", s.addr)
s.tcpServer = tcpServer
Expand Down
Loading

0 comments on commit ded3c49

Please sign in to comment.