Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ftr: move task pool to endpoint layer #879

Merged
merged 8 commits into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
import (
"github.com/Workiva/go-datastructures/slice/skip"
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
gxpage "github.com/dubbogo/gost/hash/page"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
)
Expand Down
4 changes: 2 additions & 2 deletions filter/filter_impl/sentinel_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ func TestSentinelFilter_QPS(t *testing.T) {

_, err = flow.LoadRules([]*flow.Rule{
{
Resource: interfaceResourceName,
Resource: interfaceResourceName,
//MetricType: flow.QPS,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
Threshold: 100,
Threshold: 100,
RelationStrategy: flow.CurrentResource,
},
})
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ require (
github.com/Workiva/go-datastructures v1.0.52
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/alibaba/sentinel-golang v1.0.1
github.com/apache/dubbo-getty v1.3.10
github.com/apache/dubbo-getty v1.4.1
github.com/apache/dubbo-go-hessian2 v1.8.0
github.com/coreos/etcd v3.3.25+incompatible
github.com/creasty/defaults v1.5.1
github.com/dubbogo/go-zookeeper v1.0.2
github.com/dubbogo/gost v1.9.5
github.com/dubbogo/gost v1.10.1
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
github.com/emicklei/go-restful/v3 v3.4.0
github.com/frankban/quicktest v1.4.1 // indirect
Expand Down
14 changes: 10 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIOA6tDi6QXUemppXK3P9BI7mr2hd6gx8=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/Workiva/go-datastructures v1.0.52 h1:PLSK6pwn8mYdaoaCZEMsXBpBotr4HHn9abU0yMQt0NI=
github.com/Workiva/go-datastructures v1.0.52/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
Expand All @@ -80,8 +82,8 @@ github.com/alibaba/sentinel-golang v1.0.1 h1:WlhN0XUxRyfkiDc8TO6CcRrnakwFP9zFtvJ
github.com/alibaba/sentinel-golang v1.0.1/go.mod h1:QsB99f/z35D2AiMrAWwgWE85kDTkBUIkcmPrRt+61NI=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFmb7mUnp8nH9fQBA=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk=
github.com/apache/dubbo-getty v1.3.10 h1:ys5mwjPdxG/KwkPjS6EI0RzQtU6p6FCPoKpaFEzpAL0=
github.com/apache/dubbo-getty v1.3.10/go.mod h1:x6rraK01BL5C7jUM2fPl5KMkAxLVIx54ZB8/XEOik9Y=
github.com/apache/dubbo-getty v1.4.1 h1:M9yaFhemThQSWtRwmJNrxNuv7FzydlFx5EY8oq1v+lw=
github.com/apache/dubbo-getty v1.4.1/go.mod h1:ansXgKxxyhCOiQL29nO5ce1MDcEKmCyZuNR9oMs3hek=
github.com/apache/dubbo-go-hessian2 v1.8.0 h1:+GJQHxWd/WUw2p4hbfCal/zjKvGVb8yJZzOke8IEazc=
github.com/apache/dubbo-go-hessian2 v1.8.0/go.mod h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
Expand Down Expand Up @@ -173,8 +175,9 @@ github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZ
github.com/dubbogo/go-zookeeper v1.0.2 h1:xmEnPL8SlCe3/+J5ZR9e8qE35LmFVYe8VVpDakjNM4A=
github.com/dubbogo/go-zookeeper v1.0.2/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.9.5 h1:UeG4y0O55lR3dzgdmCm/7bMWvpKrlpR7fsfKjrcXq/g=
github.com/dubbogo/gost v1.9.5/go.mod h1:QNM5RaeRdNWehUu8S0hUP5Qa8QUfGf6KH1JhqOVFvEI=
github.com/dubbogo/gost v1.10.1 h1:39kF9Cd5JOiMpmwG6dX1/aLWNFqFv9gHp8HrhzMmjLY=
github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
Expand Down Expand Up @@ -233,6 +236,8 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-ole/go-ole v1.2.1 h1:2lOsA72HgjxAuMlKpFiCbHTvu44PIVkZ5hqm3RSdI/E=
github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+35s3my2LFTysnkMfxsJBAMHj/DoqoB9knIWoYG/Vk0=
github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg=
github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nAiCcj+friV/PDoE1/3eeccG9LYBs0tYvLOWc=
Expand Down Expand Up @@ -689,6 +694,7 @@ github.com/sean-/pager v0.0.0-20180208200047-666be9bf53b5/go.mod h1:BeybITEsBEg6
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shirou/gopsutil v0.0.0-20181107111621-48177ef5f880/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.20.11-0.20201116082039-2fb5da2f2449+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.20.11+incompatible h1:LJr4ZQK4mPpIV5gOa4jCOKOGb4ty4DZO54I4FGqIpto=
github.com/shirou/gopsutil v3.20.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc=
Expand Down
6 changes: 3 additions & 3 deletions registry/consul/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

import (
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/page"
"github.com/dubbogo/gost/hash/page"
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
perrors "github.com/pkg/errors"
Expand Down Expand Up @@ -339,7 +339,7 @@ func (csd *consulServiceDiscovery) GetInstancesByPage(serviceName string, offset
for i := offset; i < len(all) && i < offset+pageSize; i++ {
res = append(res, all[i])
}
return gxpage.New(offset, pageSize, res, len(all))
return gxpage.NewPage(offset, pageSize, res, len(all))
}

func (csd *consulServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
Expand All @@ -358,7 +358,7 @@ func (csd *consulServiceDiscovery) GetHealthyInstancesByPage(serviceName string,
}
i++
}
return gxpage.New(offset, pageSize, res, len(all))
return gxpage.NewPage(offset, pageSize, res, len(all))
}

func (csd *consulServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
Expand Down
6 changes: 3 additions & 3 deletions registry/etcdv3/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

import (
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
gxpage "github.com/dubbogo/gost/hash/page"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
perrors "github.com/pkg/errors"
)
Expand Down Expand Up @@ -177,7 +177,7 @@ func (e *etcdV3ServiceDiscovery) GetInstancesByPage(serviceName string, offset i
res = append(res, all[i])
}

return gxpage.New(offset, pageSize, res, len(all))
return gxpage.NewPage(offset, pageSize, res, len(all))
}

// GetHealthyInstancesByPage will return a page containing instances of ServiceInstance.
Expand All @@ -199,7 +199,7 @@ func (e *etcdV3ServiceDiscovery) GetHealthyInstancesByPage(serviceName string, o
}
i++
}
return gxpage.New(offset, pageSize, res, len(all))
return gxpage.NewPage(offset, pageSize, res, len(all))
}

// Batch get all instances by the specified service names
Expand Down
2 changes: 1 addition & 1 deletion registry/event/event_publishing_service_deiscovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

import (
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
gxpage "github.com/dubbogo/gost/hash/page"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
Expand Down
2 changes: 1 addition & 1 deletion registry/event/event_publishing_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package event

import (
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
gxpage "github.com/dubbogo/gost/hash/page"
)

import (
Expand Down
2 changes: 1 addition & 1 deletion registry/file/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

import (
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
gxpage "github.com/dubbogo/gost/hash/page"
perrors "github.com/pkg/errors"
)

Expand Down
6 changes: 3 additions & 3 deletions registry/nacos/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

import (
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/page"
"github.com/dubbogo/gost/hash/page"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
Expand Down Expand Up @@ -175,7 +175,7 @@ func (n *nacosServiceDiscovery) GetInstancesByPage(serviceName string, offset in
for i := offset; i < len(all) && i < offset+pageSize; i++ {
res = append(res, all[i])
}
return gxpage.New(offset, pageSize, res, len(all))
return gxpage.NewPage(offset, pageSize, res, len(all))
}

// GetHealthyInstancesByPage will return the instance
Expand All @@ -198,7 +198,7 @@ func (n *nacosServiceDiscovery) GetHealthyInstancesByPage(serviceName string, of
}
i++
}
return gxpage.New(offset, pageSize, res, len(all))
return gxpage.NewPage(offset, pageSize, res, len(all))
}

// GetRequestInstances will return the instances
Expand Down
2 changes: 1 addition & 1 deletion registry/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

import (
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
gxpage "github.com/dubbogo/gost/hash/page"
)

const DefaultPageSize = 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

import (
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/page"
"github.com/dubbogo/gost/hash/page"
"github.com/stretchr/testify/assert"
)
import (
Expand Down
6 changes: 3 additions & 3 deletions registry/zookeeper/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

import (
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/page"
"github.com/dubbogo/gost/hash/page"
perrors "github.com/pkg/errors"
)

Expand Down Expand Up @@ -231,7 +231,7 @@ func (zksd *zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, of
for i := offset; i < len(all) && i < offset+pageSize; i++ {
res = append(res, all[i])
}
return gxpage.New(offset, pageSize, res, len(all))
return gxpage.NewPage(offset, pageSize, res, len(all))
}

// GetHealthyInstancesByPage will return the instance
Expand All @@ -254,7 +254,7 @@ func (zksd *zookeeperServiceDiscovery) GetHealthyInstancesByPage(serviceName str
}
i++
}
return gxpage.New(offset, pageSize, res, len(all))
return gxpage.NewPage(offset, pageSize, res, len(all))
}

// GetRequestInstances will return the instances
Expand Down
7 changes: 2 additions & 5 deletions remoting/getty/getty_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
errClientReadTimeout = perrors.New("client read timeout")

clientConf *ClientConfig
clientGrpool *gxsync.TaskPool
clientGrpool gxsync.GenericTaskPool
)

// it is init client for single protocol.
Expand Down Expand Up @@ -101,10 +101,7 @@ func SetClientConf(c ClientConfig) {
}

func setClientGrpool() {
if clientConf.GrPoolSize > 1 {
clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber))
}
clientGrpool = gxsync.NewTaskPoolSimple(clientConf.GrPoolSize)
}

// Options : param config
Expand Down
36 changes: 8 additions & 28 deletions remoting/getty/getty_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ import (
)

var (
srvConf *ServerConfig
srvGrpool *gxsync.TaskPool
srvConf *ServerConfig
)

func initServer(protocol string) {
Expand Down Expand Up @@ -76,7 +75,6 @@ func initServer(protocol string) {
if err := srvConf.CheckValidity(); err != nil {
panic(err)
}
SetServerGrpool()
}

// SetServerConfig set dubbo server config.
Expand All @@ -87,25 +85,13 @@ func SetServerConfig(s ServerConfig) {
logger.Warnf("[ServerConfig CheckValidity] error: %v", err)
return
}
SetServerGrpool()
}

// GetServerConfig get getty server config.
func GetServerConfig() ServerConfig {
return *srvConf
}

// SetServerGrpool set getty server GrPool
func SetServerGrpool() {
if srvConf.GrPoolSize > 1 {
srvGrpool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize),
gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber),
)
}
}

// Server define getty server
type Server struct {
conf ServerConfig
Expand Down Expand Up @@ -151,13 +137,11 @@ func (s *Server) newSession(session getty.Session) error {
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcServerPackageHandler(s))
session.SetEventListener(s.rpcHandler)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("server accepts new session:%s\n", session.Stat())
session.SetTaskPool(srvGrpool)
return nil
}
if _, ok = session.Conn().(*net.TCPConn); !ok {
Expand Down Expand Up @@ -192,13 +176,11 @@ func (s *Server) newSession(session getty.Session) error {
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcServerPackageHandler(s))
session.SetEventListener(s.rpcHandler)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("server accepts new session: %s", session.Stat())
session.SetTaskPool(srvGrpool)
return nil
}

Expand All @@ -210,17 +192,15 @@ func (s *Server) Start() {
)

addr = s.addr
serverOpts := []getty.ServerOption{getty.WithLocalAddress(addr)}
if s.conf.SSLEnabled {
tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr),
getty.WithServerSslEnabled(s.conf.SSLEnabled),
getty.WithServerTlsConfigBuilder(config.GetServerTlsConfigBuilder()),
)
} else {
tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr),
)
serverOpts = append(serverOpts, getty.WithServerSslEnabled(s.conf.SSLEnabled),
getty.WithServerTlsConfigBuilder(config.GetServerTlsConfigBuilder()))
}

serverOpts = append(serverOpts, getty.WithServerTaskPool(gxsync.NewTaskPoolSimple(s.conf.GrPoolSize)))

tcpServer = getty.NewTCPServer(serverOpts...)
tcpServer.RunEventLoop(s.newSession)
logger.Debugf("s bind addr{%s} ok!", s.addr)
s.tcpServer = tcpServer
Expand Down
Loading