Skip to content

Commit

Permalink
SCB-1053 Add instances request struct (#508)
Browse files Browse the repository at this point in the history
* SCB-1053 Add instances request struct

* SCB-1053 Implement batch find instances

* SCB-1053 Add UTs

* SCB-1053 Implement Query instances by provider id

* SCB-1053 Fix IT failure

* SCB-1053 Bug fix
  • Loading branch information
little-cui committed Dec 11, 2018
1 parent fad2b58 commit 61f0575
Show file tree
Hide file tree
Showing 15 changed files with 562 additions and 166 deletions.
35 changes: 33 additions & 2 deletions integration/instances_test.go
Expand Up @@ -364,11 +364,27 @@ var _ = Describe("MicroService Api Test", func() {
"version": serviceVersion,
},
}
notExistsInstance := map[string]interface{}{
"instance": map[string]interface{}{
"serviceId": serviceId,
"instanceId": "notexisted",
},
}
providerInstance := map[string]interface{}{
"instance": map[string]interface{}{
"serviceId": serviceId,
"instanceId": serviceInstanceID,
},
}
findRequest := map[string]interface{}{
"services": []map[string]interface{}{
provider,
notExistsService,
},
"instances": []map[string]interface{}{
providerInstance,
notExistsInstance,
},
}
body, _ := json.Marshal(findRequest)
bodyBuf := bytes.NewReader(body)
Expand All @@ -378,8 +394,10 @@ var _ = Describe("MicroService Api Test", func() {
resp, _ := scclient.Do(req)
respbody, _ := ioutil.ReadAll(resp.Body)
Expect(resp.StatusCode).To(Equal(http.StatusOK))
servicesStruct := map[string][]map[string]interface{}{}
json.Unmarshal(respbody, &servicesStruct)
respStruct := map[string]map[string][]map[string]interface{}{}
json.Unmarshal(respbody, &respStruct)
servicesStruct := respStruct["services"]
instancesStruct := respStruct["instances"]
failed := false
for _, services := range servicesStruct["failed"] {
a := services["indexes"].([]interface{})[0] == 1.0
Expand All @@ -393,6 +411,19 @@ var _ = Describe("MicroService Api Test", func() {
Expect(servicesStruct["updated"][0]["index"]).To(Equal(0.0))
Expect(len(servicesStruct["updated"][0]["instances"].([]interface{}))).
ToNot(Equal(0))
failed = false
for _, instances := range instancesStruct["failed"] {
a := instances["indexes"].([]interface{})[0] == 1.0
b := instances["error"].(map[string]interface{})["errorCode"] == "400017"
if a && b {
failed = true
break
}
}
Expect(failed).To(Equal(true))
Expect(instancesStruct["updated"][0]["index"]).To(Equal(0.0))
Expect(len(instancesStruct["updated"][0]["instances"].([]interface{}))).
ToNot(Equal(0))
})
})

Expand Down
23 changes: 17 additions & 6 deletions server/core/proto/batch_find.go
Expand Up @@ -24,6 +24,11 @@ type FindService struct {
Rev string `protobuf:"bytes,2,opt,name=rev" json:"rev,omitempty"`
}

type FindInstance struct {
Instance *HeartbeatSetElement `protobuf:"bytes,1,opt,name=instance" json:"instance"`
Rev string `protobuf:"bytes,2,opt,name=rev" json:"rev,omitempty"`
}

type FindResult struct {
Index int64 `protobuf:"varint,1,opt,name=index" json:"index"`
Rev string `protobuf:"bytes,2,opt,name=rev" json:"rev"`
Expand All @@ -35,14 +40,20 @@ type FindFailedResult struct {
Error *scerr.Error `protobuf:"bytes,2,opt,name=error" json:"error"`
}

type BatchFindResult struct {
Failed []*FindFailedResult `protobuf:"bytes,1,rep,name=failed" json:"failed,omitempty"`
NotModified []int64 `protobuf:"varint,2,rep,packed,name=notModified" json:"notModified,omitempty"`
Updated []*FindResult `protobuf:"bytes,3,rep,name=updated" json:"updated,omitempty"`
}

type BatchFindInstancesRequest struct {
ConsumerServiceId string `protobuf:"bytes,1,opt,name=consumerServiceId" json:"consumerServiceId,omitempty"`
Services []*FindService `protobuf:"bytes,2,rep,name=services" json:"services"`
ConsumerServiceId string `protobuf:"bytes,1,opt,name=consumerServiceId" json:"consumerServiceId,omitempty"`
Services []*FindService `protobuf:"bytes,2,rep,name=services" json:"services,omitempty"`
Instances []*FindInstance `protobuf:"bytes,3,rep,name=instances" json:"instances,omitempty"`
}

type BatchFindInstancesResponse struct {
Response *Response `protobuf:"bytes,1,opt,name=response" json:"response,omitempty"`
Failed []*FindFailedResult `protobuf:"bytes,2,rep,name=failed" json:"failed,omitempty"`
NotModified []int64 `protobuf:"varint,3,rep,packed,name=notModified" json:"notModified,omitempty"`
Updated []*FindResult `protobuf:"bytes,4,rep,name=updated" json:"updated,omitempty"`
Response *Response `protobuf:"bytes,1,opt,name=response" json:"response,omitempty"`
Services *BatchFindResult `protobuf:"bytes,2,rep,name=services" json:"services,omitempty"`
Instances *BatchFindResult `protobuf:"bytes,3,rep,name=instances" json:"instances,omitempty"`
}
21 changes: 20 additions & 1 deletion server/core/swagger/v4.yaml
Expand Up @@ -1964,13 +1964,25 @@ definitions:
rev:
type: string
description: 客户端缓存的版本号。
FindInstance:
type: object
properties:
instance:
$ref: '#/definitions/HeartbeatSetElement'
rev:
type: string
description: 客户端缓存的版本号。
BatchFindRequest:
type: object
properties:
services:
type: array
items:
$ref: '#/definitions/FindService'
instances:
type: array
items:
$ref: '#/definitions/FindInstance'
FindResult:
type: object
properties:
Expand All @@ -1994,7 +2006,7 @@ definitions:
description: 与请求数组对应的索引集合。
error:
$ref: '#/definitions/Error'
BatchFindResponse:
BatchFindResult:
type: object
properties:
failed:
Expand All @@ -2010,6 +2022,13 @@ definitions:
type: array
items:
$ref: '#/definitions/FindResult'
BatchFindResponse:
type: object
properties:
services:
$ref: '#/definitions/BatchFindResult'
instances:
$ref: '#/definitions/BatchFindResult'
CreateDependenciesRequest:
type: object
properties:
Expand Down
17 changes: 17 additions & 0 deletions server/rest/controller/v4/instance_controller.go
Expand Up @@ -191,9 +191,18 @@ func (this *MicroServiceInstanceService) GetOneInstance(w http.ResponseWriter, r
ProviderInstanceId: query.Get(":instanceId"),
Tags: ids,
}

resp, _ := core.InstanceAPI.GetOneInstance(r.Context(), request)
respInternal := resp.Response
resp.Response = nil

iv, _ := r.Context().Value(serviceUtil.CTX_REQUEST_REVISION).(string)
ov, _ := r.Context().Value(serviceUtil.CTX_RESPONSE_REVISION).(string)
w.Header().Set(serviceUtil.HEADER_REV, ov)
if len(iv) > 0 && iv == ov {
w.WriteHeader(http.StatusNotModified)
return
}
controller.WriteResponse(w, respInternal, resp)
}

Expand All @@ -212,6 +221,14 @@ func (this *MicroServiceInstanceService) GetInstances(w http.ResponseWriter, r *
resp, _ := core.InstanceAPI.GetInstances(r.Context(), request)
respInternal := resp.Response
resp.Response = nil

iv, _ := r.Context().Value(serviceUtil.CTX_REQUEST_REVISION).(string)
ov, _ := r.Context().Value(serviceUtil.CTX_RESPONSE_REVISION).(string)
w.Header().Set(serviceUtil.HEADER_REV, ov)
if len(iv) > 0 && iv == ov {
w.WriteHeader(http.StatusNotModified)
return
}
controller.WriteResponse(w, respInternal, resp)
}

Expand Down
9 changes: 5 additions & 4 deletions server/service/cache/common.go
Expand Up @@ -17,10 +17,11 @@
package cache

const (
CTX_FIND_CONSUMER = "_consumer"
CTX_FIND_PROVIDER = "_provider"
CTX_FIND_TAGS = "_tags"
CTX_FIND_REQUEST_REV = "_rev"
CTX_FIND_CONSUMER = "_consumer"
CTX_FIND_PROVIDER = "_provider"
CTX_FIND_PROVIDER_INSTANCE = "_provider_instance"
CTX_FIND_TAGS = "_tags"
CTX_FIND_REQUEST_REV = "_rev"

CACHE_FIND = "_find"
CACHE_DEP = "_dep"
Expand Down
87 changes: 67 additions & 20 deletions server/service/cache/filter_instances.go
Expand Up @@ -46,12 +46,17 @@ type InstancesFilter struct {
}

func (f *InstancesFilter) Name(ctx context.Context, _ *cache.Node) string {
instanceKey, ok := ctx.Value(CTX_FIND_PROVIDER_INSTANCE).(*pb.HeartbeatSetElement)
if ok {
return instanceKey.ServiceId + apt.SPLIT + instanceKey.InstanceId
}
return ""
}

func (f *InstancesFilter) Init(ctx context.Context, parent *cache.Node) (node *cache.Node, err error) {
pCopy := *parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem)
pCopy.Instances, pCopy.Rev, err = f.FindInstances(ctx, pCopy.ServiceIds)

pCopy.Instances, pCopy.Rev, err = f.Find(ctx, parent)
if err != nil {
return
}
Expand All @@ -62,34 +67,76 @@ func (f *InstancesFilter) Init(ctx context.Context, parent *cache.Node) (node *c
return
}

func (f *InstancesFilter) FindInstances(ctx context.Context, serviceIds []string) (instances []*pb.MicroServiceInstance, rev string, err error) {
func (f *InstancesFilter) Find(ctx context.Context, parent *cache.Node) (
instances []*pb.MicroServiceInstance, rev string, err error) {
pCache := parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem)
provider := ctx.Value(CTX_FIND_PROVIDER).(*pb.MicroServiceKey)

instanceKey, ok := ctx.Value(CTX_FIND_PROVIDER_INSTANCE).(*pb.HeartbeatSetElement)
if ok {
if len(pCache.ServiceIds) == 0 {
// can not find by instanceKey.ServiceId after pre-filters init
return
}
instances, rev, err = f.FindInstances(ctx, provider.Tenant, instanceKey)
} else {
instances, rev, err = f.BatchFindInstances(ctx, provider.Tenant, pCache.ServiceIds)
}
if err != nil {
consumer := ctx.Value(CTX_FIND_CONSUMER).(*pb.MicroService)
findFlag := fmt.Sprintf("consumer '%s' find provider %s/%s/%s", consumer.ServiceId,
provider.AppId, provider.ServiceName, provider.Version)
log.Errorf(err, "Find failed, %s", findFlag)
}
return
}

func (f *InstancesFilter) findInstances(ctx context.Context, domainProject, serviceId, instanceId string, maxRevs []int64, counts []int64) (instances []*pb.MicroServiceInstance, err error) {
key := apt.GenerateInstanceKey(domainProject, serviceId, instanceId)
opts := append(serviceUtil.FromContext(ctx), registry.WithStrKey(key), registry.WithPrefix())
resp, err := backend.Store().Instance().Search(ctx, opts...)
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return
}

for _, kv := range resp.Kvs {
if i, ok := clustersIndex[kv.ClusterName]; ok {
if kv.ModRevision > maxRevs[i] {
maxRevs[i] = kv.ModRevision
}
counts[i]++
}
instances = append(instances, kv.Value.(*pb.MicroServiceInstance))
}
return
}

func (f *InstancesFilter) FindInstances(ctx context.Context, domainProject string, instanceKey *pb.HeartbeatSetElement) (instances []*pb.MicroServiceInstance, rev string, err error) {
var (
maxRevs = make([]int64, len(clustersIndex))
counts = make([]int64, len(clustersIndex))
)
instances, err = f.findInstances(ctx, domainProject, instanceKey.ServiceId, instanceKey.InstanceId, maxRevs, counts)
if err != nil {
return
}
return instances, serviceUtil.FormatRevision(maxRevs, counts), nil
}

func (f *InstancesFilter) BatchFindInstances(ctx context.Context, domainProject string, serviceIds []string) (instances []*pb.MicroServiceInstance, rev string, err error) {
var (
maxRevs = make([]int64, len(clustersIndex))
counts = make([]int64, len(clustersIndex))
)
for _, providerServiceId := range serviceIds {
key := apt.GenerateInstanceKey(provider.Tenant, providerServiceId, "")
opts := append(serviceUtil.FromContext(ctx), registry.WithStrKey(key), registry.WithPrefix())
resp, err := backend.Store().Instance().Search(ctx, opts...)
insts, err := f.findInstances(ctx, domainProject, providerServiceId, "", maxRevs, counts)
if err != nil {
consumer := ctx.Value(CTX_FIND_CONSUMER).(*pb.MicroService)
findFlag := fmt.Sprintf("consumer '%s' find provider %s/%s/%s", consumer.ServiceId,
provider.AppId, provider.ServiceName, provider.Version)
log.Errorf(err, "Instance().Search failed, %s", findFlag)
return nil, "", err
}

for _, kv := range resp.Kvs {
if i, ok := clustersIndex[kv.ClusterName]; ok {
if kv.ModRevision > maxRevs[i] {
maxRevs[i] = kv.ModRevision
}
counts[i]++
}
instances = append(instances, kv.Value.(*pb.MicroServiceInstance))
}

instances = append(instances, insts...)
}

return instances, serviceUtil.FormatRevision(maxRevs, counts), nil
Expand Down
25 changes: 12 additions & 13 deletions server/service/cache/filter_rev.go
Expand Up @@ -38,35 +38,34 @@ func (f *RevisionFilter) Name(ctx context.Context, parent *cache.Node) string {
}

func (f *RevisionFilter) Init(ctx context.Context, parent *cache.Node) (node *cache.Node, err error) {
item := parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem)
pCache := parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem)
requestRev := ctx.Value(CTX_FIND_REQUEST_REV).(string)
if len(requestRev) == 0 || requestRev == item.Rev {
if len(requestRev) == 0 || requestRev == pCache.Rev {
node = cache.NewNode()
node.Cache.Set(CACHE_FIND, item)
node.Cache.Set(CACHE_FIND, pCache)
return
}

if item.BrokenWait() {
if pCache.BrokenWait() {
node = cache.NewNode()
node.Cache.Set(CACHE_FIND, item)
node.Cache.Set(CACHE_FIND, pCache)
return
}

cloneCtx := util.CloneContext(ctx)
cloneCtx = util.SetContext(cloneCtx, serviceUtil.CTX_NOCACHE, "1")

insts, _, err := f.FindInstances(cloneCtx, item.ServiceIds)
insts, _, err := f.Find(cloneCtx, parent)
if err != nil {
item.InitBrokenQueue()
pCache.InitBrokenQueue()
return nil, err
}

log.Warnf("the cache of finding instances api is broken, req[%s]!=cache[%s]",
requestRev, item.Rev)
item.Instances = insts
item.Broken()
log.Warnf("the cache of finding instances api is broken, req[%s]!=cache[%s][%s]",
requestRev, pCache.Rev, parent.Name)
pCache.Instances = insts
pCache.Broken()

node = cache.NewNode()
node.Cache.Set(CACHE_FIND, item)
node.Cache.Set(CACHE_FIND, pCache)
return
}
14 changes: 11 additions & 3 deletions server/service/cache/filter_version.go
Expand Up @@ -34,14 +34,23 @@ func (f *VersionRuleFilter) Name(ctx context.Context, _ *cache.Node) string {
}

func (f *VersionRuleFilter) Init(ctx context.Context, parent *cache.Node) (node *cache.Node, err error) {
instance, ok := ctx.Value(CTX_FIND_PROVIDER_INSTANCE).(*pb.HeartbeatSetElement)
if ok {
node = cache.NewNode()
node.Cache.Set(CACHE_FIND, &VersionRuleCacheItem{
ServiceIds: []string{instance.ServiceId},
})
return
}

provider := ctx.Value(CTX_FIND_PROVIDER).(*pb.MicroServiceKey)
// 版本规则
ids, exist, err := serviceUtil.FindServiceIds(ctx, provider.Version, provider)
if err != nil {
consumer := ctx.Value(CTX_FIND_CONSUMER).(*pb.MicroService)
findFlag := fmt.Sprintf("consumer '%s' find provider %s/%s/%s", consumer.ServiceId,
provider.AppId, provider.ServiceName, provider.Version)
log.Errorf(err, "VersionRuleFilter failed, %s", findFlag)
log.Errorf(err, "FindServiceIds failed, %s", findFlag)
return
}
if !exist {
Expand All @@ -50,8 +59,7 @@ func (f *VersionRuleFilter) Init(ctx context.Context, parent *cache.Node) (node

node = cache.NewNode()
node.Cache.Set(CACHE_FIND, &VersionRuleCacheItem{
VersionRule: provider.Version,
ServiceIds: ids,
ServiceIds: ids,
})
return
}

0 comments on commit 61f0575

Please sign in to comment.