From 70a1c846470a4261550b8654ad097cf2dbf227f2 Mon Sep 17 00:00:00 2001 From: jm96441n Date: Fri, 28 Jun 2024 15:37:17 -0400 Subject: [PATCH] update external registration acl role/policy to include namespace/partition, make service address optional like it is for catalog registration --- .../consul/templates/crd-registrations.yaml | 1 - control-plane/api-gateway/binding/cleanup.go | 3 + .../api-gateway/binding/cleanup_test.go | 3 + .../api-gateway/gatekeeper/ownership.go | 3 + .../api-gateway/gatekeeper/secret.go | 3 + .../api/v1alpha1/registration_types.go | 2 +- control-plane/catalog/registration/cache.go | 195 +++++++++++++----- .../registration/registrations_controller.go | 2 +- .../registrations_controller_test.go | 4 +- .../consul.hashicorp.com_registrations.yaml | 1 - .../inject-connect/v1controllers.go | 2 +- 11 files changed, 163 insertions(+), 56 deletions(-) diff --git a/charts/consul/templates/crd-registrations.yaml b/charts/consul/templates/crd-registrations.yaml index e1e45d3574..32edd278ce 100644 --- a/charts/consul/templates/crd-registrations.yaml +++ b/charts/consul/templates/crd-registrations.yaml @@ -193,7 +193,6 @@ spec: - warning type: object required: - - address - name - port type: object diff --git a/control-plane/api-gateway/binding/cleanup.go b/control-plane/api-gateway/binding/cleanup.go index 4b517a1813..4fe11382d0 100644 --- a/control-plane/api-gateway/binding/cleanup.go +++ b/control-plane/api-gateway/binding/cleanup.go @@ -1,3 +1,6 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + package binding import ( diff --git a/control-plane/api-gateway/binding/cleanup_test.go b/control-plane/api-gateway/binding/cleanup_test.go index 78897fcb87..76fcd60ef9 100644 --- a/control-plane/api-gateway/binding/cleanup_test.go +++ b/control-plane/api-gateway/binding/cleanup_test.go @@ -1,3 +1,6 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + package binding import ( diff --git a/control-plane/api-gateway/gatekeeper/ownership.go b/control-plane/api-gateway/gatekeeper/ownership.go index babf5aa812..9822dc226a 100644 --- a/control-plane/api-gateway/gatekeeper/ownership.go +++ b/control-plane/api-gateway/gatekeeper/ownership.go @@ -1,3 +1,6 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + package gatekeeper import ( diff --git a/control-plane/api-gateway/gatekeeper/secret.go b/control-plane/api-gateway/gatekeeper/secret.go index dfef33c23d..65ee4c0a8b 100644 --- a/control-plane/api-gateway/gatekeeper/secret.go +++ b/control-plane/api-gateway/gatekeeper/secret.go @@ -1,3 +1,6 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + package gatekeeper import ( diff --git a/control-plane/api/v1alpha1/registration_types.go b/control-plane/api/v1alpha1/registration_types.go index f3190aef87..e3dd8ba0f0 100644 --- a/control-plane/api/v1alpha1/registration_types.go +++ b/control-plane/api/v1alpha1/registration_types.go @@ -74,7 +74,7 @@ type Service struct { Tags []string `json:"tags,omitempty"` Meta map[string]string `json:"meta,omitempty"` Port int `json:"port"` - Address string `json:"address"` + Address string `json:"address,omitempty"` SocketPath string `json:"socketPath,omitempty"` TaggedAddresses map[string]ServiceAddress `json:"taggedAddresses,omitempty"` Weights Weights `json:"weights,omitempty"` diff --git a/control-plane/catalog/registration/cache.go b/control-plane/catalog/registration/cache.go index e556ab7a77..4906bc29be 100644 --- a/control-plane/catalog/registration/cache.go +++ b/control-plane/catalog/registration/cache.go @@ -4,39 +4,94 @@ package registration import ( + "bytes" "context" "errors" "fmt" "slices" "strings" "sync" + "text/template" mapset "github.com/deckarep/golang-set/v2" "github.com/go-logr/logr" "github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1" "github.com/hashicorp/consul-k8s/control-plane/consul" capi "github.com/hashicorp/consul/api" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" ) const NotInServiceMeshFilter = "ServiceMeta[\"managed-by\"] != \"consul-k8s-endpoints-controller\"" +func init() { + gatewayTpl = template.Must(template.New("root").Parse(strings.TrimSpace(gatewayRulesTpl))) +} + +type templateArgs struct { + EnablePartitions bool + Partition string + EnableNamespaces bool + Namespace string + ServiceName string +} + +var ( + gatewayTpl *template.Template + gatewayRulesTpl = ` +{{ if .EnablePartitions }} +partition "{{.Partition}}" { +{{- end }} + {{- if .EnableNamespaces }} + namespace "{{.Namespace}}" { + {{- end }} + service "{{.ServiceName}}" { + policy = "write" + } + {{- if .EnableNamespaces }} + } + {{- end }} +{{- if .EnablePartitions }} +} +{{- end }} +` +) + type RegistrationCache struct { + // we include the context here so that we can use it for cancellation of `run` invocations that are scheduled after the cache is started + // this occurs when registering services in a new namespace as we have an invocation of `run` per namespace that is registered + ctx context.Context + ConsulClientConfig *consul.Config ConsulServerConnMgr consul.ServerConnectionManager - serviceMtx *sync.Mutex - Services map[string]*v1alpha1.Registration - synced chan struct{} - UpdateChan chan string + k8sClient client.Client + + serviceMtx *sync.Mutex + Services map[string]*v1alpha1.Registration + + namespaces mapset.Set[string] + + synced chan struct{} + UpdateChan chan string + + namespacesEnabled bool + partitionsEnabled bool } -func NewRegistrationCache(consulClientConfig *consul.Config, consulServerConnMgr consul.ServerConnectionManager) *RegistrationCache { +func NewRegistrationCache(ctx context.Context, consulClientConfig *consul.Config, consulServerConnMgr consul.ServerConnectionManager, k8sClient client.Client, namespacesEnabled, partitionsEnabled bool) *RegistrationCache { return &RegistrationCache{ + ctx: ctx, ConsulClientConfig: consulClientConfig, ConsulServerConnMgr: consulServerConnMgr, + k8sClient: k8sClient, serviceMtx: &sync.Mutex{}, Services: make(map[string]*v1alpha1.Registration), UpdateChan: make(chan string), synced: make(chan struct{}), + namespaces: mapset.NewSet[string](), + namespacesEnabled: namespacesEnabled, + partitionsEnabled: partitionsEnabled, } } @@ -50,13 +105,13 @@ func (c *RegistrationCache) waitSynced(ctx context.Context) { } } -func (c *RegistrationCache) run(ctx context.Context, log logr.Logger) { +func (c *RegistrationCache) run(log logr.Logger, namespace string) { once := &sync.Once{} - opts := &capi.QueryOptions{Filter: NotInServiceMeshFilter} + opts := &capi.QueryOptions{Filter: NotInServiceMeshFilter, Namespace: namespace} for { select { - case <-ctx.Done(): + case <-c.ctx.Done(): return default: @@ -65,7 +120,7 @@ func (c *RegistrationCache) run(ctx context.Context, log logr.Logger) { log.Error(err, "error initializing consul client") continue } - entries, meta, err := client.Catalog().Services(opts.WithContext(ctx)) + entries, meta, err := client.Catalog().Services(opts.WithContext(c.ctx)) if err != nil { // if we timeout we don't care about the error message because it's expected to happen on long polls // any other error we want to alert on @@ -77,20 +132,40 @@ func (c *RegistrationCache) run(ctx context.Context, log logr.Logger) { continue } - diffs := mapset.NewSet[string]() + servicesToRemove := mapset.NewSet[string]() + servicesToAdd := mapset.NewSet[string]() c.serviceMtx.Lock() for svc := range c.Services { if _, ok := entries[svc]; !ok { - diffs.Add(svc) + servicesToRemove.Add(svc) + } + } + + for svc := range entries { + if _, ok := c.Services[svc]; !ok { + servicesToAdd.Add(svc) } } c.serviceMtx.Unlock() - for _, svc := range diffs.ToSlice() { + for _, svc := range servicesToRemove.ToSlice() { log.Info("consul deregistered service", "svcName", svc) c.UpdateChan <- svc } + for _, svc := range servicesToAdd.ToSlice() { + registration := &v1alpha1.Registration{} + + if err := c.k8sClient.Get(c.ctx, types.NamespacedName{Name: svc, Namespace: namespace}, registration); err != nil { + if !k8serrors.IsNotFound(err) { + log.Error(err, "unable to get registration", "svcName", svc) + } + continue + } + + c.Services[svc] = registration + } + opts.WaitIndex = meta.LastIndex once.Do(func() { log.Info("Initial sync complete") @@ -122,21 +197,26 @@ func (c *RegistrationCache) registerService(log logr.Logger, reg *v1alpha1.Regis return err } - _, err = client.Catalog().Register(regReq, nil) + _, err = client.Catalog().Register(regReq, &capi.WriteOptions{Namespace: reg.Spec.Service.Namespace}) if err != nil { log.Error(err, "error registering service", "svcName", regReq.Service.Service) return err } - c.serviceMtx.Lock() - defer c.serviceMtx.Unlock() - c.Services[reg.Spec.Service.Name] = reg + if !c.namespaces.Contains(reg.Spec.Service.Namespace) && !emptyOrDefault(reg.Spec.Service.Namespace) { + c.namespaces.Add(reg.Spec.Service.Namespace) + go c.run(log, reg.Spec.Service.Namespace) + } log.Info("Successfully registered service", "svcName", regReq.Service.Service) return nil } +func emptyOrDefault(s string) bool { + return s == "" || s == "default" +} + func (c *RegistrationCache) updateTermGWACLRole(log logr.Logger, registration *v1alpha1.Registration, termGWsToUpdate []v1alpha1.TerminatingGateway) error { if len(termGWsToUpdate) == 0 { log.Info("terminating gateway not found") @@ -148,38 +228,51 @@ func (c *RegistrationCache) updateTermGWACLRole(log logr.Logger, registration *v return err } - roles, _, err := client.ACL().RoleList(nil) - if err != nil { - log.Error(err, "error reading role list") - return err + var data bytes.Buffer + if err := gatewayTpl.Execute(&data, templateArgs{ + EnablePartitions: c.partitionsEnabled, + Partition: registration.Spec.Service.Partition, + EnableNamespaces: c.namespacesEnabled, + Namespace: registration.Spec.Service.Namespace, + ServiceName: registration.Spec.Service.Name, + }); err != nil { + // just panic if we can't compile the simple template + // as it means something else is going severly wrong. + panic(err) } - policy := &capi.ACLPolicy{ - Name: servicePolicyName(registration.Spec.Service.Name), - Description: "Write policy for terminating gateways for external service", - Rules: fmt.Sprintf(`service %q { policy = "write" }`, registration.Spec.Service.Name), - Datacenters: []string{registration.Spec.Datacenter}, - Namespace: registration.Spec.Service.Namespace, - Partition: registration.Spec.Service.Partition, - } + var mErr error + for _, termGW := range termGWsToUpdate { + // the terminating gateway role is _always_ in the default namespace + roles, _, err := client.ACL().RoleList(&capi.QueryOptions{}) + if err != nil { + log.Error(err, "error reading role list") + return err + } - existingPolicy, _, err := client.ACL().PolicyReadByName(policy.Name, nil) - if err != nil { - log.Error(err, "error reading policy") - return err - } + policy := &capi.ACLPolicy{ + Name: servicePolicyName(registration.Spec.Service.Name), + Description: "Write policy for terminating gateways for external service", + Rules: data.String(), + Datacenters: []string{registration.Spec.Datacenter}, + } - if existingPolicy == nil { - policy, _, err = client.ACL().PolicyCreate(policy, nil) + existingPolicy, _, err := client.ACL().PolicyReadByName(policy.Name, &capi.QueryOptions{}) if err != nil { - return fmt.Errorf("error creating policy: %w", err) + log.Error(err, "error reading policy") + return err } - } else { - policy = existingPolicy - } - var mErr error - for _, termGW := range termGWsToUpdate { + writeOpts := &capi.WriteOptions{} + + if existingPolicy == nil { + policy, _, err = client.ACL().PolicyCreate(policy, writeOpts) + if err != nil { + return fmt.Errorf("error creating policy: %w", err) + } + } else { + policy = existingPolicy + } var role *capi.ACLRole for _, r := range roles { if strings.HasSuffix(r.Name, fmt.Sprintf("-%s-acl-role", termGW.Name)) { @@ -196,7 +289,7 @@ func (c *RegistrationCache) updateTermGWACLRole(log logr.Logger, registration *v role.Policies = append(role.Policies, &capi.ACLRolePolicyLink{Name: policy.Name, ID: policy.ID}) - _, _, err = client.ACL().RoleUpdate(role, nil) + _, _, err = client.ACL().RoleUpdate(role, writeOpts) if err != nil { log.Error(err, "error updating role", "roleName", role.Name) mErr = errors.Join(mErr, fmt.Errorf("error updating role %q", role.Name)) @@ -239,13 +332,17 @@ func (c *RegistrationCache) removeTermGWACLRole(log logr.Logger, registration *v return err } - roles, _, err := client.ACL().RoleList(nil) - if err != nil { - return err - } - var mErr error for _, termGW := range termGWsToUpdate { + + queryOpts := &capi.QueryOptions{} + + writeOpts := &capi.WriteOptions{} + + roles, _, err := client.ACL().RoleList(queryOpts) + if err != nil { + return err + } var role *capi.ACLRole for _, r := range roles { if strings.HasSuffix(r.Name, fmt.Sprintf("-%s-acl-role", termGW.Name)) { @@ -276,14 +373,14 @@ func (c *RegistrationCache) removeTermGWACLRole(log logr.Logger, registration *v continue } - _, _, err = client.ACL().RoleUpdate(role, nil) + _, _, err = client.ACL().RoleUpdate(role, writeOpts) if err != nil { log.Error(err, "error updating role", "roleName", role.Name) mErr = errors.Join(mErr, fmt.Errorf("error updating role %q", role.Name)) continue } - _, err = client.ACL().PolicyDelete(policyID, nil) + _, err = client.ACL().PolicyDelete(policyID, writeOpts) if err != nil { log.Error(err, "error deleting service policy", "policyID", policyID, "policyName", expectedPolicyName) mErr = errors.Join(mErr, fmt.Errorf("error deleting service ACL policy %q", policyID)) diff --git a/control-plane/catalog/registration/registrations_controller.go b/control-plane/catalog/registration/registrations_controller.go index eabcaae979..0b6b4bc24e 100644 --- a/control-plane/catalog/registration/registrations_controller.go +++ b/control-plane/catalog/registration/registrations_controller.go @@ -250,7 +250,7 @@ func (r *RegistrationsController) Logger(name types.NamespacedName) logr.Logger func (r *RegistrationsController) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { // setup the cache - go r.Cache.run(ctx, r.Log) + go r.Cache.run(r.Log, "") r.Cache.waitSynced(ctx) go r.watchForDeregistrations(ctx) diff --git a/control-plane/catalog/registration/registrations_controller_test.go b/control-plane/catalog/registration/registrations_controller_test.go index ccbe053d06..ce3dab14f5 100644 --- a/control-plane/catalog/registration/registrations_controller_test.go +++ b/control-plane/catalog/registration/registrations_controller_test.go @@ -350,7 +350,7 @@ func TestReconcile_Success(tt *testing.T) { Client: fakeClient, Log: logrtest.NewTestLogger(t), Scheme: s, - Cache: registration.NewRegistrationCache(testClient.Cfg, testClient.Watcher), + Cache: registration.NewRegistrationCache(context.Background(), testClient.Cfg, testClient.Watcher, fakeClient, false, false), } _, err := controller.Reconcile(ctx, ctrl.Request{ @@ -870,7 +870,7 @@ func TestReconcile_Failure(tt *testing.T) { Client: fakeClient, Log: logrtest.NewTestLogger(t), Scheme: s, - Cache: registration.NewRegistrationCache(testClient.Cfg, testClient.Watcher), + Cache: registration.NewRegistrationCache(context.Background(), testClient.Cfg, testClient.Watcher, fakeClient, false, false), } _, err := controller.Reconcile(ctx, ctrl.Request{ diff --git a/control-plane/config/crd/bases/consul.hashicorp.com_registrations.yaml b/control-plane/config/crd/bases/consul.hashicorp.com_registrations.yaml index df8970512b..c36167b5a2 100644 --- a/control-plane/config/crd/bases/consul.hashicorp.com_registrations.yaml +++ b/control-plane/config/crd/bases/consul.hashicorp.com_registrations.yaml @@ -189,7 +189,6 @@ spec: - warning type: object required: - - address - name - port type: object diff --git a/control-plane/subcommand/inject-connect/v1controllers.go b/control-plane/subcommand/inject-connect/v1controllers.go index ce7f80dd53..52d6f66259 100644 --- a/control-plane/subcommand/inject-connect/v1controllers.go +++ b/control-plane/subcommand/inject-connect/v1controllers.go @@ -290,7 +290,7 @@ func (c *Command) configureControllers(ctx context.Context, mgr manager.Manager, if err := (®istration.RegistrationsController{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), - Cache: registration.NewRegistrationCache(consulConfig, watcher), + Cache: registration.NewRegistrationCache(ctx, consulConfig, watcher, mgr.GetClient(), c.flagEnableNamespaces, c.flagEnablePartitions), Log: ctrl.Log.WithName("controller").WithName(apicommon.Registration), }).SetupWithManager(ctx, mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", apicommon.Registration)