forked from git-lfs/git-lfs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
keyvaluestore.go
230 lines (198 loc) · 6.18 KB
/
keyvaluestore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package kv
import (
"encoding/gob"
"fmt"
"io"
"os"
"sync"
)
// Store provides an in-memory key/value store which is persisted to
// a file. The file handle itself is not kept locked for the duration; it is
// only locked during load and save, to make it concurrency friendly. When
// saving, the store uses optimistic locking to determine whether the db on disk
// has been modified by another process; in which case it loads the latest
// version and re-applies modifications made during this session. This means
// the Lost Update db concurrency issue is possible; so don't use this if you
// need more DB integrity than Read Committed isolation levels.
type Store struct {
// Locks the entire store
mu sync.RWMutex
filename string
log []change
// This is the persistent data
// version for optimistic locking, this field is incremented with every Save()
version int64
db map[string]interface{}
}
// Type of operation; set or remove
type operation int
const (
// Set a value for a key
setOperation = operation(iota)
// Removed a value for a key
removeOperation = operation(iota)
)
type change struct {
op operation
key string
value interface{}
}
// NewStore creates a new key/value store and initialises it with contents from
// the named file, if it exists
func NewStore(filepath string) (*Store, error) {
kv := &Store{filename: filepath, db: make(map[string]interface{})}
return kv, kv.loadAndMergeIfNeeded()
}
// Set updates the key/value store in memory
// Changes are not persisted until you call Save()
func (k *Store) Set(key string, value interface{}) {
k.mu.Lock()
defer k.mu.Unlock()
k.db[key] = value
k.logChange(setOperation, key, value)
}
// Remove removes the key and its value from the store in memory
// Changes are not persisted until you call Save()
func (k *Store) Remove(key string) {
k.mu.Lock()
defer k.mu.Unlock()
delete(k.db, key)
k.logChange(removeOperation, key, nil)
}
// RemoveAll removes all entries from the store
// These changes are not persisted until you call Save()
func (k *Store) RemoveAll() {
k.mu.Lock()
defer k.mu.Unlock()
// Log all changes
for key, _ := range k.db {
k.logChange(removeOperation, key, nil)
}
k.db = make(map[string]interface{})
}
// Visit walks through the entire store via a function; return false from
// your visitor function to halt the walk
func (k *Store) Visit(cb func(string, interface{}) bool) {
// Read-only lock
k.mu.RLock()
defer k.mu.RUnlock()
for k, v := range k.db {
if !cb(k, v) {
break
}
}
}
// Append a change to the log; mutex must already be locked
func (k *Store) logChange(op operation, key string, value interface{}) {
k.log = append(k.log, change{op, key, value})
}
// Get retrieves a value from the store, or nil if it is not present
func (k *Store) Get(key string) interface{} {
// Read-only lock
k.mu.RLock()
defer k.mu.RUnlock()
// zero value of interface{} is nil so this does what we want
return k.db[key]
}
// Save persists the changes made to disk
// If any changes have been written by other code they will be merged
func (k *Store) Save() error {
k.mu.Lock()
defer k.mu.Unlock()
// Short-circuit if we have no changes
if len(k.log) == 0 {
return nil
}
// firstly peek at version; open read/write to keep lock between check & write
f, err := os.OpenFile(k.filename, os.O_RDWR|os.O_CREATE, 0664)
if err != nil {
return err
}
defer f.Close()
// Only try to merge if > 0 bytes, ignore empty files (decoder will fail)
if stat, _ := f.Stat(); stat.Size() > 0 {
k.loadAndMergeReaderIfNeeded(f)
// Now we overwrite the file
f.Seek(0, os.SEEK_SET)
f.Truncate(0)
}
k.version++
enc := gob.NewEncoder(f)
if err := enc.Encode(k.version); err != nil {
return fmt.Errorf("Error while writing version data to %v: %v", k.filename, err)
}
if err := enc.Encode(k.db); err != nil {
return fmt.Errorf("Error while writing new key/value data to %v: %v", k.filename, err)
}
// Clear log now that it's saved
k.log = nil
return nil
}
// Reads as little as possible from the passed in file to determine if the
// contents are different from the version already held. If so, reads the
// contents and merges with any outstanding changes. If not, stops early without
// reading the rest of the file
func (k *Store) loadAndMergeIfNeeded() error {
stat, err := os.Stat(k.filename)
if err != nil {
if os.IsNotExist(err) {
return nil // missing is OK
}
return err
}
// Do nothing if empty file
if stat.Size() == 0 {
return nil
}
f, err := os.OpenFile(k.filename, os.O_RDONLY, 0664)
if err == nil {
defer f.Close()
return k.loadAndMergeReaderIfNeeded(f)
} else {
return err
}
}
// As loadAndMergeIfNeeded but lets caller decide how to manage file handles
func (k *Store) loadAndMergeReaderIfNeeded(f io.Reader) error {
var versionOnDisk int64
// Decode *only* the version field to check whether anyone else has
// modified the db; gob serializes structs in order so it will always be 1st
dec := gob.NewDecoder(f)
err := dec.Decode(&versionOnDisk)
if err != nil {
return fmt.Errorf("Problem checking version of key/value data from %v: %v", k.filename, err)
}
// Totally uninitialised Version == 0, saved versions are always >=1
if versionOnDisk != k.version {
// Reload data & merge
var dbOnDisk map[string]interface{}
err = dec.Decode(&dbOnDisk)
if err != nil {
return fmt.Errorf("Problem reading updated key/value data from %v: %v", k.filename, err)
}
k.reapplyChanges(dbOnDisk)
k.version = versionOnDisk
}
return nil
}
// reapplyChanges replays the changes made since the last load onto baseDb
// and stores the result as our own DB
func (k *Store) reapplyChanges(baseDb map[string]interface{}) {
for _, change := range k.log {
switch change.op {
case setOperation:
baseDb[change.key] = change.value
case removeOperation:
delete(baseDb, change.key)
}
}
// Note, log is not cleared here, that only happens on Save since it's a
// list of unsaved changes
k.db = baseDb
}
// RegisterTypeForStorage registers a custom type (e.g. a struct) for
// use in the key value store. This is necessary if you intend to pass custom
// structs to Store.Set() rather than primitive types.
func RegisterTypeForStorage(val interface{}) {
gob.Register(val)
}