Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(cluster): refactor cluster package #1507

Merged
merged 10 commits into from Oct 11, 2021
Expand Up @@ -15,30 +15,30 @@
* limitations under the License.
*/

package cluster_impl
package available

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster"
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type availableCluster struct{}

const available = "available"

func init() {
extension.SetCluster(available, NewAvailableCluster)
extension.SetCluster(constant.ClusterKeyAvailable, NewAvailableCluster)
}

type cluster struct{}

// NewAvailableCluster returns a cluster instance
//
// Obtain available service providers
func NewAvailableCluster() cluster.Cluster {
return &availableCluster{}
func NewAvailableCluster() clusterpkg.Cluster {
return &cluster{}
}

// Join returns a baseClusterInvoker instance
func (cluster *availableCluster) Join(directory cluster.Directory) protocol.Invoker {
return buildInterceptorChain(NewAvailableClusterInvoker(directory))
func (cluster *cluster) Join(directory directory.Directory) protocol.Invoker {
return clusterpkg.BuildInterceptorChain(NewClusterInvoker(directory))
}
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package cluster_impl
package available

import (
"context"
Expand All @@ -27,29 +27,30 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
"dubbo.apache.org/dubbo-go/v3/cluster/cluster/base"
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type availableClusterInvoker struct {
baseClusterInvoker
type clusterInvoker struct {
base.ClusterInvoker
}

// NewAvailableClusterInvoker returns a cluster invoker instance
func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &availableClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
// NewClusterInvoker returns a cluster invoker instance
func NewClusterInvoker(directory directory.Directory) protocol.Invoker {
return &clusterInvoker{
ClusterInvoker: base.NewClusterInvoker(directory),
}
}

func (invoker *availableClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.Directory.List(invocation)
err := invoker.CheckInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
}

err = invoker.checkWhetherDestroyed()
err = invoker.CheckWhetherDestroyed()
if err != nil {
return &protocol.RPCResult{Err: err}
}
Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package cluster_impl
package available

import (
"context"
Expand All @@ -31,8 +31,9 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster"
"dubbo.apache.org/dubbo-go/v3/cluster/directory/static"
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
Expand All @@ -45,14 +46,14 @@ var availableUrl, _ = common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user
constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))

func registerAvailable(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
extension.SetLoadbalance("random", random.NewLoadBalance)
availableCluster := NewAvailableCluster()

invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)
invoker.EXPECT().GetUrl().Return(availableUrl)

staticDir := directory.NewStaticDirectory(invokers)
staticDir := static.NewDirectory(invokers)
clusterInvoker := availableCluster.Join(staticDir)
return clusterInvoker
}
Expand All @@ -64,7 +65,7 @@ func TestAvailableClusterInvokerSuccess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerAvailable(invoker)

mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}}
invoker.EXPECT().IsAvailable().Return(true)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)

Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package cluster_impl
package base

import (
perrors "github.com/pkg/errors"
Expand All @@ -24,69 +24,70 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type baseClusterInvoker struct {
directory cluster.Directory
availablecheck bool
destroyed *atomic.Bool
stickyInvoker protocol.Invoker
type ClusterInvoker struct {
Directory directory.Directory
AvailableCheck bool
Destroyed *atomic.Bool
StickyInvoker protocol.Invoker
}

func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker {
return baseClusterInvoker{
directory: directory,
availablecheck: true,
destroyed: atomic.NewBool(false),
func NewClusterInvoker(directory directory.Directory) ClusterInvoker {
return ClusterInvoker{
Directory: directory,
AvailableCheck: true,
Destroyed: atomic.NewBool(false),
}
}

func (invoker *baseClusterInvoker) GetURL() *common.URL {
return invoker.directory.GetURL()
func (invoker *ClusterInvoker) GetURL() *common.URL {
return invoker.Directory.GetURL()
}

func (invoker *baseClusterInvoker) Destroy() {
func (invoker *ClusterInvoker) Destroy() {
// this is must atom operation
if invoker.destroyed.CAS(false, true) {
invoker.directory.Destroy()
if invoker.Destroyed.CAS(false, true) {
invoker.Directory.Destroy()
}
}

func (invoker *baseClusterInvoker) IsAvailable() bool {
if invoker.stickyInvoker != nil {
return invoker.stickyInvoker.IsAvailable()
func (invoker *ClusterInvoker) IsAvailable() bool {
if invoker.StickyInvoker != nil {
return invoker.StickyInvoker.IsAvailable()
}
return invoker.directory.IsAvailable()
return invoker.Directory.IsAvailable()
}

// check invokers availables
func (invoker *baseClusterInvoker) checkInvokers(invokers []protocol.Invoker, invocation protocol.Invocation) error {
// CheckInvokers checks invokers' status if is available or not
func (invoker *ClusterInvoker) CheckInvokers(invokers []protocol.Invoker, invocation protocol.Invocation) error {
if len(invokers) == 0 {
ip := common.GetLocalIp()
return perrors.Errorf("Failed to invoke the method %v. No provider available for the service %v from "+
"registry %v on the consumer %v using the dubbo version %v .Please check if the providers have been started and registered.",
invocation.MethodName(), invoker.directory.GetURL().SubURL.Key(), invoker.directory.GetURL().String(), ip, constant.Version)
invocation.MethodName(), invoker.Directory.GetURL().SubURL.Key(), invoker.Directory.GetURL().String(), ip, constant.Version)
}
return nil
}

// check cluster invoker is destroyed or not
func (invoker *baseClusterInvoker) checkWhetherDestroyed() error {
if invoker.destroyed.Load() {
// CheckWhetherDestroyed checks if cluster invoker was destroyed or not
func (invoker *ClusterInvoker) CheckWhetherDestroyed() error {
if invoker.Destroyed.Load() {
ip := common.GetLocalIp()
return perrors.Errorf("Rpc cluster invoker for %v on consumer %v use dubbo version %v is now destroyed! can not invoke any more. ",
invoker.directory.GetURL().Service(), ip, constant.Version)
invoker.Directory.GetURL().Service(), ip, constant.Version)
}
return nil
}

func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
func (invoker *ClusterInvoker) DoSelect(lb loadbalance.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
var selectedInvoker protocol.Invoker
if len(invokers) <= 0 {
return selectedInvoker
Expand All @@ -97,24 +98,24 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p
// Get the service method sticky config if have
sticky = url.GetMethodParamBool(invocation.MethodName(), constant.STICKY_KEY, sticky)

if invoker.stickyInvoker != nil && !isInvoked(invoker.stickyInvoker, invokers) {
invoker.stickyInvoker = nil
if invoker.StickyInvoker != nil && !isInvoked(invoker.StickyInvoker, invokers) {
invoker.StickyInvoker = nil
}

if sticky && invoker.availablecheck &&
invoker.stickyInvoker != nil && invoker.stickyInvoker.IsAvailable() &&
(invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) {
return invoker.stickyInvoker
if sticky && invoker.AvailableCheck &&
invoker.StickyInvoker != nil && invoker.StickyInvoker.IsAvailable() &&
(invoked == nil || !isInvoked(invoker.StickyInvoker, invoked)) {
return invoker.StickyInvoker
}

selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked)
if sticky {
invoker.stickyInvoker = selectedInvoker
invoker.StickyInvoker = selectedInvoker
}
return selectedInvoker
}

func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
func (invoker *ClusterInvoker) doSelectInvoker(lb loadbalance.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
if len(invokers) == 0 {
return nil
}
Expand All @@ -131,7 +132,7 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc
selectedInvoker := lb.Select(invokers, invocation)

// judge if the selected Invoker is invoked and available
if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) {
if (!selectedInvoker.IsAvailable() && invoker.AvailableCheck) || isInvoked(selectedInvoker, invoked) {
protocol.SetInvokerUnhealthyStatus(selectedInvoker)
otherInvokers := getOtherInvokers(invokers, selectedInvoker)
// do reselect
Expand Down Expand Up @@ -170,7 +171,7 @@ func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) boo
return false
}

func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cluster.LoadBalance {
func GetLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) loadbalance.LoadBalance {
url := invoker.GetURL()

methodName := invocation.MethodName()
Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package cluster_impl
package base

import (
"fmt"
Expand All @@ -27,7 +27,8 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster"
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
Expand All @@ -39,20 +40,20 @@ const (
)

func TestStickyNormal(t *testing.T) {
invokers := []protocol.Invoker{}
var invokers []protocol.Invoker
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf(baseClusterInvokerFormat, i))
url.SetParam("sticky", "true")
invokers = append(invokers, NewMockInvoker(url, 1))
invokers = append(invokers, clusterpkg.NewMockInvoker(url, 1))
}
base := &baseClusterInvoker{}
base.availablecheck = true
invoked := []protocol.Invoker{}
base := &ClusterInvoker{}
base.AvailableCheck = true
var invoked []protocol.Invoker

tmpRandomBalance := loadbalance.NewRandomLoadBalance()
tmpRandomBalance := random.NewLoadBalance()
tmpInvocation := invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil)
result := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
result1 := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
result := base.DoSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
result1 := base.DoSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
assert.Equal(t, result, result1)
}

Expand All @@ -61,14 +62,14 @@ func TestStickyNormalWhenError(t *testing.T) {
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf(baseClusterInvokerFormat, i))
url.SetParam("sticky", "true")
invokers = append(invokers, NewMockInvoker(url, 1))
invokers = append(invokers, clusterpkg.NewMockInvoker(url, 1))
}
base := &baseClusterInvoker{}
base.availablecheck = true
base := &ClusterInvoker{}
base.AvailableCheck = true

invoked := []protocol.Invoker{}
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked)
var invoked []protocol.Invoker
result := base.DoSelect(random.NewLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked)
invoked = append(invoked, result)
result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked)
result1 := base.DoSelect(random.NewLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked)
assert.NotEqual(t, result, result1)
}