Skip to content

Commit

Permalink
vault: Initial wan fed support (tls and gossip only)
Browse files Browse the repository at this point in the history
  • Loading branch information
ishustava committed Feb 9, 2022
1 parent 5027066 commit 1c03ab2
Show file tree
Hide file tree
Showing 12 changed files with 821 additions and 267 deletions.
6 changes: 3 additions & 3 deletions acceptance/framework/consul/cli_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func NewCLICluster(
require.NoError(t, err)

// Merge all helm values
MergeMaps(values, valuesFromConfig)
MergeMaps(values, helmValues)
helpers.MergeMaps(values, valuesFromConfig)
helpers.MergeMaps(values, helmValues)

logger := terratestLogger.New(logger.TestLogger{})

Expand Down Expand Up @@ -179,7 +179,7 @@ func (h *CLICluster) Destroy(t *testing.T) {
func (h *CLICluster) Upgrade(t *testing.T, helmValues map[string]string) {
t.Helper()

MergeMaps(h.helmOptions.SetValues, helmValues)
helpers.MergeMaps(h.helmOptions.SetValues, helmValues)
helm.Upgrade(t, h.helmOptions, config.HelmChartPath, h.releaseName)
helpers.WaitForAllPodsToBeReady(t, h.kubernetesClient, h.helmOptions.KubectlOptions.Namespace, fmt.Sprintf("release=%s", h.releaseName))
}
Expand Down
14 changes: 3 additions & 11 deletions acceptance/framework/consul/consul_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func NewHelmCluster(
require.NoError(t, err)

// Merge all helm values
MergeMaps(values, valuesFromConfig)
MergeMaps(values, helmValues)
helpers.MergeMaps(values, valuesFromConfig)
helpers.MergeMaps(values, helmValues)

logger := terratestLogger.New(logger.TestLogger{})

Expand Down Expand Up @@ -213,7 +213,7 @@ func (h *HelmCluster) Destroy(t *testing.T) {
func (h *HelmCluster) Upgrade(t *testing.T, helmValues map[string]string) {
t.Helper()

MergeMaps(h.helmOptions.SetValues, helmValues)
helpers.MergeMaps(h.helmOptions.SetValues, helmValues)
helm.Upgrade(t, h.helmOptions, config.HelmChartPath, h.releaseName)
helpers.WaitForAllPodsToBeReady(t, h.kubernetesClient, h.helmOptions.KubectlOptions.Namespace, fmt.Sprintf("release=%s", h.releaseName))
}
Expand Down Expand Up @@ -472,11 +472,3 @@ func defaultValues() map[string]string {
}
return values
}

// MergeMaps will merge the values in b with values in a and save in a.
// If there are conflicts, the values in b will overwrite the values in a.
func MergeMaps(a, b map[string]string) {
for k, v := range b {
a[k] = v
}
}
44 changes: 44 additions & 0 deletions acceptance/framework/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/gruntwork-io/terratest/modules/helm"
"github.com/hashicorp/consul/api"

terratestk8s "github.com/gruntwork-io/terratest/modules/k8s"
"github.com/gruntwork-io/terratest/modules/random"
Expand Down Expand Up @@ -204,3 +205,46 @@ func IsReady(pod corev1.Pod) bool {

return false
}

// VerifyFederation checks that the WAN federation between servers is successful
// by first checking members are alive from the perspective of both servers.
// If secure is true, it will also check that the ACL replication is running on the secondary server.
func VerifyFederation(t *testing.T, primaryClient, secondaryClient *api.Client, releaseName string, secure bool) {
retrier := &retry.Timer{Timeout: 5 * time.Minute, Wait: 1 * time.Second}
start := time.Now()

// Check that server in dc1 is healthy from the perspective of the server in dc2, and vice versa.
// We're calling the Consul health API, as opposed to checking serf membership status,
// because we need to make sure that the federated servers can make API calls and forward requests
// from one server to another. From running tests in CI for a while and using serf membership status before,
// we've noticed that the status could be "alive" as soon as the server in the secondary cluster joins the primary
// and then switch to "failed". This would require us to check that the status is "alive" is showing consistently for
// some amount of time, which could be quite flakey. Calling the API in another datacenter allows us to check that
// each server can forward calls to another, which is what we need for connect.
retry.RunWith(retrier, t, func(r *retry.R) {
secondaryServerHealth, _, err := primaryClient.Health().Node(fmt.Sprintf("%s-consul-server-0", releaseName), &api.QueryOptions{Datacenter: "dc2"})
require.NoError(r, err)
require.Equal(r, secondaryServerHealth.AggregatedStatus(), api.HealthPassing)

primaryServerHealth, _, err := secondaryClient.Health().Node(fmt.Sprintf("%s-consul-server-0", releaseName), &api.QueryOptions{Datacenter: "dc1"})
require.NoError(r, err)
require.Equal(r, primaryServerHealth.AggregatedStatus(), api.HealthPassing)

if secure {
replicationStatus, _, err := secondaryClient.ACL().Replication(nil)
require.NoError(r, err)
require.True(r, replicationStatus.Enabled)
require.True(r, replicationStatus.Running)
}
})

logger.Logf(t, "Took %s to verify federation", time.Since(start))
}

// MergeMaps will merge the values in b with values in a and save in a.
// If there are conflicts, the values in b will overwrite the values in a.
func MergeMaps(a, b map[string]string) {
for k, v := range b {
a[k] = v
}
}
60 changes: 60 additions & 0 deletions acceptance/framework/k8s/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package k8s

import (
"context"
"fmt"
"testing"
"time"

"github.com/hashicorp/consul-k8s/acceptance/framework/config"
"github.com/hashicorp/consul-k8s/acceptance/framework/environment"
"github.com/hashicorp/consul-k8s/acceptance/framework/helpers"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// KubernetesAPIServerHost returns the Kubernetes API server URL depending on test configuration.
func KubernetesAPIServerHost(t *testing.T, cfg *config.TestConfig, ctx environment.TestContext) string {
var k8sAPIHost string
// When running on kind, the kube API address in kubeconfig will have a localhost address
// which will not work from inside the container. That's why we need to use the endpoints address instead
// which will point the node IP.
if cfg.UseKind {
// The Kubernetes AuthMethod host is read from the endpoints for the Kubernetes service.
kubernetesEndpoint, err := ctx.KubernetesClient(t).CoreV1().Endpoints("default").Get(context.Background(), "kubernetes", metav1.GetOptions{})
require.NoError(t, err)
k8sAPIHost = fmt.Sprintf("https://%s:%d", kubernetesEndpoint.Subsets[0].Addresses[0].IP, kubernetesEndpoint.Subsets[0].Ports[0].Port)
} else {
k8sAPIHost = helpers.KubernetesAPIServerHostFromOptions(t, ctx.KubectlOptions(t))
}

return k8sAPIHost
}

// ServiceHost returns a host for a Kubernetes service depending on test configuration.
func ServiceHost(t *testing.T, cfg *config.TestConfig, ctx environment.TestContext, serviceName string) string {
if cfg.UseKind {
nodeList, err := ctx.KubernetesClient(t).CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
require.NoError(t, err)
// Get the address of the (only) node from the Kind cluster.
return nodeList.Items[0].Status.Addresses[0].Address
} else {
var host string
// It can take some time for the load balancers to be ready and have an IP/Hostname.
// Wait for 60 seconds before failing.
retry.RunWith(&retry.Counter{Wait: 1 * time.Second, Count: 60}, t, func(r *retry.R) {
svc, err := ctx.KubernetesClient(t).CoreV1().Services(ctx.KubectlOptions(t).Namespace).Get(context.Background(), serviceName, metav1.GetOptions{})
require.NoError(t, err)
require.NotEmpty(r, svc.Status.LoadBalancer.Ingress)
// On AWS, load balancers have a hostname for ingress, while on Azure and GCP
// load balancers have IPs.
if svc.Status.LoadBalancer.Ingress[0].Hostname != "" {
host = svc.Status.LoadBalancer.Ingress[0].Hostname
} else {
host = svc.Status.LoadBalancer.Ingress[0].IP
}
})
return host
}
}
85 changes: 64 additions & 21 deletions acceptance/framework/vault/vault_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,24 @@ type VaultCluster struct {
}

// NewVaultCluster creates a VaultCluster which will be used to install Vault using Helm.
func NewVaultCluster(t *testing.T, ctx environment.TestContext, cfg *config.TestConfig, releaseName string) *VaultCluster {
func NewVaultCluster(t *testing.T, ctx environment.TestContext, cfg *config.TestConfig, releaseName string, helmValues map[string]string) *VaultCluster {

logger := terratestLogger.New(logger.TestLogger{})

kopts := ctx.KubectlOptions(t)

values := defaultHelmValues(releaseName)
if cfg.EnablePodSecurityPolicies {
values["global.psp.enable"] = "true"
}

helpers.MergeMaps(values, helmValues)
vaultHelmOpts := &helm.Options{
SetValues: defaultHelmValues(releaseName),
SetValues: values,
KubectlOptions: kopts,
Logger: logger,
}
if cfg.EnablePodSecurityPolicies {
vaultHelmOpts.SetValues["global.psp.enable"] = "true"
}

helm.AddRepo(t, vaultHelmOpts, "hashicorp", "https://helm.releases.hashicorp.com")
// Ignoring the error from `helm repo update` as it could fail due to stale cache or unreachable servers and we're
// asserting a chart version on Install which would fail in an obvious way should this not succeed.
Expand All @@ -80,7 +84,7 @@ func NewVaultCluster(t *testing.T, ctx environment.TestContext, cfg *config.Test
}

// VaultClient returns the vault client.
func (v *VaultCluster) VaultClient(t *testing.T) *vapi.Client { return v.vaultClient }
func (v *VaultCluster) VaultClient(*testing.T) *vapi.Client { return v.vaultClient }

// SetupVaultClient sets up and returns a Vault Client.
func (v *VaultCluster) SetupVaultClient(t *testing.T) *vapi.Client {
Expand Down Expand Up @@ -125,8 +129,11 @@ func (v *VaultCluster) SetupVaultClient(t *testing.T) *vapi.Client {
}

// bootstrap sets up Kubernetes auth method and enables secrets engines.
func (v *VaultCluster) bootstrap(t *testing.T, ctx environment.TestContext) {

func (v *VaultCluster) bootstrap(t *testing.T) {
if !v.serverEnabled() {
v.logger.Logf(t, "skipping bootstrapping Vault because Vault server is not enabled")
return
}
v.vaultClient = v.SetupVaultClient(t)

// Enable the KV-V2 Secrets engine.
Expand All @@ -149,20 +156,39 @@ func (v *VaultCluster) bootstrap(t *testing.T, ctx environment.TestContext) {
})
require.NoError(t, err)

v.logger.Logf(t, "updating vault kube auth config")

// To configure the auth method, we need to read the token and the ca cert from the Vault's server
// service account token.
namespace := v.helmOptions.KubectlOptions.Namespace
sa, err := v.kubernetesClient.CoreV1().ServiceAccounts(namespace).Get(context.Background(), fmt.Sprintf("%s-vault", v.releaseName), metav1.GetOptions{})
vaultServerServiceAccountName := fmt.Sprintf("%s-vault", v.releaseName)
v.ConfigureAuthMethod(t, v.vaultClient, "kubernetes", "https://kubernetes.default.svc", vaultServerServiceAccountName, namespace)
}

// ConfigureAuthMethod configures the auth method in Vault from the provided service account name and namespace,
// kubernetes host and auth path.
// We need to take vaultClient here in case this Vault cluster does not have a server to run API commands against.
func (v *VaultCluster) ConfigureAuthMethod(t *testing.T, vaultClient *vapi.Client, authPath, k8sHost, saName, saNS string) {
v.logger.Logf(t, "enabling kubernetes auth method on %s path", authPath)
err := vaultClient.Sys().EnableAuthWithOptions(authPath, &vapi.EnableAuthOptions{
Type: "kubernetes",
})
require.NoError(t, err)
require.Len(t, sa.Secrets, 1)
tokenSecret, err := v.kubernetesClient.CoreV1().Secrets(namespace).Get(context.Background(), sa.Secrets[0].Name, metav1.GetOptions{})

// To configure the auth method, we need to read the token and the CA cert from the auth method's
// service account token.
// The JWT token and CA cert is what Vault server will use to validate service account token
// with the Kubernetes API.
var sa *corev1.ServiceAccount
retry.Run(t, func(r *retry.R) {
sa, err = v.kubernetesClient.CoreV1().ServiceAccounts(saNS).Get(context.Background(), saName, metav1.GetOptions{})
require.NoError(t, err)
require.Len(t, sa.Secrets, 1)
})

v.logger.Logf(t, "updating vault kubernetes auth config for %s auth path", authPath)
tokenSecret, err := v.kubernetesClient.CoreV1().Secrets(saNS).Get(context.Background(), sa.Secrets[0].Name, metav1.GetOptions{})
require.NoError(t, err)
_, err = v.vaultClient.Logical().Write("auth/kubernetes/config", map[string]interface{}{
"token_reviewer_jwt": tokenSecret.StringData["token"],
"kubernetes_ca_cert": tokenSecret.StringData["ca.crt"],
"kubernetes_host": "https://kubernetes.default.svc",
_, err = vaultClient.Logical().Write(fmt.Sprintf("auth/%s/config", authPath), map[string]interface{}{
"token_reviewer_jwt": string(tokenSecret.Data["token"]),
"kubernetes_ca_cert": string(tokenSecret.Data["ca.crt"]),
"kubernetes_host": k8sHost,
})
require.NoError(t, err)
}
Expand Down Expand Up @@ -191,7 +217,7 @@ func (v *VaultCluster) Create(t *testing.T, ctx environment.TestContext) {
helpers.WaitForAllPodsToBeReady(t, v.kubernetesClient, v.helmOptions.KubectlOptions.Namespace, v.releaseLabelSelector())

// Now call bootstrap().
v.bootstrap(t, ctx)
v.bootstrap(t)
}

// Destroy issues a helm delete and deletes the PVC + any helm secrets related to the release that are leftover.
Expand Down Expand Up @@ -261,10 +287,23 @@ func (v *VaultCluster) releaseLabelSelector() string {
return fmt.Sprintf("%s=%s", releaseLabel, v.releaseName)
}

// serverEnabled returns true if this Vault cluster has a server.
func (v *VaultCluster) serverEnabled() bool {
serverEnabled, ok := v.helmOptions.SetValues["server.enabled"]
// Server is enabled by default in the Vault Helm chart, so it's enabled either when that helm value is
// not provided or when it's not explicitly disabled.
return !ok || serverEnabled != "false"
}

// createTLSCerts generates a self-signed CA and uses it to generate
// certificate and key for the Vault server. It then saves those as
// Kubernetes secrets.
func (v *VaultCluster) createTLSCerts(t *testing.T) {
if !v.serverEnabled() {
v.logger.Logf(t, "skipping generating Vault TLS certificates because Vault server is not enabled")
return
}

v.logger.Logf(t, "generating Vault TLS certificates")

namespace := v.helmOptions.KubectlOptions.Namespace
Expand Down Expand Up @@ -318,8 +357,12 @@ func (v *VaultCluster) createTLSCerts(t *testing.T) {
// initAndUnseal initializes and unseals Vault.
// Once initialized, it saves the Vault root token into a Kubernetes secret.
func (v *VaultCluster) initAndUnseal(t *testing.T) {
v.logger.Logf(t, "initializing and unsealing Vault")
if !v.serverEnabled() {
v.logger.Logf(t, "skipping initializing and unsealing Vault because Vault server is not enabled")
return
}

v.logger.Logf(t, "initializing and unsealing Vault")
namespace := v.helmOptions.KubectlOptions.Namespace
retrier := &retry.Timer{Timeout: 2 * time.Minute, Wait: 1 * time.Second}
retry.RunWith(retrier, t, func(r *retry.R) {
Expand Down
41 changes: 2 additions & 39 deletions acceptance/tests/mesh-gateway/mesh_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/hashicorp/consul-k8s/acceptance/framework/consul"
"github.com/hashicorp/consul-k8s/acceptance/framework/environment"
"github.com/hashicorp/consul-k8s/acceptance/framework/helpers"
"github.com/hashicorp/consul-k8s/acceptance/framework/k8s"
"github.com/hashicorp/consul-k8s/acceptance/framework/logger"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -108,7 +106,7 @@ func TestMeshGatewayDefault(t *testing.T) {

// Verify federation between servers
logger.Log(t, "verifying federation was successful")
verifyFederation(t, primaryClient, secondaryClient, releaseName, false)
helpers.VerifyFederation(t, primaryClient, secondaryClient, releaseName, false)

// Create a ProxyDefaults resource to configure services to use the mesh
// gateways.
Expand Down Expand Up @@ -244,7 +242,7 @@ func TestMeshGatewaySecure(t *testing.T) {

// Verify federation between servers
logger.Log(t, "verifying federation was successful")
verifyFederation(t, primaryClient, secondaryClient, releaseName, true)
helpers.VerifyFederation(t, primaryClient, secondaryClient, releaseName, true)

// Create a ProxyDefaults resource to configure services to use the mesh
// gateways.
Expand Down Expand Up @@ -280,38 +278,3 @@ func TestMeshGatewaySecure(t *testing.T) {
})
}
}

// verifyFederation checks that the WAN federation between servers is successful
// by first checking members are alive from the perspective of both servers.
// If secure is true, it will also check that the ACL replication is running on the secondary server.
func verifyFederation(t *testing.T, primaryClient, secondaryClient *api.Client, releaseName string, secure bool) {
retrier := &retry.Timer{Timeout: 5 * time.Minute, Wait: 1 * time.Second}
start := time.Now()

// Check that server in dc1 is healthy from the perspective of the server in dc2, and vice versa.
// We're calling the Consul health API, as opposed to checking serf membership status,
// because we need to make sure that the federated servers can make API calls and forward requests
// from one server to another. From running tests in CI for a while and using serf membership status before,
// we've noticed that the status could be "alive" as soon as the server in the secondary cluster joins the primary
// and then switch to "failed". This would require us to check that the status is "alive" is showing consistently for
// some amount of time, which could be quite flakey. Calling the API in another datacenter allows us to check that
// each server can forward calls to another, which is what we need for connect.
retry.RunWith(retrier, t, func(r *retry.R) {
secondaryServerHealth, _, err := primaryClient.Health().Node(fmt.Sprintf("%s-consul-server-0", releaseName), &api.QueryOptions{Datacenter: "dc2"})
require.NoError(r, err)
require.Equal(r, secondaryServerHealth.AggregatedStatus(), api.HealthPassing)

primaryServerHealth, _, err := secondaryClient.Health().Node(fmt.Sprintf("%s-consul-server-0", releaseName), &api.QueryOptions{Datacenter: "dc1"})
require.NoError(r, err)
require.Equal(r, primaryServerHealth.AggregatedStatus(), api.HealthPassing)

if secure {
replicationStatus, _, err := secondaryClient.ACL().Replication(nil)
require.NoError(r, err)
require.True(r, replicationStatus.Enabled)
require.True(r, replicationStatus.Running)
}
})

logger.Logf(t, "Took %s to verify federation", time.Since(start))
}
Loading

0 comments on commit 1c03ab2

Please sign in to comment.