Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add acls and tls to endpoints controller #470

Merged
merged 8 commits into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ type EndpointsController struct {
Log logr.Logger
Scheme *runtime.Scheme
context.Context

// GetClientFunc allows us to specify how to get a consul client handle.
// This is used so that we can provide our own function for testing that is
// not dependent on having then ENV set up to pick up tokens and ca certs.
GetClientFunc func(string, string, string) (*api.Client, error)
kschoche marked this conversation as resolved.
Show resolved Hide resolved
}

func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -100,7 +105,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (

if hasBeenInjected(pod) {
// Create client for Consul agent local to the pod.
client, err := r.getConsulClient(pod.Status.HostIP)
client, err := r.GetClientFunc(r.ConsulScheme, r.ConsulPort, pod.Status.HostIP)
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.HostIP)
return ctrl.Result{}, err
Expand Down Expand Up @@ -300,7 +305,7 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context,
// On each agent, we need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
for _, pod := range list.Items {
// Create client for this agent.
client, err := r.getConsulClient(pod.Status.PodIP)
client, err := r.GetClientFunc(r.ConsulScheme, r.ConsulPort, pod.Status.PodIP)
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.PodIP)
return err
Expand Down Expand Up @@ -410,9 +415,9 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream,
return upstreams, nil
}

// getConsulClient returns an *api.Client that points at the consul agent local to the pod.
func (r *EndpointsController) getConsulClient(ip string) (*api.Client, error) {
newAddr := fmt.Sprintf("%s://%s:%s", r.ConsulScheme, ip, r.ConsulPort)
// GetConsulClient returns an *api.Client that points at the consul agent local to the pod.
func GetConsulClient(scheme, port, ip string) (*api.Client, error) {
newAddr := fmt.Sprintf("%s://%s:%s", scheme, ip, port)
localConfig := api.DefaultConfig()
localConfig.Address = newAddr

Expand Down
249 changes: 181 additions & 68 deletions connect-inject/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"strings"
"testing"

mapset "github.com/deckarep/golang-set"
"github.com/deckarep/golang-set"
logrtest "github.com/go-logr/logr/testing"
"github.com/hashicorp/consul-k8s/subcommand/common"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -106,6 +107,74 @@ func TestHasBeenInjected(t *testing.T) {
}
}

// TestProcessUpstreamsTLSandACLs enables TLS and ACLS and tests processUpstreams through
// the only path which sets up and uses a consul client: when proxy defaults need to be read.
// This test was plucked from the table test TestProcessUpstreams as the rest do not use the client.
func TestProcessUpstreamsTLSandACLs(t *testing.T) {
t.Parallel()
nodeName := "test-node"

masterToken := "b78d37c7-0ca7-5f4d-99ee-6d9975ce4586"
caFile, certFile, keyFile := common.GenerateServerCerts(t)
// Create test consul server with ACLs and TLS
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
c.ACL.Enabled = true
c.ACL.DefaultPolicy = "deny"
c.ACL.Tokens.Master = masterToken
c.CAFile = caFile
c.CertFile = certFile
c.KeyFile = keyFile
c.NodeName = nodeName
})
require.NoError(t, err)
defer consul.Stop()

consul.WaitForSerfCheck(t)
cfg := &api.Config{
Address: consul.HTTPSAddr,
Scheme: "https",
TLSConfig: api.TLSConfig{
CAFile: caFile,
},
Token: masterToken,
}
consulClient, err := api.NewClient(cfg)
require.NoError(t, err)
addr := strings.Split(consul.HTTPSAddr, ":")
consulPort := addr[1]

ce, _ := api.MakeConfigEntry(api.ProxyDefaults, "pd")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ce, _ := api.MakeConfigEntry(api.ProxyDefaults, "pd")
ce, _ := api.MakeConfigEntry(api.ProxyDefaults, "global")

I don't know how this worked because inside processUpstreams we look for a proxy defaults with name global. I think somewhere when you call ConfigEntries().Set it's ignoring the pd and using global but may as well set it to global here to avoid confusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on this. Not sure how it worked either..

pd := ce.(*api.ProxyConfigEntry)
pd.MeshGateway.Mode = api.MeshGatewayModeRemote
_, _, err = consulClient.ConfigEntries().Set(pd, &api.WriteOptions{})
require.NoError(t, err)

ep := &EndpointsController{
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
ConsulScheme: "https",
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
}

pod := createPod("pod1", "1.2.3.4", true)
pod.Annotations[annotationUpstreams] = "upstream1:1234:dc1"

upstreams, err := ep.processUpstreams(*pod)
require.NoError(t, err)

expected := []api.Upstream{
{
DestinationType: api.UpstreamDestTypeService,
DestinationName: "upstream1",
Datacenter: "dc1",
LocalBindPort: 1234,
},
}
require.Equal(t, expected, upstreams)
}

func TestProcessUpstreams(t *testing.T) {
t.Parallel()
nodeName := "test-node"
Expand Down Expand Up @@ -632,6 +701,10 @@ func TestReconcileCreateEndpoint(t *testing.T) {
require.NoError(t, err)
}

getclientfunction := func(string, string, string) (*api.Client, error) {
kschoche marked this conversation as resolved.
Show resolved Hide resolved
return consulClient, nil
}

// Create the endpoints controller
ep := &EndpointsController{
Client: fakeClient,
Expand All @@ -643,6 +716,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
GetClientFunc: getclientfunction,
kschoche marked this conversation as resolved.
Show resolved Hide resolved
}
namespacedName := types.NamespacedName{
Namespace: "default",
Expand Down Expand Up @@ -704,7 +778,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
// For the register and deregister codepath, this also tests that they work when the Consul service name is different
// from the K8s service name.
// This test covers EndpointsController.deregisterServiceOnAllAgents when services should be selectively deregistered
// since the map will not be nil.
// since the map will not be nil. This test also runs each test with ACLs+TLS enabled and disabled.
kschoche marked this conversation as resolved.
Show resolved Hide resolved
func TestReconcileUpdateEndpoint(t *testing.T) {
t.Parallel()
nodeName := "test-node"
Expand Down Expand Up @@ -1201,79 +1275,113 @@ func TestReconcileUpdateEndpoint(t *testing.T) {
expectedProxySvcInstances: []*api.CatalogService{},
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
// The agent pod needs to have the address 127.0.0.1 so when the
// code gets the agent pods via the label component=client, and
// makes requests against the agent API, it will actually hit the
// test server we have on localhost.
fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false)
fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"}
for _, secure := range []bool{true, false} {
for _, tt := range cases {
t.Run(fmt.Sprintf("%s - secure: %v", tt.name, secure), func(t *testing.T) {
// The agent pod needs to have the address 127.0.0.1 so when the
// code gets the agent pods via the label component=client, and
// makes requests against the agent API, it will actually hit the
// test server we have on localhost.
fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false)
fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"}

// Create fake k8s client
k8sObjects := append(tt.k8sObjects(), fakeClientPod)
fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build()
// Create fake k8s client
k8sObjects := append(tt.k8sObjects(), fakeClientPod)
fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build()

// Create test consul server
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
c.NodeName = nodeName
})
require.NoError(t, err)
defer consul.Stop()

consul.WaitForLeader(t)
consulClient, err := api.NewClient(&api.Config{
Address: consul.HTTPAddr,
})
require.NoError(t, err)
addr := strings.Split(consul.HTTPAddr, ":")
consulPort := addr[1]
masterToken := "b78d37c7-0ca7-5f4d-99ee-6d9975ce4586"
caFile, certFile, keyFile := common.GenerateServerCerts(t)
// Create test consul server, with ACLs+TLS if necessary
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
if secure {
c.ACL.Enabled = true
c.ACL.DefaultPolicy = "deny"
c.ACL.Tokens.Master = masterToken
c.CAFile = caFile
c.CertFile = certFile
c.KeyFile = keyFile
}
c.NodeName = nodeName
})
require.NoError(t, err)
defer consul.Stop()
consul.WaitForSerfCheck(t)
kschoche marked this conversation as resolved.
Show resolved Hide resolved

// Register service and proxy in consul
for _, svc := range tt.initialConsulSvcs {
err = consulClient.Agent().ServiceRegister(svc)
cfg := &api.Config{
Scheme: "http",
Address: consul.HTTPAddr,
}
if secure {
cfg.Address = consul.HTTPSAddr
cfg.Scheme = "https"
cfg.TLSConfig = api.TLSConfig{
CAFile: caFile,
}
cfg.Token = masterToken
}
consulClient, err := api.NewClient(cfg)
require.NoError(t, err)
}
addr := strings.Split(cfg.Address, ":")
consulPort := addr[1]

// Create the endpoints controller
ep := &EndpointsController{
Client: fakeClient,
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
ConsulScheme: "http",
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
}
namespacedName := types.NamespacedName{
Namespace: "default",
Name: "service-updated",
}
// Register service and proxy in consul
for _, svc := range tt.initialConsulSvcs {
err = consulClient.Agent().ServiceRegister(svc)
require.NoError(t, err)
}

resp, err := ep.Reconcile(context.Background(), ctrl.Request{
NamespacedName: namespacedName,
})
require.NoError(t, err)
require.False(t, resp.Requeue)
// Override the internal implementation of getConsulClient() because
// there is not a straight-forward way of providing the ACL token and ca info
// in tests through the env.
// Normally this is provided through the Command via `-ca-file`, etc, httpFlags
// which are later re-read for each new consul client in api.DefaultConfig(), but this test does
// not run as part of a Command and so api.DefaultConfig() would set our TLS and ACL config to empty.
// We can re-use this client as the test defines the fakeClientPod to be 127.0.0.1.
kschoche marked this conversation as resolved.
Show resolved Hide resolved
getclientfunction := func(string, string, string) (*api.Client, error) {
kschoche marked this conversation as resolved.
Show resolved Hide resolved
return consulClient, nil
}

// After reconciliation, Consul should have service-updated with the correct number of instances
serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", nil)
require.NoError(t, err)
require.Len(t, serviceInstances, tt.expectedNumSvcInstances)
for i, instance := range serviceInstances {
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceID, instance.ServiceID)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceAddress, instance.ServiceAddress)
}
proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", tt.consulSvcName), "", nil)
require.NoError(t, err)
require.Len(t, proxyServiceInstances, tt.expectedNumSvcInstances)
for i, instance := range proxyServiceInstances {
require.Equal(t, tt.expectedProxySvcInstances[i].ServiceID, instance.ServiceID)
require.Equal(t, tt.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress)
}
})
// Create the endpoints controller
ep := &EndpointsController{
Client: fakeClient,
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
ConsulScheme: cfg.Scheme,
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
GetClientFunc: getclientfunction,
}
namespacedName := types.NamespacedName{
Namespace: "default",
Name: "service-updated",
}

resp, err := ep.Reconcile(context.Background(), ctrl.Request{
NamespacedName: namespacedName,
})
require.NoError(t, err)
require.False(t, resp.Requeue)

// After reconciliation, Consul should have service-updated with the correct number of instances
serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", nil)
require.NoError(t, err)
require.Len(t, serviceInstances, tt.expectedNumSvcInstances)
for i, instance := range serviceInstances {
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceID, instance.ServiceID)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceAddress, instance.ServiceAddress)
}
proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", tt.consulSvcName), "", nil)
require.NoError(t, err)
require.Len(t, proxyServiceInstances, tt.expectedNumSvcInstances)
for i, instance := range proxyServiceInstances {
require.Equal(t, tt.expectedProxySvcInstances[i].ServiceID, instance.ServiceID)
require.Equal(t, tt.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress)
}
})
}
}
}

Expand Down Expand Up @@ -1371,6 +1479,10 @@ func TestReconcileDeleteEndpoint(t *testing.T) {
require.NoError(t, err)
}

getclientfunction := func(string, string, string) (*api.Client, error) {
kschoche marked this conversation as resolved.
Show resolved Hide resolved
return consulClient, nil
}

// Create the endpoints controller
ep := &EndpointsController{
Client: fakeClient,
Expand All @@ -1382,6 +1494,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) {
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
GetClientFunc: getclientfunction,
}

// Set up the Endpoint that will be reconciled, and reconcile
Expand Down
1 change: 1 addition & 0 deletions subcommand/inject-connect/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ func (c *Command) Run(args []string) int {
ReleaseName: c.flagReleaseName,
ReleaseNamespace: c.flagReleaseNamespace,
Context: ctx,
GetClientFunc: connectinject.GetConsulClient,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", connectinject.EndpointsController{})
return 1
Expand Down