-
Notifications
You must be signed in to change notification settings - Fork 5
/
model.go
393 lines (334 loc) · 11.4 KB
/
model.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
package mapper
// The mapper package provides a simplified, high-level interface for
// interacting with database objects.
import (
"fmt"
"reflect"
"github.com/ghetzel/go-stockutil/sliceutil"
"github.com/ghetzel/pivot/backends"
"github.com/ghetzel/pivot/dal"
"github.com/ghetzel/pivot/filter"
)
type ResultFunc func(ptrToInstance interface{}, err error) // {}
type Mapper interface {
NewInstance(inits ...dal.InitializerFunc) interface{}
GetBackend() backends.Backend
GetCollection() *dal.Collection
Migrate() error
Drop() error
Exists(id interface{}) bool
Create(from interface{}) error
Get(id interface{}, into interface{}) error
Update(from interface{}) error
CreateOrUpdate(id interface{}, from interface{}) error
Delete(ids ...interface{}) error
Find(flt interface{}, into interface{}) error
FindFunc(flt interface{}, destZeroValue interface{}, resultFn ResultFunc) error
All(into interface{}) error
Each(destZeroValue interface{}, resultFn ResultFunc) error
List(fields []string) (map[string][]interface{}, error)
ListWithFilter(fields []string, flt interface{}) (map[string][]interface{}, error)
Sum(field string, flt interface{}) (float64, error)
Count(flt interface{}) (uint64, error)
Minimum(field string, flt interface{}) (float64, error)
Maximum(field string, flt interface{}) (float64, error)
Average(field string, flt interface{}) (float64, error)
GroupBy(fields []string, aggregates []filter.Aggregate, flt interface{}) (*dal.RecordSet, error)
}
type Model struct {
Mapper
db backends.Backend
collection *dal.Collection
}
func NewModel(db backends.Backend, collection *dal.Collection) *Model {
model := new(Model)
model.db = db
model.collection = collection
if model.collection.Fields == nil {
model.collection.Fields = make([]dal.Field, 0)
}
if v := collection.IdentityField; v == `` {
model.collection.IdentityField = dal.DefaultIdentityField
} else {
model.collection.IdentityField = v
}
if v := collection.IdentityFieldType; v == `` {
model.collection.IdentityFieldType = dal.DefaultIdentityFieldType
} else {
model.collection.IdentityFieldType = v
}
db.RegisterCollection(collection)
return model
}
func (self *Model) NewInstance(inits ...dal.InitializerFunc) interface{} {
if self.collection == nil {
panic("Collection-aware instance creation is not supported on anonymous Models")
}
return self.collection.NewInstance(inits...)
}
func (self *Model) GetBackend() backends.Backend {
return self.db
}
func (self *Model) GetCollection() *dal.Collection {
return self.collection
}
func (self *Model) Migrate() error {
var actualCollection *dal.Collection
// create the collection if it doesn't exist
if c, err := self.db.GetCollection(self.collection.Name); dal.IsCollectionNotFoundErr(err) {
if err := self.db.CreateCollection(self.collection); err == nil {
if c, err := self.db.GetCollection(self.collection.Name); err == nil {
actualCollection = c
} else {
return err
}
} else {
return err
}
} else if err != nil {
return err
} else {
actualCollection = c
}
if diffs := self.collection.Diff(actualCollection); diffs != nil {
msg := fmt.Sprintf("Actual schema for collection '%s' differs from desired schema:\n", self.collection.Name)
for _, err := range diffs {
msg += fmt.Sprintf(" %v\n", err)
}
return fmt.Errorf(msg)
}
// overlay the definition onto whatever the backend came back with
actualCollection.ApplyDefinition(self.collection)
return nil
}
func (self *Model) Drop() error {
return self.db.DeleteCollection(self.collection.Name)
}
// Creates and saves a new instance of the model from the given struct or dal.Record.
//
func (self *Model) Create(from interface{}) error {
if record, err := self.collection.MakeRecord(from); err == nil {
return self.db.Insert(self.collection.Name, dal.NewRecordSet(record))
} else {
return err
}
}
// Retrieves an instance of the model identified by the given ID and populates the value pointed to
// by the into parameter. Structs and dal.Record instances can be populated.
//
func (self *Model) Get(id interface{}, into interface{}) error {
if record, err := self.db.Retrieve(self.collection.Name, id); err == nil {
return record.Populate(into, self.collection)
} else {
return err
}
}
// Tests whether a record exists for the given ID.
//
func (self *Model) Exists(id interface{}) bool {
return self.db.Exists(self.collection.Name, id)
}
// Updates and saves an existing instance of the model from the given struct or dal.Record.
//
func (self *Model) Update(from interface{}) error {
if record, err := self.collection.MakeRecord(from); err == nil {
return self.db.Update(self.collection.Name, dal.NewRecordSet(record))
} else {
return err
}
}
// Creates or updates an instance of the model depending on whether it exists or not.
//
func (self *Model) CreateOrUpdate(id interface{}, from interface{}) error {
if id == nil || !self.Exists(id) {
return self.Create(from)
} else {
return self.Update(from)
}
}
// Delete instances of the model identified by the given IDs
//
func (self *Model) Delete(ids ...interface{}) error {
return self.db.Delete(self.collection.Name, ids...)
}
// Perform a query for instances of the model that match the given filter.Filter.
// Results will be returned in the slice or array pointed to by the into parameter, or
// if into points to a dal.RecordSet, the RecordSet resulting from the query will be returned
// as-is.
//
func (self *Model) Find(flt interface{}, into interface{}) error {
if f, err := self.filterFromInterface(flt); err == nil {
f.IdentityField = self.collection.IdentityField
if search := self.db.WithSearch(self.collection, f); search != nil {
// perform query
if recordset, err := search.Query(self.collection, f); err == nil {
return self.populateOutputParameter(f, recordset, into)
} else {
return err
}
} else {
return fmt.Errorf("backend %T does not support searching", self.db)
}
} else {
return err
}
}
// Perform a query for instances of the model that match the given filter.Filter.
// The given callback function will be called once per result.
//
func (self *Model) FindFunc(flt interface{}, destZeroValue interface{}, resultFn ResultFunc) error {
if f, err := self.filterFromInterface(flt); err == nil {
f.IdentityField = self.collection.IdentityField
if search := self.db.WithSearch(self.collection, f); search != nil {
_, err := search.Query(self.collection, f, func(record *dal.Record, err error, _ backends.IndexPage) error {
if err == nil {
if _, ok := destZeroValue.(*dal.Record); ok {
resultFn(record, nil)
} else if _, ok := destZeroValue.(dal.Record); ok {
resultFn(*record, nil)
} else {
into := reflect.New(reflect.TypeOf(destZeroValue)).Interface()
// populate that type with data from this record
if err := record.Populate(into, self.collection); err == nil {
resultFn(into, nil)
} else {
return err
}
}
} else {
resultFn(nil, err)
}
return nil
})
return err
} else {
return fmt.Errorf("backend %T does not support searching", self.db)
}
} else {
return err
}
}
func (self *Model) All(into interface{}) error {
return self.Find(filter.All(), into)
}
func (self *Model) Each(destZeroValue interface{}, resultFn ResultFunc) error {
return self.FindFunc(filter.All(), destZeroValue, resultFn)
}
func (self *Model) List(fields []string) (map[string][]interface{}, error) {
return self.ListWithFilter(fields, filter.All())
}
func (self *Model) ListWithFilter(fields []string, flt interface{}) (map[string][]interface{}, error) {
if f, err := self.filterFromInterface(flt); err == nil {
f.IdentityField = self.collection.IdentityField
if search := self.db.WithSearch(self.collection, f); search != nil {
return search.ListValues(self.collection, fields, f)
} else {
return nil, fmt.Errorf("backend %T does not support searching", self.db)
}
} else {
return nil, err
}
}
func (self *Model) Sum(field string, flt interface{}) (float64, error) {
if f, err := self.filterFromInterface(flt); err == nil {
f.IdentityField = self.collection.IdentityField
if agg := self.db.WithAggregator(self.collection); agg != nil {
return agg.Sum(self.collection, field, f)
} else {
return 0, fmt.Errorf("backend %T does not support aggregation", self.db)
}
} else {
return 0, err
}
}
func (self *Model) Count(flt interface{}) (uint64, error) {
if f, err := self.filterFromInterface(flt); err == nil {
f.IdentityField = self.collection.IdentityField
if agg := self.db.WithAggregator(self.collection); agg != nil {
return agg.Count(self.collection, f)
} else {
return 0, fmt.Errorf("backend %T does not support aggregation", self.db)
}
} else {
return 0, err
}
}
func (self *Model) Minimum(field string, flt interface{}) (float64, error) {
if f, err := self.filterFromInterface(flt); err == nil {
f.IdentityField = self.collection.IdentityField
if agg := self.db.WithAggregator(self.collection); agg != nil {
return agg.Minimum(self.collection, field, f)
} else {
return 0, fmt.Errorf("backend %T does not support aggregation", self.db)
}
} else {
return 0, err
}
}
func (self *Model) Maximum(field string, flt interface{}) (float64, error) {
if f, err := self.filterFromInterface(flt); err == nil {
f.IdentityField = self.collection.IdentityField
if agg := self.db.WithAggregator(self.collection); agg != nil {
return agg.Maximum(self.collection, field, f)
} else {
return 0, fmt.Errorf("backend %T does not support aggregation", self.db)
}
} else {
return 0, err
}
}
func (self *Model) Average(field string, flt interface{}) (float64, error) {
if f, err := self.filterFromInterface(flt); err == nil {
f.IdentityField = self.collection.IdentityField
if agg := self.db.WithAggregator(self.collection); agg != nil {
return agg.Average(self.collection, field, f)
} else {
return 0, fmt.Errorf("backend %T does not support aggregation", self.db)
}
} else {
return 0, err
}
}
func (self *Model) GroupBy(fields []string, aggregates []filter.Aggregate, flt interface{}) (*dal.RecordSet, error) {
if f, err := self.filterFromInterface(flt); err == nil {
f.IdentityField = self.collection.IdentityField
for i, agg := range aggregates {
if agg.Field == `` {
aggregates[i].Field = f.IdentityField
}
}
if agg := self.db.WithAggregator(self.collection); agg != nil {
return agg.GroupBy(self.collection, fields, aggregates, f)
} else {
return nil, fmt.Errorf("backend %T does not support aggregation", self.db)
}
} else {
return nil, err
}
}
func (self *Model) filterFromInterface(in interface{}) (*filter.Filter, error) {
if f, ok := in.(filter.Filter); ok {
return &f, nil
} else if f, ok := in.(*filter.Filter); ok {
return f, nil
} else if fMap, ok := in.(map[string]interface{}); ok {
return filter.FromMap(fMap)
} else if fStr, ok := in.(string); ok {
return filter.Parse(fStr)
} else {
return filter.Null(), fmt.Errorf("Expected filter.Filter, map[string]interface{}, or string; got: %T", in)
}
}
func (self *Model) populateOutputParameter(f *filter.Filter, recordset *dal.RecordSet, into interface{}) error {
// for each resulting record...
for _, record := range recordset.Records {
if len(f.Fields) > 0 {
for k, _ := range record.Fields {
if !sliceutil.ContainsString(f.Fields, k) {
delete(record.Fields, k)
}
}
}
}
return recordset.PopulateFromRecords(into, self.collection)
}