From 8a9a841b4bc16fcd4dd1329ce4ce5feaf20fab11 Mon Sep 17 00:00:00 2001 From: Kyle Schochenmaier Date: Tue, 6 Apr 2021 09:37:03 -0500 Subject: [PATCH] Add acls and tls to endpoints controller (#470) Add support for TLS and ACLs to the endpoints controller. --- connect-inject/endpoints_controller.go | 12 +- connect-inject/endpoints_controller_test.go | 242 ++++++++++++++------ subcommand/inject-connect/command.go | 1 + 3 files changed, 177 insertions(+), 78 deletions(-) diff --git a/connect-inject/endpoints_controller.go b/connect-inject/endpoints_controller.go index a5d4600741..cc01ce7c86 100644 --- a/connect-inject/endpoints_controller.go +++ b/connect-inject/endpoints_controller.go @@ -34,6 +34,8 @@ type EndpointsController struct { client.Client // ConsulClient points at the agent local to the connect-inject deployment pod. ConsulClient *api.Client + // ConsulClientCfg is the client config used by the ConsulClient when calling NewClient(). + ConsulClientCfg *api.Config // ConsulScheme is the scheme to use when making API calls to Consul, // i.e. "http" or "https". ConsulScheme string @@ -100,7 +102,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( if hasBeenInjected(pod) { // Create client for Consul agent local to the pod. - client, err := r.getConsulClient(pod.Status.HostIP) + client, err := r.remoteConsulClient(pod.Status.HostIP) if err != nil { r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.HostIP) return ctrl.Result{}, err @@ -300,7 +302,7 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, // On each agent, we need to get services matching "k8s-service-name" and "k8s-namespace" metadata. for _, pod := range list.Items { // Create client for this agent. - client, err := r.getConsulClient(pod.Status.PodIP) + client, err := r.remoteConsulClient(pod.Status.PodIP) if err != nil { r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.PodIP) return err @@ -410,10 +412,10 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream, return upstreams, nil } -// getConsulClient returns an *api.Client that points at the consul agent local to the pod. -func (r *EndpointsController) getConsulClient(ip string) (*api.Client, error) { +// remoteConsulClient returns an *api.Client that points at the consul agent local to the pod. +func (r *EndpointsController) remoteConsulClient(ip string) (*api.Client, error) { newAddr := fmt.Sprintf("%s://%s:%s", r.ConsulScheme, ip, r.ConsulPort) - localConfig := api.DefaultConfig() + localConfig := r.ConsulClientCfg localConfig.Address = newAddr return consul.NewClient(localConfig) diff --git a/connect-inject/endpoints_controller_test.go b/connect-inject/endpoints_controller_test.go index b27c153659..4aca6b7fb3 100644 --- a/connect-inject/endpoints_controller_test.go +++ b/connect-inject/endpoints_controller_test.go @@ -6,8 +6,9 @@ import ( "strings" "testing" - mapset "github.com/deckarep/golang-set" + "github.com/deckarep/golang-set" logrtest "github.com/go-logr/logr/testing" + "github.com/hashicorp/consul-k8s/subcommand/common" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil" "github.com/stretchr/testify/require" @@ -106,6 +107,74 @@ func TestHasBeenInjected(t *testing.T) { } } +// TestProcessUpstreamsTLSandACLs enables TLS and ACLS and tests processUpstreams through +// the only path which sets up and uses a consul client: when proxy defaults need to be read. +// This test was plucked from the table test TestProcessUpstreams as the rest do not use the client. +func TestProcessUpstreamsTLSandACLs(t *testing.T) { + t.Parallel() + nodeName := "test-node" + + masterToken := "b78d37c7-0ca7-5f4d-99ee-6d9975ce4586" + caFile, certFile, keyFile := common.GenerateServerCerts(t) + // Create test consul server with ACLs and TLS + consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + c.ACL.Enabled = true + c.ACL.DefaultPolicy = "deny" + c.ACL.Tokens.Master = masterToken + c.CAFile = caFile + c.CertFile = certFile + c.KeyFile = keyFile + c.NodeName = nodeName + }) + require.NoError(t, err) + defer consul.Stop() + + consul.WaitForSerfCheck(t) + cfg := &api.Config{ + Address: consul.HTTPSAddr, + Scheme: "https", + TLSConfig: api.TLSConfig{ + CAFile: caFile, + }, + Token: masterToken, + } + consulClient, err := api.NewClient(cfg) + require.NoError(t, err) + addr := strings.Split(consul.HTTPSAddr, ":") + consulPort := addr[1] + + ce, _ := api.MakeConfigEntry(api.ProxyDefaults, "global") + pd := ce.(*api.ProxyConfigEntry) + pd.MeshGateway.Mode = api.MeshGatewayModeRemote + _, _, err = consulClient.ConfigEntries().Set(pd, &api.WriteOptions{}) + require.NoError(t, err) + + ep := &EndpointsController{ + Log: logrtest.TestLogger{T: t}, + ConsulClient: consulClient, + ConsulPort: consulPort, + ConsulScheme: "https", + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + } + + pod := createPod("pod1", "1.2.3.4", true) + pod.Annotations[annotationUpstreams] = "upstream1:1234:dc1" + + upstreams, err := ep.processUpstreams(*pod) + require.NoError(t, err) + + expected := []api.Upstream{ + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream1", + Datacenter: "dc1", + LocalBindPort: 1234, + }, + } + require.Equal(t, expected, upstreams) +} + func TestProcessUpstreams(t *testing.T) { t.Parallel() nodeName := "test-node" @@ -617,11 +686,12 @@ func TestReconcileCreateEndpoint(t *testing.T) { }) require.NoError(t, err) defer consul.Stop() - consul.WaitForLeader(t) - consulClient, err := api.NewClient(&api.Config{ + + cfg := &api.Config{ Address: consul.HTTPAddr, - }) + } + consulClient, err := api.NewClient(cfg) require.NoError(t, err) addr := strings.Split(consul.HTTPAddr, ":") consulPort := addr[1] @@ -643,6 +713,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { DenyK8sNamespacesSet: mapset.NewSetWith(), ReleaseName: "consul", ReleaseNamespace: "default", + ConsulClientCfg: cfg, } namespacedName := types.NamespacedName{ Namespace: "default", @@ -704,7 +775,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { // For the register and deregister codepath, this also tests that they work when the Consul service name is different // from the K8s service name. // This test covers EndpointsController.deregisterServiceOnAllAgents when services should be selectively deregistered -// since the map will not be nil. +// since the map will not be nil. This test also runs each test with ACLs+TLS enabled and disabled, since it covers all the cases where a Consul client is created. func TestReconcileUpdateEndpoint(t *testing.T) { t.Parallel() nodeName := "test-node" @@ -1201,79 +1272,102 @@ func TestReconcileUpdateEndpoint(t *testing.T) { expectedProxySvcInstances: []*api.CatalogService{}, }, } - for _, tt := range cases { - t.Run(tt.name, func(t *testing.T) { - // The agent pod needs to have the address 127.0.0.1 so when the - // code gets the agent pods via the label component=client, and - // makes requests against the agent API, it will actually hit the - // test server we have on localhost. - fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false) - fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"} + for _, secure := range []bool{true, false} { + for _, tt := range cases { + t.Run(fmt.Sprintf("%s - secure: %v", tt.name, secure), func(t *testing.T) { + // The agent pod needs to have the address 127.0.0.1 so when the + // code gets the agent pods via the label component=client, and + // makes requests against the agent API, it will actually hit the + // test server we have on localhost. + fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false) + fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"} - // Create fake k8s client - k8sObjects := append(tt.k8sObjects(), fakeClientPod) - fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build() + // Create fake k8s client + k8sObjects := append(tt.k8sObjects(), fakeClientPod) + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build() - // Create test consul server - consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { - c.NodeName = nodeName - }) - require.NoError(t, err) - defer consul.Stop() - - consul.WaitForLeader(t) - consulClient, err := api.NewClient(&api.Config{ - Address: consul.HTTPAddr, - }) - require.NoError(t, err) - addr := strings.Split(consul.HTTPAddr, ":") - consulPort := addr[1] + masterToken := "b78d37c7-0ca7-5f4d-99ee-6d9975ce4586" + caFile, certFile, keyFile := common.GenerateServerCerts(t) + // Create test consul server, with ACLs+TLS if necessary + consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + if secure { + c.ACL.Enabled = true + c.ACL.DefaultPolicy = "deny" + c.ACL.Tokens.Master = masterToken + c.CAFile = caFile + c.CertFile = certFile + c.KeyFile = keyFile + } + c.NodeName = nodeName + }) + require.NoError(t, err) + defer consul.Stop() + consul.WaitForSerfCheck(t) - // Register service and proxy in consul - for _, svc := range tt.initialConsulSvcs { - err = consulClient.Agent().ServiceRegister(svc) + cfg := &api.Config{ + Scheme: "http", + Address: consul.HTTPAddr, + } + if secure { + cfg.Address = consul.HTTPSAddr + cfg.Scheme = "https" + cfg.TLSConfig = api.TLSConfig{ + CAFile: caFile, + } + cfg.Token = masterToken + } + consulClient, err := api.NewClient(cfg) require.NoError(t, err) - } + addr := strings.Split(cfg.Address, ":") + consulPort := addr[1] - // Create the endpoints controller - ep := &EndpointsController{ - Client: fakeClient, - Log: logrtest.TestLogger{T: t}, - ConsulClient: consulClient, - ConsulPort: consulPort, - ConsulScheme: "http", - AllowK8sNamespacesSet: mapset.NewSetWith("*"), - DenyK8sNamespacesSet: mapset.NewSetWith(), - ReleaseName: "consul", - ReleaseNamespace: "default", - } - namespacedName := types.NamespacedName{ - Namespace: "default", - Name: "service-updated", - } + // Register service and proxy in consul + for _, svc := range tt.initialConsulSvcs { + err = consulClient.Agent().ServiceRegister(svc) + require.NoError(t, err) + } - resp, err := ep.Reconcile(context.Background(), ctrl.Request{ - NamespacedName: namespacedName, - }) - require.NoError(t, err) - require.False(t, resp.Requeue) + // Create the endpoints controller + ep := &EndpointsController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + ConsulClient: consulClient, + ConsulPort: consulPort, + ConsulScheme: cfg.Scheme, + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + ReleaseName: "consul", + ReleaseNamespace: "default", + ConsulClientCfg: cfg, + } + namespacedName := types.NamespacedName{ + Namespace: "default", + Name: "service-updated", + } - // After reconciliation, Consul should have service-updated with the correct number of instances - serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", nil) - require.NoError(t, err) - require.Len(t, serviceInstances, tt.expectedNumSvcInstances) - for i, instance := range serviceInstances { - require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceID, instance.ServiceID) - require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceAddress, instance.ServiceAddress) - } - proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", tt.consulSvcName), "", nil) - require.NoError(t, err) - require.Len(t, proxyServiceInstances, tt.expectedNumSvcInstances) - for i, instance := range proxyServiceInstances { - require.Equal(t, tt.expectedProxySvcInstances[i].ServiceID, instance.ServiceID) - require.Equal(t, tt.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress) - } - }) + resp, err := ep.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: namespacedName, + }) + require.NoError(t, err) + require.False(t, resp.Requeue) + + // After reconciliation, Consul should have service-updated with the correct number of instances + serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", nil) + require.NoError(t, err) + require.Len(t, serviceInstances, tt.expectedNumSvcInstances) + for i, instance := range serviceInstances { + require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceID, instance.ServiceID) + require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceAddress, instance.ServiceAddress) + } + proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", tt.consulSvcName), "", nil) + require.NoError(t, err) + require.Len(t, proxyServiceInstances, tt.expectedNumSvcInstances) + for i, instance := range proxyServiceInstances { + require.Equal(t, tt.expectedProxySvcInstances[i].ServiceID, instance.ServiceID) + require.Equal(t, tt.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress) + } + }) + } } } @@ -1358,9 +1452,10 @@ func TestReconcileDeleteEndpoint(t *testing.T) { defer consul.Stop() consul.WaitForLeader(t) - consulClient, err := api.NewClient(&api.Config{ + cfg := &api.Config{ Address: consul.HTTPAddr, - }) + } + consulClient, err := api.NewClient(cfg) require.NoError(t, err) addr := strings.Split(consul.HTTPAddr, ":") consulPort := addr[1] @@ -1382,6 +1477,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) { DenyK8sNamespacesSet: mapset.NewSetWith(), ReleaseName: "consul", ReleaseNamespace: "default", + ConsulClientCfg: cfg, } // Set up the Endpoint that will be reconciled, and reconcile diff --git a/subcommand/inject-connect/command.go b/subcommand/inject-connect/command.go index 97e24b6fd7..67a7e41fd7 100644 --- a/subcommand/inject-connect/command.go +++ b/subcommand/inject-connect/command.go @@ -418,6 +418,7 @@ func (c *Command) Run(args []string) int { ReleaseName: c.flagReleaseName, ReleaseNamespace: c.flagReleaseNamespace, Context: ctx, + ConsulClientCfg: cfg, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", connectinject.EndpointsController{}) return 1