Skip to content

Commit

Permalink
xDS server implementation for serving Envoy as a Connect proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
banks committed Oct 1, 2018
1 parent b5b2ba3 commit 5ed61df
Show file tree
Hide file tree
Showing 10 changed files with 1,927 additions and 7 deletions.
11 changes: 11 additions & 0 deletions agent/connect/uri.go
Expand Up @@ -33,6 +33,17 @@ var (
`^/ns/([^/]+)/dc/([^/]+)/svc/([^/]+)$`)
)

// ParseCertURIFromString attempts to parse a string representation of a
// certificate URI as a convenince helper around ParseCertURI.
func ParseCertURIFromString(input string) (CertURI, error) {
// Parse the certificate URI from the string
uriRaw, err := url.Parse(input)
if err != nil {
return nil, err
}
return ParseCertURI(uriRaw)
}

// ParseCertURI parses a the URI value from a TLS certificate.
func ParseCertURI(input *url.URL) (CertURI, error) {
if input.Scheme != "spiffe" {
Expand Down
9 changes: 2 additions & 7 deletions agent/connect/uri_test.go
@@ -1,7 +1,6 @@
package connect

import (
"net/url"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -57,17 +56,13 @@ var testCertURICases = []struct {
},
}

func TestParseCertURI(t *testing.T) {
func TestParseCertURIFromString(t *testing.T) {
for _, tc := range testCertURICases {
t.Run(tc.Name, func(t *testing.T) {
assert := assert.New(t)

// Parse the URI, should always be valid
uri, err := url.Parse(tc.URI)
assert.Nil(err)

// Parse the ID and check the error/return value
actual, err := ParseCertURI(uri)
actual, err := ParseCertURIFromString(tc.URI)
if err != nil {
t.Logf("parse error: %s", err.Error())
}
Expand Down
79 changes: 79 additions & 0 deletions agent/xds/clusters.go
@@ -0,0 +1,79 @@
package xds

import (
"errors"
"time"

"github.com/gogo/protobuf/proto"

"github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/hashicorp/consul/agent/proxycfg"
)

// clustersFromSnapshot returns the xDS API reprepsentation of the "clusters"
// (upstreams) in the snapshot.
func clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
if cfgSnap == nil {
return nil, errors.New("nil config given")
}
// Inlude the "app" cluster for the public listener
clusters := make([]proto.Message, len(cfgSnap.Proxy.Upstreams)+1)

clusters[0] = makeAppCluster(cfgSnap)

for idx, upstream := range cfgSnap.Proxy.Upstreams {
clusters[idx+1] = makeUpstreamCluster(upstream.Identifier(), cfgSnap)
}

return clusters, nil
}

func makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot) *v2.Cluster {
addr := cfgSnap.Proxy.LocalServiceAddress
if addr == "" {
addr = "127.0.0.1"
}
return &v2.Cluster{
Name: LocalAppClusterName,
// TODO(banks): make this configurable from the proxy config
ConnectTimeout: 5 * time.Second,
Type: v2.Cluster_STATIC,
// API v2 docs say hosts is deprecated and should use LoadAssignment as
// below.. but it doesn't work for tcp_proxy target for some reason.
Hosts: []*core.Address{makeAddressPtr(addr, cfgSnap.Proxy.LocalServicePort)},
// LoadAssignment: &v2.ClusterLoadAssignment{
// ClusterName: LocalAppClusterName,
// Endpoints: []endpoint.LocalityLbEndpoints{
// {
// LbEndpoints: []endpoint.LbEndpoint{
// makeEndpoint(LocalAppClusterName,
// addr,
// cfgSnap.Proxy.LocalServicePort),
// },
// },
// },
// },
}
}

func makeUpstreamCluster(name string, cfgSnap *proxycfg.ConfigSnapshot) *v2.Cluster {
return &v2.Cluster{
Name: name,
// TODO(banks): make this configurable from the upstream config
ConnectTimeout: 5 * time.Second,
Type: v2.Cluster_EDS,
EdsClusterConfig: &v2.Cluster_EdsClusterConfig{
EdsConfig: &core.ConfigSource{
ConfigSourceSpecifier: &core.ConfigSource_Ads{
Ads: &core.AggregatedConfigSource{},
},
},
},
// Enable TLS upstream with the configured client certificate.
TlsContext: &auth.UpstreamTlsContext{
CommonTlsContext: makeCommonTLSContext(cfgSnap),
},
}
}
58 changes: 58 additions & 0 deletions agent/xds/endpoints.go
@@ -0,0 +1,58 @@
package xds

import (
"errors"

"github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
"github.com/gogo/protobuf/proto"

"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)

// endpointsFromSnapshot returns the xDS API reprepsentation of the "endpoints"
// (upstream instances) in the snapshot.
func endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
if cfgSnap == nil {
return nil, errors.New("nil config given")
}
resources := make([]proto.Message, 0, len(cfgSnap.UpstreamEndpoints))
for id, endpoints := range cfgSnap.UpstreamEndpoints {
if len(endpoints) < 1 {
continue
}
la := makeLoadAssignment(id, endpoints)
resources = append(resources, la)
}
return resources, nil
}

func makeEndpoint(clusterName, host string, port int) endpoint.LbEndpoint {
return endpoint.LbEndpoint{
Endpoint: &endpoint.Endpoint{
Address: makeAddressPtr(host, port),
},
}
}

func makeLoadAssignment(clusterName string, endpoints structs.CheckServiceNodes) *v2.ClusterLoadAssignment {
es := make([]endpoint.LbEndpoint, 0, len(endpoints))
for _, ep := range endpoints {
addr := ep.Service.Address
if addr == "" {
addr = ep.Node.Address
}
es = append(es, endpoint.LbEndpoint{
Endpoint: &endpoint.Endpoint{
Address: makeAddressPtr(addr, ep.Service.Port),
},
})
}
return &v2.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: []endpoint.LocalityLbEndpoints{{
LbEndpoints: es,
}},
}
}

0 comments on commit 5ed61df

Please sign in to comment.