-
Notifications
You must be signed in to change notification settings - Fork 3
/
persistor.go
142 lines (121 loc) · 3.38 KB
/
persistor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package keelmongo
import (
"context"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
"go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo"
)
// Persistor exported to used also for embedding into other types in foreign packages.
type (
Persistor struct {
client *mongo.Client
db *mongo.Database
}
Options struct {
OtelEnabled bool
OtelServiceName string
OtelOptions []otelmongo.Option
ClientOptions *options.ClientOptions
DatabaseOptions *options.DatabaseOptions
}
Option func(o *Options)
)
func WithOtelEnabled(v bool) Option {
return func(o *Options) {
o.OtelEnabled = v
}
}
func WithOtelServiceName(v string) Option {
return func(o *Options) {
o.OtelServiceName = v
}
}
func WithOtelOptions(v ...otelmongo.Option) Option {
return func(o *Options) {
o.OtelOptions = append(o.OtelOptions, v...)
}
}
func WithClientOptions(v *options.ClientOptions) Option {
return func(o *Options) {
o.ClientOptions = options.MergeClientOptions(o.ClientOptions, v)
}
}
func WithDatabaseOptions(v *options.DatabaseOptions) Option {
return func(o *Options) {
o.DatabaseOptions = options.MergeDatabaseOptions(o.DatabaseOptions, v)
}
}
func DefaultOptions() Options {
return Options{
OtelEnabled: true,
OtelServiceName: "mongo",
OtelOptions: nil,
ClientOptions: options.Client().
SetReadConcern(readconcern.Majority()).
SetReadPreference(readpref.SecondaryPreferred()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority())),
DatabaseOptions: nil,
}
}
// New ...
func New(ctx context.Context, uri string, opts ...Option) (*Persistor, error) {
o := DefaultOptions()
// TODO remove once Database attribute is being exposed
cs, err := connstring.ParseAndValidate(uri)
if err != nil {
return nil, errors.Wrap(err, "failed to parse uri")
} else if cs.Database == "" {
return nil, errors.Errorf("missing database name in uri: %s", uri)
}
// apply uri
o.ClientOptions.ApplyURI(uri)
// apply options
for _, opt := range opts {
opt(&o)
}
// setup otel
if o.OtelEnabled {
o.ClientOptions.SetMonitor(otelmongo.NewMonitor(o.OtelServiceName, o.OtelOptions...))
}
// create connection
client, err := mongo.Connect(ctx, o.ClientOptions)
if err != nil {
return nil, errors.Wrap(err, "failed to connect")
}
// test connection
if err := client.Ping(ctx, nil); err != nil {
return nil, err
}
return &Persistor{
client: client,
db: client.Database(cs.Database),
}, nil
}
func (p Persistor) Ping(ctx context.Context) error {
return p.client.Ping(ctx, nil)
}
func (p Persistor) Collection(name string, opts ...CollectionOption) (*Collection, error) {
return NewCollection(p.db, name, opts...)
}
// HasCollection checks if the given collection exists
func (p Persistor) HasCollection(ctx context.Context, name string) (bool, error) {
names, err := p.db.ListCollectionNames(ctx, bson.D{})
if err != nil {
return false, err
}
for i := range names {
if names[i] == name {
return true, nil
}
}
return false, nil
}
func (p Persistor) Close(ctx context.Context) error {
return p.client.Disconnect(ctx)
}