Skip to content

Commit

Permalink
fix bug: can not save service with different app value (#470)
Browse files Browse the repository at this point in the history
  • Loading branch information
tianxiaoliang committed Dec 10, 2018
1 parent b1f50fe commit 5e647f1
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 70 deletions.
7 changes: 4 additions & 3 deletions core/registry/cache.go
Expand Up @@ -46,10 +46,11 @@ func enableRegistryCache() {

// CacheIndex is a unified local instances cache manager
type CacheIndex interface {
Get(k string, tags map[string]string) ([]*MicroServiceInstance, bool)
Set(k string, instances []*MicroServiceInstance)
Get(service string, tags map[string]string) ([]*MicroServiceInstance, bool)
//Set will overwrite all instances correspond to a service name
Set(service string, instances []*MicroServiceInstance)
FullCache() *cache.Cache
Delete(k string)
Delete(service string)
}

//SetIPIndex save ip index
Expand Down
6 changes: 3 additions & 3 deletions core/registry/cache_index.go
Expand Up @@ -86,7 +86,7 @@ func (ic *IndexCache) Get(k string, tags map[string]string) ([]*MicroServiceInst
//if version is latest, then set it to real version
ic.setTagsBeforeQuery(k, tags)
//find from indexed cache first
indexKey := ic.getIndexedCacheKey(k, tags)
indexKey := getIndexedCacheKey(k, tags)
savedResult, ok := ic.indexedCache.Get(indexKey)
if !ok {
//no result, then find it and save result
Expand Down Expand Up @@ -117,8 +117,8 @@ func (ic *IndexCache) setTagsBeforeQuery(k string, tags map[string]string) {
}
}

//must combine in order
func (ic *IndexCache) getIndexedCacheKey(service string, tags map[string]string) (ss string) {
//must combine keys in order, use sets to return sorted list
func getIndexedCacheKey(service string, tags map[string]string) (ss string) {
ss = "service:" + service
keys := sets.NewString()
for k := range tags {
Expand Down
5 changes: 2 additions & 3 deletions core/registry/cache_index_test.go
Expand Up @@ -58,12 +58,11 @@ func TestIndexCache(t *testing.T) {
cache.Delete("TestServer")
}
func TestIndexCache_Get(t *testing.T) {
cache := NewIndexCache()
k1 := cache.getIndexedCacheKey("service1", map[string]string{
k1 := getIndexedCacheKey("service1", map[string]string{
"a": "b",
"c": "d",
})
k2 := cache.getIndexedCacheKey("service1", map[string]string{
k2 := getIndexedCacheKey("service1", map[string]string{
"c": "d",
"a": "b",
})
Expand Down
24 changes: 0 additions & 24 deletions core/registry/index.go

This file was deleted.

73 changes: 39 additions & 34 deletions core/registry/servicecenter/cache.go
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-chassis/go-sc-client"

"github.com/go-chassis/go-chassis/third_party/forked/k8s.io/apimachinery/pkg/util/sets"
"github.com/go-mesh/openlogging"
)

// constant values for default expiration time, and refresh interval
Expand Down Expand Up @@ -41,7 +42,7 @@ func (c *CacheManager) AutoSync() {
if err != nil {
lager.Logger.Errorf("Watch failed. Self Micro service Id:%s. %s", runtime.ServiceID, err)
}
lager.Logger.Debugf("Watching Intances change events.")
lager.Logger.Debugf("Watching Instances change events.")
}
var ticker *time.Ticker
refreshInterval := config.GetServiceDiscoveryRefreshInterval()
Expand Down Expand Up @@ -217,31 +218,31 @@ func (c *CacheManager) pullMicroserviceInstance() error {
AppID: service.AppID,
})
}
serviceNameSet, serviceNameAppIDKeySet := c.getServiceSet(services)
serviceNameSet, serviceNameAppIDKeySet := getServiceSet(services)
c.compareAndDeleteOutdatedProviders(serviceNameSet)

for key := range serviceNameAppIDKeySet {
service := strings.Split(key, ":")
if len(service) != 2 {
lager.Logger.Errorf("Invalid serviceStore %s for providers %s", key, runtime.ServiceID)
continue
}

providerInstances, err := c.registryClient.FindMicroServiceInstances(runtime.ServiceID, service[1],
service[0], common.AllVersion)
if err != nil {
if err == client.ErrNotModified {
lager.Logger.Debug(err.Error())
for service, apps := range serviceNameAppIDKeySet {
ups := make([]*registry.MicroServiceInstance, 0) //append instances from different app and same service name into one unified slice
downs := make(map[string]struct{}, 0)
for _, app := range apps.List() {
//fetch remote based on app and service
instances, err := c.registryClient.FindMicroServiceInstances(runtime.ServiceID, app, service,
common.AllVersion)
if err != nil {
if err == client.ErrNotModified {
openlogging.Debug(err.Error())
continue
}
if err == client.ErrMicroServiceNotExists {
registry.ProvidersMicroServiceCache.Delete(strings.Join([]string{service, app}, "|"))
}
openlogging.Error("Refresh local instance cache failed: " + err.Error())
continue
}
if err == client.ErrMicroServiceNotExists {
registry.ProvidersMicroServiceCache.Delete(strings.Join([]string{service[0], service[1]}, "|"))
}
lager.Logger.Error("Refresh local instance cache failed: " + err.Error())
continue
u := filter(instances, app, downs) //set app into instance metadata, split instances into ups and downs
ups = append(ups, u...)
}

filterReIndex(providerInstances, service[0], service[1])
registry.RefreshCache(service, ups, downs) //save cache after get all instances of a service name
}
return nil
}
Expand All @@ -256,28 +257,31 @@ func (c *CacheManager) compareAndDeleteOutdatedProviders(newProviders sets.Strin
}

// getServiceSet returns service sets
func (c *CacheManager) getServiceSet(exist []*client.MicroService) (sets.String, sets.String) {
func getServiceSet(exist []*client.MicroService) (sets.String, map[string]sets.String) {
//get Provider's instances
serviceNameSet := sets.NewString() // key is serviceName
serviceNameAppIDKeySet := sets.NewString() // key is "serviceName:appId"
serviceNameSet := sets.NewString() // key is serviceName
serviceNameAppIDKeySet := make(map[string]sets.String, 0) // key is "serviceName" value is app sets
if exist == nil || len(exist) == 0 {
return serviceNameSet, serviceNameAppIDKeySet
}

for _, microservice := range exist {
if microservice == nil {
for _, service := range exist {
if service == nil {
continue
}
serviceNameSet.Insert(microservice.ServiceName)
key := strings.Join([]string{microservice.ServiceName, microservice.AppID}, ":")
serviceNameAppIDKeySet.Insert(key)
serviceNameSet.Insert(service.ServiceName)
m, ok := serviceNameAppIDKeySet[service.ServiceName]
if ok {
m.Insert(service.AppID)
} else {
serviceNameAppIDKeySet[service.ServiceName] = sets.NewString()
serviceNameAppIDKeySet[service.ServiceName].Insert(service.AppID)
}
}
return serviceNameSet, serviceNameAppIDKeySet
}

func filterReIndex(providerInstances []*client.MicroServiceInstance, serviceName string, appID string) {
func filter(providerInstances []*client.MicroServiceInstance, app string, downs map[string]struct{}) []*registry.MicroServiceInstance {
ups := make([]*registry.MicroServiceInstance, 0, len(providerInstances))
downs := make(map[string]struct{})
for _, ins := range providerInstances {
switch {
case ins.Version == "":
Expand All @@ -289,10 +293,11 @@ func filterReIndex(providerInstances []*client.MicroServiceInstance, serviceName
ins.Status, ins.ServiceID, ins.InstanceID)
continue
default:
ups = append(ups, ToMicroServiceInstance(ins).WithAppID(appID))
ups = append(ups, ToMicroServiceInstance(ins).WithAppID(app))
}
}
registry.RefreshCache(serviceName, ups, downs)
return ups

}

// watch watching micro-service instance status
Expand Down
7 changes: 4 additions & 3 deletions core/registry/servicecenter/servicecenter.go
Expand Up @@ -321,14 +321,15 @@ func (r *ServiceDiscovery) FindMicroServiceInstances(consumerID, microServiceNam
if appID == "" {
appID = runtime.App
}
openlogging.GetLogger().Warnf("%s Get instances from remote, key: %s %s", consumerID, appID, microServiceName)
openlogging.GetLogger().Warnf("%s Get instances from remote, key: %s:%s:%s", consumerID, appID, microServiceName, tags.Version())
providerInstances, err := r.registryClient.FindMicroServiceInstances(consumerID, appID, microServiceName,
tags.Version(), client.WithoutRevision())
if err != nil {
return nil, fmt.Errorf("FindMicroServiceInstances failed, ProviderID: %s, err: %s", microServiceName, err)
}

filterReIndex(providerInstances, microServiceName, appID)
downs := make(map[string]struct{}, 0)
ups := filter(providerInstances, appID, downs)
registry.RefreshCache(microServiceName, ups, downs)
microServiceInstance, boo = registry.MicroserviceInstanceIndex.Get(microServiceName, tags.KV)
if !boo || microServiceInstance == nil {
openlogging.GetLogger().Debugf("Find no microservice instances for %s from cache", microServiceName)
Expand Down
32 changes: 32 additions & 0 deletions core/registry/struct.go
@@ -1,5 +1,7 @@
package registry

import "github.com/go-chassis/go-chassis/core/common"

// MicroService struct having full info about micro-service
type MicroService struct {
ServiceID string
Expand Down Expand Up @@ -42,6 +44,36 @@ type MicroServiceInstance struct {
DataCenterInfo *DataCenterInfo
}

func (m *MicroServiceInstance) appID() string { return m.Metadata[common.BuildinTagApp] }
func (m *MicroServiceInstance) version() string { return m.Metadata[common.BuildinTagVersion] }

// Has return whether microservice has tags
func (m *MicroServiceInstance) Has(tags map[string]string) bool {
for k, v := range tags {
if mt, ok := m.Metadata[k]; !ok || mt != v {
return false
}
}
return true
}

// WithAppID add app tag for microservice instance
func (m *MicroServiceInstance) WithAppID(v string) *MicroServiceInstance {
m.Metadata[common.BuildinTagApp] = v
return m
}

//Equal compares 2 instances is same or not
func (m *MicroServiceInstance) Equal(ins *MicroServiceInstance) bool {
if m.InstanceID != ins.InstanceID {
return false
}
if m.ServiceID != ins.ServiceID {
return false
}
return true
}

// MicroServiceDependency is for to represent dependencies of micro-service
type MicroServiceDependency struct {
Consumer *MicroService
Expand Down
48 changes: 48 additions & 0 deletions core/registry/struct_test.go
@@ -0,0 +1,48 @@
package registry

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestMicroServiceInstance_Equal(t *testing.T) {
ins1 := &MicroServiceInstance{
InstanceID: "1",
ServiceID: "bill",
}
ins2 := &MicroServiceInstance{
InstanceID: "1",
ServiceID: "bill",
}
assert.True(t, ins1.Equal(ins2))

ins3 := &MicroServiceInstance{
InstanceID: "1",
ServiceID: "bill",
Metadata: map[string]string{
"a": "b",
"c": "d",
},
}
ins4 := &MicroServiceInstance{
InstanceID: "1",
ServiceID: "bill",
Metadata: map[string]string{
"a": "b",
"c": "d",
},
}
assert.True(t, ins3.Equal(ins4))

ins5 := &MicroServiceInstance{
InstanceID: "2",
ServiceID: "bill",
}
assert.False(t, ins5.Equal(ins4))

ins6 := &MicroServiceInstance{
InstanceID: "2",
ServiceID: "text",
}
assert.False(t, ins5.Equal(ins6))
}

0 comments on commit 5e647f1

Please sign in to comment.