Skip to content

Commit

Permalink
update external registration acl role/policy to include namespace/par…
Browse files Browse the repository at this point in the history
…tition, make service address optional like it is for catalog registration
  • Loading branch information
jm96441n committed Jun 28, 2024
1 parent dad675c commit 70a1c84
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 56 deletions.
1 change: 0 additions & 1 deletion charts/consul/templates/crd-registrations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ spec:
- warning
type: object
required:
- address
- name
- port
type: object
Expand Down
3 changes: 3 additions & 0 deletions control-plane/api-gateway/binding/cleanup.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package binding

import (
Expand Down
3 changes: 3 additions & 0 deletions control-plane/api-gateway/binding/cleanup_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package binding

import (
Expand Down
3 changes: 3 additions & 0 deletions control-plane/api-gateway/gatekeeper/ownership.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package gatekeeper

import (
Expand Down
3 changes: 3 additions & 0 deletions control-plane/api-gateway/gatekeeper/secret.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package gatekeeper

import (
Expand Down
2 changes: 1 addition & 1 deletion control-plane/api/v1alpha1/registration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type Service struct {
Tags []string `json:"tags,omitempty"`
Meta map[string]string `json:"meta,omitempty"`
Port int `json:"port"`
Address string `json:"address"`
Address string `json:"address,omitempty"`
SocketPath string `json:"socketPath,omitempty"`
TaggedAddresses map[string]ServiceAddress `json:"taggedAddresses,omitempty"`
Weights Weights `json:"weights,omitempty"`
Expand Down
195 changes: 146 additions & 49 deletions control-plane/catalog/registration/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,94 @@
package registration

import (
"bytes"
"context"
"errors"
"fmt"
"slices"
"strings"
"sync"
"text/template"

mapset "github.com/deckarep/golang-set/v2"
"github.com/go-logr/logr"
"github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1"
"github.com/hashicorp/consul-k8s/control-plane/consul"
capi "github.com/hashicorp/consul/api"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const NotInServiceMeshFilter = "ServiceMeta[\"managed-by\"] != \"consul-k8s-endpoints-controller\""

func init() {
gatewayTpl = template.Must(template.New("root").Parse(strings.TrimSpace(gatewayRulesTpl)))
}

type templateArgs struct {
EnablePartitions bool
Partition string
EnableNamespaces bool
Namespace string
ServiceName string
}

var (
gatewayTpl *template.Template
gatewayRulesTpl = `
{{ if .EnablePartitions }}
partition "{{.Partition}}" {
{{- end }}
{{- if .EnableNamespaces }}
namespace "{{.Namespace}}" {
{{- end }}
service "{{.ServiceName}}" {
policy = "write"
}
{{- if .EnableNamespaces }}
}
{{- end }}
{{- if .EnablePartitions }}
}
{{- end }}
`
)

type RegistrationCache struct {
// we include the context here so that we can use it for cancellation of `run` invocations that are scheduled after the cache is started
// this occurs when registering services in a new namespace as we have an invocation of `run` per namespace that is registered
ctx context.Context

ConsulClientConfig *consul.Config
ConsulServerConnMgr consul.ServerConnectionManager
serviceMtx *sync.Mutex
Services map[string]*v1alpha1.Registration
synced chan struct{}
UpdateChan chan string
k8sClient client.Client

serviceMtx *sync.Mutex
Services map[string]*v1alpha1.Registration

namespaces mapset.Set[string]

synced chan struct{}
UpdateChan chan string

namespacesEnabled bool
partitionsEnabled bool
}

func NewRegistrationCache(consulClientConfig *consul.Config, consulServerConnMgr consul.ServerConnectionManager) *RegistrationCache {
func NewRegistrationCache(ctx context.Context, consulClientConfig *consul.Config, consulServerConnMgr consul.ServerConnectionManager, k8sClient client.Client, namespacesEnabled, partitionsEnabled bool) *RegistrationCache {
return &RegistrationCache{
ctx: ctx,
ConsulClientConfig: consulClientConfig,
ConsulServerConnMgr: consulServerConnMgr,
k8sClient: k8sClient,
serviceMtx: &sync.Mutex{},
Services: make(map[string]*v1alpha1.Registration),
UpdateChan: make(chan string),
synced: make(chan struct{}),
namespaces: mapset.NewSet[string](),
namespacesEnabled: namespacesEnabled,
partitionsEnabled: partitionsEnabled,
}
}

Expand All @@ -50,13 +105,13 @@ func (c *RegistrationCache) waitSynced(ctx context.Context) {
}
}

func (c *RegistrationCache) run(ctx context.Context, log logr.Logger) {
func (c *RegistrationCache) run(log logr.Logger, namespace string) {
once := &sync.Once{}
opts := &capi.QueryOptions{Filter: NotInServiceMeshFilter}
opts := &capi.QueryOptions{Filter: NotInServiceMeshFilter, Namespace: namespace}

for {
select {
case <-ctx.Done():
case <-c.ctx.Done():
return
default:

Expand All @@ -65,7 +120,7 @@ func (c *RegistrationCache) run(ctx context.Context, log logr.Logger) {
log.Error(err, "error initializing consul client")
continue
}
entries, meta, err := client.Catalog().Services(opts.WithContext(ctx))
entries, meta, err := client.Catalog().Services(opts.WithContext(c.ctx))
if err != nil {
// if we timeout we don't care about the error message because it's expected to happen on long polls
// any other error we want to alert on
Expand All @@ -77,20 +132,40 @@ func (c *RegistrationCache) run(ctx context.Context, log logr.Logger) {
continue
}

diffs := mapset.NewSet[string]()
servicesToRemove := mapset.NewSet[string]()
servicesToAdd := mapset.NewSet[string]()
c.serviceMtx.Lock()
for svc := range c.Services {
if _, ok := entries[svc]; !ok {
diffs.Add(svc)
servicesToRemove.Add(svc)
}
}

for svc := range entries {
if _, ok := c.Services[svc]; !ok {
servicesToAdd.Add(svc)
}
}
c.serviceMtx.Unlock()

for _, svc := range diffs.ToSlice() {
for _, svc := range servicesToRemove.ToSlice() {
log.Info("consul deregistered service", "svcName", svc)
c.UpdateChan <- svc
}

for _, svc := range servicesToAdd.ToSlice() {
registration := &v1alpha1.Registration{}

if err := c.k8sClient.Get(c.ctx, types.NamespacedName{Name: svc, Namespace: namespace}, registration); err != nil {
if !k8serrors.IsNotFound(err) {
log.Error(err, "unable to get registration", "svcName", svc)
}
continue
}

c.Services[svc] = registration
}

opts.WaitIndex = meta.LastIndex
once.Do(func() {
log.Info("Initial sync complete")
Expand Down Expand Up @@ -122,21 +197,26 @@ func (c *RegistrationCache) registerService(log logr.Logger, reg *v1alpha1.Regis
return err
}

_, err = client.Catalog().Register(regReq, nil)
_, err = client.Catalog().Register(regReq, &capi.WriteOptions{Namespace: reg.Spec.Service.Namespace})
if err != nil {
log.Error(err, "error registering service", "svcName", regReq.Service.Service)
return err
}

c.serviceMtx.Lock()
defer c.serviceMtx.Unlock()
c.Services[reg.Spec.Service.Name] = reg
if !c.namespaces.Contains(reg.Spec.Service.Namespace) && !emptyOrDefault(reg.Spec.Service.Namespace) {
c.namespaces.Add(reg.Spec.Service.Namespace)
go c.run(log, reg.Spec.Service.Namespace)
}

log.Info("Successfully registered service", "svcName", regReq.Service.Service)

return nil
}

func emptyOrDefault(s string) bool {
return s == "" || s == "default"
}

func (c *RegistrationCache) updateTermGWACLRole(log logr.Logger, registration *v1alpha1.Registration, termGWsToUpdate []v1alpha1.TerminatingGateway) error {
if len(termGWsToUpdate) == 0 {
log.Info("terminating gateway not found")
Expand All @@ -148,38 +228,51 @@ func (c *RegistrationCache) updateTermGWACLRole(log logr.Logger, registration *v
return err
}

roles, _, err := client.ACL().RoleList(nil)
if err != nil {
log.Error(err, "error reading role list")
return err
var data bytes.Buffer
if err := gatewayTpl.Execute(&data, templateArgs{
EnablePartitions: c.partitionsEnabled,
Partition: registration.Spec.Service.Partition,
EnableNamespaces: c.namespacesEnabled,
Namespace: registration.Spec.Service.Namespace,
ServiceName: registration.Spec.Service.Name,
}); err != nil {
// just panic if we can't compile the simple template
// as it means something else is going severly wrong.
panic(err)
}

policy := &capi.ACLPolicy{
Name: servicePolicyName(registration.Spec.Service.Name),
Description: "Write policy for terminating gateways for external service",
Rules: fmt.Sprintf(`service %q { policy = "write" }`, registration.Spec.Service.Name),
Datacenters: []string{registration.Spec.Datacenter},
Namespace: registration.Spec.Service.Namespace,
Partition: registration.Spec.Service.Partition,
}
var mErr error
for _, termGW := range termGWsToUpdate {
// the terminating gateway role is _always_ in the default namespace
roles, _, err := client.ACL().RoleList(&capi.QueryOptions{})
if err != nil {
log.Error(err, "error reading role list")
return err
}

existingPolicy, _, err := client.ACL().PolicyReadByName(policy.Name, nil)
if err != nil {
log.Error(err, "error reading policy")
return err
}
policy := &capi.ACLPolicy{
Name: servicePolicyName(registration.Spec.Service.Name),
Description: "Write policy for terminating gateways for external service",
Rules: data.String(),
Datacenters: []string{registration.Spec.Datacenter},
}

if existingPolicy == nil {
policy, _, err = client.ACL().PolicyCreate(policy, nil)
existingPolicy, _, err := client.ACL().PolicyReadByName(policy.Name, &capi.QueryOptions{})
if err != nil {
return fmt.Errorf("error creating policy: %w", err)
log.Error(err, "error reading policy")
return err
}
} else {
policy = existingPolicy
}

var mErr error
for _, termGW := range termGWsToUpdate {
writeOpts := &capi.WriteOptions{}

if existingPolicy == nil {
policy, _, err = client.ACL().PolicyCreate(policy, writeOpts)
if err != nil {
return fmt.Errorf("error creating policy: %w", err)
}
} else {
policy = existingPolicy
}
var role *capi.ACLRole
for _, r := range roles {
if strings.HasSuffix(r.Name, fmt.Sprintf("-%s-acl-role", termGW.Name)) {
Expand All @@ -196,7 +289,7 @@ func (c *RegistrationCache) updateTermGWACLRole(log logr.Logger, registration *v

role.Policies = append(role.Policies, &capi.ACLRolePolicyLink{Name: policy.Name, ID: policy.ID})

_, _, err = client.ACL().RoleUpdate(role, nil)
_, _, err = client.ACL().RoleUpdate(role, writeOpts)
if err != nil {
log.Error(err, "error updating role", "roleName", role.Name)
mErr = errors.Join(mErr, fmt.Errorf("error updating role %q", role.Name))
Expand Down Expand Up @@ -239,13 +332,17 @@ func (c *RegistrationCache) removeTermGWACLRole(log logr.Logger, registration *v
return err
}

roles, _, err := client.ACL().RoleList(nil)
if err != nil {
return err
}

var mErr error
for _, termGW := range termGWsToUpdate {

queryOpts := &capi.QueryOptions{}

writeOpts := &capi.WriteOptions{}

roles, _, err := client.ACL().RoleList(queryOpts)
if err != nil {
return err
}
var role *capi.ACLRole
for _, r := range roles {
if strings.HasSuffix(r.Name, fmt.Sprintf("-%s-acl-role", termGW.Name)) {
Expand Down Expand Up @@ -276,14 +373,14 @@ func (c *RegistrationCache) removeTermGWACLRole(log logr.Logger, registration *v
continue
}

_, _, err = client.ACL().RoleUpdate(role, nil)
_, _, err = client.ACL().RoleUpdate(role, writeOpts)
if err != nil {
log.Error(err, "error updating role", "roleName", role.Name)
mErr = errors.Join(mErr, fmt.Errorf("error updating role %q", role.Name))
continue
}

_, err = client.ACL().PolicyDelete(policyID, nil)
_, err = client.ACL().PolicyDelete(policyID, writeOpts)
if err != nil {
log.Error(err, "error deleting service policy", "policyID", policyID, "policyName", expectedPolicyName)
mErr = errors.Join(mErr, fmt.Errorf("error deleting service ACL policy %q", policyID))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (r *RegistrationsController) Logger(name types.NamespacedName) logr.Logger

func (r *RegistrationsController) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
// setup the cache
go r.Cache.run(ctx, r.Log)
go r.Cache.run(r.Log, "")
r.Cache.waitSynced(ctx)

go r.watchForDeregistrations(ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func TestReconcile_Success(tt *testing.T) {
Client: fakeClient,
Log: logrtest.NewTestLogger(t),
Scheme: s,
Cache: registration.NewRegistrationCache(testClient.Cfg, testClient.Watcher),
Cache: registration.NewRegistrationCache(context.Background(), testClient.Cfg, testClient.Watcher, fakeClient, false, false),
}

_, err := controller.Reconcile(ctx, ctrl.Request{
Expand Down Expand Up @@ -870,7 +870,7 @@ func TestReconcile_Failure(tt *testing.T) {
Client: fakeClient,
Log: logrtest.NewTestLogger(t),
Scheme: s,
Cache: registration.NewRegistrationCache(testClient.Cfg, testClient.Watcher),
Cache: registration.NewRegistrationCache(context.Background(), testClient.Cfg, testClient.Watcher, fakeClient, false, false),
}

_, err := controller.Reconcile(ctx, ctrl.Request{
Expand Down
Loading

0 comments on commit 70a1c84

Please sign in to comment.