forked from hashicorp/consul
/
structs.go
435 lines (386 loc) · 15.5 KB
/
structs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package wasm
import (
"fmt"
"net/url"
"time"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_wasm_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/wasm/v3"
"github.com/hernad/consul/acl"
"github.com/hernad/consul/api"
"github.com/hernad/consul/envoyextensions/extensioncommon"
"github.com/hashicorp/go-multierror"
"github.com/mitchellh/mapstructure"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"
)
// wasmConfig defines the configuration for a Wasm Envoy extension.
type wasmConfig struct {
// Protocol is the type of Wasm filter to apply, "tcp" or "http".
Protocol string
// ProxyType identifies the type of Envoy proxy that this extension applies to.
// The extension will only be configured for proxies that match this type and
// will be ignored for all other proxy types.
ProxyType api.ServiceKind
// ListenerType identifies the listener type the filter will be applied to.
ListenerType string
// PluginConfig holds the configuration for the Wasm plugin.
PluginConfig pluginConfig
}
// pluginConfig defines a Wasm plugin configuration.
type pluginConfig struct {
// Name is the unique name for the filter in a VM. For use in identifying the
// filter if multiple filters are handled by the same VmID and RootID.
// Also used for logging/debugging.
Name string
// RootID is a unique ID for a set of filters in a VM which will share a
// RootContext and Contexts if applicable (e.g. a Wasm HttpFilter and a Wasm AccessLog).
// All filters with the same RootID and VmID will share Context(s).
RootID string
// VmConfig is the configuration for starting or finding the Wasm VM that the
// filter will run in.
VmConfig vmConfig
// Configuration holds the configuration that will be encoded as bytes and passed to
// the plugin on startup (proxy_on_configure).
Configuration string
// CapabilityRestrictionConfiguration controls the Wasm ABI capabilities available
// to the filter.
CapabilityRestrictionConfiguration capabilityRestrictionConfiguration
// failOpen controls the behavior when a runtime error occurs during filter
// processing.
//
// If set to false runtime errors will result in a failed request.
// For TCP filters, the connection will be closed. For HTTP filters a 503
// status is returned.
//
// If set to true, a runtime error will result in the filter being bypassed.
failOpen bool
}
// vmConfig defines a Wasm VM configuration.
type vmConfig struct {
// VmID is an ID which will be used along with a hash of the Wasm code to
// determine which VM will be used for the plugin. All plugins which use
// the same VmID and code will use the same VM. May be left blank.
VmID string
// Runtime is the Wasm runtime type, one of: v8, wasmtime, wamr, or wavm.
Runtime string
// Code references the Wasm code that will run in the filter.
Code dataSource
// Configuration holds the configuration that will be encoded as bytes and
// passed to the plugin during VM startup (proxy_on_vm_start).
Configuration string
// EnvironmentVariables specifies environment variables to be injected to
// this VM which will be available through WASI’s environ_get and
// environ_get_sizes system calls.
EnvironmentVariables environmentVariables
}
// dataSource defines a local or remote location where Wasm code will be loaded from.
type dataSource struct {
// Local supports loading files from a local volume.
Local localDataSource
// Remote supports loading files from a remote server.
Remote remoteDataSource
}
// environmentVariables defines the environment variables that will be made available
// to the Wasm filter.
type environmentVariables struct {
// HostEnvKeys holds the keys of Envoy’s environment variables exposed to this VM.
// If a key exists in Envoy’s environment variables, then that key-value pair will
// be injected into the Wasm VM. If a key does not exist, it will be ignored.
HostEnvKeys []string
// KeyValues is a list of key-value pairs to be injected to this VM in the form of "KEY=VALUE".
KeyValues map[string]string
}
// localDataSource defines a file from a local file system.
type localDataSource struct {
// Filename is the path to the Wasm file on the local file system.
Filename string
}
// remoteDataSource defines a file from a remote file server.
type remoteDataSource struct {
// HttpURI
HttpURI httpURI
// SHA256 of the remote file. Used to validate the remote file.
SHA256 string
// RetryPolicy determines how retries are handled.
RetryPolicy retryPolicy
}
// httpURI defines a remote file using an HTTP URI.
type httpURI struct {
// Service is the upstream service the Wasm plugin will be fetched from.
Service api.CompoundServiceName
// URI is the location of the Wasm file on the upstream service.
URI string
// Timeout sets the maximum duration that a response can take.
Timeout string
timeout time.Duration
}
// retryPolicy defines how to handle retries when fetching remote files.
type retryPolicy struct {
// RetryBackOff holds parameters that control retry backoff strategy.
RetryBackOff retryBackoff
// NumRetries specifies the allowed number of retries.
NumRetries int
}
// retryBackoff holds parameters that control retry backoff strategy.
type retryBackoff struct {
// BaseInterval is the base interval to be used for the next back off
// computation. It should be greater than zero and less than or equal
// to MaxInterval.
BaseInterval string
// MaxInterval is the maximum interval between retries.
MaxInterval string
baseInterval time.Duration
maxInterval time.Duration
}
// capabilityRestrictionConfiguration controls Wasm capabilities available to modules.
type capabilityRestrictionConfiguration struct {
// AllowedCapabilities specifies the Wasm capabilities which will be allowed.
// Capabilities are mapped by name. The value which each capability maps to is
// currently ignored and should be left empty.
AllowedCapabilities map[string]any
}
// newWasmConfig creates a filterConfig from the given args.
// It starts with the default wasm configuration and merges in the config
// from the given args.
func newWasmConfig(args map[string]any) (*wasmConfig, error) {
cfg := &wasmConfig{}
if err := mapstructure.Decode(args, cfg); err != nil {
return cfg, err
}
cfg.normalize()
return cfg, nil
}
func (p *pluginConfig) asyncDataSource(rtCfg *extensioncommon.RuntimeConfig) (*envoy_core_v3.AsyncDataSource, error) {
// Local data source
if filename := p.VmConfig.Code.Local.Filename; filename != "" {
return &envoy_core_v3.AsyncDataSource{
Specifier: &envoy_core_v3.AsyncDataSource_Local{
Local: &envoy_core_v3.DataSource{
Specifier: &envoy_core_v3.DataSource_Filename{
Filename: filename,
},
},
},
}, nil
}
// Remote data source
// For a remote file, ensure there is an upstream cluster for the host specified in the URL.
// Envoy requires an explicit cluster in order to perform the DNS lookup required to actually
// fetch the data from the upstream source.
remote := &p.VmConfig.Code.Remote
clusterSNI := ""
for service, upstream := range rtCfg.Upstreams {
if service == remote.HttpURI.Service {
for sni := range upstream.SNIs {
clusterSNI = sni
break
}
}
}
if clusterSNI == "" {
return nil, fmt.Errorf("no upstream found for remote service %q", remote.HttpURI.Service.Name)
}
d := time.Second
if remote.HttpURI.timeout > 0 {
d = remote.HttpURI.timeout
}
timeout := &durationpb.Duration{Seconds: int64(d.Seconds())}
return &envoy_core_v3.AsyncDataSource{
Specifier: &envoy_core_v3.AsyncDataSource_Remote{
Remote: &envoy_core_v3.RemoteDataSource{
Sha256: remote.SHA256,
HttpUri: &envoy_core_v3.HttpUri{
Uri: remote.HttpURI.URI,
HttpUpstreamType: &envoy_core_v3.HttpUri_Cluster{
Cluster: clusterSNI,
},
Timeout: timeout,
},
RetryPolicy: p.retryPolicy(),
},
},
}, nil
}
func (p *pluginConfig) capConfig() *envoy_wasm_v3.CapabilityRestrictionConfig {
if len(p.CapabilityRestrictionConfiguration.AllowedCapabilities) == 0 {
return nil
}
allowedCaps := make(map[string]*envoy_wasm_v3.SanitizationConfig)
for key := range p.CapabilityRestrictionConfiguration.AllowedCapabilities {
allowedCaps[key] = &envoy_wasm_v3.SanitizationConfig{}
}
return &envoy_wasm_v3.CapabilityRestrictionConfig{
AllowedCapabilities: allowedCaps,
}
}
func (p *pluginConfig) envoyPluginConfig(rtCfg *extensioncommon.RuntimeConfig) (*envoy_wasm_v3.PluginConfig, error) {
var err error
var pluginCfgData, vmCfgData *anypb.Any
if p.Configuration != "" {
pluginCfgData, err = anypb.New(wrapperspb.String(p.Configuration))
if err != nil {
return nil, fmt.Errorf("failed to encode Wasm plugin configuration: %w", err)
}
}
if p.VmConfig.Configuration != "" {
vmCfgData, err = anypb.New(wrapperspb.String(p.VmConfig.Configuration))
if err != nil {
return nil, fmt.Errorf("failed to encode Wasm VM configuration: %w", err)
}
}
code, err := p.asyncDataSource(rtCfg)
if err != nil {
return nil, fmt.Errorf("failed to encode async data source configuration: %w", err)
}
var envVars *envoy_wasm_v3.EnvironmentVariables
if len(p.VmConfig.EnvironmentVariables.HostEnvKeys) > 0 ||
len(p.VmConfig.EnvironmentVariables.KeyValues) > 0 {
envVars = &envoy_wasm_v3.EnvironmentVariables{
HostEnvKeys: p.VmConfig.EnvironmentVariables.HostEnvKeys,
KeyValues: p.VmConfig.EnvironmentVariables.KeyValues,
}
}
return &envoy_wasm_v3.PluginConfig{
Name: p.Name,
RootId: p.RootID,
Vm: &envoy_wasm_v3.PluginConfig_VmConfig{
VmConfig: &envoy_wasm_v3.VmConfig{
VmId: p.VmConfig.VmID,
Runtime: fmt.Sprintf("envoy.wasm.runtime.%s", p.VmConfig.Runtime),
Code: code,
Configuration: vmCfgData,
EnvironmentVariables: envVars,
},
},
Configuration: pluginCfgData,
CapabilityRestrictionConfig: p.capConfig(),
FailOpen: p.failOpen,
}, nil
}
func (p *pluginConfig) retryPolicy() *envoy_core_v3.RetryPolicy {
remote := &p.VmConfig.Code.Remote
if remote.RetryPolicy.NumRetries <= 0 &&
remote.RetryPolicy.RetryBackOff.BaseInterval == "" &&
remote.RetryPolicy.RetryBackOff.MaxInterval == "" {
return nil
}
retryPolicy := &envoy_core_v3.RetryPolicy{}
if remote.RetryPolicy.NumRetries > 0 {
retryPolicy.NumRetries = wrapperspb.UInt32(uint32(remote.RetryPolicy.NumRetries))
}
var baseInterval, maxInterval *durationpb.Duration
if remote.RetryPolicy.RetryBackOff.baseInterval > 0 {
baseInterval = &durationpb.Duration{Seconds: int64(remote.RetryPolicy.RetryBackOff.baseInterval.Seconds())}
}
if remote.RetryPolicy.RetryBackOff.maxInterval > 0 {
maxInterval = &durationpb.Duration{Seconds: int64(remote.RetryPolicy.RetryBackOff.maxInterval.Seconds())}
}
if baseInterval != nil || maxInterval != nil {
retryPolicy.RetryBackOff = &envoy_core_v3.BackoffStrategy{
BaseInterval: baseInterval,
MaxInterval: maxInterval,
}
}
return retryPolicy
}
func (w *wasmConfig) normalize() {
if w.ProxyType == "" {
w.ProxyType = api.ServiceKindConnectProxy
}
if w.PluginConfig.VmConfig.Runtime == "" {
w.PluginConfig.VmConfig.Runtime = supportedRuntimes[0]
}
httpURI := &w.PluginConfig.VmConfig.Code.Remote.HttpURI
httpURI.Service.Namespace = acl.NamespaceOrDefault(httpURI.Service.Namespace)
httpURI.Service.Partition = acl.PartitionOrDefault(httpURI.Service.Partition)
if httpURI.timeout <= 0 {
httpURI.timeout = time.Second
}
}
// validate ensures the filterConfig is valid or it returns an error.
// This method must be called before using the configuration.
func (w *wasmConfig) validate() error {
var err, resultErr error
if w.Protocol != "tcp" && w.Protocol != "http" {
resultErr = multierror.Append(resultErr, fmt.Errorf(`unsupported Protocol %q, expected "tcp" or "http"`, w.Protocol))
}
if w.ProxyType != api.ServiceKindConnectProxy {
resultErr = multierror.Append(resultErr, fmt.Errorf("unsupported ProxyType %q, only %q is supported", w.ProxyType, api.ServiceKindConnectProxy))
}
if w.ListenerType != "inbound" && w.ListenerType != "outbound" {
resultErr = multierror.Append(resultErr, fmt.Errorf(`unsupported ListenerType %q, expected "inbound" or "outbound"`, w.ListenerType))
}
if err = validateRuntime(w.PluginConfig.VmConfig.Runtime); err != nil {
resultErr = multierror.Append(resultErr, err)
}
httpURI := &w.PluginConfig.VmConfig.Code.Remote.HttpURI
isLocal := w.PluginConfig.VmConfig.Code.Local.Filename != ""
isRemote := httpURI.Service.Name != "" || httpURI.URI != ""
if isLocal == isRemote {
resultErr = multierror.Append(resultErr, fmt.Errorf("VmConfig.Code must provide exactly one of Local or Remote data source"))
}
// If the data source is Local then validation is complete.
if isLocal {
return resultErr
}
// Validate the remote data source fields.
// Both Service and URI are required inputs for remote data sources.
// We could catch this above in the isRemote check; however, we do an explicit check
// here for UX to give the user extra feedback in case they only provide one or the other.
if httpURI.Service.Name == "" || httpURI.URI == "" {
resultErr = multierror.Append(resultErr, fmt.Errorf("both Service and URI are required for Remote data sources"))
}
if w.PluginConfig.VmConfig.Code.Remote.SHA256 == "" {
resultErr = multierror.Append(resultErr, fmt.Errorf("SHA256 checksum is required for Remote data sources"))
}
if _, err := url.Parse(httpURI.URI); err != nil {
resultErr = multierror.Append(resultErr, fmt.Errorf("invalid HttpURI.URI: %w", err))
}
if httpURI.Timeout != "" {
httpURI.timeout, err = time.ParseDuration(httpURI.Timeout)
if err != nil {
resultErr = multierror.Append(resultErr, fmt.Errorf("failed to parse HttpURI.Timeout %q as a duration: %w", httpURI.Timeout, err))
}
}
retryPolicy := &w.PluginConfig.VmConfig.Code.Remote.RetryPolicy
if retryPolicy.NumRetries < 0 {
resultErr = multierror.Append(resultErr, fmt.Errorf("RetryPolicy.NumRetries must be greater than or equal to 0"))
}
if retryPolicy.RetryBackOff.BaseInterval != "" {
retryPolicy.RetryBackOff.baseInterval, err = time.ParseDuration(retryPolicy.RetryBackOff.BaseInterval)
if err != nil {
resultErr = multierror.Append(resultErr, fmt.Errorf("failed to parse RetryBackOff.BaseInterval %q: %w", retryPolicy.RetryBackOff.BaseInterval, err))
}
}
if retryPolicy.RetryBackOff.MaxInterval != "" {
retryPolicy.RetryBackOff.maxInterval, err = time.ParseDuration(retryPolicy.RetryBackOff.MaxInterval)
if err != nil {
resultErr = multierror.Append(resultErr, fmt.Errorf("failed to parse RetryBackOff.MaxInterval %q: %w", retryPolicy.RetryBackOff.MaxInterval, err))
}
}
if retryPolicy.RetryBackOff.BaseInterval != "" && retryPolicy.RetryBackOff.baseInterval <= 0 {
resultErr = multierror.Append(resultErr, fmt.Errorf("RetryBackOff.BaseInterval %q must be greater than zero and less than or equal to RetryBackOff.MaxInterval %q",
retryPolicy.RetryBackOff.BaseInterval,
retryPolicy.RetryBackOff.MaxInterval),
)
}
if retryPolicy.RetryBackOff.MaxInterval != "" &&
retryPolicy.RetryBackOff.maxInterval < retryPolicy.RetryBackOff.baseInterval {
resultErr = multierror.Append(resultErr, fmt.Errorf("RetryBackOff.MaxInterval %q must be greater than or equal to RetryBackOff.BaseInterval %q",
retryPolicy.RetryBackOff.MaxInterval,
retryPolicy.RetryBackOff.BaseInterval),
)
}
return resultErr
}
func validateRuntime(s string) error {
for _, rt := range supportedRuntimes {
if s == rt {
return nil
}
}
return fmt.Errorf("unsupported runtime %q", s)
}