/
cursor.go
113 lines (90 loc) · 2.87 KB
/
cursor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package mongodb
import (
"context"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// CreateCursor interface to create a unique cursor
type CreateCursor interface {
create(context.Context) (*mongo.Cursor, error)
}
// CreateFindCursor creates a find cursor
type CreateFindCursor struct {
collection *mongo.Collection
query interface{}
findOptions *options.FindOptions
}
func newFindCursor(collection *mongo.Collection, query interface{}, findOptions *options.FindOptions) CreateCursor {
return &CreateFindCursor{collection, query, findOptions}
}
func (createFindCursor *CreateFindCursor) create(ctx context.Context) (*mongo.Cursor, error) {
return createFindCursor.collection.Find(ctx, createFindCursor.query, createFindCursor.findOptions)
}
// CreateAggreateCursor creates a cursor used with aggregation
type CreateAggregateCursor struct {
collectiom *mongo.Collection
pipeline interface{}
options *options.AggregateOptions
}
func newAggregateCursor(collection *mongo.Collection, pipeline interface{}, options *options.AggregateOptions) CreateCursor {
return &CreateAggregateCursor{collection, pipeline, options}
}
func (createAggregateCursor *CreateAggregateCursor) create(ctx context.Context) (*mongo.Cursor, error) {
return createAggregateCursor.collectiom.Aggregate(ctx, createAggregateCursor.pipeline, createAggregateCursor.options)
}
type Cursor struct {
createCursor CreateCursor
cursor *mongo.Cursor
lastError error
}
func newCursor(createCursor CreateCursor) *Cursor {
return &Cursor{createCursor, nil, nil}
}
// Close closes a cursor
func (cursor *Cursor) Close(ctx context.Context) error {
return cursor.cursor.Close(ctx)
}
// All returns all the results for this cursor
func (cursor *Cursor) All(ctx context.Context, results interface{}) error {
mongoCursor, err := cursor.createCursor.create(ctx)
if err != nil {
cursor.lastError = err
return wrapMongoError(err)
}
err = mongoCursor.All(ctx, results)
return wrapMongoError(err)
}
// Next returns the next available record which must be available
func (cursor *Cursor) Next(ctx context.Context) bool {
if cursor.cursor == nil {
var err error
cursor.cursor, err = cursor.createCursor.create(ctx)
if err != nil {
cursor.lastError = err
return false
}
}
return cursor.cursor.Next(ctx)
}
// TryNext trys to retrieve the next available record
func (cursor *Cursor) TryNext(ctx context.Context) bool {
if cursor.cursor == nil {
var err error
cursor.cursor, err = cursor.createCursor.create(ctx)
if err != nil {
cursor.lastError = err
return false
}
}
return cursor.TryNext(ctx)
}
// Err returns the last error which occured for this cursor
func (cursor *Cursor) Err() error {
if cursor.cursor == nil {
return nil
}
if cursor.lastError != nil {
return wrapMongoError(cursor.lastError)
}
return wrapMongoError(cursor.cursor.Err())
}