-
Notifications
You must be signed in to change notification settings - Fork 17
/
system.go
335 lines (295 loc) · 11.2 KB
/
system.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
/*
* Copyright 2022 Aspect Build Systems, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package system
import (
"context"
"errors"
"fmt"
"math"
"os"
"reflect"
"strings"
"sync"
"time"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"sigs.k8s.io/yaml"
"aspect.build/cli/pkg/aspect/root/config"
"aspect.build/cli/pkg/aspect/root/flags"
rootFlags "aspect.build/cli/pkg/aspect/root/flags"
"aspect.build/cli/pkg/aspecterrors"
"aspect.build/cli/pkg/interceptors"
"aspect.build/cli/pkg/ioutils"
"aspect.build/cli/pkg/plugin/client"
"aspect.build/cli/pkg/plugin/sdk/v1alpha4/plugin"
"aspect.build/cli/pkg/plugin/system/bep"
"aspect.build/cli/pkg/plugin/system/besproxy"
)
// PluginSystem is the interface that defines all the methods for the aspect CLI
// plugin system intended to be used by the Core.
type PluginSystem interface {
Configure(streams ioutils.Streams, pluginsConfig interface{}) error
TearDown()
RegisterCustomCommands(cmd *cobra.Command, bazelStartupArgs []string) error
BESBackendInterceptor() interceptors.Interceptor
BESBackendSubscriberInterceptor() interceptors.Interceptor
BuildHooksInterceptor(streams ioutils.Streams) interceptors.Interceptor
TestHooksInterceptor(streams ioutils.Streams) interceptors.Interceptor
RunHooksInterceptor(streams ioutils.Streams) interceptors.Interceptor
}
type pluginSystem struct {
clientFactory client.Factory
plugins *PluginList
promptRunner ioutils.PromptRunner
}
// NewPluginSystem instantiates a default internal implementation of the
// PluginSystem interface.
func NewPluginSystem() PluginSystem {
return &pluginSystem{
clientFactory: client.NewFactory(),
plugins: &PluginList{},
promptRunner: ioutils.NewPromptRunner(),
}
}
// Configure configures the plugin system.
func (ps *pluginSystem) Configure(streams ioutils.Streams, pluginsConfig interface{}) error {
plugins, err := config.UnmarshalPluginConfig(pluginsConfig)
if err != nil {
return fmt.Errorf("failed to configure plugin system: %w", err)
}
g := new(errgroup.Group)
var mutex sync.Mutex
for _, p := range plugins {
p := p
g.Go(func() error {
aspectplugin, err := ps.clientFactory.New(p, streams)
if err != nil {
return err
}
if aspectplugin == nil {
return nil
}
properties, err := yaml.Marshal(p.Properties)
if err != nil {
return err
}
setupConfig := plugin.NewSetupConfig(properties)
if err := aspectplugin.Setup(setupConfig); err != nil {
return err
}
mutex.Lock()
ps.plugins.insert(aspectplugin)
mutex.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return fmt.Errorf("failed to configure plugin system: %w", err)
}
return nil
}
// RegisterCustomCommands processes custom commands provided by plugins and adds
// them as commands to the core whilst setting up callbacks for the those commands.
func (ps *pluginSystem) RegisterCustomCommands(cmd *cobra.Command, bazelStartupArgs []string) error {
internalCommands := make(map[string]struct{})
for _, command := range cmd.Commands() {
cmdName := strings.SplitN(command.Use, " ", 2)[0]
internalCommands[cmdName] = struct{}{}
}
for node := ps.plugins.head; node != nil; node = node.next {
result, err := node.payload.Plugin.CustomCommands()
if err != nil {
return fmt.Errorf("failed to register custom commands: %w", err)
}
for _, command := range result {
cmdName := strings.SplitN(command.Use, " ", 2)[0]
if _, ok := internalCommands[cmdName]; ok {
return fmt.Errorf("failed to register custom commands: plugin implements a command with a protected name: %s", command.Use)
}
callback := node.payload.CustomCommandExecutor
cmd.AddCommand(&cobra.Command{
Use: command.Use,
Short: command.ShortDesc,
Long: command.LongDesc,
GroupID: "plugin",
RunE: interceptors.Run(
[]interceptors.Interceptor{},
func(ctx context.Context, cmd *cobra.Command, args []string) (exitErr error) {
return callback.ExecuteCustomCommand(cmdName, ctx, args, bazelStartupArgs)
},
),
})
}
}
return nil
}
// TearDown tears down the plugin system, making all the necessary actions to
// clean up the system.
func (ps *pluginSystem) TearDown() {
for node := ps.plugins.head; node != nil; node = node.next {
node.payload.Kill()
}
}
// BESBackendSubscriberInterceptor always starts a BES backend and injects it into the context.
// Use BESBackendInterceptor to only create the grpc service when there is a known subscriber.
func (ps *pluginSystem) BESBackendSubscriberInterceptor() interceptors.Interceptor {
return func(ctx context.Context, cmd *cobra.Command, args []string, next interceptors.RunEContextFn) error {
return ps.createBesBackend(ctx, cmd, args, next)
}
}
// BESBackendInterceptor sometimes starts a BES backend and injects it into the context.
// It short-circuits and does nothing in cases where we think there is no subscriber.
// Use BESBackendSubscriberInterceptor if you always know there will be a subscriber.
// It gracefully stops the server after the main command is executed.
func (ps *pluginSystem) BESBackendInterceptor() interceptors.Interceptor {
return func(ctx context.Context, cmd *cobra.Command, args []string, next interceptors.RunEContextFn) error {
// Check if --aspect:force_bes_backend is set. This is primarily used for testing.
forceBesBackend, err := cmd.Root().Flags().GetBool(rootFlags.AspectForceBesBackendFlagName)
if err != nil {
return fmt.Errorf("failed to get value of --aspect:force_bes_backend: %w", err)
}
// If there are no plugins configured and --aspect:force_bes_backend is not set then short
// circuit here since we don't have any need to create a grpc server to consume the build event
// stream.
if ps.plugins.head == nil && !forceBesBackend {
return next(ctx, cmd, args)
}
if forceBesBackend {
fmt.Fprintf(os.Stderr, "Forcing creation of BES backend\n")
}
return ps.createBesBackend(ctx, cmd, args, next)
}
}
func (ps *pluginSystem) createBesBackend(ctx context.Context, cmd *cobra.Command, args []string, next interceptors.RunEContextFn) error {
// Create the BES backend
besBackend := bep.NewBESBackend(ctx)
for node := ps.plugins.head; node != nil; node = node.next {
besBackend.RegisterSubscriber(node.payload.BEPEventCallback)
}
opts := []grpc.ServerOption{
// Bazel doesn't seem to set a maximum send message size, therefore
// we match the default send message for Go, which should be enough
// for all messages sent by Bazel (roughly 2.14GB).
grpc.MaxRecvMsgSize(math.MaxInt32),
// Here we are just being explicit with the default value since we
// also set the receive message size.
grpc.MaxSendMsgSize(math.MaxInt32),
}
// Check if user has specified --bes_backend
// https://bazel.build/reference/command-line-reference#flag--bes_backend
userBesBackend, _ := cmd.Flags().GetString("bes_backend")
// Configure a BES proxy if `--bes_backend` is set by the user
if userBesBackend != "" {
// Check if user has specified any --remote_header values
// https://bazel.build/reference/command-line-reference#flag--remote_header
userRemoteHeaders := make(map[string]string)
userRemoteHeader, ok := cmd.Flag("remote_header").Value.(*flags.MultiString)
if !ok {
return fmt.Errorf("expected --remote_header flag to be registered with cobra as a MultiString")
}
for _, header := range userRemoteHeader.Get() {
s := strings.Split(header, "=")
if len(s) != 2 {
return fmt.Errorf("invalid ---remote_header flag value '%v'; value must be in the form of a 'name=value' assignment", header)
}
userRemoteHeaders[s[0]] = s[1]
}
besProxy := besproxy.NewBesProxy(userBesBackend, userRemoteHeaders)
if err := besProxy.Connect(); err != nil {
fmt.Fprintf(os.Stderr, "Failed to connect to build even stream backend %s: %s", userBesBackend, err.Error())
} else {
fmt.Fprintf(os.Stderr, "Forwarding build even stream to %v\n", userBesBackend)
besBackend.RegisterBesProxy(besProxy)
}
}
// Setup the BES backend grpc server
if err := besBackend.Setup(opts...); err != nil {
return fmt.Errorf("failed to run BES backend: %w", err)
}
// Start the BES backend
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := besBackend.ServeWait(ctx); err != nil {
return fmt.Errorf("failed to run BES backend: %w", err)
}
defer besBackend.GracefulStop()
ctx = bep.InjectBESBackend(ctx, besBackend)
return next(ctx, cmd, args)
}
// BuildHooksInterceptor returns an interceptor that runs the pre and post-build
// hooks from all plugins.
func (ps *pluginSystem) BuildHooksInterceptor(streams ioutils.Streams) interceptors.Interceptor {
return ps.commandHooksInterceptor("PostBuildHook", streams)
}
// TestHooksInterceptor returns an interceptor that runs the pre and post-test
// hooks from all plugins.
func (ps *pluginSystem) TestHooksInterceptor(streams ioutils.Streams) interceptors.Interceptor {
return ps.commandHooksInterceptor("PostTestHook", streams)
}
// RunHooksInterceptor returns an interceptor that runs the pre and post-run
// hooks from all plugins.
func (ps *pluginSystem) RunHooksInterceptor(streams ioutils.Streams) interceptors.Interceptor {
return ps.commandHooksInterceptor("PostRunHook", streams)
}
func (ps *pluginSystem) commandHooksInterceptor(methodName string, streams ioutils.Streams) interceptors.Interceptor {
return func(ctx context.Context, cmd *cobra.Command, args []string, next interceptors.RunEContextFn) (exitErr error) {
isInteractiveMode, err := cmd.Root().PersistentFlags().GetBool(rootFlags.AspectInteractiveFlagName)
if err != nil {
return fmt.Errorf("failed to run 'aspect %s' command: %w", cmd.CalledAs(), err)
}
defer func() {
hasPluginErrors := false
for node := ps.plugins.head; node != nil; node = node.next {
params := []reflect.Value{
reflect.ValueOf(isInteractiveMode),
reflect.ValueOf(ps.promptRunner),
}
if err := reflect.ValueOf(node.payload).MethodByName(methodName).Call(params)[0].Interface(); err != nil {
fmt.Fprintf(streams.Stderr, "Error: failed to run 'aspect %s' command: %v\n", cmd.CalledAs(), err)
hasPluginErrors = true
}
}
if hasPluginErrors {
var err *aspecterrors.ExitError
if errors.As(exitErr, &err) {
err.ExitCode = 1
}
}
}()
return next(ctx, cmd, args)
}
}
// PluginList implements a simple linked list for the parsed plugins from the
// plugins file.
type PluginList struct {
head *PluginNode
tail *PluginNode
}
func (l *PluginList) insert(p *client.PluginInstance) {
node := &PluginNode{payload: p}
if l.head == nil {
l.head = node
} else {
l.tail.next = node
}
l.tail = node
}
// PluginNode is a node in the PluginList linked list.
type PluginNode struct {
next *PluginNode
payload *client.PluginInstance
}