/
nosql.go
289 lines (259 loc) · 7.4 KB
/
nosql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
package nosql
import (
"context"
"errors"
"fmt"
"regexp"
"github.com/pborman/uuid"
)
var (
ErrNotFound = errors.New("not found")
)
// Key is a set of values that describe primary key of document.
type Key []string
// Value converts a Key to a value that can be stored in the database.
func (k Key) Value() Value {
return Strings(k)
}
// GenKey generates a unique key (with one field).
func GenKey() Key {
return Key{uuid.NewUUID().String()}
}
// KeyFrom extracts a set of fields as a Key from Document.
func KeyFrom(fields []string, doc Document) Key {
key := make(Key, 0, len(fields))
for _, f := range fields {
if s, ok := doc[f].(String); ok {
key = append(key, string(s))
}
}
return key
}
// Database is a minimal interface for NoSQL database implementations.
type Database interface {
// Insert creates a document with a given key in a given collection.
// Key can be nil meaning that implementation should generate a unique key for the item.
// It returns the key that was generated, or the same key that was passed to it.
Insert(ctx context.Context, col string, key Key, d Document) (Key, error)
// FindByKey finds a document by it's Key. It returns ErrNotFound if document not exists.
FindByKey(ctx context.Context, col string, key Key) (Document, error)
// Query starts construction of a new query for a specified collection.
Query(col string) Query
// Update starts construction of document update request for a specified document and collection.
Update(col string, key Key) Update
// Delete starts construction of document delete request.
Delete(col string) Delete
// EnsureIndex creates or updates indexes on the collection to match it's arguments.
// It should create collection if it not exists. Primary index is guaranteed to be of StringExact type.
EnsureIndex(ctx context.Context, col string, primary Index, secondary []Index) error
// Close closes the database connection.
Close() error
}
// FilterOp is a comparison operation type used for value filters.
type FilterOp int
func (op FilterOp) String() string {
name := ""
switch op {
case Equal:
name = "Equal"
case NotEqual:
name = "NotEqual"
case GT:
name = "GT"
case GTE:
name = "GTE"
case LT:
name = "LT"
case LTE:
name = "LTE"
default:
return fmt.Sprintf("FilterOp(%d)", int(op))
}
return name
}
func (op FilterOp) GoString() string {
return "nosql." + op.String()
}
const (
Equal = FilterOp(iota + 1)
NotEqual
GT
GTE
LT
LTE
Regexp
)
// FieldFilter represents a single field comparison operation.
type FieldFilter struct {
Path []string // path is a path to specific field in the document
Filter FilterOp // comparison operation
Value Value // value that will be compared with field of the document
}
func (f FieldFilter) Matches(d Document) bool {
if f.Filter == NotEqual {
// not equal is special - it allows parent fields to not exist
path := f.Path
var val Value = d
for len(path) > 0 {
d, ok := val.(Document)
if !ok {
return true
}
v, ok := d[path[0]]
if !ok {
return true
}
val, path = v, path[1:]
}
return !ValuesEqual(val, f.Value)
}
path := f.Path
var val Value = d
for len(path) > 0 {
d, ok := val.(Document)
if !ok {
return false
}
v, ok := d[path[0]]
if !ok {
return false
}
val, path = v, path[1:]
}
switch f.Filter {
case Equal:
return ValuesEqual(val, f.Value)
case GT, GTE, LT, LTE:
dn := CompareValues(val, f.Value)
switch f.Filter {
case GT:
return dn > 0
case GTE:
return dn >= 0
case LT:
return dn < 0
case LTE:
return dn <= 0
}
case Regexp:
pattern, ok := f.Value.(String)
if !ok {
return false
}
s, ok := val.(String)
if !ok {
return false
}
ok, _ = regexp.MatchString(string(pattern), string(s))
return ok
}
panic(fmt.Errorf("unsupported operation: %v", f.Filter))
}
// Query is a query builder object.
type Query interface {
// WithFields adds specified filters to the query.
WithFields(filters ...FieldFilter) Query
// Limit limits a maximal number of results returned.
Limit(n int) Query
// Count executes query and returns a number of items that matches it.
Count(ctx context.Context) (int64, error)
// One executes query and returns first document from it.
One(ctx context.Context) (Document, error)
// Iterate starts an iteration over query results.
Iterate() DocIterator
}
// Update is an update request builder.
type Update interface {
// Inc increments document field with a given amount. Will also increment upserted document.
Inc(field string, dn int) Update
// Upsert sets a document that will be inserted in case original object does not exists already.
// It should omit fields used by Inc - they will be added automatically.
Upsert(d Document) Update
// Do executes update request.
Do(ctx context.Context) error
}
// Update is a batch delete request builder.
type Delete interface {
// WithFields adds specified filters to select document for deletion.
WithFields(filters ...FieldFilter) Delete
// Keys limits a set of documents to delete to ones with keys specified.
// Delete still uses provided filters, thus it will not delete objects with these keys if they do not pass filters.
Keys(keys ...Key) Delete
// Do executes batch delete.
Do(ctx context.Context) error
}
// DocIterator is an iterator over a list of documents.
type DocIterator interface {
// Next advances an iterator to the next document.
Next(ctx context.Context) bool
// Err returns a last encountered error.
Err() error
// Close frees all resources associated with iterator.
Close() error
// Key returns a key of current document.
Key() Key
// Doc returns current document.
Doc() Document
}
// BatchInsert returns a streaming writer for database or emulates it if database has no support for batch inserts.
func BatchInsert(db Database, col string) DocWriter {
if bi, ok := db.(BatchInserter); ok {
return bi.BatchInsert(col)
}
return &seqInsert{db: db, col: col}
}
type seqInsert struct {
db Database
col string
keys []Key
err error
}
func (w *seqInsert) WriteDoc(ctx context.Context, key Key, d Document) error {
key, err := w.db.Insert(ctx, w.col, key, d)
if err != nil {
w.err = err
return err
}
w.keys = append(w.keys, key)
return nil
}
func (w *seqInsert) Flush(ctx context.Context) error {
return w.err
}
func (w *seqInsert) Keys() []Key {
return w.keys
}
func (w *seqInsert) Close() error {
return w.err
}
// DocWriter is an interface for writing documents in streaming manner.
type DocWriter interface {
// WriteDoc prepares document to be written. Write becomes valid only after Flush.
WriteDoc(ctx context.Context, key Key, d Document) error
// Flush waits for all writes to complete.
Flush(ctx context.Context) error
// Keys returns a list of already inserted documents.
// Might be less then a number of written documents until Flush is called.
Keys() []Key
// Close closes writer and discards any unflushed documents.
Close() error
}
// BatchInserter is an optional interface for databases that can insert documents in batches.
type BatchInserter interface {
BatchInsert(col string) DocWriter
}
// IndexType is a type of index for collection.
type IndexType int
const (
IndexAny = IndexType(iota)
StringExact // exact match for string values (usually a hash index)
//StringFulltext
//IntIndex
//FloatIndex
//TimeIndex
)
// Index is an index for a collection of documents.
type Index struct {
Fields []string // an ordered set of fields used in index
Type IndexType
}