forked from hashicorp/consul
-
Notifications
You must be signed in to change notification settings - Fork 0
/
clusters.go
171 lines (151 loc) · 5.21 KB
/
clusters.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package xds
import (
"encoding/json"
"errors"
"fmt"
"time"
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)
// clustersFromSnapshot returns the xDS API representation of the "clusters"
// (upstreams) in the snapshot.
func clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
if cfgSnap == nil {
return nil, errors.New("nil config given")
}
// Include the "app" cluster for the public listener
clusters := make([]proto.Message, len(cfgSnap.Proxy.Upstreams)+1)
var err error
clusters[0], err = makeAppCluster(cfgSnap)
if err != nil {
return nil, err
}
for idx, upstream := range cfgSnap.Proxy.Upstreams {
clusters[idx+1], err = makeUpstreamCluster(upstream, cfgSnap)
if err != nil {
return nil, err
}
}
return clusters, nil
}
func makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
var c *envoy.Cluster
var err error
// If we have overridden local cluster config try to parse it into an Envoy cluster
if clusterJSONRaw, ok := cfgSnap.Proxy.Config["envoy_local_cluster_json"]; ok {
if clusterJSON, ok := clusterJSONRaw.(string); ok {
c, err = makeClusterFromUserConfig(clusterJSON)
if err != nil {
return c, err
}
}
}
if c == nil {
addr := cfgSnap.Proxy.LocalServiceAddress
if addr == "" {
addr = "127.0.0.1"
}
c = &envoy.Cluster{
Name: LocalAppClusterName,
ConnectTimeout: 5 * time.Second,
Type: envoy.Cluster_STATIC,
// API v2 docs say hosts is deprecated and should use LoadAssignment as
// below.. but it doesn't work for tcp_proxy target for some reason.
Hosts: []*envoycore.Address{makeAddressPtr(addr, cfgSnap.Proxy.LocalServicePort)},
// LoadAssignment: &envoy.ClusterLoadAssignment{
// ClusterName: LocalAppClusterName,
// Endpoints: []endpoint.LocalityLbEndpoints{
// {
// LbEndpoints: []endpoint.LbEndpoint{
// makeEndpoint(LocalAppClusterName,
// addr,
// cfgSnap.Proxy.LocalServicePort),
// },
// },
// },
// },
}
}
return c, err
}
func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
var c *envoy.Cluster
var err error
// If we have overridden cluster config attempt to parse it into an Envoy cluster
if clusterJSONRaw, ok := upstream.Config["envoy_cluster_json"]; ok {
if clusterJSON, ok := clusterJSONRaw.(string); ok {
c, err = makeClusterFromUserConfig(clusterJSON)
if err != nil {
return c, err
}
}
}
if c == nil {
c = &envoy.Cluster{
Name: upstream.Identifier(),
ConnectTimeout: 5 * time.Second,
Type: envoy.Cluster_EDS,
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
EdsConfig: &envoycore.ConfigSource{
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
Ads: &envoycore.AggregatedConfigSource{},
},
},
},
}
}
// Enable TLS upstream with the configured client certificate.
c.TlsContext = &envoyauth.UpstreamTlsContext{
CommonTlsContext: makeCommonTLSContext(cfgSnap),
}
return c, nil
}
// makeClusterFromUserConfig returns the listener config decoded from an
// arbitrary proto3 json format string or an error if it's invalid.
//
// For now we only support embedding in JSON strings because of the hcl parsing
// pain (see config.go comment above call to patchSliceOfMaps). Until we
// refactor config parser a _lot_ user's opaque config that contains arrays will
// be mangled. We could actually fix that up in mapstructure which knows the
// type of the target so could resolve the slices to singletons unambiguously
// and it would work for us here... but we still have the problem that the
// config would render incorrectly in general in our HTTP API responses so we
// really need to fix it "properly".
//
// When we do that we can support just nesting the config directly into the
// JSON/hcl naturally but this is a stop-gap that gets us an escape hatch
// immediately. It's also probably not a bad thing to support long-term since
// any config generated by other systems will likely be in canonical protobuf
// from rather than our slight variant in JSON/hcl.
func makeClusterFromUserConfig(configJSON string) (*envoy.Cluster, error) {
var jsonFields map[string]*json.RawMessage
if err := json.Unmarshal([]byte(configJSON), &jsonFields); err != nil {
fmt.Println("Custom error", err, configJSON)
return nil, err
}
var c envoy.Cluster
if _, ok := jsonFields["@type"]; ok {
// Type field is present so decode it as a types.Any
var any types.Any
err := jsonpb.UnmarshalString(configJSON, &any)
if err != nil {
return nil, err
}
// And then unmarshal the listener again...
err = proto.Unmarshal(any.Value, &c)
if err != nil {
panic(err)
//return nil, err
}
return &c, err
}
// No @type so try decoding as a straight listener.
err := jsonpb.UnmarshalString(configJSON, &c)
return &c, err
}