/
model.go
297 lines (263 loc) · 8.19 KB
/
model.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
package orm
import (
"context"
"encoding/json"
"reflect"
"sync"
"go.bryk.io/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// Model instances serve as a "wrapper" to a MongoDB collection and
// provide an easy-to-use API on top of it to greatly simplify common
// tasks.
type Model struct {
// MongoDB's collection backing the model.
Collection *mongo.Collection
// Name of the model. Used also as collection name.
name string
// Transaction in-progress
tx *Transaction
// State lock
mu sync.Mutex
}
// WithTransaction sets the active transaction for the model. All CRUD
// operations are bound to the active transaction when executed. The
// active transaction will be removed automatically once it has been
// aborted or committed. This method will return an error if another
// transaction is currently active.
func (m *Model) WithTransaction(tx *Transaction) error {
m.mu.Lock()
if m.tx != nil {
m.mu.Unlock()
return errors.New("transaction already in progress")
}
m.tx = tx
m.mu.Unlock()
go func() {
<-m.tx.Done()
m.mu.Lock()
m.tx = nil
m.mu.Unlock()
}()
return nil
}
// Estimate executes a count command and returns an estimate of
// the number of documents in the collection using available metadata.
// This operation trades-off accuracy for speed.
// For more information:
// https://docs.mongodb.com/manual/reference/method/db.collection.estimatedDocumentCount/
func (m *Model) Estimate() (int64, error) {
return m.Collection.EstimatedDocumentCount(context.Background())
}
// Count returns the number of documents in the collection that satisfy
// the provided filter. An empty filter will count all the documents in
// the collection by performing a full scan. This operation trades-off
// speed for accuracy.
// For more information:
// https://docs.mongodb.com/manual/reference/method/db.collection.countDocuments/
func (m *Model) Count(filter map[string]interface{}) (int64, error) {
f, err := doc(filter)
if err != nil {
return 0, err
}
return m.Collection.CountDocuments(context.Background(), f)
}
// Distinct allows to find the unique values for a specified field in the
// collection. If no 'filter' is specified a full scan of the collection
// is performed. [Command documentation].
//
// var list []string
// err := mod.Distinct("user_type", Filter(), &list)
//
// [Command documentation]: https://docs.mongodb.com/manual/reference/command/distinct/
func (m *Model) Distinct(field string, filter map[string]interface{}, result interface{}) error {
f, err := doc(filter)
if err != nil {
return err
}
val, err := m.Collection.Distinct(context.Background(), field, f)
if err != nil {
return err
}
data, _ := json.Marshal(val)
return json.Unmarshal(data, result)
}
// Insert the item in the model's underlying collection.
func (m *Model) Insert(item interface{}) (string, error) {
res, err := m.Collection.InsertOne(m.ctx(), item)
if err != nil {
return "", err
}
id, ok := res.InsertedID.(primitive.ObjectID)
if !ok {
return "", errors.New("invalid id")
}
return id.Hex(), nil
}
// Batch executes an insert command to save multiple documents into the
// collection and return the number of successfully stored items. The
// provided item must be a slice.
func (m *Model) Batch(item interface{}, opts ...*options.InsertManyOptions) (int64, error) {
// Verify input type
if err := checkType(item, reflect.Slice, "slice"); err != nil {
return 0, err
}
// Get source slice of items
src := reflect.ValueOf(item)
documents := make([]interface{}, src.Len())
for i := 0; i < len(documents); i++ {
documents[i] = src.Index(i).Interface()
}
// Insert in batch
res, err := m.Collection.InsertMany(m.ctx(), documents, opts...)
if err != nil {
return 0, err
}
return int64(len(res.InsertedIDs)), nil
}
// Update will look for the first document that satisfies the 'filter' value
// and apply the 'patch' to it. If no such document currently exists, it will
// be automatically generated if 'upsert' is set to true.
func (m *Model) Update(filter map[string]interface{}, patch interface{}, upsert bool) error {
// Get filter
f, err := doc(filter)
if err != nil {
return err
}
// Run update operation
_, err = m.Collection.UpdateOne(m.ctx(), f, bson.M{"$set": patch}, &options.UpdateOptions{
Upsert: &upsert,
})
return err
}
// UpdateAll will try to apply the provided 'patch' to all the documents
// satisfying the specified 'filter' and return the number of documents modified
// by the operation.
func (m *Model) UpdateAll(filter map[string]interface{}, patch interface{}) (int64, error) {
// Get filter
f, err := doc(filter)
if err != nil {
return 0, err
}
// Run update operation
res, err := m.Collection.UpdateMany(m.ctx(), f, bson.M{"$set": patch})
if err != nil {
return 0, err
}
return res.ModifiedCount, err
}
// Delete will look for the first document that satisfies the provided 'filter'
// value and permanently remove it. The operation does not fail if no document
// satisfy the filter value.
func (m *Model) Delete(filter map[string]interface{}) error {
f, err := doc(filter)
if err != nil {
return err
}
_, err = m.Collection.DeleteOne(m.ctx(), f)
return err
}
// DeleteAll will remove any document that satisfies the provided 'filter'
// value and return the number of documents deleted by the operation.
func (m *Model) DeleteAll(filter map[string]interface{}) (int64, error) {
f, err := doc(filter)
if err != nil {
return 0, err
}
res, err := m.Collection.DeleteMany(m.ctx(), f)
if err != nil {
return 0, err
}
return res.DeletedCount, err
}
// FindByID looks for a given document based on its '_id' field.
// The provided 'id' value must be a MongoDB objectID hex string.
// The returned document is automatically decoded into 'result',
// that must be a pointer to a given struct.
func (m *Model) FindByID(id string, result interface{}) error {
// Verify target type
if err := checkType(result, reflect.Ptr, "pointer"); err != nil {
return err
}
// Parse id
oid, err := ParseID(id)
if err != nil {
return err
}
// Run query
sr := m.Collection.FindOne(m.ctx(), bson.M{"_id": oid})
if err := sr.Err(); err != nil {
return err
}
// Decode result
return sr.Decode(result)
}
// First looks for the first document in the collection that satisfies the
// specified 'filter'. The returned document is automatically decoded into
// 'result', that must be a pointer to a given struct.
func (m *Model) First(filter map[string]interface{}, result interface{}, opts ...*options.FindOneOptions) error {
// Verify target type
if err := checkType(result, reflect.Ptr, "pointer"); err != nil {
return err
}
// Get filter
f, err := doc(filter)
if err != nil {
return err
}
// Run query
sr := m.Collection.FindOne(m.ctx(), f, opts...)
if err := sr.Err(); err != nil {
return err
}
// Decode result
return sr.Decode(result)
}
// Find all documents in the collection that satisfy the provided 'filter'.
// The returned documents will be automatically decoded into 'result', that
// must be a pointer to a slice.
func (m *Model) Find(filter map[string]interface{}, result interface{}, opts ...*options.FindOptions) error {
// Verify target type
if err := checkType(result, reflect.Ptr, "pointer to a slice"); err != nil {
return err
}
// Get filter
f, err := doc(filter)
if err != nil {
return err
}
// Get cursor
mc, err := m.Collection.Find(m.ctx(), f, opts...)
if err != nil {
return err
}
// Decode directly into the target slice
return mc.All(m.ctx(), result)
}
// Subscribe will set up and return a stream instance that can used to
// receive change events based on the parameters provided.
func (m *Model) Subscribe(pipeline mongo.Pipeline, opts *options.ChangeStreamOptions) (*Stream, error) {
cs, err := m.Collection.Watch(context.Background(), pipeline, opts)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
sub := &Stream{
cs: cs,
ctx: ctx,
halt: cancel,
done: make(chan struct{}),
event: make(chan ChangeEvent),
}
go sub.loop()
return sub, nil
}
func (m *Model) ctx() context.Context {
if m.tx != nil {
return m.tx.ctx
}
return context.Background()
}