-
Notifications
You must be signed in to change notification settings - Fork 18
/
store.go
138 lines (113 loc) · 3.57 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright 2023 The ClusterLink Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import (
"encoding/json"
"errors"
"fmt"
"reflect"
"github.com/sirupsen/logrus"
"github.com/clusterlink-net/clusterlink/pkg/store"
)
// ObjectStore represents a persistent store of objects, backed by a KV-store.
// Key format for an object is: <storeName>.<objectName>.
type ObjectStore struct {
store Store
keyPrefix string
objectType reflect.Type
logger *logrus.Entry
}
// kvKey encodes object keys to a single key identifying the object in the store.
func (s *ObjectStore) kvKey(name string) []byte {
return []byte(s.keyPrefix + name)
}
// Create an object.
func (s *ObjectStore) Create(name string, value any) error {
s.logger.Infof("Creating: '%s'.", name)
// serialize
encoded, err := json.Marshal(value)
if err != nil {
return fmt.Errorf("unable to serialize object: %w", err)
}
// persist to store
if err := s.store.Create(s.kvKey(name), encoded); err != nil {
var keyExistsError *KeyExistsError
if errors.As(err, &keyExistsError) {
return &store.ObjectExistsError{}
}
return err
}
return nil
}
// Update an object.
func (s *ObjectStore) Update(name string, mutator func(any) any) error {
s.logger.Infof("Updating: '%s'.", name)
// persist to store
err := s.store.Update(s.kvKey(name), func(value []byte) ([]byte, error) {
// de-serialize old value
decoded := reflect.New(s.objectType).Interface()
if err := json.Unmarshal(value, decoded); err != nil {
return nil, fmt.Errorf("unable to decode value for object '%s': %w", name, err)
}
// serialize mutated value
encoded, err := json.Marshal(mutator(decoded))
if err != nil {
return nil, fmt.Errorf("unable to serialize mutated object '%s': %w", name, err)
}
return encoded, nil
})
if err != nil {
var keyNotFoundError *KeyNotFoundError
if errors.As(err, &keyNotFoundError) {
return &store.ObjectNotFoundError{}
}
return err
}
return nil
}
// Delete an object identified by the given name.
func (s *ObjectStore) Delete(name string) error {
s.logger.Infof("Deleting: '%s'.", name)
return s.store.Delete(s.kvKey(name))
}
// GetAll returns all of the objects in the store.
func (s *ObjectStore) GetAll() ([]any, error) {
s.logger.Info("Getting all objects.")
var objects []any
err := s.store.Range([]byte(s.keyPrefix), func(key, value []byte) error {
s.logger.Debugf("De-serializing: %v.", value)
decoded := reflect.New(s.objectType).Interface()
if err := json.Unmarshal(value, decoded); err != nil {
return fmt.Errorf("unable to decode object for key %v: %w", key, err)
}
objects = append(objects, decoded)
return nil
})
if err != nil {
return nil, err
}
return objects, nil
}
// NewObjectStore returns a new object store backed by a KV-store.
func NewObjectStore(name string, s Store, sampleObject any) *ObjectStore {
logger := logrus.WithFields(logrus.Fields{
"component": "store.kv.object-store",
"name": name,
})
return &ObjectStore{
store: s,
keyPrefix: name + ".",
objectType: reflect.TypeOf(sampleObject),
logger: logger,
}
}