Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add acls and tls to endpoints controller #470

Merged
merged 8 commits into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type EndpointsController struct {
client.Client
// ConsulClient points at the agent local to the connect-inject deployment pod.
ConsulClient *api.Client
// ConsulClientCfg is the client config used by the ConsulClient when calling NewClient().
ConsulClientCfg *api.Config
// ConsulScheme is the scheme to use when making API calls to Consul,
// i.e. "http" or "https".
ConsulScheme string
Expand Down Expand Up @@ -413,7 +415,7 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream,
// getConsulClient returns an *api.Client that points at the consul agent local to the pod.
func (r *EndpointsController) getConsulClient(ip string) (*api.Client, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked Luke's suggestion from here to name this remoteConsulClient or externalConsulClient

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remoteConsulClient looks great, switching to that, thanks!

newAddr := fmt.Sprintf("%s://%s:%s", r.ConsulScheme, ip, r.ConsulPort)
localConfig := api.DefaultConfig()
localConfig := r.ConsulClientCfg
localConfig.Address = newAddr

return consul.NewClient(localConfig)
Expand Down
242 changes: 169 additions & 73 deletions connect-inject/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"strings"
"testing"

mapset "github.com/deckarep/golang-set"
"github.com/deckarep/golang-set"
logrtest "github.com/go-logr/logr/testing"
"github.com/hashicorp/consul-k8s/subcommand/common"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -106,6 +107,74 @@ func TestHasBeenInjected(t *testing.T) {
}
}

// TestProcessUpstreamsTLSandACLs enables TLS and ACLS and tests processUpstreams through
// the only path which sets up and uses a consul client: when proxy defaults need to be read.
// This test was plucked from the table test TestProcessUpstreams as the rest do not use the client.
func TestProcessUpstreamsTLSandACLs(t *testing.T) {
t.Parallel()
nodeName := "test-node"

masterToken := "b78d37c7-0ca7-5f4d-99ee-6d9975ce4586"
caFile, certFile, keyFile := common.GenerateServerCerts(t)
// Create test consul server with ACLs and TLS
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
c.ACL.Enabled = true
c.ACL.DefaultPolicy = "deny"
c.ACL.Tokens.Master = masterToken
c.CAFile = caFile
c.CertFile = certFile
c.KeyFile = keyFile
c.NodeName = nodeName
})
require.NoError(t, err)
defer consul.Stop()

consul.WaitForSerfCheck(t)
cfg := &api.Config{
Address: consul.HTTPSAddr,
Scheme: "https",
TLSConfig: api.TLSConfig{
CAFile: caFile,
},
Token: masterToken,
}
consulClient, err := api.NewClient(cfg)
require.NoError(t, err)
addr := strings.Split(consul.HTTPSAddr, ":")
consulPort := addr[1]

ce, _ := api.MakeConfigEntry(api.ProxyDefaults, "global")
pd := ce.(*api.ProxyConfigEntry)
pd.MeshGateway.Mode = api.MeshGatewayModeRemote
_, _, err = consulClient.ConfigEntries().Set(pd, &api.WriteOptions{})
require.NoError(t, err)

ep := &EndpointsController{
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
ConsulScheme: "https",
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
}

pod := createPod("pod1", "1.2.3.4", true)
pod.Annotations[annotationUpstreams] = "upstream1:1234:dc1"

upstreams, err := ep.processUpstreams(*pod)
require.NoError(t, err)

expected := []api.Upstream{
{
DestinationType: api.UpstreamDestTypeService,
DestinationName: "upstream1",
Datacenter: "dc1",
LocalBindPort: 1234,
},
}
require.Equal(t, expected, upstreams)
}

func TestProcessUpstreams(t *testing.T) {
t.Parallel()
nodeName := "test-node"
Expand Down Expand Up @@ -617,11 +686,12 @@ func TestReconcileCreateEndpoint(t *testing.T) {
})
require.NoError(t, err)
defer consul.Stop()

consul.WaitForLeader(t)
consulClient, err := api.NewClient(&api.Config{

cfg := &api.Config{
Address: consul.HTTPAddr,
})
}
consulClient, err := api.NewClient(cfg)
require.NoError(t, err)
addr := strings.Split(consul.HTTPAddr, ":")
consulPort := addr[1]
Expand All @@ -643,6 +713,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
ConsulClientCfg: cfg,
}
namespacedName := types.NamespacedName{
Namespace: "default",
Expand Down Expand Up @@ -704,7 +775,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
// For the register and deregister codepath, this also tests that they work when the Consul service name is different
// from the K8s service name.
// This test covers EndpointsController.deregisterServiceOnAllAgents when services should be selectively deregistered
// since the map will not be nil.
// since the map will not be nil. This test also runs each test with ACLs+TLS enabled and disabled, since it covers all the cases where a Consul client is created.
func TestReconcileUpdateEndpoint(t *testing.T) {
t.Parallel()
nodeName := "test-node"
Expand Down Expand Up @@ -1201,79 +1272,102 @@ func TestReconcileUpdateEndpoint(t *testing.T) {
expectedProxySvcInstances: []*api.CatalogService{},
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
// The agent pod needs to have the address 127.0.0.1 so when the
// code gets the agent pods via the label component=client, and
// makes requests against the agent API, it will actually hit the
// test server we have on localhost.
fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false)
fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"}
for _, secure := range []bool{true, false} {
for _, tt := range cases {
t.Run(fmt.Sprintf("%s - secure: %v", tt.name, secure), func(t *testing.T) {
// The agent pod needs to have the address 127.0.0.1 so when the
// code gets the agent pods via the label component=client, and
// makes requests against the agent API, it will actually hit the
// test server we have on localhost.
fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false)
fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"}

// Create fake k8s client
k8sObjects := append(tt.k8sObjects(), fakeClientPod)
fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build()
// Create fake k8s client
k8sObjects := append(tt.k8sObjects(), fakeClientPod)
fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build()

// Create test consul server
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
c.NodeName = nodeName
})
require.NoError(t, err)
defer consul.Stop()

consul.WaitForLeader(t)
consulClient, err := api.NewClient(&api.Config{
Address: consul.HTTPAddr,
})
require.NoError(t, err)
addr := strings.Split(consul.HTTPAddr, ":")
consulPort := addr[1]
masterToken := "b78d37c7-0ca7-5f4d-99ee-6d9975ce4586"
caFile, certFile, keyFile := common.GenerateServerCerts(t)
// Create test consul server, with ACLs+TLS if necessary
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
if secure {
c.ACL.Enabled = true
c.ACL.DefaultPolicy = "deny"
c.ACL.Tokens.Master = masterToken
c.CAFile = caFile
c.CertFile = certFile
c.KeyFile = keyFile
}
c.NodeName = nodeName
})
require.NoError(t, err)
defer consul.Stop()
consul.WaitForSerfCheck(t)
kschoche marked this conversation as resolved.
Show resolved Hide resolved

// Register service and proxy in consul
for _, svc := range tt.initialConsulSvcs {
err = consulClient.Agent().ServiceRegister(svc)
cfg := &api.Config{
Scheme: "http",
Address: consul.HTTPAddr,
}
if secure {
cfg.Address = consul.HTTPSAddr
cfg.Scheme = "https"
cfg.TLSConfig = api.TLSConfig{
CAFile: caFile,
}
cfg.Token = masterToken
}
consulClient, err := api.NewClient(cfg)
require.NoError(t, err)
}
addr := strings.Split(cfg.Address, ":")
consulPort := addr[1]

// Create the endpoints controller
ep := &EndpointsController{
Client: fakeClient,
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
ConsulScheme: "http",
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
}
namespacedName := types.NamespacedName{
Namespace: "default",
Name: "service-updated",
}
// Register service and proxy in consul
for _, svc := range tt.initialConsulSvcs {
err = consulClient.Agent().ServiceRegister(svc)
require.NoError(t, err)
}

resp, err := ep.Reconcile(context.Background(), ctrl.Request{
NamespacedName: namespacedName,
})
require.NoError(t, err)
require.False(t, resp.Requeue)
// Create the endpoints controller
ep := &EndpointsController{
Client: fakeClient,
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
ConsulScheme: cfg.Scheme,
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
ConsulClientCfg: cfg,
}
namespacedName := types.NamespacedName{
Namespace: "default",
Name: "service-updated",
}

// After reconciliation, Consul should have service-updated with the correct number of instances
serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", nil)
require.NoError(t, err)
require.Len(t, serviceInstances, tt.expectedNumSvcInstances)
for i, instance := range serviceInstances {
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceID, instance.ServiceID)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceAddress, instance.ServiceAddress)
}
proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", tt.consulSvcName), "", nil)
require.NoError(t, err)
require.Len(t, proxyServiceInstances, tt.expectedNumSvcInstances)
for i, instance := range proxyServiceInstances {
require.Equal(t, tt.expectedProxySvcInstances[i].ServiceID, instance.ServiceID)
require.Equal(t, tt.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress)
}
})
resp, err := ep.Reconcile(context.Background(), ctrl.Request{
NamespacedName: namespacedName,
})
require.NoError(t, err)
require.False(t, resp.Requeue)

// After reconciliation, Consul should have service-updated with the correct number of instances
serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", nil)
require.NoError(t, err)
require.Len(t, serviceInstances, tt.expectedNumSvcInstances)
for i, instance := range serviceInstances {
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceID, instance.ServiceID)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceAddress, instance.ServiceAddress)
}
proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", tt.consulSvcName), "", nil)
require.NoError(t, err)
require.Len(t, proxyServiceInstances, tt.expectedNumSvcInstances)
for i, instance := range proxyServiceInstances {
require.Equal(t, tt.expectedProxySvcInstances[i].ServiceID, instance.ServiceID)
require.Equal(t, tt.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress)
}
})
}
}
}

Expand Down Expand Up @@ -1358,9 +1452,10 @@ func TestReconcileDeleteEndpoint(t *testing.T) {
defer consul.Stop()

consul.WaitForLeader(t)
consulClient, err := api.NewClient(&api.Config{
cfg := &api.Config{
Address: consul.HTTPAddr,
})
}
consulClient, err := api.NewClient(cfg)
require.NoError(t, err)
addr := strings.Split(consul.HTTPAddr, ":")
consulPort := addr[1]
Expand All @@ -1382,6 +1477,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) {
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
ConsulClientCfg: cfg,
}

// Set up the Endpoint that will be reconciled, and reconcile
Expand Down
1 change: 1 addition & 0 deletions subcommand/inject-connect/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ func (c *Command) Run(args []string) int {
ReleaseName: c.flagReleaseName,
ReleaseNamespace: c.flagReleaseNamespace,
Context: ctx,
ConsulClientCfg: api.DefaultConfig(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may need to use cfg here because there's a bunch of code above that is doing stuff to it after it gets it out of api.DefaultConfig():

	// create Consul API config object
	cfg := api.DefaultConfig()
	c.http.MergeOntoConfig(cfg)
	if cfg.TLSConfig.CAFile == "" && c.flagConsulCACert != "" {
		cfg.TLSConfig.CAFile = c.flagConsulCACert
	}
	consulURLRaw := cfg.Address
	// cfg.Address may or may not be prefixed with scheme.
	if !strings.Contains(cfg.Address, "://") {
		consulURLRaw = fmt.Sprintf("%s://%s", cfg.Scheme, cfg.Address)
	}
	consulURL, err := url.Parse(consulURLRaw)
	if err != nil {
		c.UI.Error(fmt.Sprintf("error parsing consul address %q: %s", consulURLRaw, err))
		return 1
	}

	// load CA file contents
	var consulCACert []byte
	if cfg.TLSConfig.CAFile != "" {
		var err error
		consulCACert, err = ioutil.ReadFile(cfg.TLSConfig.CAFile)
		if err != nil {
			c.UI.Error(fmt.Sprintf("error reading Consul's CA cert file %q: %s", cfg.TLSConfig.CAFile, err))
			return 1
		}
	}

I haven't tested this out in an acceptance test but I'm guessing we need to use that config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch. At runtime it worked for me deployed on a cluster as-is, but we definitely can use cfg instead here.
I believe api.DefaultConfig() wouldn't have picked up flag overrides on command line, thx!

}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", connectinject.EndpointsController{})
return 1
Expand Down