/
operations.go
235 lines (213 loc) · 6.32 KB
/
operations.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
package db
import (
"encoding/json"
"errors"
"fmt"
"reflect"
"strconv"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/util"
)
func findByID(info *colInfo, reader dbReader, id []byte, model Model) error {
if err := info.checkModelType(model); err != nil {
return err
}
pk := info.primaryKeyForID(id)
data, err := reader.Get(pk, nil)
if err != nil {
if err == leveldb.ErrNotFound {
return NotFoundError{ID: id}
}
return err
}
return json.Unmarshal(data, model)
}
func findAll(info *colInfo, reader dbReader, models interface{}) error {
prefixRange := util.BytesPrefix([]byte(fmt.Sprintf("%s:", info.prefix())))
iter := reader.NewIterator(prefixRange, nil)
return findWithIterator(info, iter, models)
}
func findWithIterator(info *colInfo, iter iterator.Iterator, models interface{}) error {
defer iter.Release()
if err := info.checkModelsType(models); err != nil {
return err
}
modelsVal := reflect.ValueOf(models).Elem()
for iter.Next() && iter.Error() == nil {
// We assume that each value in the iterator is the encoded data for some
// model.
data := iter.Value()
model := reflect.New(info.modelType)
if err := json.Unmarshal(data, model.Interface()); err != nil {
return err
}
modelsVal.Set(reflect.Append(modelsVal, model.Elem()))
}
return nil
}
// findExistingModelByPrimaryKeyWithTransaction gets the latest data for the
// given primary key. Useful in cases where the given model may be out of date
// with what is currently stored in the database. It *doesn't* discard the
// transaction if there is an error.
func findExistingModelByPrimaryKeyWithTransaction(info *colInfo, readWriter dbReadWriter, primaryKey []byte) (Model, error) {
data, err := readWriter.Get(primaryKey, nil)
if err != nil {
return nil, err
}
// Use reflect to create a new reference for the model type.
modelRef := reflect.New(info.modelType).Interface()
if err := json.Unmarshal(data, modelRef); err != nil {
return nil, err
}
model := reflect.ValueOf(modelRef).Elem().Interface().(Model)
return model, nil
}
func insertWithTransaction(info *colInfo, readWriter dbReadWriter, model Model) error {
if len(model.ID()) == 0 {
return errors.New("can't insert model with empty ID")
}
if err := info.checkModelType(model); err != nil {
return err
}
data, err := json.Marshal(model)
if err != nil {
return err
}
pk := info.primaryKeyForModel(model)
if exists, err := readWriter.Has(pk, nil); err != nil {
return err
} else if exists {
return AlreadyExistsError{ID: model.ID()}
}
if err := readWriter.Put(pk, data, nil); err != nil {
return err
}
if err := saveIndexesWithTransaction(info, readWriter, model); err != nil {
return err
}
return nil
}
func updateWithTransaction(info *colInfo, readWriter dbReadWriter, model Model) error {
if len(model.ID()) == 0 {
return errors.New("can't update model with empty ID")
}
if err := info.checkModelType(model); err != nil {
return err
}
// Check if the model already exists and return an error if not.
pk := info.primaryKeyForModel(model)
if exists, err := readWriter.Has(pk, nil); err != nil {
return err
} else if !exists {
return NotFoundError{ID: model.ID()}
}
// Get the existing data for the model and delete any (now outdated) indexes.
existingModel, err := findExistingModelByPrimaryKeyWithTransaction(info, readWriter, pk)
if err != nil {
return err
}
if err := deleteIndexesWithTransaction(info, readWriter, existingModel); err != nil {
return err
}
// Save the new data and add the new indexes.
newData, err := json.Marshal(model)
if err != nil {
return err
}
if err := readWriter.Put(pk, newData, nil); err != nil {
return err
}
if err := saveIndexesWithTransaction(info, readWriter, model); err != nil {
return err
}
return nil
}
func deleteWithTransaction(info *colInfo, readWriter dbReadWriter, id []byte) error {
if len(id) == 0 {
return errors.New("can't delete model with empty ID")
}
// We need to get the latest data because the given model might be out of sync
// with the actual data in the database.
pk := info.primaryKeyForID(id)
latest, err := findExistingModelByPrimaryKeyWithTransaction(info, readWriter, pk)
if err != nil {
if err == leveldb.ErrNotFound {
return NotFoundError{ID: id}
}
return err
}
// Delete the primary key.
if err := readWriter.Delete(pk, nil); err != nil {
return err
}
// Delete any index entries.
if err := deleteIndexesWithTransaction(info, readWriter, latest); err != nil {
return err
}
return nil
}
func saveIndexesWithTransaction(info *colInfo, readWriter dbReadWriter, model Model) error {
info.indexMut.RLock()
defer info.indexMut.RUnlock()
for _, index := range info.indexes {
keys := index.keysForModel(model)
for _, key := range keys {
if err := readWriter.Put(key, nil, nil); err != nil {
return err
}
}
}
return nil
}
// deleteIndexesForModel deletes any indexes computed from the given model. It
// *doesn't* discard the transaction if there is an error.
func deleteIndexesWithTransaction(info *colInfo, readWriter dbReadWriter, model Model) error {
info.indexMut.RLock()
defer info.indexMut.RUnlock()
for _, index := range info.indexes {
keys := index.keysForModel(model)
for _, key := range keys {
if err := readWriter.Delete(key, nil); err != nil {
return err
}
}
}
return nil
}
func count(info *colInfo, reader dbReader) (int, error) {
encodedCount, err := reader.Get(info.countKey(), nil)
if err != nil {
if err == leveldb.ErrNotFound {
// If countKey doesn't exist, assume no models have been inserted and
// return a count of 0.
return 0, nil
}
return 0, err
}
count, err := decodeInt(encodedCount)
if err != nil {
return 0, err
}
return count, nil
}
func updateCountWithTransaction(info *colInfo, readWriter dbReadWriter, diff int) error {
existingCount, err := count(info, readWriter)
if err != nil {
return err
}
newCount := existingCount + diff
if newCount == 0 {
return readWriter.Delete(info.countKey(), nil)
} else {
return readWriter.Put(info.countKey(), encodeInt(newCount), nil)
}
}
func encodeInt(i int) []byte {
// TODO(albrow): Could potentially be optimized.
return []byte(strconv.Itoa(i))
}
func decodeInt(b []byte) (int, error) {
// TODO(albrow): Could potentially be optimized.
return strconv.Atoi(string(b))
}