Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: fix the black list bug and make ConnCheckRouter work well #1287

Merged
merged 6 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 16 additions & 5 deletions cluster/cluster_impl/base_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,27 @@ type baseClusterInvoker struct {
availablecheck bool
destroyed *atomic.Bool
stickyInvoker protocol.Invoker
// healthState
serviceHealthState *protocol.ServiceHealthState
}

func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker {
return baseClusterInvoker{
directory: directory,
availablecheck: true,
destroyed: atomic.NewBool(false),
// init from directory
serviceHealthState: directory.ServiceHealthState(),
}
}

func (invoker *baseClusterInvoker) getServiceHealthState() *protocol.ServiceHealthState {
if invoker.serviceHealthState == nil {
invoker.serviceHealthState = protocol.NewServiceState(invoker.GetURL().ServiceKey())
}
return invoker.serviceHealthState
}

func (invoker *baseClusterInvoker) GetURL() *common.URL {
return invoker.directory.GetURL()
}
Expand Down Expand Up @@ -118,12 +129,12 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc
if len(invokers) == 0 {
return nil
}
protocol.TryRefreshBlackList()
invoker.getServiceHealthState().TryRefreshBlackList()
if len(invokers) == 1 {
if invokers[0].IsAvailable() {
return invokers[0]
}
protocol.SetInvokerUnhealthyStatus(invokers[0])
invoker.getServiceHealthState().SetInvokerUnhealthyStatus(invokers[0])
logger.Errorf("the invokers of %s is nil. ", invokers[0].GetURL().ServiceKey())
return nil
}
Expand All @@ -132,10 +143,10 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc

//judge if the selected Invoker is invoked and available
if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) {
protocol.SetInvokerUnhealthyStatus(selectedInvoker)
invoker.getServiceHealthState().SetInvokerUnhealthyStatus(selectedInvoker)
otherInvokers := getOtherInvokers(invokers, selectedInvoker)
// do reselect
for i := 0; i < 3; i++ {
for i := 0; i < 5; i++ {
if len(otherInvokers) == 0 {
// no other ivk to reselect, return to fallback
break
Expand All @@ -148,7 +159,7 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc
if !reselectedInvoker.IsAvailable() {
logger.Infof("the invoker of %s is not available, maybe some network error happened or the server is shutdown.",
invoker.GetURL().Ip)
protocol.SetInvokerUnhealthyStatus(reselectedInvoker)
invoker.getServiceHealthState().SetInvokerUnhealthyStatus(reselectedInvoker)
otherInvokers = getOtherInvokers(otherInvokers, reselectedInvoker)
continue
}
Expand Down
3 changes: 3 additions & 0 deletions cluster/cluster_impl/base_cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
)

import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
Expand All @@ -47,6 +48,7 @@ func TestStickyNormal(t *testing.T) {
}
base := &baseClusterInvoker{}
base.availablecheck = true
base.directory = directory.NewStaticDirectory(invokers)
invoked := []protocol.Invoker{}

tmpRandomBalance := loadbalance.NewRandomLoadBalance()
Expand All @@ -65,6 +67,7 @@ func TestStickyNormalWhenError(t *testing.T) {
}
base := &baseClusterInvoker{}
base.availablecheck = true
base.directory = directory.NewStaticDirectory(invokers)

invoked := []protocol.Invoker{}
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked)
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failback_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation pr
invoker.taskList.Put(timerTask)

logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
methodName, url.Service(), result.Error().Error())
methodName, url.Service(), result.Error())
// ignore
return &protocol.RPCResult{}
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failsafe_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (invoker *failsafeClusterInvoker) Invoke(ctx context.Context, invocation pr
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
// ignore
logger.Errorf("Failsafe ignore exception: %v.\n", result.Error().Error())
logger.Errorf("Failsafe ignore exception: %v.\n", result.Error())
return &protocol.RPCResult{}
}
return result
Expand Down
1 change: 1 addition & 0 deletions cluster/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ import (
type Directory interface {
common.Node
List(invocation protocol.Invocation) []protocol.Invoker
ServiceHealthState() *protocol.ServiceHealthState
}
9 changes: 9 additions & 0 deletions cluster/directory/static_directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
type staticDirectory struct {
BaseDirectory
invokers []protocol.Invoker
// healthState
serviceHealthState *protocol.ServiceHealthState
}

// NewStaticDirectory Create a new staticDirectory with invokers
Expand All @@ -42,6 +44,8 @@ func NewStaticDirectory(invokers []protocol.Invoker) *staticDirectory {
dir := &staticDirectory{
BaseDirectory: NewBaseDirectory(url),
invokers: invokers,
// init serviceHealthState
serviceHealthState: protocol.NewServiceState(url.ServiceKey()),
}

return dir
Expand Down Expand Up @@ -74,6 +78,11 @@ func (dir *staticDirectory) List(invocation protocol.Invocation) []protocol.Invo
return routerChain.Route(dirUrl, invocation)
}

// Fetch ServiceHealthState
func (dir *staticDirectory) ServiceHealthState() *protocol.ServiceHealthState {
return dir.serviceHealthState
}

// Destroy Destroy
func (dir *staticDirectory) Destroy() {
dir.BaseDirectory.Destroy(func() {
Expand Down
3 changes: 2 additions & 1 deletion cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ func (c *RouterChain) Loop() {
for {
select {
case <-ticker.C:
if protocol.GetAndRefreshState() {
if protocol.GetAndRefreshState(c.url) {
logger.Infof("start to build route cache because the invokers in black list is changed [%s]", c.url.ServiceKey())
c.buildCache()
}
case <-c.notify:
Expand Down
43 changes: 29 additions & 14 deletions cluster/router/conncheck/conn_check_route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ func TestConnCheckRouterRoute(t *testing.T) {
invoker1 := NewMockInvoker(url1)
invoker2 := NewMockInvoker(url2)
invoker3 := NewMockInvoker(url3)
protocol.SetInvokerUnhealthyStatus(invoker1)
protocol.SetInvokerUnhealthyStatus(invoker2)
state1 := protocol.NewServiceState(consumerURL.ServiceKey())

state1.SetInvokerUnhealthyStatus(invoker1)
state1.SetInvokerUnhealthyStatus(invoker2)

invokers = append(invokers, invoker1, invoker2, invoker3)
inv := invocation.NewRPCInvocation(connCheckRouteMethodNameTest, nil, nil)
Expand All @@ -75,7 +77,7 @@ func TestConnCheckRouterRoute(t *testing.T) {
assert.True(t, len(res.ToArray()) == 1)

// check blacklist remove
protocol.RemoveInvokerUnhealthyStatus(invoker1)
state1.RemoveInvokerUnhealthyStatus(invoker1)
res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*ConnCheckRouter), invokers), consumerURL, inv)
// now invoker3 invoker1 is healthy
assert.True(t, len(res.ToArray()) == 2)
Expand All @@ -96,12 +98,23 @@ func TestRecovery(t *testing.T) {
invoker1.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker2.EXPECT().IsAvailable().Return(true).AnyTimes()

protocol.SetInvokerUnhealthyStatus(invoker1)
protocol.SetInvokerUnhealthyStatus(invoker2)
assert.Equal(t, len(protocol.GetBlackListInvokers(16)), 2)
protocol.TryRefreshBlackList()
time.Sleep(1 * time.Second)
assert.Equal(t, len(protocol.GetBlackListInvokers(16)), 0)
state1 := protocol.NewServiceState(invoker1.GetURL().ServiceKey())
state2 := protocol.NewServiceState(invoker2.GetURL().ServiceKey())
assert.Equal(t, len(state1.GetBlackListInvokers(16)), 0)
assert.Equal(t, len(state2.GetBlackListInvokers(16)), 0)
state1.SetInvokerUnhealthyStatus(invoker1)
state2.SetInvokerUnhealthyStatus(invoker2)
assert.Equal(t, len(state1.GetBlackListInvokers(16)), 1)
assert.Equal(t, len(state2.GetBlackListInvokers(16)), 1)
state1.TryRefreshBlackList()
time.Sleep(300 * time.Millisecond)
assert.Equal(t, len(state1.GetBlackListInvokers(16)), 0)
assert.Equal(t, len(state2.GetBlackListInvokers(16)), 1)
state2.TryRefreshBlackList()
time.Sleep(300 * time.Millisecond)
assert.Equal(t, len(state1.GetBlackListInvokers(16)), 0)
assert.Equal(t, len(state2.GetBlackListInvokers(16)), 0)

}

func TestPrintlnConnCheckRouterRoute(t *testing.T) {
Expand All @@ -125,9 +138,11 @@ func TestPrintlnConnCheckRouterRoute(t *testing.T) {
invoker1 := NewMockInvoker(url1)
invoker2 := NewMockInvoker(url2)
invoker3 := NewMockInvoker(url3)
protocol.SetInvokerUnhealthyStatus(invoker1)
protocol.SetInvokerUnhealthyStatus(invoker2)
protocol.SetInvokerUnhealthyStatus(invoker3)

srvState := protocol.NewServiceState(consumerURL.ServiceKey())
srvState.SetInvokerUnhealthyStatus(invoker1)
srvState.SetInvokerUnhealthyStatus(invoker2)
srvState.SetInvokerUnhealthyStatus(invoker3)

invokers = append(invokers, invoker1, invoker2, invoker3)
inv := invocation.NewRPCInvocation(connCheckRouteMethodNameTest, nil, nil)
Expand All @@ -145,8 +160,8 @@ func TestPrintlnConnCheckRouterRoute(t *testing.T) {
assert.Equal(t, router.RouteSnapshot(cache), "conn-check-router -> Count:0 {}")

// check blacklist remove
protocol.RemoveInvokerUnhealthyStatus(invoker1)
protocol.RemoveInvokerUnhealthyStatus(invoker3)
srvState.RemoveInvokerUnhealthyStatus(invoker1)
srvState.RemoveInvokerUnhealthyStatus(invoker3)
cache = setUpAddrCache(hcr.(*ConnCheckRouter), invokers)
res = hcr.Route(utils.ToBitmap(invokers), cache, consumerURL, inv)
// now invoker3 invoker1 is healthy
Expand Down
2 changes: 1 addition & 1 deletion cluster/router/conncheck/conn_health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ func TestDefaultConnCheckerIsHealthy(t *testing.T) {
invoker = NewMockInvoker(url)
cc = NewDefaultConnChecker(url).(*DefaultConnChecker)
// add to black list
protocol.SetInvokerUnhealthyStatus(invoker)
protocol.NewServiceState(url.ServiceKey()).SetInvokerUnhealthyStatus(invoker)
assert.False(t, cc.IsConnHealthy(invoker))
}
3 changes: 2 additions & 1 deletion common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ const (
RETRY_PERIOD_KEY = "retry.period"
RETRY_TIMES_KEY = "retry.times"
CYCLE_REPORT_KEY = "cycle.report"
DEFAULT_BLACK_LIST_RECOVER_BLOCK = 16
DEFAULT_BLACK_LIST_RECOVER_BLOCK = 64
DEFAULT_BLACK_LIST_MAX_RETRY_TIMES = 512
)

const (
Expand Down