/
provider_runner.go
233 lines (208 loc) · 6.63 KB
/
provider_runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
// SPDX-FileCopyrightText: 2023 The Crossplane Authors <https://crossplane.io>
//
// SPDX-License-Identifier: Apache-2.0
package terraform
import (
"bufio"
"fmt"
"os"
"regexp"
"sync"
"time"
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/pkg/errors"
"k8s.io/utils/clock"
"k8s.io/utils/exec"
)
const (
// error messages
errFmtTimeout = "timed out after %v while waiting for the reattach configuration string"
// an example value would be: '{"registry.terraform.io/hashicorp/aws": {"Protocol": "grpc", "ProtocolVersion":5, "Pid":... "Addr":{"Network": "unix","String": "..."}}}'
fmtReattachEnv = `{"%s":{"Protocol":"grpc","ProtocolVersion":%d,"Pid":%d,"Test": true,"Addr":{"Network": "unix","String": "%s"}}}`
fmtSetEnv = "%s=%s"
envMagicCookie = "TF_PLUGIN_MAGIC_COOKIE"
// Terraform provider plugin expects this magic cookie in its environment
// (as the value of key TF_PLUGIN_MAGIC_COOKIE):
// https://github.com/hashicorp/terraform/blob/d35bc0531255b496beb5d932f185cbcdb2d61a99/internal/plugin/serve.go#L33
valMagicCookie = "d602bf8f470bc67ca7faa0386276bbdd4330efaf76d1a219cb4d6991ca9872b2"
defaultProtocolVersion = 5
reattachTimeout = 1 * time.Minute
)
var (
regexReattachLine = regexp.MustCompile(`.*unix\|(.*)\|grpc.*`)
)
// ProviderRunner is the interface for running
// Terraform native provider processes in the shared
// gRPC server mode
type ProviderRunner interface {
Start() (string, error)
Stop() error
}
// NoOpProviderRunner is a no-op ProviderRunner
type NoOpProviderRunner struct{}
// NewNoOpProviderRunner constructs a new NoOpProviderRunner
func NewNoOpProviderRunner() NoOpProviderRunner {
return NoOpProviderRunner{}
}
// Start takes no action
func (NoOpProviderRunner) Start() (string, error) {
return "", nil
}
// Stop takes no action
func (NoOpProviderRunner) Stop() error {
return nil
}
// SharedProvider runs the configured native provider plugin
// using the supplied command-line args
type SharedProvider struct {
nativeProviderPath string
nativeProviderArgs []string
reattachConfig string
nativeProviderName string
protocolVersion int
logger logging.Logger
executor exec.Interface
clock clock.Clock
mu *sync.Mutex
stopCh chan bool
}
// SharedProviderOption lets you configure the shared gRPC runner.
type SharedProviderOption func(runner *SharedProvider)
// WithNativeProviderArgs are the arguments to be passed to the native provider
func WithNativeProviderArgs(args ...string) SharedProviderOption {
return func(sp *SharedProvider) {
sp.nativeProviderArgs = args
}
}
// WithNativeProviderExecutor sets the process executor to be used
func WithNativeProviderExecutor(e exec.Interface) SharedProviderOption {
return func(sp *SharedProvider) {
sp.executor = e
}
}
// WithProtocolVersion sets the gRPC protocol version in use between
// the Terraform CLI and the native provider.
func WithProtocolVersion(protocolVersion int) SharedProviderOption {
return func(sp *SharedProvider) {
sp.protocolVersion = protocolVersion
}
}
// WithNativeProviderPath configures the Terraform provider executable path
// for the runner.
func WithNativeProviderPath(p string) SharedProviderOption {
return func(sp *SharedProvider) {
sp.nativeProviderPath = p
}
}
// WithNativeProviderName configures the Terraform provider name
// for the runner.
func WithNativeProviderName(n string) SharedProviderOption {
return func(sp *SharedProvider) {
sp.nativeProviderName = n
}
}
// WithNativeProviderLogger configures the logger for the runner.
func WithNativeProviderLogger(logger logging.Logger) SharedProviderOption {
return func(sp *SharedProvider) {
sp.logger = logger
}
}
// NewSharedProvider instantiates a SharedProvider runner with an
// OS executor using the supplied options.
func NewSharedProvider(opts ...SharedProviderOption) *SharedProvider {
sr := &SharedProvider{
protocolVersion: defaultProtocolVersion,
executor: exec.New(),
clock: clock.RealClock{},
mu: &sync.Mutex{},
}
for _, o := range opts {
o(sr)
}
return sr
}
// Start starts a shared gRPC server if not already running
// A logger, native provider's path and command-line arguments to be
// passed to it must have been properly configured.
// Returns any errors encountered and the reattachment configuration for
// the native provider.
func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo
sr.mu.Lock()
defer sr.mu.Unlock()
log := sr.logger.WithValues("nativeProviderPath", sr.nativeProviderPath, "nativeProviderArgs", sr.nativeProviderArgs)
if sr.reattachConfig != "" {
log.Debug("Shared gRPC server is running...", "reattachConfig", sr.reattachConfig)
return sr.reattachConfig, nil
}
log.Debug("Provider runner not yet started. Will fork a new native provider.")
errCh := make(chan error, 1)
reattachCh := make(chan string, 1)
sr.stopCh = make(chan bool, 1)
go func() {
defer close(errCh)
defer close(reattachCh)
defer func() {
sr.mu.Lock()
sr.reattachConfig = ""
sr.mu.Unlock()
}()
//#nosec G204 no user input
cmd := sr.executor.Command(sr.nativeProviderPath, sr.nativeProviderArgs...)
cmd.SetEnv(append(os.Environ(), fmt.Sprintf(fmtSetEnv, envMagicCookie, valMagicCookie)))
stdout, err := cmd.StdoutPipe()
if err != nil {
errCh <- err
return
}
if err := cmd.Start(); err != nil {
errCh <- err
return
}
log.Debug("Forked new native provider.")
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
t := scanner.Text()
matches := regexReattachLine.FindStringSubmatch(t)
if matches == nil {
continue
}
reattachCh <- fmt.Sprintf(fmtReattachEnv, sr.nativeProviderName, sr.protocolVersion, os.Getpid(), matches[1])
break
}
waitErrCh := make(chan error, 1)
go func() {
defer close(waitErrCh)
waitErrCh <- cmd.Wait()
}()
select {
case err := <-waitErrCh:
log.Info("Native Terraform provider process error", "error", err)
errCh <- err
case <-sr.stopCh:
cmd.Stop()
log.Debug("Stopped the provider runner.")
}
}()
select {
case reattachConfig := <-reattachCh:
sr.reattachConfig = reattachConfig
return sr.reattachConfig, nil
case err := <-errCh:
return "", err
case <-sr.clock.After(reattachTimeout):
return "", errors.Errorf(errFmtTimeout, reattachTimeout)
}
}
// Stop attempts to stop a shared gRPC server if it's already running.
func (sr *SharedProvider) Stop() error {
sr.mu.Lock()
defer sr.mu.Unlock()
sr.logger.Debug("Attempting to stop the provider runner.")
if sr.stopCh == nil {
return errors.New("shared provider process not started yet")
}
sr.stopCh <- true
close(sr.stopCh)
sr.stopCh = nil
return nil
}