Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat]get sync value from context instead of directly from configuration file #257

Merged
merged 6 commits into from
Aug 20, 2022
45 changes: 22 additions & 23 deletions server/datasource/kv_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (

"github.com/apache/servicecomb-kie/pkg/common"
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/server/config"
"github.com/apache/servicecomb-kie/server/datasource"
kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
"github.com/apache/servicecomb-kie/server/service/sync"
"github.com/apache/servicecomb-kie/test"
emodel "github.com/apache/servicecomb-service-center/eventbase/model"
"github.com/apache/servicecomb-service-center/eventbase/service/task"
Expand Down Expand Up @@ -112,12 +112,12 @@ func TestWithSync(t *testing.T) {
if test.IsEmbeddedetcdMode() {
return
}
// set the sync enabled
config.Configurations.Sync.Enabled = true

t.Run("create kv with sync enabled", func(t *testing.T) {
t.Run("creating a kv will create a task should pass", func(t *testing.T) {
kv1, err := kvsvc.Create(context.Background(), &model.KVDoc{
// set the sync enabled
ctx := sync.NewContext(context.Background(), true)
kv1, err := kvsvc.Create(ctx, &model.KVDoc{
Key: "sync-create",
Value: "2s",
Status: common.StatusEnabled,
Expand All @@ -135,31 +135,33 @@ func TestWithSync(t *testing.T) {
Domain: "default",
Project: "sync-create",
}
_, tempErr := kvsvc.FindOneAndDelete(context.Background(), kv1.ID, "sync-create", "default")
_, tempErr := kvsvc.FindOneAndDelete(ctx, kv1.ID, "sync-create", "default")
assert.Nil(t, tempErr)
resp, tempErr := kvsvc.List(context.Background(), "sync-create", "default")
resp, tempErr := kvsvc.List(ctx, "sync-create", "default")
assert.Nil(t, tempErr)
assert.Equal(t, 0, resp.Total)
tasks, tempErr := task.List(context.Background(), &listReq)
tasks, tempErr := task.List(ctx, &listReq)
assert.Nil(t, tempErr)
assert.Equal(t, 2, len(tasks))
tempErr = task.Delete(context.Background(), tasks...)
tempErr = task.Delete(ctx, tasks...)
assert.Nil(t, tempErr)
tbListReq := emodel.ListTombstoneRequest{
Domain: "default",
Project: "sync-create",
ResourceType: datasource.ConfigResource,
}
tombstones, tempErr := tombstone.List(context.Background(), &tbListReq)
tombstones, tempErr := tombstone.List(ctx, &tbListReq)
assert.Equal(t, 1, len(tombstones))
tempErr = tombstone.Delete(context.Background(), tombstones...)
tempErr = tombstone.Delete(ctx, tombstones...)
assert.Nil(t, tempErr)
})
})

t.Run("update kv with sync enabled", func(t *testing.T) {
t.Run("creating two kvs and updating them will create four tasks should pass", func(t *testing.T) {
kv1, err := kvsvc.Create(context.Background(), &model.KVDoc{
// set the sync enabled
ctx := sync.NewContext(context.Background(), true)
kv1, err := kvsvc.Create(ctx, &model.KVDoc{
Key: "sync-update-one",
Value: "2s",
Status: common.StatusEnabled,
Expand All @@ -172,7 +174,7 @@ func TestWithSync(t *testing.T) {
})
assert.Nil(t, err)
assert.NotEmpty(t, kv1.ID)
kv2, err := kvsvc.Create(context.Background(), &model.KVDoc{
kv2, err := kvsvc.Create(ctx, &model.KVDoc{
Key: "sync-update-two",
Value: "2s",
Status: common.StatusEnabled,
Expand All @@ -185,49 +187,46 @@ func TestWithSync(t *testing.T) {
})
assert.Nil(t, err)
assert.NotEmpty(t, kv2.ID)
kv1, tmpErr := kvsvc.Update(context.Background(), &model.UpdateKVRequest{
kv1, tmpErr := kvsvc.Update(ctx, &model.UpdateKVRequest{
ID: kv1.ID,
Value: "3s",
Domain: "default",
Project: "sync-update",
})
assert.Nil(t, tmpErr)
assert.NotEmpty(t, kv1.ID)
kv2, tmpErr = kvsvc.Update(context.Background(), &model.UpdateKVRequest{
kv2, tmpErr = kvsvc.Update(ctx, &model.UpdateKVRequest{
ID: kv2.ID,
Value: "3s",
Domain: "default",
Project: "sync-update",
})
assert.Nil(t, tmpErr)
assert.NotEmpty(t, kv2.ID)
_, tempErr := kvsvc.FindManyAndDelete(context.Background(), []string{kv1.ID, kv2.ID}, "sync-update", "default")
_, tempErr := kvsvc.FindManyAndDelete(ctx, []string{kv1.ID, kv2.ID}, "sync-update", "default")
assert.Nil(t, tempErr)
resp, tempErr := kvsvc.List(context.Background(), "sync-update", "default")
resp, tempErr := kvsvc.List(ctx, "sync-update", "default")
assert.Nil(t, tempErr)
assert.Equal(t, 0, resp.Total)
listReq := emodel.ListTaskRequest{
Domain: "default",
Project: "sync-update",
}
tasks, tempErr := task.List(context.Background(), &listReq)
tasks, tempErr := task.List(ctx, &listReq)
assert.Nil(t, tempErr)
assert.Equal(t, 6, len(tasks))
tempErr = task.Delete(context.Background(), tasks...)
tempErr = task.Delete(ctx, tasks...)
assert.Nil(t, tempErr)
tbListReq := emodel.ListTombstoneRequest{
Domain: "default",
Project: "sync-update",
ResourceType: datasource.ConfigResource,
}
tombstones, tempErr := tombstone.List(context.Background(), &tbListReq)
tombstones, tempErr := tombstone.List(ctx, &tbListReq)
assert.Equal(t, 2, len(tombstones))
tempErr = tombstone.Delete(context.Background(), tombstones...)
tempErr = tombstone.Delete(ctx, tombstones...)
assert.Nil(t, tempErr)
})
})

// set the sync unable
config.Configurations.Sync.Enabled = false

}
11 changes: 6 additions & 5 deletions server/service/kv/kv_svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import (
"github.com/apache/servicecomb-kie/pkg/concurrency"
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/stringutil"
cfg "github.com/apache/servicecomb-kie/server/config"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/pubsub"
"github.com/apache/servicecomb-kie/server/service/sync"
)

var listSema = concurrency.NewSemaphore(concurrency.DefaultConcurrency)
Expand Down Expand Up @@ -113,7 +113,8 @@ func Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, *errsvc.Error)
openlog.Error(err.Error())
return nil, config.NewError(config.ErrInternal, "create kv failed")
}
kv, err = datasource.GetBroker().GetKVDao().Create(ctx, kv, datasource.WithSync(cfg.GetSync().Enabled))

kv, err = datasource.GetBroker().GetKVDao().Create(ctx, kv, datasource.WithSync(sync.FromContext(ctx)))
if err != nil {
openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
return nil, config.NewError(config.ErrInternal, "create kv failed")
Expand Down Expand Up @@ -230,7 +231,7 @@ func Update(ctx context.Context, kv *model.UpdateKVRequest) (*model.KVDoc, error
if err != nil {
return nil, err
}
err = datasource.GetBroker().GetKVDao().Update(ctx, oldKV, datasource.WithSync(cfg.GetSync().Enabled))
err = datasource.GetBroker().GetKVDao().Update(ctx, oldKV, datasource.WithSync(sync.FromContext(ctx)))
if err != nil {
return nil, err
}
Expand All @@ -252,7 +253,7 @@ func Update(ctx context.Context, kv *model.UpdateKVRequest) (*model.KVDoc, error
}

func FindOneAndDelete(ctx context.Context, kvID string, project, domain string) (*model.KVDoc, error) {
kv, err := datasource.GetBroker().GetKVDao().FindOneAndDelete(ctx, kvID, project, domain, datasource.WithSync(cfg.GetSync().Enabled))
kv, err := datasource.GetBroker().GetKVDao().FindOneAndDelete(ctx, kvID, project, domain, datasource.WithSync(sync.FromContext(ctx)))
if err != nil {
return nil, err
}
Expand All @@ -272,7 +273,7 @@ func FindManyAndDelete(ctx context.Context, kvIDs []string, project, domain stri
var kvs []*model.KVDoc
var deleted int64
var err error
kvs, deleted, err = datasource.GetBroker().GetKVDao().FindManyAndDelete(ctx, kvIDs, project, domain, datasource.WithSync(cfg.GetSync().Enabled))
kvs, deleted, err = datasource.GetBroker().GetKVDao().FindManyAndDelete(ctx, kvIDs, project, domain, datasource.WithSync(sync.FromContext(ctx)))
if err != nil {
return nil, err
}
Expand Down
45 changes: 45 additions & 0 deletions server/service/sync/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sync

import (
"context"
)

type ctxKey string

const CtxSyncEnabled ctxKey = "sync"

func NewContext(ctx context.Context, enabled bool) context.Context {
if ctx == nil {
ctx = context.Background()
}
return context.WithValue(ctx, CtxSyncEnabled, enabled)
}

func FromContext(ctx context.Context) bool {
if ctx == nil {
return false
}
val := ctx.Value(CtxSyncEnabled)
enabled, ok := val.(bool)
if !ok {
enabled = false
}
return enabled
}
65 changes: 65 additions & 0 deletions server/service/sync/sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sync_test

import (
"context"
"testing"

"github.com/apache/servicecomb-kie/server/service/sync"
"github.com/stretchr/testify/assert"
)

func TestNewContext(t *testing.T) {
t.Run("parent is nil should new one", func(t *testing.T) {
ctx := sync.NewContext(nil, true)
assert.NotNil(t, ctx)
assert.True(t, sync.FromContext(ctx))
})
t.Run("parent is not nil should be ok", func(t *testing.T) {
ctx := sync.NewContext(context.Background(), true)
assert.NotNil(t, ctx)
assert.True(t, sync.FromContext(ctx))

ctx = sync.NewContext(context.Background(), false)
assert.False(t, sync.FromContext(ctx))
})
}

func TestFromContext(t *testing.T) {
type args struct {
ctx context.Context
}
tests := []struct {
name string
args args
want bool
}{
{"no ctx should return false", args{nil}, false},
{"ctx without sync should return false", args{context.Background()}, false},
{"ctx with sync=true should return true", args{sync.NewContext(context.Background(), true)}, true},
{"ctx with sync=false should return true", args{sync.NewContext(context.Background(), false)}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := sync.FromContext(tt.args.ctx); got != tt.want {
t.Errorf("FromContext() = %v, want %v", got, tt.want)
}
})
}
}