Skip to content

Commit

Permalink
Support Key generate Func in ServiceEvent (apache#1286)
Browse files Browse the repository at this point in the history
* support Key Func in ServiceEvent

* FIX review issue and add unit test

* fix : make all getCacheKey from ServiceEvent

* fix override url notify bug
  • Loading branch information
cvictory committed Jul 11, 2021
1 parent 52deca0 commit 103aa72
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 12 deletions.
24 changes: 13 additions & 11 deletions registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent
if event != nil && event.Service != nil && constant.ROUTER_PROTOCOL == event.Service.Protocol {
dir.configRouters()
}
if oldInvoker, _ := dir.doCacheInvoker(event.Service); oldInvoker != nil {
if oldInvoker, _ := dir.doCacheInvoker(event.Service, event); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
}
}
Expand Down Expand Up @@ -224,7 +224,7 @@ func (dir *RegistryDirectory) invokerCacheKey(event *registry.ServiceEvent) stri
referenceUrl := dir.GetDirectoryUrl().SubURL
newUrl := common.MergeURL(event.Service, referenceUrl)
event.Update(newUrl)
return newUrl.GetCacheInvokerMapKey()
return event.Key()
}

// setNewInvokers groups the invokers from the cache first, then set the result to both directory and router chain.
Expand All @@ -240,17 +240,18 @@ func (dir *RegistryDirectory) setNewInvokers() {
func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) (protocol.Invoker, error) {
// judge is override or others
if event != nil {
u := dir.convertUrl(event)

switch event.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
u := dir.convertUrl(event)
logger.Infof("selector add service url{%s}", event.Service)
if u != nil && constant.ROUTER_PROTOCOL == u.Protocol {
dir.configRouters()
}
return dir.cacheInvoker(u), nil
return dir.cacheInvoker(u, event), nil
case remoting.EventTypeDel:
logger.Infof("selector delete service url{%s}", event.Service)
return dir.uncacheInvoker(u), nil
return dir.uncacheInvoker(event), nil
default:
return nil, fmt.Errorf("illegal event type: %v", event.Action)
}
Expand Down Expand Up @@ -316,8 +317,8 @@ func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
}

// uncacheInvoker will return abandoned Invoker, if no Invoker to be abandoned, return nil
func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
return dir.uncacheInvokerWithKey(url.GetCacheInvokerMapKey())
func (dir *RegistryDirectory) uncacheInvoker(event *registry.ServiceEvent) protocol.Invoker {
return dir.uncacheInvokerWithKey(event.Key())
}

func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker {
Expand All @@ -331,7 +332,7 @@ func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker
}

// cacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
func (dir *RegistryDirectory) cacheInvoker(url *common.URL, event *registry.ServiceEvent) protocol.Invoker {
dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL

Expand All @@ -348,15 +349,16 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
newUrl := common.MergeURL(url, referenceUrl)
dir.overrideUrl(newUrl)
if v, ok := dir.doCacheInvoker(newUrl); ok {
event.Update(newUrl)
if v, ok := dir.doCacheInvoker(newUrl, event); ok {
return v
}
}
return nil
}

func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL) (protocol.Invoker, bool) {
key := newUrl.GetCacheInvokerMapKey()
func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL, event *registry.ServiceEvent) (protocol.Invoker, bool) {
key := event.Key()
if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); !ok {
logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
Expand Down
9 changes: 8 additions & 1 deletion registry/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"dubbo.apache.org/dubbo-go/v3/remoting"
)

type KeyFunc func(*common.URL) string

func init() {
rand.Seed(time.Now().UnixNano())
}
Expand All @@ -45,6 +47,7 @@ type ServiceEvent struct {
key string
// If the url is updated, such as Merged.
updated bool
KeyFunc KeyFunc
}

// String return the description of event
Expand All @@ -69,7 +72,11 @@ func (e *ServiceEvent) Key() string {
if len(e.key) > 0 {
return e.key
}
e.key = e.Service.GetCacheInvokerMapKey()
if e.KeyFunc == nil {
e.key = e.Service.GetCacheInvokerMapKey()
} else {
e.key = e.KeyFunc(e.Service)
}
return e.key
}

Expand Down
47 changes: 47 additions & 0 deletions registry/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package registry

import (
"testing"
)

import (
"github.com/stretchr/testify/assert"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
)

func TestKey(t *testing.T) {
u1, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.0")
se := ServiceEvent{
Service: u1,
}
assert.Equal(t, se.Key(), "dubbo://:@127.0.0.1:20000/?interface=com.ikurento.user.UserProvider&group=&version=2.0&timestamp=")

se2 := ServiceEvent{
Service: u1,
KeyFunc: defineKey,
}
assert.Equal(t, se2.Key(), "Hello Key")
}

func defineKey(url *common.URL) string {
return "Hello Key"
}

0 comments on commit 103aa72

Please sign in to comment.