Skip to content

Commit

Permalink
Add acls and tls to endpoints controller (#470)
Browse files Browse the repository at this point in the history
Add support for TLS and ACLs to the endpoints controller.
  • Loading branch information
kschoche committed Apr 6, 2021
1 parent 37a4384 commit 8a9a841
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 78 deletions.
12 changes: 7 additions & 5 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type EndpointsController struct {
client.Client
// ConsulClient points at the agent local to the connect-inject deployment pod.
ConsulClient *api.Client
// ConsulClientCfg is the client config used by the ConsulClient when calling NewClient().
ConsulClientCfg *api.Config
// ConsulScheme is the scheme to use when making API calls to Consul,
// i.e. "http" or "https".
ConsulScheme string
Expand Down Expand Up @@ -100,7 +102,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (

if hasBeenInjected(pod) {
// Create client for Consul agent local to the pod.
client, err := r.getConsulClient(pod.Status.HostIP)
client, err := r.remoteConsulClient(pod.Status.HostIP)
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.HostIP)
return ctrl.Result{}, err
Expand Down Expand Up @@ -300,7 +302,7 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context,
// On each agent, we need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
for _, pod := range list.Items {
// Create client for this agent.
client, err := r.getConsulClient(pod.Status.PodIP)
client, err := r.remoteConsulClient(pod.Status.PodIP)
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.PodIP)
return err
Expand Down Expand Up @@ -410,10 +412,10 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream,
return upstreams, nil
}

// getConsulClient returns an *api.Client that points at the consul agent local to the pod.
func (r *EndpointsController) getConsulClient(ip string) (*api.Client, error) {
// remoteConsulClient returns an *api.Client that points at the consul agent local to the pod.
func (r *EndpointsController) remoteConsulClient(ip string) (*api.Client, error) {
newAddr := fmt.Sprintf("%s://%s:%s", r.ConsulScheme, ip, r.ConsulPort)
localConfig := api.DefaultConfig()
localConfig := r.ConsulClientCfg
localConfig.Address = newAddr

return consul.NewClient(localConfig)
Expand Down
242 changes: 169 additions & 73 deletions connect-inject/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"strings"
"testing"

mapset "github.com/deckarep/golang-set"
"github.com/deckarep/golang-set"
logrtest "github.com/go-logr/logr/testing"
"github.com/hashicorp/consul-k8s/subcommand/common"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -106,6 +107,74 @@ func TestHasBeenInjected(t *testing.T) {
}
}

// TestProcessUpstreamsTLSandACLs enables TLS and ACLS and tests processUpstreams through
// the only path which sets up and uses a consul client: when proxy defaults need to be read.
// This test was plucked from the table test TestProcessUpstreams as the rest do not use the client.
func TestProcessUpstreamsTLSandACLs(t *testing.T) {
t.Parallel()
nodeName := "test-node"

masterToken := "b78d37c7-0ca7-5f4d-99ee-6d9975ce4586"
caFile, certFile, keyFile := common.GenerateServerCerts(t)
// Create test consul server with ACLs and TLS
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
c.ACL.Enabled = true
c.ACL.DefaultPolicy = "deny"
c.ACL.Tokens.Master = masterToken
c.CAFile = caFile
c.CertFile = certFile
c.KeyFile = keyFile
c.NodeName = nodeName
})
require.NoError(t, err)
defer consul.Stop()

consul.WaitForSerfCheck(t)
cfg := &api.Config{
Address: consul.HTTPSAddr,
Scheme: "https",
TLSConfig: api.TLSConfig{
CAFile: caFile,
},
Token: masterToken,
}
consulClient, err := api.NewClient(cfg)
require.NoError(t, err)
addr := strings.Split(consul.HTTPSAddr, ":")
consulPort := addr[1]

ce, _ := api.MakeConfigEntry(api.ProxyDefaults, "global")
pd := ce.(*api.ProxyConfigEntry)
pd.MeshGateway.Mode = api.MeshGatewayModeRemote
_, _, err = consulClient.ConfigEntries().Set(pd, &api.WriteOptions{})
require.NoError(t, err)

ep := &EndpointsController{
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
ConsulScheme: "https",
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
}

pod := createPod("pod1", "1.2.3.4", true)
pod.Annotations[annotationUpstreams] = "upstream1:1234:dc1"

upstreams, err := ep.processUpstreams(*pod)
require.NoError(t, err)

expected := []api.Upstream{
{
DestinationType: api.UpstreamDestTypeService,
DestinationName: "upstream1",
Datacenter: "dc1",
LocalBindPort: 1234,
},
}
require.Equal(t, expected, upstreams)
}

func TestProcessUpstreams(t *testing.T) {
t.Parallel()
nodeName := "test-node"
Expand Down Expand Up @@ -617,11 +686,12 @@ func TestReconcileCreateEndpoint(t *testing.T) {
})
require.NoError(t, err)
defer consul.Stop()

consul.WaitForLeader(t)
consulClient, err := api.NewClient(&api.Config{

cfg := &api.Config{
Address: consul.HTTPAddr,
})
}
consulClient, err := api.NewClient(cfg)
require.NoError(t, err)
addr := strings.Split(consul.HTTPAddr, ":")
consulPort := addr[1]
Expand All @@ -643,6 +713,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
ConsulClientCfg: cfg,
}
namespacedName := types.NamespacedName{
Namespace: "default",
Expand Down Expand Up @@ -704,7 +775,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
// For the register and deregister codepath, this also tests that they work when the Consul service name is different
// from the K8s service name.
// This test covers EndpointsController.deregisterServiceOnAllAgents when services should be selectively deregistered
// since the map will not be nil.
// since the map will not be nil. This test also runs each test with ACLs+TLS enabled and disabled, since it covers all the cases where a Consul client is created.
func TestReconcileUpdateEndpoint(t *testing.T) {
t.Parallel()
nodeName := "test-node"
Expand Down Expand Up @@ -1201,79 +1272,102 @@ func TestReconcileUpdateEndpoint(t *testing.T) {
expectedProxySvcInstances: []*api.CatalogService{},
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
// The agent pod needs to have the address 127.0.0.1 so when the
// code gets the agent pods via the label component=client, and
// makes requests against the agent API, it will actually hit the
// test server we have on localhost.
fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false)
fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"}
for _, secure := range []bool{true, false} {
for _, tt := range cases {
t.Run(fmt.Sprintf("%s - secure: %v", tt.name, secure), func(t *testing.T) {
// The agent pod needs to have the address 127.0.0.1 so when the
// code gets the agent pods via the label component=client, and
// makes requests against the agent API, it will actually hit the
// test server we have on localhost.
fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false)
fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"}

// Create fake k8s client
k8sObjects := append(tt.k8sObjects(), fakeClientPod)
fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build()
// Create fake k8s client
k8sObjects := append(tt.k8sObjects(), fakeClientPod)
fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build()

// Create test consul server
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
c.NodeName = nodeName
})
require.NoError(t, err)
defer consul.Stop()

consul.WaitForLeader(t)
consulClient, err := api.NewClient(&api.Config{
Address: consul.HTTPAddr,
})
require.NoError(t, err)
addr := strings.Split(consul.HTTPAddr, ":")
consulPort := addr[1]
masterToken := "b78d37c7-0ca7-5f4d-99ee-6d9975ce4586"
caFile, certFile, keyFile := common.GenerateServerCerts(t)
// Create test consul server, with ACLs+TLS if necessary
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
if secure {
c.ACL.Enabled = true
c.ACL.DefaultPolicy = "deny"
c.ACL.Tokens.Master = masterToken
c.CAFile = caFile
c.CertFile = certFile
c.KeyFile = keyFile
}
c.NodeName = nodeName
})
require.NoError(t, err)
defer consul.Stop()
consul.WaitForSerfCheck(t)

// Register service and proxy in consul
for _, svc := range tt.initialConsulSvcs {
err = consulClient.Agent().ServiceRegister(svc)
cfg := &api.Config{
Scheme: "http",
Address: consul.HTTPAddr,
}
if secure {
cfg.Address = consul.HTTPSAddr
cfg.Scheme = "https"
cfg.TLSConfig = api.TLSConfig{
CAFile: caFile,
}
cfg.Token = masterToken
}
consulClient, err := api.NewClient(cfg)
require.NoError(t, err)
}
addr := strings.Split(cfg.Address, ":")
consulPort := addr[1]

// Create the endpoints controller
ep := &EndpointsController{
Client: fakeClient,
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
ConsulScheme: "http",
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
}
namespacedName := types.NamespacedName{
Namespace: "default",
Name: "service-updated",
}
// Register service and proxy in consul
for _, svc := range tt.initialConsulSvcs {
err = consulClient.Agent().ServiceRegister(svc)
require.NoError(t, err)
}

resp, err := ep.Reconcile(context.Background(), ctrl.Request{
NamespacedName: namespacedName,
})
require.NoError(t, err)
require.False(t, resp.Requeue)
// Create the endpoints controller
ep := &EndpointsController{
Client: fakeClient,
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
ConsulScheme: cfg.Scheme,
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
ConsulClientCfg: cfg,
}
namespacedName := types.NamespacedName{
Namespace: "default",
Name: "service-updated",
}

// After reconciliation, Consul should have service-updated with the correct number of instances
serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", nil)
require.NoError(t, err)
require.Len(t, serviceInstances, tt.expectedNumSvcInstances)
for i, instance := range serviceInstances {
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceID, instance.ServiceID)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceAddress, instance.ServiceAddress)
}
proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", tt.consulSvcName), "", nil)
require.NoError(t, err)
require.Len(t, proxyServiceInstances, tt.expectedNumSvcInstances)
for i, instance := range proxyServiceInstances {
require.Equal(t, tt.expectedProxySvcInstances[i].ServiceID, instance.ServiceID)
require.Equal(t, tt.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress)
}
})
resp, err := ep.Reconcile(context.Background(), ctrl.Request{
NamespacedName: namespacedName,
})
require.NoError(t, err)
require.False(t, resp.Requeue)

// After reconciliation, Consul should have service-updated with the correct number of instances
serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", nil)
require.NoError(t, err)
require.Len(t, serviceInstances, tt.expectedNumSvcInstances)
for i, instance := range serviceInstances {
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceID, instance.ServiceID)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceAddress, instance.ServiceAddress)
}
proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", tt.consulSvcName), "", nil)
require.NoError(t, err)
require.Len(t, proxyServiceInstances, tt.expectedNumSvcInstances)
for i, instance := range proxyServiceInstances {
require.Equal(t, tt.expectedProxySvcInstances[i].ServiceID, instance.ServiceID)
require.Equal(t, tt.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress)
}
})
}
}
}

Expand Down Expand Up @@ -1358,9 +1452,10 @@ func TestReconcileDeleteEndpoint(t *testing.T) {
defer consul.Stop()

consul.WaitForLeader(t)
consulClient, err := api.NewClient(&api.Config{
cfg := &api.Config{
Address: consul.HTTPAddr,
})
}
consulClient, err := api.NewClient(cfg)
require.NoError(t, err)
addr := strings.Split(consul.HTTPAddr, ":")
consulPort := addr[1]
Expand All @@ -1382,6 +1477,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) {
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
ConsulClientCfg: cfg,
}

// Set up the Endpoint that will be reconciled, and reconcile
Expand Down
1 change: 1 addition & 0 deletions subcommand/inject-connect/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ func (c *Command) Run(args []string) int {
ReleaseName: c.flagReleaseName,
ReleaseNamespace: c.flagReleaseNamespace,
Context: ctx,
ConsulClientCfg: cfg,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", connectinject.EndpointsController{})
return 1
Expand Down

0 comments on commit 8a9a841

Please sign in to comment.