diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker.go b/cluster/cluster_impl/zone_aware_cluster_invoker.go index 0f52b0442c..050f831f06 100644 --- a/cluster/cluster_impl/zone_aware_cluster_invoker.go +++ b/cluster/cluster_impl/zone_aware_cluster_invoker.go @@ -43,7 +43,7 @@ func newZoneAwareClusterInvoker(directory cluster.Directory) protocol.Invoker { invoke := &zoneAwareClusterInvoker{ baseClusterInvoker: newBaseClusterInvoker(directory), } - // add self to interceptor + // add local to interceptor invoke.interceptor = invoke return invoke } diff --git a/cluster/router/local/factory.go b/cluster/router/local/factory.go new file mode 100644 index 0000000000..73864c8dac --- /dev/null +++ b/cluster/router/local/factory.go @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package local + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" +) + +func init() { + extension.SetRouterFactory(constant.LocalPriorityRouterName, newLocalPriorityRouteFactory) +} + +// LocalPriorityRouteFactory +type LocalPriorityRouteFactory struct { +} + +// newLocalPriorityRouteFactory construct a new LocalDiscRouteFactory +func newLocalPriorityRouteFactory() router.PriorityRouterFactory { + return &LocalPriorityRouteFactory{} +} + +// NewPriorityRouter construct a new NewLocalDiscRouter via url +func (f *LocalPriorityRouteFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) { + return NewLocalPriorityRouter(url) +} diff --git a/cluster/router/local/factory_test.go b/cluster/router/local/factory_test.go new file mode 100644 index 0000000000..a5a683b4bc --- /dev/null +++ b/cluster/router/local/factory_test.go @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package local + +import ( + "context" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +// nolint +type MockInvoker struct { + url *common.URL +} + +// nolint +func NewMockInvoker(url *common.URL) *MockInvoker { + return &MockInvoker{ + url: url, + } +} + +// nolint +func (bi *MockInvoker) GetUrl() *common.URL { + return bi.url +} + +// nolint +func (bi *MockInvoker) IsAvailable() bool { + return true +} + +// nolint +func (bi *MockInvoker) IsDestroyed() bool { + return true +} + +// nolint +func (bi *MockInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol.Result { + return nil +} + +// nolint +func (bi *MockInvoker) Destroy() { +} + +// nolint +func TestLocalDiscRouteFactory(t *testing.T) { + factory := newLocalPriorityRouteFactory() + assert.NotNil(t, factory) +} diff --git a/cluster/router/local/self_priority_route.go b/cluster/router/local/self_priority_route.go new file mode 100644 index 0000000000..87eaaf7837 --- /dev/null +++ b/cluster/router/local/self_priority_route.go @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package local + +import ( + "github.com/RoaringBitmap/roaring" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/cluster/router/utils" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +const ( + localPriority = "local-priority" + name = "local-priority-router" +) + +// LocalPriorityRouter provides a ip-same-first routing logic +// if there is not provider with same ip as consumer, it would not filter any invoker +// if exists same ip invoker, it would retains this invoker only +type LocalPriorityRouter struct { + url *common.URL + localIP string +} + +// NewLocalPriorityRouter construct an LocalPriorityRouter via url +func NewLocalPriorityRouter(url *common.URL) (router.PriorityRouter, error) { + r := &LocalPriorityRouter{ + url: url, + localIP: url.Ip, + } + return r, nil +} + +// Route gets a list of match-logic invoker +func (r *LocalPriorityRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap { + addrPool := cache.FindAddrPool(r) + // Add localPriority invoker to the list + selectedInvokers := utils.JoinIfNotEqual(addrPool[localPriority], invokers) + // If all invokers are considered not match, downgrade to all invoker + if selectedInvokers.IsEmpty() { + logger.Warnf(" Now all invokers are not match, so downgraded to all! Service: [%s]", url.ServiceKey()) + return invokers + } + return selectedInvokers +} + +// Pool separates same ip invoker from others. +func (r *LocalPriorityRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) { + rb := make(router.AddrPool, 8) + rb[localPriority] = roaring.NewBitmap() + localIpFound := false + for i, invoker := range invokers { + if invoker.GetUrl().Ip == r.localIP { + rb[localPriority].Add(uint32(i)) + localIpFound = true + } + } + if localIpFound { + // found local desc + logger.Debug("found local ip ") + return rb, nil + } + for i, _ := range invokers { + rb[localPriority].Add(uint32(i)) + } + return rb, nil +} + +// ShouldPool will always return true to make sure local call logic constantly. +func (r *LocalPriorityRouter) ShouldPool() bool { + return true +} + +func (r *LocalPriorityRouter) Name() string { + return name +} + +// Priority +func (r *LocalPriorityRouter) Priority() int64 { + return 0 +} + +// URL Return URL in router +func (r *LocalPriorityRouter) URL() *common.URL { + return r.url +} diff --git a/cluster/router/local/self_priority_route_test.go b/cluster/router/local/self_priority_route_test.go new file mode 100644 index 0000000000..88535d3dd2 --- /dev/null +++ b/cluster/router/local/self_priority_route_test.go @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package local + +import ( + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/cluster/router/chain" + "github.com/apache/dubbo-go/cluster/router/utils" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +const ( + localDiscRoute1010IP = "192.168.10.10" + localDiscRoute1011IP = "192.168.10.11" + localDiscRoute1012IP = "192.168.10.12" + localDiscRouteMethodNameTest = "test" + localDiscRouteUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider" +) + +func TestLocalDiscRouterRoute(t *testing.T) { + defer protocol.CleanAllStatus() + consumerURL, _ := common.NewURL(fmt.Sprintf(localDiscRouteUrlFormat, localDiscRoute1010IP)) + url1, _ := common.NewURL(fmt.Sprintf(localDiscRouteUrlFormat, localDiscRoute1010IP)) + url2, _ := common.NewURL(fmt.Sprintf(localDiscRouteUrlFormat, localDiscRoute1011IP)) + url3, _ := common.NewURL(fmt.Sprintf(localDiscRouteUrlFormat, localDiscRoute1012IP)) + hcr, _ := NewLocalPriorityRouter(consumerURL) + + var invokers []protocol.Invoker + invoker1 := NewMockInvoker(url1) + invoker2 := NewMockInvoker(url2) + invoker3 := NewMockInvoker(url3) + invokers = append(invokers, invoker1, invoker2, invoker3) + inv := invocation.NewRPCInvocation(localDiscRouteMethodNameTest, nil, nil) + res := hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*LocalPriorityRouter), invokers), consumerURL, inv) + // now only same ip invoker is selected + assert.True(t, len(res.ToArray()) == 1) + + // now all invoker with ip that not match client are selected + invokers = invokers[1:] + res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*LocalPriorityRouter), invokers), consumerURL, inv) + assert.True(t, len(res.ToArray()) == 2) +} + +func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache { + pool, info := r.Pool(addrs) + cache := chain.BuildCache(addrs) + cache.SetAddrMeta(r.Name(), info) + cache.SetAddrPool(r.Name(), pool) + return cache +} diff --git a/common/constant/key.go b/common/constant/key.go index 0515094f28..59b95e4c19 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -214,6 +214,8 @@ const ( ListenableRouterName = "listenable" // HealthCheckRouterName Specify the name of HealthCheckRouter HealthCheckRouterName = "health_check" + // LocalPriorityRouterName Specify the name of LocalPriorityRouter + LocalPriorityRouterName = "self_priority" // TagRouterName Specify the name of TagRouter TagRouterName = "tag" // TagRouterRuleSuffix Specify tag router suffix diff --git a/go.sum b/go.sum index 6cc41e6b58..786d0457e6 100644 --- a/go.sum +++ b/go.sum @@ -174,7 +174,6 @@ github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZ github.com/dubbogo/go-zookeeper v1.0.2 h1:xmEnPL8SlCe3/+J5ZR9e8qE35LmFVYe8VVpDakjNM4A= github.com/dubbogo/go-zookeeper v1.0.2/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c= github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= -github.com/dubbogo/gost v1.10.1 h1:39kF9Cd5JOiMpmwG6dX1/aLWNFqFv9gHp8HrhzMmjLY= github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI= github.com/dubbogo/gost v1.11.0 h1:9KtyWQz1gMlAfwzen5iyhMdoe08SPBBUVhco4rdgJ9I= github.com/dubbogo/gost v1.11.0/go.mod h1:w8Yw29eDWtRVo3tx9nPpHkNZnOi4SRx1fZf7eVlAAU4= @@ -749,7 +748,6 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go index bb520d3df7..e2a7a64d53 100644 --- a/metadata/service/remote/service.go +++ b/metadata/service/remote/service.go @@ -107,7 +107,7 @@ func (mts *MetadataService) SubscribeURL(url *common.URL) (bool, error) { // UnsubscribeURL will be implemented by in memory service func (mts *MetadataService) UnsubscribeURL(url *common.URL) error { - // TODO remove call self. + // TODO remove call local. return nil //return mts.UnsubscribeURL(url) }