Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: append inner properties when heartbeat success #1259

Merged
merged 2 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion etc/conf/app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ registry:
name:
region:
availableZone:
# properties params for instance
# inner properties params for instance, sc will always append these to instance properties
properties:

schema:
Expand Down
85 changes: 72 additions & 13 deletions server/service/disco/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ var (
propertiesMap map[string]string
)

func getInnerProperties() map[string]string {
once.Do(func() {
propertiesMap = config.GetStringMap("registry.instance.properties")
})
return propertiesMap
}

func RegisterInstance(ctx context.Context, in *pb.RegisterInstanceRequest) (*pb.RegisterInstanceResponse, error) {
remoteIP := util.GetIPFromContext(ctx)

Expand Down Expand Up @@ -104,27 +111,22 @@ func populateInstanceDefaultValue(ctx context.Context, instance *pb.MicroService
}
instance.Version = microservice.Version

setPropertiesToInstance(instance)
appendInnerPropertiesToInstance(instance)
return nil
}

func setPropertiesToInstance(instance *pb.MicroServiceInstance) {
func appendInnerPropertiesToInstance(instance *pb.MicroServiceInstance) {
if instance.Properties == nil {
instance.Properties = make(map[string]string)
}

once.Do(func() {
propertiesMap = config.GetStringMap("registry.instance.properties")
})

if len(propertiesMap) <= 0 {
innerProps := getInnerProperties()
if len(innerProps) <= 0 {
return
}

for k, v := range propertiesMap {
if _, ok := instance.Properties[k]; !ok {
instance.Properties[k] = v
}
for k, v := range innerProps {
instance.Properties[k] = v
}
}

Expand All @@ -141,13 +143,63 @@ func UnregisterInstance(ctx context.Context, in *pb.UnregisterInstanceRequest) e

func SendHeartbeat(ctx context.Context, in *pb.HeartbeatRequest) error {
remoteIP := util.GetIPFromContext(ctx)
instanceID := in.InstanceId
serviceID := in.ServiceId

if err := validator.ValidateHeartbeatRequest(in); err != nil {
log.Error(fmt.Sprintf("heartbeat failed, invalid parameters, operator %s", remoteIP), err)
log.Error(fmt.Sprintf("send heartbeat[%s/%s] failed, invalid parameters, operator %s",
serviceID, instanceID, remoteIP), err)
return pb.NewError(pb.ErrInvalidParams, err.Error())
}

return datasource.GetMetadataManager().SendHeartbeat(ctx, in)
err := datasource.GetMetadataManager().SendHeartbeat(ctx, in)
if err != nil {
log.Error(fmt.Sprintf("send heartbeat[%s/%s] failed, operator %s", serviceID, instanceID, remoteIP), err)
return err
}

// append the inner properties
err = appendInnerProperties(ctx, serviceID, instanceID)
if err != nil {
log.Error(fmt.Sprintf("append inner instance[%s/%s] properties failed, operator %s",
serviceID, instanceID, remoteIP), err)
return err
}
return nil
}

func appendInnerProperties(ctx context.Context, serviceID string, instanceID string) error {
resp, err := datasource.GetMetadataManager().GetInstance(ctx, &pb.GetOneInstanceRequest{ProviderServiceId: serviceID, ProviderInstanceId: instanceID})
if err != nil {
return err
little-cui marked this conversation as resolved.
Show resolved Hide resolved
}
instance := resp.Instance
if !shouldAppendInnerProperties(instance) {
return nil
}
props := make(map[string]string, len(resp.Instance.Properties))
for k, v := range resp.Instance.Properties {
props[k] = v
}
return PutInstanceProperties(ctx, &pb.UpdateInstancePropsRequest{
ServiceId: serviceID,
InstanceId: instanceID,
Properties: props,
})
}

func shouldAppendInnerProperties(instance *pb.MicroServiceInstance) bool {
instProps := instance.Properties
innerProps := getInnerProperties()
if len(innerProps) == 0 {
return false
}
for k, v := range innerProps {
if prop, ok := instProps[k]; !ok || prop != v {
return true
}
}
return false
}

func SendManyHeartbeat(ctx context.Context, in *pb.HeartbeatSetRequest) (*pb.HeartbeatSetResponse, error) {
Expand Down Expand Up @@ -374,6 +426,13 @@ func PutInstanceProperties(ctx context.Context, in *pb.UpdateInstancePropsReques
return pb.NewError(pb.ErrInvalidParams, err.Error())
}

properties := getInnerProperties()
if in.Properties == nil {
in.Properties = make(map[string]string, len(properties))
}
for k, v := range properties {
in.Properties[k] = v
}
return datasource.GetMetadataManager().PutInstanceProperties(ctx, in)
}

Expand Down
43 changes: 34 additions & 9 deletions server/service/disco/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import (
"strings"
"testing"

_ "github.com/apache/servicecomb-service-center/test"

"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/config"
"github.com/apache/servicecomb-service-center/server/core"
discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
_ "github.com/apache/servicecomb-service-center/test"
pb "github.com/go-chassis/cari/discovery"
"github.com/go-chassis/cari/pkg/errsvc"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -1862,13 +1863,26 @@ func TestSendHeartbeat(t *testing.T) {
})

t.Run("when update a lease, should be passed", func(t *testing.T) {
err := discosvc.SendHeartbeat(ctx, &pb.HeartbeatRequest{
resp, err := discosvc.GetInstance(ctx, &pb.GetOneInstanceRequest{ProviderServiceId: serviceId, ProviderInstanceId: instanceId1})
assert.NoError(t, err)
resp.Instance.Properties = nil

err = discosvc.PutInstance(ctx, &pb.RegisterInstanceRequest{Instance: resp.Instance})
assert.NoError(t, err)

err = discosvc.SendHeartbeat(ctx, &pb.HeartbeatRequest{
ServiceId: serviceId,
InstanceId: instanceId1,
})
assert.NoError(t, err)

err = discosvc.SendHeartbeat(ctx, &pb.HeartbeatRequest{
resp, err = discosvc.GetInstance(ctx, &pb.GetOneInstanceRequest{ProviderServiceId: serviceId, ProviderInstanceId: instanceId1})
assert.NoError(t, err)
assert.Equal(t, "test_engineID", resp.Instance.Properties["engineID"])
})

t.Run("when update lease with invalid request, should be failed", func(t *testing.T) {
err := discosvc.SendHeartbeat(ctx, &pb.HeartbeatRequest{
ServiceId: "",
InstanceId: instanceId1,
})
Expand Down Expand Up @@ -2118,6 +2132,10 @@ func TestUpdateInstance(t *testing.T) {
})
assert.NoError(t, err)

resp, err := discosvc.GetInstance(ctx, &pb.GetOneInstanceRequest{ProviderServiceId: serviceId, ProviderInstanceId: instanceId})
assert.NoError(t, err)
assert.Equal(t, "test", resp.Instance.Properties["test"])

size := 1000
properties := make(map[string]string, size)
for i := 0; i < size; i++ {
Expand All @@ -2131,6 +2149,19 @@ func TestUpdateInstance(t *testing.T) {
})
assert.NoError(t, err)

err = discosvc.PutInstanceProperties(ctx, &pb.UpdateInstancePropsRequest{
ServiceId: serviceId,
InstanceId: instanceId,
})
assert.NoError(t, err)

resp, err = discosvc.GetInstance(ctx, &pb.GetOneInstanceRequest{ProviderServiceId: serviceId, ProviderInstanceId: instanceId})
assert.NoError(t, err)
_, ok := resp.Instance.Properties["test"]
assert.False(t, ok)
})

t.Run("when update instance properties with invalid request, should be failed", func(t *testing.T) {
err = discosvc.PutInstanceProperties(ctx, &pb.UpdateInstancePropsRequest{
ServiceId: serviceId,
InstanceId: "notexistins",
Expand Down Expand Up @@ -2197,12 +2228,6 @@ func TestUpdateInstance(t *testing.T) {
assert.Error(t, testErr)
assert.Equal(t, pb.ErrInvalidParams, testErr.Code)

err = discosvc.PutInstanceProperties(ctx, &pb.UpdateInstancePropsRequest{
ServiceId: serviceId,
InstanceId: instanceId,
})
assert.NoError(t, err)

err = discosvc.PutInstanceProperties(ctx, &pb.UpdateInstancePropsRequest{
ServiceId: "notexistservice",
InstanceId: instanceId,
Expand Down