-
Notifications
You must be signed in to change notification settings - Fork 202
/
connection.go
347 lines (298 loc) · 10.4 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
package pluggable
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"time"
"get.porter.sh/porter/pkg/config"
"get.porter.sh/porter/pkg/plugins"
"get.porter.sh/porter/pkg/tracing"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-plugin"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc"
)
// PluginConnection represents a connection to a plugin.
// It wraps the hashicorp/go-plugin library.
type PluginConnection struct {
// config is the porter configuration
config *config.Config
// key is the fully-qualified plugin key.
// For example, porter.storage.mongodb
key plugins.PluginKey
// pluginType is the type of plugin we want to connect to.
pluginType PluginTypeConfig
// client is the plugin framework client used to manage the connection to the
// plugin.
client *plugin.Client
// pluginCmd is command that manages the plugin process
pluginCmd *exec.Cmd
// pluginProtocol is a connection that supports the plugin protocol, such as
// plugins.SecretsProtocol or plugins.StorageProtocol
pluginProtocol interface{}
// debugger is the optionally attached go debugger command.
debugger *exec.Cmd
// cancelLogCtx is the cancellation function for our go-routine that collects the plugin logs
cancelLogCtx context.CancelFunc
// logsWaitGroup is used to ensure that any go routines spawned by the plugin connection
// complete when Close is called. Otherwise we can get into a race between us and when the logger is closed.
logsWaitGroup sync.WaitGroup
// logsWriter receives logs from the plugin's stdout.
logsWriter *io.PipeWriter
// logsReader reads the logs from the plugin.
logsReader *io.PipeReader
}
func NewPluginConnection(c *config.Config, pluginType PluginTypeConfig, pluginKey plugins.PluginKey) *PluginConnection {
return &PluginConnection{
config: c,
pluginType: pluginType,
key: pluginKey,
}
}
// String returns the plugin key name
func (c *PluginConnection) String() string {
return c.key.String()
}
// Start establishes a connection to the plugin.
// * pluginCfg is the resolved plugin configuration section from the Porter config file
func (c *PluginConnection) Start(ctx context.Context, pluginCfg io.Reader) error {
ctx, span := tracing.StartSpan(ctx,
attribute.String("plugin-key", c.key.String()))
defer span.EndSpan()
// Create a command to run the plugin
if c.key.IsInternal {
porterPath, err := c.config.GetPorterPath()
if err != nil {
return errors.Wrap(err, "could not determine the path to the porter pluginProtocol")
}
c.pluginCmd = c.config.NewCommand(ctx, porterPath, "plugin", "run", c.key.String())
} else {
pluginPath, err := c.config.GetPluginPath(c.key.Binary)
if err != nil {
return span.Error(err)
}
span.SetAttributes(attribute.String("plugin-path", pluginPath))
c.pluginCmd = c.config.NewCommand(ctx, pluginPath, "run", c.key.String())
}
span.SetAttributes(attribute.String("plugin-path", c.pluginCmd.Path))
// Configure the command
c.pluginCmd.Stdin = pluginCfg
// The plugin doesn't read the config file, we pass in relevant plugin config to them directly
// The remaining relevant config (e.g. logging, tracing) is set via env vars
// Config files require using the plugins to resolve templated values, so we resolve once in Porter
// and pass relevant resolved values to the plugins explicitly
pluginConfigVars := c.config.ExportRemoteConfigAsEnvironmentVariables()
c.pluginCmd.Env = append(c.pluginCmd.Env, pluginConfigVars...)
// Pipe logs from the plugin and capture them
c.setupLogCollector(ctx)
var errbuf bytes.Buffer
logger := hclog.New(&hclog.LoggerOptions{
Name: "porter",
Output: c.logsWriter,
Level: hclog.Debug,
JSONFormat: true,
})
c.client = plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: plugin.HandshakeConfig{
ProtocolVersion: c.pluginType.ProtocolVersion,
MagicCookieKey: plugins.HandshakeConfig.MagicCookieKey,
MagicCookieValue: plugins.HandshakeConfig.MagicCookieValue,
},
AllowedProtocols: []plugin.Protocol{
// All v1 plugins use gRPC
plugin.ProtocolGRPC,
// Enable net/rpc so that we can talk to older plugins from before v1
plugin.ProtocolNetRPC,
},
// Specify which plugin we want to connect to
Plugins: map[string]plugin.Plugin{
c.pluginType.Interface: c.pluginType.Plugin,
},
Cmd: c.pluginCmd,
Logger: logger,
Stderr: &errbuf,
StartTimeout: getPluginStartTimeout(),
// Configure gRPC to propagate the span context so the plugin's traces
// show up under the current span
GRPCDialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
},
})
// Start the plugin
span.Debug("Connecting to plugin", attribute.String("plugin-command", strings.Join(c.pluginCmd.Args, " ")))
rpcClient, err := c.client.Client(ctx)
if err != nil {
c.Close(ctx)
if stderr := errbuf.String(); stderr != "" {
err = errors.Wrap(errors.New(stderr), err.Error())
}
return span.Error(errors.Wrapf(err, "could not connect to the %s plugin", c.key))
}
err = c.setUpDebugger(ctx, c.client)
if err != nil {
c.Close(ctx)
return span.Error(errors.Wrap(err, "could not set up debugger for plugin"))
}
// Get a connection to the plugin
c.pluginProtocol, err = rpcClient.Dispense(c.key.Interface)
if err != nil {
c.Close(ctx)
return span.Error(errors.Wrapf(err, "could not connect to the %s plugin", c.key))
}
span.SetAttributes(attribute.Int("negotiated-protocol-version", c.client.NegotiatedVersion()))
return nil
}
// GetClient returns the raw connection to the pluginProtocol.
// This value should be cast to the plugin protocol interface,
// such as plugins.StorageProtocol or plugins.SecretsProtocol.
func (c *PluginConnection) GetClient() interface{} {
return c.pluginProtocol
}
// Close releases the resources held by the plugin connection. Blocks until the
// plugin process closes. Pass a context to control the graceful shutdown of the
// plugin.
func (c *PluginConnection) Close(ctx context.Context) error {
ctx, span := tracing.StartSpan(ctx,
attribute.String("plugin-key", c.key.String()))
defer span.EndSpan()
var bigErr *multierror.Error
if c.client != nil {
ctx, cancel := context.WithTimeout(ctx, getPluginStopTimeout())
defer cancel()
// Stop the plugin process
done := make(chan bool)
go func() {
// beware, this can block or deadlock
c.client.Kill(ctx)
done <- true
}()
select {
case <-done:
// plugin stopped as requested
break
case <-ctx.Done():
// Stop being nice, cleanup the plugin process without any waiting or blocking
span.Debugf("killing the plugin process: %s", ctx.Err())
c.client.HardKill()
}
// Stop processing logs from the plugin and wait for the log collection routine to complete
// This avoids a race where the log collector picks up a message but doesn't print it until
// after we close the logfile. This ensures that everything releated to the plugins is released
// when Close exits.
c.cancelLogCtx()
c.logsWriter.Close()
c.logsReader.Close()
c.logsWaitGroup.Wait()
c.client = nil
}
if c.debugger != nil {
if c.debugger.Process != nil {
err := c.debugger.Process.Kill()
bigErr = multierror.Append(bigErr, err)
}
c.debugger = nil
}
return bigErr.ErrorOrNil()
}
func getPluginStartTimeout() time.Duration {
timeoutS := os.Getenv(PluginStartTimeoutEnvVar)
if timeoutD, err := time.ParseDuration(timeoutS); err == nil {
return timeoutD
}
return PluginStartTimeoutDefault
}
func getPluginStopTimeout() time.Duration {
timeoutS := os.Getenv(PluginStopTimeoutEnvVar)
if timeoutD, err := time.ParseDuration(timeoutS); err == nil {
return timeoutD
}
return PluginStopTimeoutDefault
}
func (c *PluginConnection) setUpDebugger(ctx context.Context, client *plugin.Client) error {
log := tracing.LoggerFromContext(ctx)
debugContext := c.config.PlugInDebugContext
if !(len(debugContext.RunPlugInInDebugger) > 0 && strings.ToLower(c.key.String()) == strings.TrimSpace(strings.ToLower(debugContext.RunPlugInInDebugger))) {
return nil
}
if !isDelveInstalled() {
return log.Error(errors.New("Delve needs to be installed to debug plugins"))
}
listen := fmt.Sprintf("--listen=127.0.0.1:%s", debugContext.DebuggerPort)
if len(debugContext.PlugInWorkingDirectory) == 0 {
return log.Error(errors.New("Plugin Working Directory is required for debugging"))
}
wd := fmt.Sprintf("--wd=%s", debugContext.PlugInWorkingDirectory)
pid := client.ReattachConfig().Pid
c.debugger = exec.CommandContext(ctx, "dlv", "attach", strconv.Itoa(pid), "--headless=true", "--api-version=2", "--log", listen, "--accept-multiclient", wd)
c.debugger.Stderr = os.Stderr
c.debugger.Stdout = os.Stdout
err := c.debugger.Start()
if err != nil {
return log.Error(fmt.Errorf("Error starting dlv: %w", err))
}
return nil
}
// setupLogCollector kicks off a go routine to collect the plugin logs.
func (c *PluginConnection) setupLogCollector(ctx context.Context) {
c.logsReader, c.logsWriter = io.Pipe()
ctx, c.cancelLogCtx = context.WithCancel(ctx)
c.logsWaitGroup.Add(1)
go c.collectPluginLogs(ctx)
}
// Watch the pipe between porter and the plugin for messages, and log them in a span.
// We don't have a good way to associate them with the actual action in porter that triggered the plugin response
// The best way to get that information is to instrument the plugin itself. This is mainly a fallback mechanism to
// collect logs from an uninstrumented plugin.
func (c *PluginConnection) collectPluginLogs(ctx context.Context) {
defer c.logsWaitGroup.Done()
ctx, span := tracing.StartSpan(ctx, attribute.String("plugin-key", c.key.String()))
defer span.EndSpan()
r := bufio.NewReader(c.logsReader)
for {
select {
case <-ctx.Done():
return
default:
line, err := r.ReadString('\n')
if err != nil {
if err == io.EOF {
return
}
return
}
if line == "" {
return
}
var pluginLog map[string]interface{}
err = json.Unmarshal([]byte(line), &pluginLog)
if err != nil {
continue
}
msg, ok := pluginLog["@message"].(string)
if !ok {
continue
}
switch pluginLog["@level"] {
case hclog.Error:
span.Error(fmt.Errorf(msg))
case hclog.Warn:
span.Warn(msg)
case hclog.Info:
span.Infof(msg)
default:
span.Debug(msg)
}
}
}
}