diff --git a/core/registry/cache.go b/core/registry/cache.go index 55aec8b91..620ca7c8d 100755 --- a/core/registry/cache.go +++ b/core/registry/cache.go @@ -46,10 +46,11 @@ func enableRegistryCache() { // CacheIndex is a unified local instances cache manager type CacheIndex interface { - Get(k string, tags map[string]string) ([]*MicroServiceInstance, bool) - Set(k string, instances []*MicroServiceInstance) + Get(service string, tags map[string]string) ([]*MicroServiceInstance, bool) + //Set will overwrite all instances correspond to a service name + Set(service string, instances []*MicroServiceInstance) FullCache() *cache.Cache - Delete(k string) + Delete(service string) } //SetIPIndex save ip index diff --git a/core/registry/cache_index.go b/core/registry/cache_index.go index 3aeada42f..037c68738 100755 --- a/core/registry/cache_index.go +++ b/core/registry/cache_index.go @@ -86,7 +86,7 @@ func (ic *IndexCache) Get(k string, tags map[string]string) ([]*MicroServiceInst //if version is latest, then set it to real version ic.setTagsBeforeQuery(k, tags) //find from indexed cache first - indexKey := ic.getIndexedCacheKey(k, tags) + indexKey := getIndexedCacheKey(k, tags) savedResult, ok := ic.indexedCache.Get(indexKey) if !ok { //no result, then find it and save result @@ -117,8 +117,8 @@ func (ic *IndexCache) setTagsBeforeQuery(k string, tags map[string]string) { } } -//must combine in order -func (ic *IndexCache) getIndexedCacheKey(service string, tags map[string]string) (ss string) { +//must combine keys in order, use sets to return sorted list +func getIndexedCacheKey(service string, tags map[string]string) (ss string) { ss = "service:" + service keys := sets.NewString() for k := range tags { diff --git a/core/registry/cache_index_test.go b/core/registry/cache_index_test.go index b2bea4ab7..21e35c6fe 100755 --- a/core/registry/cache_index_test.go +++ b/core/registry/cache_index_test.go @@ -58,12 +58,11 @@ func TestIndexCache(t *testing.T) { cache.Delete("TestServer") } func TestIndexCache_Get(t *testing.T) { - cache := NewIndexCache() - k1 := cache.getIndexedCacheKey("service1", map[string]string{ + k1 := getIndexedCacheKey("service1", map[string]string{ "a": "b", "c": "d", }) - k2 := cache.getIndexedCacheKey("service1", map[string]string{ + k2 := getIndexedCacheKey("service1", map[string]string{ "c": "d", "a": "b", }) diff --git a/core/registry/index.go b/core/registry/index.go deleted file mode 100755 index 941ea7cb4..000000000 --- a/core/registry/index.go +++ /dev/null @@ -1,24 +0,0 @@ -package registry - -import ( - "github.com/go-chassis/go-chassis/core/common" -) - -func (m *MicroServiceInstance) appID() string { return m.Metadata[common.BuildinTagApp] } -func (m *MicroServiceInstance) version() string { return m.Metadata[common.BuildinTagVersion] } - -// Has return whether microservice has tags -func (m *MicroServiceInstance) Has(tags map[string]string) bool { - for k, v := range tags { - if mt, ok := m.Metadata[k]; !ok || mt != v { - return false - } - } - return true -} - -// WithAppID add app tag for microservice instance -func (m *MicroServiceInstance) WithAppID(v string) *MicroServiceInstance { - m.Metadata[common.BuildinTagApp] = v - return m -} diff --git a/core/registry/servicecenter/cache.go b/core/registry/servicecenter/cache.go index e1e6ee8bf..b895a9543 100755 --- a/core/registry/servicecenter/cache.go +++ b/core/registry/servicecenter/cache.go @@ -14,6 +14,7 @@ import ( "github.com/go-chassis/go-sc-client" "github.com/go-chassis/go-chassis/third_party/forked/k8s.io/apimachinery/pkg/util/sets" + "github.com/go-mesh/openlogging" ) // constant values for default expiration time, and refresh interval @@ -41,7 +42,7 @@ func (c *CacheManager) AutoSync() { if err != nil { lager.Logger.Errorf("Watch failed. Self Micro service Id:%s. %s", runtime.ServiceID, err) } - lager.Logger.Debugf("Watching Intances change events.") + lager.Logger.Debugf("Watching Instances change events.") } var ticker *time.Ticker refreshInterval := config.GetServiceDiscoveryRefreshInterval() @@ -217,31 +218,31 @@ func (c *CacheManager) pullMicroserviceInstance() error { AppID: service.AppID, }) } - serviceNameSet, serviceNameAppIDKeySet := c.getServiceSet(services) + serviceNameSet, serviceNameAppIDKeySet := getServiceSet(services) c.compareAndDeleteOutdatedProviders(serviceNameSet) - for key := range serviceNameAppIDKeySet { - service := strings.Split(key, ":") - if len(service) != 2 { - lager.Logger.Errorf("Invalid serviceStore %s for providers %s", key, runtime.ServiceID) - continue - } - - providerInstances, err := c.registryClient.FindMicroServiceInstances(runtime.ServiceID, service[1], - service[0], common.AllVersion) - if err != nil { - if err == client.ErrNotModified { - lager.Logger.Debug(err.Error()) + for service, apps := range serviceNameAppIDKeySet { + ups := make([]*registry.MicroServiceInstance, 0) //append instances from different app and same service name into one unified slice + downs := make(map[string]struct{}, 0) + for _, app := range apps.List() { + //fetch remote based on app and service + instances, err := c.registryClient.FindMicroServiceInstances(runtime.ServiceID, app, service, + common.AllVersion) + if err != nil { + if err == client.ErrNotModified { + openlogging.Debug(err.Error()) + continue + } + if err == client.ErrMicroServiceNotExists { + registry.ProvidersMicroServiceCache.Delete(strings.Join([]string{service, app}, "|")) + } + openlogging.Error("Refresh local instance cache failed: " + err.Error()) continue } - if err == client.ErrMicroServiceNotExists { - registry.ProvidersMicroServiceCache.Delete(strings.Join([]string{service[0], service[1]}, "|")) - } - lager.Logger.Error("Refresh local instance cache failed: " + err.Error()) - continue + u := filter(instances, app, downs) //set app into instance metadata, split instances into ups and downs + ups = append(ups, u...) } - - filterReIndex(providerInstances, service[0], service[1]) + registry.RefreshCache(service, ups, downs) //save cache after get all instances of a service name } return nil } @@ -256,28 +257,31 @@ func (c *CacheManager) compareAndDeleteOutdatedProviders(newProviders sets.Strin } // getServiceSet returns service sets -func (c *CacheManager) getServiceSet(exist []*client.MicroService) (sets.String, sets.String) { +func getServiceSet(exist []*client.MicroService) (sets.String, map[string]sets.String) { //get Provider's instances - serviceNameSet := sets.NewString() // key is serviceName - serviceNameAppIDKeySet := sets.NewString() // key is "serviceName:appId" + serviceNameSet := sets.NewString() // key is serviceName + serviceNameAppIDKeySet := make(map[string]sets.String, 0) // key is "serviceName" value is app sets if exist == nil || len(exist) == 0 { return serviceNameSet, serviceNameAppIDKeySet } - for _, microservice := range exist { - if microservice == nil { + for _, service := range exist { + if service == nil { continue } - serviceNameSet.Insert(microservice.ServiceName) - key := strings.Join([]string{microservice.ServiceName, microservice.AppID}, ":") - serviceNameAppIDKeySet.Insert(key) + serviceNameSet.Insert(service.ServiceName) + m, ok := serviceNameAppIDKeySet[service.ServiceName] + if ok { + m.Insert(service.AppID) + } else { + serviceNameAppIDKeySet[service.ServiceName] = sets.NewString() + serviceNameAppIDKeySet[service.ServiceName].Insert(service.AppID) + } } return serviceNameSet, serviceNameAppIDKeySet } - -func filterReIndex(providerInstances []*client.MicroServiceInstance, serviceName string, appID string) { +func filter(providerInstances []*client.MicroServiceInstance, app string, downs map[string]struct{}) []*registry.MicroServiceInstance { ups := make([]*registry.MicroServiceInstance, 0, len(providerInstances)) - downs := make(map[string]struct{}) for _, ins := range providerInstances { switch { case ins.Version == "": @@ -289,10 +293,11 @@ func filterReIndex(providerInstances []*client.MicroServiceInstance, serviceName ins.Status, ins.ServiceID, ins.InstanceID) continue default: - ups = append(ups, ToMicroServiceInstance(ins).WithAppID(appID)) + ups = append(ups, ToMicroServiceInstance(ins).WithAppID(app)) } } - registry.RefreshCache(serviceName, ups, downs) + return ups + } // watch watching micro-service instance status diff --git a/core/registry/servicecenter/servicecenter.go b/core/registry/servicecenter/servicecenter.go index e09f8339b..a1d29e113 100755 --- a/core/registry/servicecenter/servicecenter.go +++ b/core/registry/servicecenter/servicecenter.go @@ -321,14 +321,15 @@ func (r *ServiceDiscovery) FindMicroServiceInstances(consumerID, microServiceNam if appID == "" { appID = runtime.App } - openlogging.GetLogger().Warnf("%s Get instances from remote, key: %s %s", consumerID, appID, microServiceName) + openlogging.GetLogger().Warnf("%s Get instances from remote, key: %s:%s:%s", consumerID, appID, microServiceName, tags.Version()) providerInstances, err := r.registryClient.FindMicroServiceInstances(consumerID, appID, microServiceName, tags.Version(), client.WithoutRevision()) if err != nil { return nil, fmt.Errorf("FindMicroServiceInstances failed, ProviderID: %s, err: %s", microServiceName, err) } - - filterReIndex(providerInstances, microServiceName, appID) + downs := make(map[string]struct{}, 0) + ups := filter(providerInstances, appID, downs) + registry.RefreshCache(microServiceName, ups, downs) microServiceInstance, boo = registry.MicroserviceInstanceIndex.Get(microServiceName, tags.KV) if !boo || microServiceInstance == nil { openlogging.GetLogger().Debugf("Find no microservice instances for %s from cache", microServiceName) diff --git a/core/registry/struct.go b/core/registry/struct.go index 1fe449e5c..c636cfb9e 100755 --- a/core/registry/struct.go +++ b/core/registry/struct.go @@ -1,5 +1,7 @@ package registry +import "github.com/go-chassis/go-chassis/core/common" + // MicroService struct having full info about micro-service type MicroService struct { ServiceID string @@ -42,6 +44,36 @@ type MicroServiceInstance struct { DataCenterInfo *DataCenterInfo } +func (m *MicroServiceInstance) appID() string { return m.Metadata[common.BuildinTagApp] } +func (m *MicroServiceInstance) version() string { return m.Metadata[common.BuildinTagVersion] } + +// Has return whether microservice has tags +func (m *MicroServiceInstance) Has(tags map[string]string) bool { + for k, v := range tags { + if mt, ok := m.Metadata[k]; !ok || mt != v { + return false + } + } + return true +} + +// WithAppID add app tag for microservice instance +func (m *MicroServiceInstance) WithAppID(v string) *MicroServiceInstance { + m.Metadata[common.BuildinTagApp] = v + return m +} + +//Equal compares 2 instances is same or not +func (m *MicroServiceInstance) Equal(ins *MicroServiceInstance) bool { + if m.InstanceID != ins.InstanceID { + return false + } + if m.ServiceID != ins.ServiceID { + return false + } + return true +} + // MicroServiceDependency is for to represent dependencies of micro-service type MicroServiceDependency struct { Consumer *MicroService diff --git a/core/registry/struct_test.go b/core/registry/struct_test.go new file mode 100644 index 000000000..bce716180 --- /dev/null +++ b/core/registry/struct_test.go @@ -0,0 +1,48 @@ +package registry + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestMicroServiceInstance_Equal(t *testing.T) { + ins1 := &MicroServiceInstance{ + InstanceID: "1", + ServiceID: "bill", + } + ins2 := &MicroServiceInstance{ + InstanceID: "1", + ServiceID: "bill", + } + assert.True(t, ins1.Equal(ins2)) + + ins3 := &MicroServiceInstance{ + InstanceID: "1", + ServiceID: "bill", + Metadata: map[string]string{ + "a": "b", + "c": "d", + }, + } + ins4 := &MicroServiceInstance{ + InstanceID: "1", + ServiceID: "bill", + Metadata: map[string]string{ + "a": "b", + "c": "d", + }, + } + assert.True(t, ins3.Equal(ins4)) + + ins5 := &MicroServiceInstance{ + InstanceID: "2", + ServiceID: "bill", + } + assert.False(t, ins5.Equal(ins4)) + + ins6 := &MicroServiceInstance{ + InstanceID: "2", + ServiceID: "text", + } + assert.False(t, ins5.Equal(ins6)) +}