Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kie dev merge and local storage #312

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
with:
version: v1.51.2
version: v1.55.2
args: --enable gofmt,gocyclo,goimports,dupl,gosec --timeout 5m --skip-dirs=examples,test --skip-files=.*_test.go$
static-checks:
runs-on: ubuntu-latest
Expand Down
24 changes: 24 additions & 0 deletions .github/workflows/local_storage.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Merge check for local
on: [push, pull_request]
jobs:
etcd-with-localstorage:
runs-on: ubuntu-latest
steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: 1.18
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@v1
- name: UT for etcd with local storage
run: |
time docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379
while ! nc -z 127.0.0.1 2379; do
sleep 1
done
export TEST_DB_KIND=etcd_with_localstorage
export TEST_DB_URI=127.0.0.1:2379
export TEST_KVS_ROOT_PATH=/data/kvs
sudo rm -rf /data/kvs
sudo time go test $(go list ./... | grep -v mongo | grep -v third_party | grep -v examples)
1 change: 1 addition & 0 deletions cmd/kieserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

//storage
_ "github.com/apache/servicecomb-kie/server/datasource/etcd"
_ "github.com/apache/servicecomb-kie/server/datasource/local"
_ "github.com/apache/servicecomb-kie/server/datasource/mongo"

//quota management
Expand Down
5 changes: 4 additions & 1 deletion examples/dev/kie-conf.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
db:
# kind can be mongo, etcd, embedded_etcd
# kind can be mongo, etcd, embedded_etcd, embedded_etcd_with_localstorage, etcd_with_localstorage
kind: embedded_etcd

# localFilePath: is the root path to store local kv files
# uri: http://127.0.0.1:2379
# uri is the db endpoints list
# kind=mongo, then is the mongodb cluster's uri, e.g. mongodb://127.0.0.1:27017/kie
# kind=etcd, then is the remote etcd server's advertise-client-urls, e.g. http://127.0.0.1:2379
Expand Down
1 change: 1 addition & 0 deletions scripts/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ EOM
db:
kind: ${db_type}
uri: ${uri}
localFilePath: ${KVS_ROOT_PATH}
EOM
}

Expand Down
11 changes: 6 additions & 5 deletions server/config/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ type TLS struct {

// DB is yaml file struct to set persistent config
type DB struct {
TLS `yaml:",inline" json:",inline"`
URI string `yaml:"uri" json:"uri,omitempty"`
Kind string `yaml:"kind" json:"kind,omitempty"`
PoolSize int `yaml:"poolSize" json:"pool_size,omitempty"`
Timeout string `yaml:"timeout" json:"timeout,omitempty"`
TLS `yaml:",inline" json:",inline"`
URI string `yaml:"uri" json:"uri,omitempty"`
Kind string `yaml:"kind" json:"kind,omitempty"`
LocalFilePath string `yaml:"localFilePath" json:"local_file_path,omitempty"`
PoolSize int `yaml:"poolSize" json:"pool_size,omitempty"`
Timeout string `yaml:"timeout" json:"timeout,omitempty"`
}

// RBAC is rbac config
Expand Down
6 changes: 4 additions & 2 deletions server/datasource/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ import (
"errors"
"fmt"

"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/server/datasource/rbac"
"github.com/go-chassis/openlog"

"github.com/apache/servicecomb-kie/pkg/model"
)

var (
Expand Down Expand Up @@ -120,9 +119,12 @@ type ViewDao interface {
func Init(kind string) error {
var err error
f, ok := plugins[kind]

if !ok {
openlog.Info(fmt.Sprintf("do not support '%s'", kind))
return fmt.Errorf("do not support '%s'", kind)
}

dbc := &Config{}
if b, err = f(dbc); err != nil {
return err
Expand Down
62 changes: 36 additions & 26 deletions server/datasource/etcd/kv/kv_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
"sync"
"time"

"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
"github.com/go-chassis/foundation/backoff"
"github.com/go-chassis/openlog"
"github.com/little-cui/etcdadpt"
goCache "github.com/patrickmn/go-cache"
"go.etcd.io/etcd/api/v3/mvccpb"

"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
)

func Init() {
Expand All @@ -35,8 +36,6 @@ const (
backOffMinInterval = 5 * time.Second
)

type IDSet map[string]struct{}

type Cache struct {
timeOut time.Duration
client etcdadpt.Client
Expand Down Expand Up @@ -158,11 +157,13 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) {
cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels)
m, ok := kc.LoadKvIDSet(cacheKey)
if !ok {
kc.StoreKvIDSet(cacheKey, IDSet{kvDoc.ID: struct{}{}})
z := &sync.Map{}
z.Store(kvDoc.ID, struct{}{})
kc.StoreKvIDSet(cacheKey, z)
openlog.Info("cacheKey " + cacheKey + "not exists")
continue
}
m[kvDoc.ID] = struct{}{}
m.Store(kvDoc.ID, struct{}{})
}
}

Expand All @@ -180,23 +181,23 @@ func (kc *Cache) cacheDelete(rsp *etcdadpt.Response) {
openlog.Error("cacheKey " + cacheKey + "not exists")
continue
}
delete(m, kvDoc.ID)
m.Delete(kvDoc.ID)
}
}

func (kc *Cache) LoadKvIDSet(cacheKey string) (IDSet, bool) {
func (kc *Cache) LoadKvIDSet(cacheKey string) (*sync.Map, bool) {
val, ok := kc.kvIDCache.Load(cacheKey)
if !ok {
return nil, false
}
kvIds, ok := val.(IDSet)
kvIds, ok := val.(*sync.Map)
if !ok {
return nil, false
}
return kvIds, true
}

func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds IDSet) {
func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds *sync.Map) {
kc.kvIDCache.Store(cacheKey, kvIds)
}

Expand All @@ -220,9 +221,9 @@ func (kc *Cache) DeleteKvDoc(kvID string) {
kc.kvDocCache.Delete(kvID)
}

func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool) {
func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) {
if !req.Opts.ExactLabels {
return nil, false
return nil, false, nil
}

openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts))
Expand All @@ -232,22 +233,25 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool)
cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels)
kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
if !ok {
kvCache.StoreKvIDSet(cacheKey, IDSet{})
return result, true
kvCache.StoreKvIDSet(cacheKey, &sync.Map{})
return result, true, nil
}

var docs []*model.KVDoc

var kvIdsLeft []string
for kvID := range kvIds {
if doc, ok := kvCache.LoadKvDoc(kvID); ok {
kvIds.Range(func(kvID, value any) bool {
if doc, ok := kvCache.LoadKvDoc(kvID.(string)); ok {
docs = append(docs, doc)
continue
} else {
kvIdsLeft = append(kvIdsLeft, kvID.(string))
}
kvIdsLeft = append(kvIdsLeft, kvID)
return true
})
tpData, err := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
if err != nil {
return nil, true, err
}

tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
docs = append(docs, tpData...)

for _, doc := range docs {
Expand All @@ -257,17 +261,18 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool)
}
}
result.Total = len(result.Data)
return result, true
return result, true, nil
}

func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc {
func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) ([]*model.KVDoc, error) {
if len(kvIdsLeft) == 0 {
return nil
return nil, nil
}

openlog.Debug("get kv from etcd by kvId")
wg := sync.WaitGroup{}
docs := make([]*model.KVDoc, len(kvIdsLeft))
var getKvErr error
for i, kvID := range kvIdsLeft {
wg.Add(1)
go func(kvID string, cnt int) {
Expand All @@ -277,12 +282,14 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe
kv, err := etcdadpt.Get(ctx, docKey)
if err != nil {
openlog.Error(fmt.Sprintf("failed to get kv from etcd, err %v", err))
getKvErr = err
return
}

doc, err := kc.GetKvDoc(kv)
if err != nil {
openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err))
getKvErr = err
return
}

Expand All @@ -291,7 +298,10 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe
}(kvID, i)
}
wg.Wait()
return docs
if getKvErr != nil {
return nil, getKvErr
}
return docs, nil
}

func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
Expand Down
7 changes: 5 additions & 2 deletions server/datasource/etcd/kv/kv_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,15 +524,18 @@ func (s *Dao) listData(ctx context.Context, project, domain string, options ...d
}

if Enabled() {
result, useCache := Search(ctx, &CacheSearchReq{
result, useCache, err := Search(ctx, &CacheSearchReq{
Domain: domain,
Project: project,
Opts: &opts,
Regex: regex,
})
if useCache {
if useCache && err == nil {
return result, opts, nil
}
if useCache && err != nil {
openlog.Error("using cache to search kv failed: " + err.Error())
}
}

result, err := matchLabelsSearch(ctx, domain, project, regex, opts)
Expand Down
4 changes: 2 additions & 2 deletions server/datasource/kv_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestWithSync(t *testing.T) {
Project: "sync-create",
ResourceType: datasource.ConfigResource,
}
tombstones, tempErr := tombstone.List(ctx, &tbListReq)
tombstones, _ := tombstone.List(ctx, &tbListReq)
assert.Equal(t, 1, len(tombstones))
tempErr = tombstone.Delete(ctx, tombstones...)
assert.Nil(t, tempErr)
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestWithSync(t *testing.T) {
Project: "sync-update",
ResourceType: datasource.ConfigResource,
}
tombstones, tempErr := tombstone.List(ctx, &tbListReq)
tombstones, _ := tombstone.List(ctx, &tbListReq)
assert.Equal(t, 2, len(tombstones))
tempErr = tombstone.Delete(ctx, tombstones...)
assert.Nil(t, tempErr)
Expand Down
69 changes: 69 additions & 0 deletions server/datasource/local/counter/revision.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package counter

import (
"context"
"os"
"path"
"strconv"

"github.com/apache/servicecomb-kie/server/datasource/local/file"
"github.com/go-chassis/openlog"
)

// Dao is the implementation
type Dao struct {
}

// GetRevision return current revision number
func (s *Dao) GetRevision(ctx context.Context, domain string) (int64, error) {
revisionPath := path.Join(file.FileRootPath, domain, "revision")

revisionByte, err := file.ReadFile(revisionPath)

if err != nil {
if os.IsNotExist(err) {
return 0, nil
}
openlog.Error("get error: " + err.Error())
return 0, err
}
if revisionByte == nil || string(revisionByte) == "" {
return 0, nil
}

revisionNum, err := strconv.Atoi(string(revisionByte))
if err != nil {
return 0, err
}
return int64(revisionNum), nil
}

// ApplyRevision increase revision number and return modified value
func (s *Dao) ApplyRevision(ctx context.Context, domain string) (int64, error) {
currentRevisionNum, err := s.GetRevision(ctx, domain)
if err != nil {
return 0, err
}
err = file.CreateOrUpdateFile(path.Join(file.FileRootPath, domain, "revision"), []byte(strconv.Itoa(int(currentRevisionNum+1))), &[]file.FileDoRecord{}, false)
if err != nil {
return 0, err
}
return currentRevisionNum + 1, nil
}
Loading
Loading