forked from documize/glick
/
plugin.go
299 lines (272 loc) · 7.63 KB
/
plugin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
package glick
import (
"errors"
"fmt"
"os/exec"
"reflect"
"sort"
"sync"
"time"
"golang.org/x/net/context"
)
var (
// ErrNilLib means the library pointer is nil.
ErrNilLib = errors.New("nil library")
// ErrNilAPI means an API value is nil.
ErrNilAPI = errors.New("nil api")
)
// errDupAPI means that a duplicate name has been given for an API.
func errDupAPI(name string) error {
return errors.New("duplicate api: " + name)
}
// errNoAPI means that the name of the API was not found in the map.
func errNoAPI(name string) error {
return errors.New("unknown api: " + name)
}
// errNoPlug means that no handler function was found for the plugin.
func errNoPlug(name string) error {
return errors.New("no plugin found: " + name)
}
// Plugin type provides the type of the every plugin function,
// it has the same signature as Endpoint in "github.com/go-kit/kit".
type Plugin func(ctx context.Context, in interface{}) (out interface{}, err error)
// ProtoPlugOut provides a prototype for the output of a Plugger
type ProtoPlugOut func() interface{}
type plugOut struct {
out interface{}
err error
}
// Overloader allows the standard system settings for an API
// to be overloaded, depending on the context passed in.
type Overloader func(ctx context.Context, api, action string, handler Plugin) (context.Context, Plugin, error)
type plugkey struct {
api, action string // the strings to choose a plugin
}
type plugval struct {
plug Plugin
cfg *Config
}
type plugmap map[plugkey]plugval
type apidef struct {
ppi interface{} // a prototype of the input type
ppo ProtoPlugOut // a function returning a prototype of the output type
ppiT, ppoT reflect.Type // a cached version of reflect.TypeOf the input and output types
timeout time.Duration // how long before we abort
}
type apimap map[string]apidef
type cfgmap map[string]Configurator
// Library holds the registered API and plugin database.
type Library struct {
pim plugmap // a map of known plugins
apim apimap // a map of known APIs
cfgm cfgmap // a map of know configuration handlers
mtx sync.RWMutex // mutex to protect map access
ovfn Overloader // the function to call to overload which plugin to use at runtime
subprocs []*exec.Cmd // a slice of sub-processes created
}
// New returns an initialized Library.
func New(ov Overloader) (*Library, error) {
lib := &Library{
apim: make(apimap),
pim: make(plugmap),
cfgm: make(cfgmap),
ovfn: ov,
subprocs: make([]*exec.Cmd, 0),
}
if err := ConfigCmd(lib); err != nil {
return nil, err
}
if err := ConfigGetURL(lib); err != nil {
return nil, err
}
if err := ConfigRPC(lib); err != nil {
return nil, err
}
return lib, nil
}
// RegAPI allows registration of a named API.
// The in/out prototype defines the type that must be passed in and out.
// The timeout gives the maximum time that a Plugin using this API may take to execute.
func (l *Library) RegAPI(api string, inPrototype interface{}, outPlugProto ProtoPlugOut, timeout time.Duration) error {
if l == nil {
return ErrNilLib
}
l.mtx.Lock()
defer l.mtx.Unlock()
if inPrototype == nil || outPlugProto == nil || outPlugProto() == nil {
return ErrNilAPI
}
if _, found := l.apim[api]; found {
return errDupAPI(api)
}
l.apim[api] = apidef{inPrototype, outPlugProto,
reflect.TypeOf(inPrototype), reflect.TypeOf(outPlugProto()),
timeout}
return nil
}
// RegPlugin registers a Plugger to use for this action on an api.
// Duplicate actions simply overload what is there.
func (l *Library) RegPlugin(api, action string, handler Plugin, cfg *Config) error {
if l == nil {
return ErrNilLib
}
l.mtx.Lock()
defer l.mtx.Unlock()
if _, hasAPI := l.apim[api]; !hasAPI {
return errNoAPI(api)
}
if handler == nil {
return errNoPlug("nil handler for api " + api)
}
l.pim[plugkey{api, action}] = plugval{handler, cfg}
return nil
}
func (l *Library) def(ctx context.Context, api, action string, in interface{}) (apidef, error) {
if in == nil {
return apidef{}, ErrNilAPI
}
def, ok := l.apim[api]
if ok {
if !reflect.TypeOf(in).AssignableTo(def.ppiT) {
return apidef{}, fmt.Errorf("bad api types - in: got %T want %T",
in, def.ppi)
}
} else {
return apidef{}, errNoAPI(api)
}
return def, nil
}
// Run a plugin for a given action on an API, passing data in/out.
// The library overloader function may decide from the context that a non-standard
// action should be run.
func (l *Library) Run(ctx context.Context, api, action string, in interface{}) (out interface{}, err error) {
if l == nil {
return nil, ErrNilLib
}
l.mtx.RLock()
defer l.mtx.RUnlock()
def, err := l.def(ctx, api, action, in)
if err != nil {
return nil, err
}
if ctx == nil || ctx == context.TODO() {
ctx = context.Background()
}
var handler Plugin
pv, found := l.pim[plugkey{api, action}]
if found {
handler = pv.plug
}
// should this run call and overload function?
if l.ovfn != nil {
var ovHandler Plugin
var ovErr error
ctx, ovHandler, ovErr = l.ovfn(ctx, api, action, handler)
if ovErr != nil {
return nil, ovErr
}
if ovHandler != nil {
handler = ovHandler
found = true
}
}
return l.run(ctx, api, found, handler, def, in)
}
func (l *Library) run(ctx context.Context, api string, found bool, handler Plugin, def apidef, in interface{}) (out interface{}, err error) {
if !found || handler == nil {
return nil, errNoPlug("api " + api)
}
reply := make(chan plugOut)
ctxWT, cancel := context.WithTimeout(ctx, l.apim[api].timeout)
go func() {
defer cancel()
var plo plugOut
plo.out, plo.err = handler(ctxWT, in)
reply <- plo
}()
select {
case <-ctxWT.Done():
return nil, ctxWT.Err()
case plo := <-reply:
if plo.err == nil && (plo.out == nil ||
!def.ppoT.AssignableTo(reflect.TypeOf(plo.out))) {
return nil, fmt.Errorf("bad api type - out: got %T want %T",
plo.out, def.ppo())
}
return plo.out, plo.err
}
}
// ProtoPlugOut provides the way to return a function to create the output for a plugin.
func (l *Library) ProtoPlugOut(api string) (ppo ProtoPlugOut, err error) {
if l == nil {
return nil, ErrNilLib
}
l.mtx.RLock()
defer l.mtx.RUnlock()
if v, ok := l.apim[api]; !ok {
err = errors.New("could not find api: " + api)
} else {
ppo = v.ppo
}
return
}
// Actions provides the names of all registered plugin actions for an api.
func (l *Library) Actions(api string) ([]string, error) {
if l == nil {
return nil, ErrNilLib
}
l.mtx.RLock()
defer l.mtx.RUnlock()
if _, ok := l.apim[api]; !ok {
return nil, errors.New("could not find api: " + api)
}
var ret []string
for pv := range l.pim {
if pv.api == api {
ret = append(ret, pv.action)
}
}
sort.Strings(ret)
return ret, nil
}
// Config returns a pointer to the JSON Config struct for a given API and Action,
// or nil if no Config exists.
func (l *Library) Config(api, action string) *Config {
if l == nil {
return nil
}
l.mtx.RLock()
defer l.mtx.RUnlock()
return l.pim[plugkey{api, action}].cfg
}
// Token is a convenience function that returns the Token string for a given API and Action,
// if one exists.
func (l *Library) Token(api, action string) string {
cfg := l.Config(api, action)
if cfg == nil {
return ""
}
return cfg.Token
}
// KillSubProcs created by StartLocalRPCservers() (or eventually maybe elsewhere).
func (l *Library) KillSubProcs() error {
if l == nil {
return errors.New("pointer to Library is nil")
}
l.mtx.RLock()
defer l.mtx.RUnlock()
errStr := ""
for _, s := range l.subprocs {
var err error
err = s.Process.Kill()
if err != nil {
errStr += " : " + err.Error()
} else {
time.Sleep(time.Second)
}
}
if errStr == "" {
return nil
}
return errors.New(errStr)
}