Skip to content

Commit

Permalink
xds: don't attempt to load-balance sessions for local proxies (#15789)
Browse files Browse the repository at this point in the history
Previously, we'd begin a session with the xDS concurrency limiter
regardless of whether the proxy was registered in the catalog or in
the server's local agent state.

This caused problems for users who run `consul connect envoy` directly
against a server rather than a client agent, as the server's locally
registered proxies wouldn't be included in the limiter's capacity.

Now, the `ConfigSource` is responsible for beginning the session and we
only do so for services in the catalog.

Fixes: #15753
  • Loading branch information
boxofrad committed Jan 18, 2023
1 parent 02869ce commit 7a55de3
Show file tree
Hide file tree
Showing 63 changed files with 643 additions and 393 deletions.
3 changes: 3 additions & 0 deletions .changelog/15789.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
xds: fix bug where sessions for locally-managed services could fail with "this server has too many xDS streams open"
```
2 changes: 1 addition & 1 deletion GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ SHELL = bash
# or the string @DEV to imply use what is currently installed locally.
###
GOLANGCI_LINT_VERSION='v1.50.1'
MOCKERY_VERSION='v2.12.2'
MOCKERY_VERSION='v2.15.0'
BUF_VERSION='v1.4.0'
PROTOC_GEN_GO_GRPC_VERSION="v1.2.0"
MOG_VERSION='v0.3.0'
Expand Down
2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,7 @@ func (a *Agent) listenAndServeGRPC() error {
Manager: a.proxyConfig,
GetStore: func() catalogproxycfg.Store { return server.FSM().State() },
Logger: a.proxyConfig.Logger.Named("server-catalog"),
SessionLimiter: a.baseDeps.XDSStreamLimiter,
})
go func() {
<-a.shutdownCh
Expand All @@ -870,7 +871,6 @@ func (a *Agent) listenAndServeGRPC() error {
return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil)
},
a,
a.baseDeps.XDSStreamLimiter,
)
a.xdsServer.Register(a.externalGRPCServer)

Expand Down
23 changes: 12 additions & 11 deletions agent/cache-types/catalog_datacenters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package cachetype
import (
"testing"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)

func TestCatalogDatacenters(t *testing.T) {
Expand All @@ -18,34 +19,34 @@ func TestCatalogDatacenters(t *testing.T) {
var resp *[]string
var resp2 *[]string
var resp3 *[]string
rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
rpc.On("RPC", mock.Anything, "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DatacentersRequest)
req := args.Get(2).(*structs.DatacentersRequest)
require.True(t, req.AllowStale)

reply := args.Get(2).(*[]string)
reply := args.Get(3).(*[]string)
*reply = []string{
"primary", "secondary", "tertiary",
}
resp = reply
})
rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
rpc.On("RPC", mock.Anything, "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DatacentersRequest)
req := args.Get(2).(*structs.DatacentersRequest)
require.True(t, req.AllowStale)

reply := args.Get(2).(*[]string)
reply := args.Get(3).(*[]string)
*reply = []string{
"primary", "tertiary", "secondary",
}
resp2 = reply
})
rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
rpc.On("RPC", mock.Anything, "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DatacentersRequest)
req := args.Get(2).(*structs.DatacentersRequest)
require.True(t, req.AllowStale)

reply := args.Get(2).(*[]string)
reply := args.Get(3).(*[]string)
*reply = []string{
"primary", "secondary",
}
Expand Down
12 changes: 6 additions & 6 deletions agent/cache-types/catalog_list_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ func TestCatalogListServices(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedServices
rpc.On("RPC", "Catalog.ListServices", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "Catalog.ListServices", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DCSpecificRequest)
req := args.Get(2).(*structs.DCSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)

reply := args.Get(2).(*structs.IndexedServices)
reply := args.Get(3).(*structs.IndexedServices)
reply.Services = map[string][]string{
"foo": {"prod", "linux"},
"bar": {"qa", "windows"},
Expand Down Expand Up @@ -75,14 +75,14 @@ func TestCatalogListServices_IntegrationWithCache_NotModifiedResponse(t *testing
"foo": {"prod", "linux"},
"bar": {"qa", "windows"},
}
rpc.On("RPC", "Catalog.ListServices", mock.Anything, mock.Anything).
rpc.On("RPC", mock.Anything, "Catalog.ListServices", mock.Anything, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DCSpecificRequest)
req := args.Get(2).(*structs.DCSpecificRequest)
require.True(t, req.AllowStale)
require.True(t, req.AllowNotModifiedResponse)

reply := args.Get(2).(*structs.IndexedServices)
reply := args.Get(3).(*structs.IndexedServices)
reply.QueryMeta.Index = 44
reply.NotModified = true
})
Expand Down
11 changes: 6 additions & 5 deletions agent/cache-types/catalog_service_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"testing"
"time"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)

func TestCatalogServiceList(t *testing.T) {
Expand All @@ -17,14 +18,14 @@ func TestCatalogServiceList(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedServiceList
rpc.On("RPC", "Catalog.ServiceList", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "Catalog.ServiceList", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DCSpecificRequest)
req := args.Get(2).(*structs.DCSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)

reply := args.Get(2).(*structs.IndexedServiceList)
reply := args.Get(3).(*structs.IndexedServiceList)
reply.Services = structs.ServiceList{
structs.ServiceName{
Name: "foo",
Expand Down
11 changes: 6 additions & 5 deletions agent/cache-types/catalog_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"testing"
"time"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)

func TestCatalogServices(t *testing.T) {
Expand All @@ -18,15 +19,15 @@ func TestCatalogServices(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedServiceNodes
rpc.On("RPC", "Catalog.ServiceNodes", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "Catalog.ServiceNodes", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceSpecificRequest)
req := args.Get(2).(*structs.ServiceSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.Equal(t, "web", req.ServiceName)
require.True(t, req.AllowStale)

reply := args.Get(2).(*structs.IndexedServiceNodes)
reply := args.Get(3).(*structs.IndexedServiceNodes)
reply.ServiceNodes = []*structs.ServiceNode{
{ServiceTags: req.ServiceTags},
}
Expand Down
17 changes: 9 additions & 8 deletions agent/cache-types/config_entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"testing"
"time"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)

func TestConfigEntries(t *testing.T) {
Expand All @@ -17,16 +18,16 @@ func TestConfigEntries(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedConfigEntries
rpc.On("RPC", "ConfigEntry.List", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConfigEntry.List", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ConfigEntryQuery)
req := args.Get(2).(*structs.ConfigEntryQuery)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
require.Equal(t, structs.ServiceResolver, req.Kind)
require.Equal(t, "", req.Name)

reply := args.Get(2).(*structs.IndexedConfigEntries)
reply := args.Get(3).(*structs.IndexedConfigEntries)
reply.Kind = structs.ServiceResolver
reply.Entries = []structs.ConfigEntry{
&structs.ServiceResolverConfigEntry{Kind: structs.ServiceResolver, Name: "foo"},
Expand Down Expand Up @@ -60,9 +61,9 @@ func TestConfigEntry(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.ConfigEntryResponse
rpc.On("RPC", "ConfigEntry.Get", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConfigEntry.Get", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ConfigEntryQuery)
req := args.Get(2).(*structs.ConfigEntryQuery)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
Expand All @@ -73,7 +74,7 @@ func TestConfigEntry(t *testing.T) {
Name: "foo",
Kind: structs.ServiceResolver,
}
reply := args.Get(2).(*structs.ConfigEntryResponse)
reply := args.Get(3).(*structs.ConfigEntryResponse)
reply.Entry = entry
reply.QueryMeta.Index = 48
resp = reply
Expand Down
36 changes: 18 additions & 18 deletions agent/cache-types/connect_ca_leaf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,15 @@ func TestConnectCALeaf_changingRoots(t *testing.T) {
var resp *structs.IssuedCert
var idx uint64

rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
ca := caRoot
cIdx := atomic.AddUint64(&idx, 1)
if cIdx > 1 {
// Second time round use the new CA
ca = caRoot2
}
reply := args.Get(2).(*structs.IssuedCert)
reply := args.Get(3).(*structs.IssuedCert)
leaf, _ := connect.TestLeaf(t, "web", ca)
reply.CertPEM = leaf
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
Expand Down Expand Up @@ -292,9 +292,9 @@ func TestConnectCALeaf_changingRootsJitterBetweenCalls(t *testing.T) {
// Instrument ConnectCA.Sign to return signed cert
var resp *structs.IssuedCert
var idx uint64
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
reply := args.Get(3).(*structs.IssuedCert)
leaf, _ := connect.TestLeaf(t, "web", caRoot)
reply.CertPEM = leaf
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
Expand Down Expand Up @@ -433,9 +433,9 @@ func TestConnectCALeaf_changingRootsBetweenBlockingCalls(t *testing.T) {
// Instrument ConnectCA.Sign to return signed cert
var resp *structs.IssuedCert
var idx uint64
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
reply := args.Get(3).(*structs.IssuedCert)
leaf, _ := connect.TestLeaf(t, "web", caRoot)
reply.CertPEM = leaf
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
Expand Down Expand Up @@ -548,7 +548,7 @@ func TestConnectCALeaf_CSRRateLimiting(t *testing.T) {
var idx, rateLimitedRPCs uint64

genCert := func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
reply := args.Get(3).(*structs.IssuedCert)
leaf, _ := connect.TestLeaf(t, "web", caRoot)
reply.CertPEM = leaf
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
Expand All @@ -565,16 +565,16 @@ func TestConnectCALeaf_CSRRateLimiting(t *testing.T) {
// First call return rate limit error. This is important as it checks
// behavior when cache is empty and we have to return a nil Value but need to
// save state to do the right thing for retry.
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).
Return(consul.ErrRateLimited).Once().Run(incRateLimit)
// Then succeed on second call
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).
Return(nil).Run(genCert).Once()
// Then be rate limited again on several further calls
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).
Return(consul.ErrRateLimited).Twice().Run(incRateLimit)
// Then fine after that
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).
Return(nil).Run(genCert)

opts := cache.FetchOptions{MinIndex: 0, Timeout: 10 * time.Minute}
Expand Down Expand Up @@ -727,9 +727,9 @@ func TestConnectCALeaf_watchRootsDedupingMultipleCallers(t *testing.T) {

// Instrument ConnectCA.Sign to return signed cert
var idx uint64
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
reply := args.Get(3).(*structs.IssuedCert)
// Note we will sign certs for same service name each time because
// otherwise we have to re-invent whole CSR endpoint here to be able to
// control things - parse PEM sign with right key etc. It doesn't matter -
Expand Down Expand Up @@ -924,9 +924,9 @@ func TestConnectCALeaf_expiringLeaf(t *testing.T) {
// Instrument ConnectCA.Sign to
var resp *structs.IssuedCert
var idx uint64
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
reply := args.Get(3).(*structs.IssuedCert)
reply.CreateIndex = atomic.AddUint64(&idx, 1)
reply.ModifyIndex = reply.CreateIndex

Expand Down Expand Up @@ -1017,13 +1017,13 @@ func TestConnectCALeaf_DNSSANForService(t *testing.T) {

// Instrument ConnectCA.Sign to
var caReq *structs.CASignRequest
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
reply := args.Get(3).(*structs.IssuedCert)
leaf, _ := connect.TestLeaf(t, "web", caRoot)
reply.CertPEM = leaf

caReq = args.Get(1).(*structs.CASignRequest)
caReq = args.Get(2).(*structs.CASignRequest)
})

opts := cache.FetchOptions{MinIndex: 0, Timeout: 10 * time.Second}
Expand Down
11 changes: 6 additions & 5 deletions agent/cache-types/connect_ca_root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"testing"
"time"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)

func TestConnectCARoot(t *testing.T) {
Expand All @@ -18,13 +19,13 @@ func TestConnectCARoot(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedCARoots
rpc.On("RPC", "ConnectCA.Roots", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConnectCA.Roots", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DCSpecificRequest)
req := args.Get(2).(*structs.DCSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)

reply := args.Get(2).(*structs.IndexedCARoots)
reply := args.Get(3).(*structs.IndexedCARoots)
reply.QueryMeta.Index = 48
resp = reply
})
Expand Down
Loading

0 comments on commit 7a55de3

Please sign in to comment.