Skip to content

Commit

Permalink
Backport of consul: set partition for gateway config entries into rel…
Browse files Browse the repository at this point in the history
…ease/1.8.x (#22408)

Co-authored-by: Tim Gross <tgross@hashicorp.com>
  • Loading branch information
hc-github-team-nomad-core and tgross committed May 29, 2024
1 parent 6bcd941 commit 4f40612
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 52 deletions.
7 changes: 7 additions & 0 deletions .changelog/22228.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:bug
consul: Fixed a bug where gateway config entries were written to the Nomad server agent's Consul partition and not the client's partition
```

```release-note:bug
consul: (Enterprise) Fixed a bug where gateway config entries were written before Sentinel policies were enforced
```
17 changes: 13 additions & 4 deletions command/agent/consul/config_entries_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/lib/lang"
)

var _ ConfigAPI = (*MockConfigsAPI)(nil)
Expand All @@ -18,7 +19,7 @@ type MockConfigsAPI struct {
lock sync.Mutex
state struct {
error error
entries map[string]api.ConfigEntry
entries map[string]lang.Pair[api.ConfigEntry, *api.WriteOptions]
}
}

Expand All @@ -27,8 +28,8 @@ func NewMockConfigsAPI(l hclog.Logger) *MockConfigsAPI {
logger: l.Named("mock_consul"),
state: struct {
error error
entries map[string]api.ConfigEntry
}{entries: make(map[string]api.ConfigEntry)},
entries map[string]lang.Pair[api.ConfigEntry, *api.WriteOptions]
}{entries: make(map[string]lang.Pair[api.ConfigEntry, *api.WriteOptions])},
}
}

Expand All @@ -41,7 +42,7 @@ func (m *MockConfigsAPI) Set(entry api.ConfigEntry, w *api.WriteOptions) (bool,
return false, nil, m.state.error
}

m.state.entries[entry.GetName()] = entry
m.state.entries[entry.GetName()] = lang.Pair[api.ConfigEntry, *api.WriteOptions]{First: entry, Second: w}

return true, &api.WriteMeta{
RequestTime: 1,
Expand All @@ -56,3 +57,11 @@ func (m *MockConfigsAPI) SetError(err error) {

m.state.error = err
}

// GetEntry is a helper method so that test can verify what's been written
func (m *MockConfigsAPI) GetEntry(kind string) (api.ConfigEntry, *api.WriteOptions) {
m.lock.Lock()
defer m.lock.Unlock()
entry := m.state.entries[kind]
return entry.First, entry.Second
}
19 changes: 11 additions & 8 deletions nomad/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,11 @@ func (s *Server) purgeSITokenAccessors(accessors []*structs.SITokenAccessor) err
type ConsulConfigsAPI interface {
// SetIngressCE adds the given ConfigEntry to Consul, overwriting
// the previous entry if set.
SetIngressCE(ctx context.Context, namespace, service, cluster string, entry *structs.ConsulIngressConfigEntry) error
SetIngressCE(ctx context.Context, namespace, service, cluster, partition string, entry *structs.ConsulIngressConfigEntry) error

// SetTerminatingCE adds the given ConfigEntry to Consul, overwriting
// the previous entry if set.
SetTerminatingCE(ctx context.Context, namespace, service, cluster string, entry *structs.ConsulTerminatingConfigEntry) error
SetTerminatingCE(ctx context.Context, namespace, service, cluster, partition string, entry *structs.ConsulTerminatingConfigEntry) error

// Stop is used to stop additional creations of Configuration Entries. Intended to
// be used on Nomad Server shutdown.
Expand Down Expand Up @@ -552,16 +552,16 @@ func (c *consulConfigsAPI) Stop() {
c.stopped = true
}

func (c *consulConfigsAPI) SetIngressCE(ctx context.Context, namespace, service, cluster string, entry *structs.ConsulIngressConfigEntry) error {
return c.setCE(ctx, convertIngressCE(namespace, service, entry), cluster)
func (c *consulConfigsAPI) SetIngressCE(ctx context.Context, namespace, service, cluster, partition string, entry *structs.ConsulIngressConfigEntry) error {
return c.setCE(ctx, convertIngressCE(namespace, service, entry), cluster, partition)
}

func (c *consulConfigsAPI) SetTerminatingCE(ctx context.Context, namespace, service, cluster string, entry *structs.ConsulTerminatingConfigEntry) error {
return c.setCE(ctx, convertTerminatingCE(namespace, service, entry), cluster)
func (c *consulConfigsAPI) SetTerminatingCE(ctx context.Context, namespace, service, cluster, partition string, entry *structs.ConsulTerminatingConfigEntry) error {
return c.setCE(ctx, convertTerminatingCE(namespace, service, entry), cluster, partition)
}

// setCE will set the Configuration Entry of any type Consul supports.
func (c *consulConfigsAPI) setCE(ctx context.Context, entry api.ConfigEntry, cluster string) error {
func (c *consulConfigsAPI) setCE(ctx context.Context, entry api.ConfigEntry, cluster, partition string) error {
defer metrics.MeasureSince([]string{"nomad", "consul", "create_config_entry"}, time.Now())

// make sure the background deletion goroutine has not been stopped
Expand All @@ -579,7 +579,10 @@ func (c *consulConfigsAPI) setCE(ctx context.Context, entry api.ConfigEntry, clu
}

client := c.configsClientFunc(cluster)
_, _, err := client.Set(entry, &api.WriteOptions{Namespace: entry.GetNamespace()})
_, _, err := client.Set(entry, &api.WriteOptions{
Namespace: entry.GetNamespace(),
Partition: partition,
})
return err
}

Expand Down
65 changes: 44 additions & 21 deletions nomad/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"testing"
"time"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)
Expand All @@ -27,56 +29,77 @@ var _ ConsulConfigsAPI = (*consulConfigsAPI)(nil)
func TestConsulConfigsAPI_SetCE(t *testing.T) {
ci.Parallel(t)

try := func(t *testing.T, expect error, f func(ConsulConfigsAPI) error) {
try := func(t *testing.T,
expectErr error,
expectKey string,
expectConfig api.ConfigEntry,
expectWriteOpts *api.WriteOptions,
f func(ConsulConfigsAPI) error) {

logger := testlog.HCLogger(t)
configsAPI := consul.NewMockConfigsAPI(logger)
configsAPI.SetError(expect)
configsAPI.SetError(expectErr)
configsAPIFunc := func(_ string) consul.ConfigAPI { return configsAPI }

c := NewConsulConfigsAPI(configsAPIFunc, logger)
err := f(c) // set the config entry

switch expect {
entry, wo := configsAPI.GetEntry(expectKey)
must.Eq(t, expectConfig, entry)
must.Eq(t, expectWriteOpts, wo)

switch expectErr {
case nil:
require.NoError(t, err)
must.NoError(t, err)
default:
require.Equal(t, expect, err)
must.EqError(t, err, expectErr.Error())
}
}

ctx := context.Background()

// existing behavior is no set namespace
consulNamespace := ""
partition := "foo"

ingressCE := new(structs.ConsulIngressConfigEntry)
t.Run("ingress ok", func(t *testing.T) {
try(t, nil, func(c ConsulConfigsAPI) error {
return c.SetIngressCE(
ctx, consulNamespace, "ig", structs.ConsulDefaultCluster, ingressCE)
})
try(t, nil, "ig",
&api.IngressGatewayConfigEntry{Kind: "ingress-gateway", Name: "ig"},
&api.WriteOptions{Partition: partition},
func(c ConsulConfigsAPI) error {
return c.SetIngressCE(
ctx, consulNamespace, "ig", structs.ConsulDefaultCluster, partition, ingressCE)
})
})

t.Run("ingress fail", func(t *testing.T) {
try(t, errors.New("consul broke"), func(c ConsulConfigsAPI) error {
return c.SetIngressCE(
ctx, consulNamespace, "ig", structs.ConsulDefaultCluster, ingressCE)
})
try(t, errors.New("consul broke"),
"ig", nil, nil,
func(c ConsulConfigsAPI) error {
return c.SetIngressCE(
ctx, consulNamespace, "ig", structs.ConsulDefaultCluster, partition, ingressCE)
})
})

terminatingCE := new(structs.ConsulTerminatingConfigEntry)
t.Run("terminating ok", func(t *testing.T) {
try(t, nil, func(c ConsulConfigsAPI) error {
return c.SetTerminatingCE(
ctx, consulNamespace, "tg", structs.ConsulDefaultCluster, terminatingCE)
})
try(t, nil, "tg",
&api.TerminatingGatewayConfigEntry{Kind: "terminating-gateway", Name: "tg"},
&api.WriteOptions{Partition: partition},
func(c ConsulConfigsAPI) error {
return c.SetTerminatingCE(
ctx, consulNamespace, "tg", structs.ConsulDefaultCluster, partition, terminatingCE)
})
})

t.Run("terminating fail", func(t *testing.T) {
try(t, errors.New("consul broke"), func(c ConsulConfigsAPI) error {
return c.SetTerminatingCE(
ctx, consulNamespace, "tg", structs.ConsulDefaultCluster, terminatingCE)
})
try(t, errors.New("consul broke"),
"tg", nil, nil,
func(c ConsulConfigsAPI) error {
return c.SetTerminatingCE(
ctx, consulNamespace, "tg", structs.ConsulDefaultCluster, partition, terminatingCE)
})
})

// also mesh
Expand Down
40 changes: 21 additions & 19 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,23 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}

// Enforce Sentinel policies. Pass a copy of the job to prevent
// sentinel from altering it.
ns, err := snap.NamespaceByName(nil, args.RequestNamespace())
if err != nil {
return err
}

policyWarnings, err := j.enforceSubmitJob(args.PolicyOverride, args.Job.Copy(),
existingJob, args.GetIdentity().GetACLToken(), ns)
if err != nil {
return err
}
if policyWarnings != nil {
warnings = append(warnings, policyWarnings)
reply.Warnings = helper.MergeMultierrorWarnings(warnings...)
}

// Create or Update Consul Configuration Entries defined in the job. For now
// Nomad only supports Configuration Entries types
// - "ingress-gateway" for managing Ingress Gateways
Expand All @@ -282,34 +299,19 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis

for ns, entries := range args.Job.ConfigEntries() {
for service, entry := range entries.Ingress {
if errCE := j.srv.consulConfigEntries.SetIngressCE(ctx, ns, service, entries.Cluster, entry); errCE != nil {
if errCE := j.srv.consulConfigEntries.SetIngressCE(
ctx, ns, service, entries.Cluster, entries.Partition, entry); errCE != nil {
return errCE
}
}
for service, entry := range entries.Terminating {
if errCE := j.srv.consulConfigEntries.SetTerminatingCE(ctx, ns, service, entries.Cluster, entry); errCE != nil {
if errCE := j.srv.consulConfigEntries.SetTerminatingCE(
ctx, ns, service, entries.Cluster, entries.Partition, entry); errCE != nil {
return errCE
}
}
}

// Enforce Sentinel policies. Pass a copy of the job to prevent
// sentinel from altering it.
ns, err := snap.NamespaceByName(nil, args.RequestNamespace())
if err != nil {
return err
}

policyWarnings, err := j.enforceSubmitJob(args.PolicyOverride, args.Job.Copy(),
existingJob, args.GetIdentity().GetACLToken(), ns)
if err != nil {
return err
}
if policyWarnings != nil {
warnings = append(warnings, policyWarnings)
reply.Warnings = helper.MergeMultierrorWarnings(warnings...)
}

// Clear the Vault token
args.Job.VaultToken = ""

Expand Down
7 changes: 7 additions & 0 deletions nomad/structs/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
// a single Consul namespace.
type ConsulConfigEntries struct {
Cluster string
Partition string
Ingress map[string]*ConsulIngressConfigEntry
Terminating map[string]*ConsulTerminatingConfigEntry
}
Expand All @@ -43,9 +44,15 @@ func (j *Job) ConfigEntries() map[string]*ConsulConfigEntries {
if ig := gateway.Ingress; ig != nil {
collection[ns].Ingress[service.Name] = ig
collection[ns].Cluster = service.Cluster
if tg.Consul != nil {
collection[ns].Partition = tg.Consul.Partition
}
} else if term := gateway.Terminating; term != nil {
collection[ns].Terminating[service.Name] = term
collection[ns].Cluster = service.Cluster
if tg.Consul != nil {
collection[ns].Partition = tg.Consul.Partition
}
}
}
}
Expand Down

0 comments on commit 4f40612

Please sign in to comment.