Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobspec: add support for destination partition to upstream block #20167

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/20167.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
consul/connect: Added support for destination partition in `upstream` block
```
19 changes: 6 additions & 13 deletions api/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ type ConsulUpstream struct {
DestinationName string `mapstructure:"destination_name" hcl:"destination_name,optional"`
DestinationNamespace string `mapstructure:"destination_namespace" hcl:"destination_namespace,optional"`
DestinationPeer string `mapstructure:"destination_peer" hcl:"destination_peer,optional"`
DestinationPartition string `mapstructure:"destination_partition" hcl:"destination_partition,optional"`
DestinationType string `mapstructure:"destination_type" hcl:"destination_type,optional"`
LocalBindPort int `mapstructure:"local_bind_port" hcl:"local_bind_port,optional"`
Datacenter string `mapstructure:"datacenter" hcl:"datacenter,optional"`
Expand All @@ -238,19 +239,11 @@ func (cu *ConsulUpstream) Copy() *ConsulUpstream {
if cu == nil {
return nil
}
return &ConsulUpstream{
DestinationName: cu.DestinationName,
DestinationNamespace: cu.DestinationNamespace,
DestinationPeer: cu.DestinationPeer,
DestinationType: cu.DestinationType,
LocalBindPort: cu.LocalBindPort,
Datacenter: cu.Datacenter,
LocalBindAddress: cu.LocalBindAddress,
LocalBindSocketPath: cu.LocalBindSocketPath,
LocalBindSocketMode: cu.LocalBindSocketMode,
MeshGateway: cu.MeshGateway.Copy(),
Config: maps.Clone(cu.Config),
}
up := new(ConsulUpstream)
*up = *cu
up.MeshGateway = cu.MeshGateway.Copy()
up.Config = maps.Clone(cu.Config)
return up
}

func (cu *ConsulUpstream) Canonicalize() {
Expand Down
1 change: 1 addition & 0 deletions api/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func TestConsulUpstream_Copy(t *testing.T) {
DestinationName: "dest1",
DestinationNamespace: "ns2",
DestinationPeer: "10.0.0.1:6379",
DestinationPartition: "infra",
DestinationType: "tcp",
Datacenter: "dc2",
LocalBindPort: 2000,
Expand Down
1 change: 1 addition & 0 deletions command/agent/consul/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func connectUpstreams(in []structs.ConsulUpstream) []api.Upstream {
DestinationName: upstream.DestinationName,
DestinationNamespace: upstream.DestinationNamespace,
DestinationType: api.UpstreamDestType(upstream.DestinationType),
DestinationPartition: upstream.DestinationPartition,
DestinationPeer: upstream.DestinationPeer,
LocalBindPort: upstream.LocalBindPort,
LocalBindSocketPath: upstream.LocalBindSocketPath,
Expand Down
2 changes: 2 additions & 0 deletions command/agent/consul/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ func TestConnect_connectUpstreams(t *testing.T) {
}, {
DestinationName: "bar",
DestinationPeer: "10.0.0.1:6379",
DestinationPartition: "infra",
DestinationType: "tcp",
DestinationNamespace: "ns2",
LocalBindPort: 9000,
Expand All @@ -391,6 +392,7 @@ func TestConnect_connectUpstreams(t *testing.T) {
DestinationName: "bar",
DestinationNamespace: "ns2",
DestinationPeer: "10.0.0.1:6379",
DestinationPartition: "infra",
DestinationType: "tcp",
LocalBindPort: 9000,
LocalBindSocketPath: "/var/run/testsocket.sock",
Expand Down
2 changes: 2 additions & 0 deletions command/agent/consul/service_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ func proxyUpstreamsDifferent(wanted *api.AgentServiceConnect, sidecar *api.Agent
return true
case A.DestinationPeer != B.DestinationPeer:
return true
case A.DestinationPartition != B.DestinationPartition:
return true
case A.DestinationType != B.DestinationType:
return true
case A.LocalBindSocketPath != B.LocalBindSocketPath:
Expand Down
9 changes: 9 additions & 0 deletions command/agent/consul/service_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,15 @@ func TestSyncLogic_proxyUpstreamsDifferent(t *testing.T) {
}
})

try(t, "different destination partition", func(p proxy) {
diff := upstream1()
diff.DestinationPartition = "foo"
p.Upstreams = []api.Upstream{
diff,
upstream2(),
}
})

try(t, "different destination type", func(p proxy) {
diff := upstream1()
diff.DestinationType = "service"
Expand Down
1 change: 1 addition & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1881,6 +1881,7 @@ func apiUpstreamsToStructs(in []*api.ConsulUpstream) []structs.ConsulUpstream {
DestinationName: upstream.DestinationName,
DestinationNamespace: upstream.DestinationNamespace,
DestinationPeer: upstream.DestinationPeer,
DestinationPartition: upstream.DestinationPartition,
DestinationType: upstream.DestinationType,
LocalBindPort: upstream.LocalBindPort,
LocalBindSocketPath: upstream.LocalBindSocketPath,
Expand Down
2 changes: 2 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4027,6 +4027,7 @@ func TestConversion_apiUpstreamsToStructs(t *testing.T) {
DestinationName: "upstream",
DestinationNamespace: "ns2",
DestinationPeer: "10.0.0.1:6379",
DestinationPartition: "infra",
DestinationType: "tcp",
LocalBindPort: 8000,
LocalBindSocketPath: "/var/run/testsocket.sock",
Expand All @@ -4038,6 +4039,7 @@ func TestConversion_apiUpstreamsToStructs(t *testing.T) {
DestinationName: "upstream",
DestinationNamespace: "ns2",
DestinationPeer: "10.0.0.1:6379",
DestinationPartition: "infra",
DestinationType: "tcp",
LocalBindPort: 8000,
LocalBindSocketPath: "/var/run/testsocket.sock",
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,7 @@ func parseUpstream(uo *ast.ObjectItem) (*api.ConsulUpstream, error) {
valid := []string{
"destination_name",
"destination_peer",
"destination_partition",
"destination_type",
"local_bind_port",
"local_bind_address",
Expand Down
17 changes: 9 additions & 8 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,14 +1209,15 @@ func TestParse(t *testing.T) {
LocalServicePort: 8080,
Upstreams: []*api.ConsulUpstream{
{
DestinationName: "other-service",
DestinationPeer: "10.0.0.1:6379",
DestinationType: "tcp",
LocalBindPort: 4567,
LocalBindAddress: "0.0.0.0",
LocalBindSocketPath: "/var/run/testsocket.sock",
LocalBindSocketMode: "0666",
Datacenter: "dc1",
DestinationName: "other-service",
DestinationPeer: "10.0.0.1:6379",
DestinationPartition: "infra",
DestinationType: "tcp",
LocalBindPort: 4567,
LocalBindAddress: "0.0.0.0",
LocalBindSocketPath: "/var/run/testsocket.sock",
LocalBindSocketMode: "0666",
Datacenter: "dc1",

MeshGateway: &api.ConsulMeshGateway{
Mode: "local",
Expand Down
1 change: 1 addition & 0 deletions jobspec/test-fixtures/tg-network.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ job "foo" {
upstreams {
destination_name = "other-service"
destination_peer = "10.0.0.1:6379"
destination_partition = "infra"
destination_type = "tcp"
local_bind_port = 4567
local_bind_address = "0.0.0.0"
Expand Down
30 changes: 22 additions & 8 deletions nomad/structs/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3830,6 +3830,12 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "",
New: "ns2",
},
{
Type: DiffTypeNone,
Name: "DestinationPartition",
Old: "",
New: "",
},
{
Type: DiffTypeNone,
Name: "DestinationPeer",
Expand Down Expand Up @@ -9940,14 +9946,15 @@ func TestServicesDiff(t *testing.T) {
LocalServicePort: 8080,
Upstreams: []ConsulUpstream{
{
DestinationName: "count-api",
LocalBindPort: 8080,
Datacenter: "dc2",
LocalBindAddress: "127.0.0.1",
LocalBindSocketMode: "0700",
LocalBindSocketPath: "/tmp/redis_5678.sock",
DestinationPeer: "cloud-services",
DestinationType: "service",
DestinationName: "count-api",
LocalBindPort: 8080,
Datacenter: "dc2",
LocalBindAddress: "127.0.0.1",
LocalBindSocketMode: "0700",
LocalBindSocketPath: "/tmp/redis_5678.sock",
DestinationPeer: "cloud-services",
DestinationPartition: "infra",
DestinationType: "service",
MeshGateway: ConsulMeshGateway{
Mode: "remote",
},
Expand Down Expand Up @@ -9979,6 +9986,13 @@ func TestServicesDiff(t *testing.T) {
Type: DiffTypeEdited,
Name: "ConsulUpstreams",
Fields: []*FieldDiff{
{
Type: DiffTypeAdded,
Name: "DestinationPartition",
Old: "",
New: "infra",
Annotations: nil,
},
{
Type: DiffTypeAdded,
Name: "DestinationPeer",
Expand Down
6 changes: 6 additions & 0 deletions nomad/structs/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,7 @@ func hashConnect(h hash.Hash, connect *ConsulConnect) {
hashStringIfNonEmpty(h, upstream.Datacenter)
hashStringIfNonEmpty(h, upstream.LocalBindAddress)
hashString(h, upstream.DestinationPeer)
hashString(h, upstream.DestinationPartition)
hashString(h, upstream.DestinationType)
hashString(h, upstream.LocalBindSocketPath)
hashString(h, upstream.LocalBindSocketMode)
Expand Down Expand Up @@ -1608,6 +1609,9 @@ type ConsulUpstream struct {
// DestinationNamespace is the namespace of the upstream service.
DestinationNamespace string

// DestinationNamespace is the admin partition of the upstream service.
DestinationPartition string

// DestinationPeer the destination service address
DestinationPeer string

Expand Down Expand Up @@ -1654,6 +1658,8 @@ func (u *ConsulUpstream) Equal(o *ConsulUpstream) bool {
return false
case u.DestinationPeer != o.DestinationPeer:
return false
case u.DestinationPartition != o.DestinationPartition:
return false
case u.DestinationType != o.DestinationType:
return false
case u.LocalBindPort != o.LocalBindPort:
Expand Down
34 changes: 23 additions & 11 deletions nomad/structs/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,13 @@ func TestServiceCheck_validate_FailingTypes(t *testing.T) {

t.Run("invalid", func(t *testing.T) {
err := (&ServiceCheck{
Name: "check",
Type: "script",
Command: "/nothing",
Interval: 1 * time.Second,
Timeout: 2 * time.Second,
SuccessBeforePassing: 0,
FailuresBeforeWarning: 3,
Name: "check",
Type: "script",
Command: "/nothing",
Interval: 1 * time.Second,
Timeout: 2 * time.Second,
SuccessBeforePassing: 0,
FailuresBeforeWarning: 3,
}).validateConsul()
require.EqualError(t, err, `failures_before_warning not supported for check of type "script"`)
})
Expand Down Expand Up @@ -298,10 +298,10 @@ func TestServiceCheck_validateNomad(t *testing.T) {
{
name: "failures_before_warning",
sc: &ServiceCheck{
Type: ServiceCheckTCP,
FailuresBeforeWarning: 3, // consul only
Interval: 3 * time.Second,
Timeout: 1 * time.Second,
Type: ServiceCheckTCP,
FailuresBeforeWarning: 3, // consul only
Interval: 3 * time.Second,
Timeout: 1 * time.Second,
},
exp: `failures_before_warning may only be set for Consul service checks`,
},
Expand Down Expand Up @@ -786,6 +786,16 @@ func TestConsulUpstream_upstreamEqual(t *testing.T) {
must.False(t, upstreamsEquals(a, b))
})

t.Run("different dest partition", func(t *testing.T) {
a := []ConsulUpstream{up("foo", 8000)}
a[0].DestinationPeer = "infra"

b := []ConsulUpstream{up("foo", 8000)}
b[0].DestinationPeer = "dev"

must.False(t, upstreamsEquals(a, b))
})

t.Run("different dest type", func(t *testing.T) {
a := []ConsulUpstream{up("foo", 8000)}
a[0].DestinationType = "tcp"
Expand Down Expand Up @@ -832,10 +842,12 @@ func TestConsulUpstream_upstreamEqual(t *testing.T) {
a := []ConsulUpstream{up("foo", 8000), up("bar", 9000)}
b := []ConsulUpstream{up("foo", 8000), up("bar", 9000)}
a[0].DestinationPeer = "10.0.0.1:6379"
a[0].DestinationPartition = "infra"
a[0].DestinationType = "tcp"
a[0].LocalBindSocketPath = "/var/run/mysocket.sock"
a[0].LocalBindSocketMode = "0666"
b[0].DestinationPeer = "10.0.0.1:6379"
b[0].DestinationPartition = "infra"
b[0].DestinationType = "tcp"
b[0].LocalBindSocketPath = "/var/run/mysocket.sock"
b[0].LocalBindSocketMode = "0666"
Expand Down
1 change: 1 addition & 0 deletions website/content/docs/job-specification/upstreams.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ job "countdash" {
for details. Keys and values support [runtime variable interpolation][interpolation].
- `destination_name` `(string: <required>)` - Name of the upstream service.
- `destination_namespace` `(string: <required>)` - Name of the upstream Consul namespace.
- `destination_partition` `(string: "")` - Name of the Cluster admin partition containing the upstream service.
- `destination_peer` `(string: "")` - Name of the peer cluster containing the upstream service.
- `destination_type` - `(string: "service")` - The type of discovery query the proxy should use for finding service mesh instances.
- `local_bind_port` - `(int: <required>)` - The port the proxy will receive
Expand Down