/
plugin_adapter.go
229 lines (191 loc) · 6.76 KB
/
plugin_adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
package storage
import (
"context"
"encoding/json"
"fmt"
"io"
"strings"
"get.porter.sh/porter/pkg/storage/plugins"
"go.mongodb.org/mongo-driver/bson"
)
var _ Store = PluginAdapter{}
// PluginAdapter converts between the low-level plugin.StorageProtocol which
// operates on bson documents, and the document types stored by Porter which are
// marshaled using json.
//
// Specifically it handles converting from bson.Raw to the type specified by
// ResultType on plugin.ResultOptions so that you can just cast the result to
// the specified type safely.
type PluginAdapter struct {
plugin plugins.StorageProtocol
}
// NewPluginAdapter wraps the specified storage plugin.
func NewPluginAdapter(plugin plugins.StorageProtocol) PluginAdapter {
return PluginAdapter{
plugin: plugin,
}
}
func (a PluginAdapter) Close() error {
if closer, ok := a.plugin.(io.Closer); ok {
return closer.Close()
}
return nil
}
func (a PluginAdapter) Aggregate(ctx context.Context, collection string, opts AggregateOptions, out interface{}) error {
rawResults, err := a.plugin.Aggregate(ctx, opts.ToPluginOptions(collection))
if err != nil {
return err
}
return a.unmarshalSlice(rawResults, out)
}
func (a PluginAdapter) EnsureIndex(ctx context.Context, opts EnsureIndexOptions) error {
return a.plugin.EnsureIndex(ctx, opts.ToPluginOptions())
}
func (a PluginAdapter) Count(ctx context.Context, collection string, opts CountOptions) (int64, error) {
return a.plugin.Count(ctx, opts.ToPluginOptions(collection))
}
func (a PluginAdapter) Find(ctx context.Context, collection string, opts FindOptions, out interface{}) error {
rawResults, err := a.plugin.Find(ctx, opts.ToPluginOptions(collection))
if err != nil {
return a.handleError(err, collection)
}
return a.unmarshalSlice(rawResults, out)
}
// FindOne queries a collection and returns the first result, returning
// ErrNotFound when no results are returned.
func (a PluginAdapter) FindOne(ctx context.Context, collection string, opts FindOptions, out interface{}) error {
rawResults, err := a.plugin.Find(ctx, opts.ToPluginOptions(collection))
if err != nil {
return a.handleError(err, collection)
}
if len(rawResults) == 0 {
notFoundErr := ErrNotFound{Collection: collection}
if name, ok := opts.Filter["name"]; ok {
notFoundErr.Item = fmt.Sprint(name)
}
return notFoundErr
}
err = a.unmarshal(rawResults[0], out)
if err != nil {
return fmt.Errorf("could not unmarshal document of type %T: %w", out, err)
}
return nil
}
// unmarshalSlice unpacks a slice of bson documents onto the specified type slice (out)
// by going through a temporary representation of the document as json so that we
// use the json marshal logic defined on the struct, e.g. if fields have different
// names defined with json tags.
func (a PluginAdapter) unmarshalSlice(bsonResults []bson.Raw, out interface{}) error {
// We want to go from []bson.Raw -> []bson.M -> json -> out (typed struct)
// Populate a single document with all the results in an intermediate
// format of map[string]interface
tmpResults := make([]bson.M, len(bsonResults))
for i, bsonResult := range bsonResults {
var result bson.M
err := bson.Unmarshal(bsonResult, &result)
if err != nil {
return err
}
tmpResults[i] = result
}
// Marshal the consolidated document to json
data, err := json.Marshal(tmpResults)
if err != nil {
return fmt.Errorf("error marshaling results into a single result document: %w", err)
}
// Unmarshal the consolidated results onto our destination output
err = json.Unmarshal(data, out)
if err != nil {
return fmt.Errorf("could not unmarshal slice onto type %T: %w", out, err)
}
return nil
}
// unmarshalSlice a bson document onto the specified typed output
// by going through a temporary representation of the document as json so that we
// use the json marshal logic defined on the struct, e.g. if fields have different
// names defined with json tags.
func (a PluginAdapter) unmarshal(bsonResult bson.Raw, out interface{}) error {
// We want to go from bson.Raw -> bson.M -> json -> out (typed struct)
var tmpResult bson.M
err := bson.Unmarshal(bsonResult, &tmpResult)
if err != nil {
return err
}
// Marshal the consolidated document to json
data, err := json.Marshal(tmpResult)
if err != nil {
return fmt.Errorf("error marshaling results into a single result document: %w", err)
}
// Unmarshal the consolidated results onto our destination output
err = json.Unmarshal(data, out)
if err != nil {
return fmt.Errorf("could not unmarshal slice onto type %T: %w", out, err)
}
return nil
}
func (a PluginAdapter) Get(ctx context.Context, collection string, opts GetOptions, out interface{}) error {
findOpts := opts.ToFindOptions()
err := a.FindOne(ctx, collection, findOpts, out)
return a.handleError(err, collection)
}
func (a PluginAdapter) Insert(ctx context.Context, collection string, opts InsertOptions) error {
pluginOpts, err := opts.ToPluginOptions(collection)
if err != nil {
return err
}
err = a.plugin.Insert(ctx, pluginOpts)
return a.handleError(err, collection)
}
func (a PluginAdapter) Patch(ctx context.Context, collection string, opts PatchOptions) error {
err := a.plugin.Patch(ctx, opts.ToPluginOptions(collection))
return a.handleError(err, collection)
}
func (a PluginAdapter) Remove(ctx context.Context, collection string, opts RemoveOptions) error {
err := a.plugin.Remove(ctx, opts.ToPluginOptions(collection))
return a.handleError(err, collection)
}
func (a PluginAdapter) Update(ctx context.Context, collection string, opts UpdateOptions) error {
pluginOpts, err := opts.ToPluginOptions(collection)
if err != nil {
return err
}
err = a.plugin.Update(ctx, pluginOpts)
return a.handleError(err, collection)
}
// handleError unwraps errors returned from a plugin (which due to the round trip
// through the plugin framework means the original typed error may not be the right type anymore
// and turns it back into a well known error such as NotFound.
func (a PluginAdapter) handleError(err error, collection string) error {
if err != nil && strings.Contains(strings.ToLower(err.Error()), "not found") {
return ErrNotFound{Collection: collection}
}
return err
}
// ErrNotFound indicates that the requested document was not found.
// You can test for this error using errors.Is(err, storage.ErrNotFound{})
type ErrNotFound struct {
Collection string
Item string
}
func (e ErrNotFound) Error() string {
var docType string
switch e.Collection {
case "installations":
docType = "Installation"
case "runs":
docType = "Run"
case "results":
docType = "Result"
case "output":
docType = "Output"
case "credentials", "parameters":
if len(e.Item) > 0 {
docType = e.Item
}
}
return fmt.Sprintf("%s not found", docType)
}
func (e ErrNotFound) Is(err error) bool {
_, ok := err.(ErrNotFound)
return ok
}