Skip to content

Commit

Permalink
Refactor client RPC timeouts (#14965)
Browse files Browse the repository at this point in the history
Fix an issue where rpc_hold_timeout was being used as the timeout for non-blocking queries. Users should be able to tune read timeouts without fiddling with rpc_hold_timeout. A new configuration `rpc_read_timeout` is created.

Refactor some implementation from the original PR 11500 to remove the misleading linkage between RPCInfo's timeout (used to retry in case of certain modes of failures) and the client RPC timeouts.

(cherry picked from commit 29a297d)
  • Loading branch information
kisunji committed Oct 18, 2022
1 parent 5297e38 commit 42df428
Show file tree
Hide file tree
Showing 25 changed files with 229 additions and 154 deletions.
3 changes: 3 additions & 0 deletions .changelog/14965.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
agent: Added a new config option `rpc_client_timeout` to tune timeouts for client RPC requests
```
26 changes: 14 additions & 12 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,18 +371,18 @@ type Agent struct {

// New process the desired options and creates a new Agent.
// This process will
// * parse the config given the config Flags
// * setup logging
// * using predefined logger given in an option
// OR
// * initialize a new logger from the configuration
// including setting up gRPC logging
// * initialize telemetry
// * create a TLS Configurator
// * build a shared connection pool
// * create the ServiceManager
// * setup the NodeID if one isn't provided in the configuration
// * create the AutoConfig object for future use in fully
// - parse the config given the config Flags
// - setup logging
// - using predefined logger given in an option
// OR
// - initialize a new logger from the configuration
// including setting up gRPC logging
// - initialize telemetry
// - create a TLS Configurator
// - build a shared connection pool
// - create the ServiceManager
// - setup the NodeID if one isn't provided in the configuration
// - create the AutoConfig object for future use in fully
// resolving the configuration
func New(bd BaseDeps) (*Agent, error) {
a := Agent{
Expand Down Expand Up @@ -1203,6 +1203,7 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
// RPC-related performance configs. We allow explicit zero value to disable so
// copy it whatever the value.
cfg.RPCHoldTimeout = runtimeCfg.RPCHoldTimeout
cfg.RPCClientTimeout = runtimeCfg.RPCClientTimeout

cfg.RPCConfig = runtimeCfg.RPCConfig

Expand Down Expand Up @@ -3795,6 +3796,7 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
}

cc := consul.ReloadableConfig{
RPCClientTimeout: newCfg.RPCClientTimeout,
RPCRateLimit: newCfg.RPCRateLimit,
RPCMaxBurst: newCfg.RPCMaxBurst,
RPCMaxConnsPerClient: newCfg.RPCMaxConnsPerClient,
Expand Down
30 changes: 30 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4103,6 +4103,36 @@ func TestAgent_consulConfig_AutoEncryptAllowTLS(t *testing.T) {
require.True(t, a.consulConfig().AutoEncryptAllowTLS)
}

func TestAgent_ReloadConfigRPCClientConfig(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

dataDir := testutil.TempDir(t, "agent") // we manage the data dir
hcl := `
data_dir = "` + dataDir + `"
server = false
bootstrap = false
`
a := NewTestAgent(t, hcl)

defaultRPCTimeout := 60 * time.Second
require.Equal(t, defaultRPCTimeout, a.baseDeps.ConnPool.RPCClientTimeout())

hcl = `
data_dir = "` + dataDir + `"
server = false
bootstrap = false
limits {
rpc_client_timeout = "2m"
}
`
c := TestConfig(testutil.Logger(t), config.FileSource{Name: t.Name(), Format: "hcl", Data: hcl})
require.NoError(t, a.reloadConfigInternal(c))

require.Equal(t, 2*time.Minute, a.baseDeps.ConnPool.RPCClientTimeout())
}

func TestAgent_consulConfig_RaftTrailingLogs(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
Expand Down
9 changes: 5 additions & 4 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ type LoadResult struct {
//
// The sources are merged in the following order:
//
// * default configuration
// * config files in alphabetical order
// * command line arguments
// * overrides
// - default configuration
// - config files in alphabetical order
// - command line arguments
// - overrides
//
// The config sources are merged sequentially and later values overwrite
// previously set values. Slice values are merged by concatenating the two slices.
Expand Down Expand Up @@ -1037,6 +1037,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
RPCBindAddr: rpcBindAddr,
RPCHandshakeTimeout: b.durationVal("limits.rpc_handshake_timeout", c.Limits.RPCHandshakeTimeout),
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
RPCClientTimeout: b.durationVal("limits.rpc_client_timeout", c.Limits.RPCClientTimeout),
RPCMaxBurst: intVal(c.Limits.RPCMaxBurst),
RPCMaxConnsPerClient: intVal(c.Limits.RPCMaxConnsPerClient),
RPCProtocol: intVal(c.RPCProtocol),
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ type UnixSocket struct {
type Limits struct {
HTTPMaxConnsPerClient *int `mapstructure:"http_max_conns_per_client"`
HTTPSHandshakeTimeout *string `mapstructure:"https_handshake_timeout"`
RPCClientTimeout *string `mapstructure:"rpc_client_timeout"`
RPCHandshakeTimeout *string `mapstructure:"rpc_handshake_timeout"`
RPCMaxBurst *int `mapstructure:"rpc_max_burst"`
RPCMaxConnsPerClient *int `mapstructure:"rpc_max_conns_per_client"`
Expand Down
1 change: 1 addition & 0 deletions agent/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func DefaultSource() Source {
http_max_conns_per_client = 200
https_handshake_timeout = "5s"
rpc_handshake_timeout = "5s"
rpc_client_timeout = "60s"
rpc_rate = -1
rpc_max_burst = 1000
rpc_max_conns_per_client = 100
Expand Down
14 changes: 13 additions & 1 deletion agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type RuntimeConfig struct {
// AutopilotMinQuorum sets the minimum number of servers required in a cluster
// before autopilot can prune dead servers.
//
//hcl: autopilot { min_quorum = int }
// hcl: autopilot { min_quorum = int }
AutopilotMinQuorum uint

// AutopilotRedundancyZoneTag is the Meta tag to use for separating servers
Expand Down Expand Up @@ -881,6 +881,18 @@ type RuntimeConfig struct {
// hcl: performance { rpc_hold_timeout = "duration" }
RPCHoldTimeout time.Duration

// RPCClientTimeout limits how long a client is allowed to read from an RPC
// connection. This is used to set an upper bound for requests to eventually
// terminate so that RPC connections are not held indefinitely.
// It may be set to 0 explicitly to disable the timeout but this should never
// be used in production. Default is 60 seconds.
//
// Note: Blocking queries use MaxQueryTime and DefaultQueryTime to calculate
// timeouts.
//
// hcl: limits { rpc_client_timeout = "duration" }
RPCClientTimeout time.Duration

// RPCRateLimit and RPCMaxBurst control how frequently RPC calls are allowed
// to happen. In any large enough time interval, rate limiter limits the
// rate to RPCRateLimit tokens per second, with a maximum burst size of
Expand Down
2 changes: 2 additions & 0 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4413,6 +4413,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
// defaults are changed from these values forcing that change to be
// intentional.
rt.RPCHandshakeTimeout = 5 * time.Second
rt.RPCClientTimeout = 60 * time.Second
rt.HTTPSHandshakeTimeout = 5 * time.Second
rt.HTTPMaxConnsPerClient = 200
rt.RPCMaxConnsPerClient = 100
Expand Down Expand Up @@ -5615,6 +5616,7 @@ func TestLoad_FullConfig(t *testing.T) {
RPCAdvertiseAddr: tcpAddr("17.99.29.16:3757"),
RPCBindAddr: tcpAddr("16.99.34.17:3757"),
RPCHandshakeTimeout: 1932 * time.Millisecond,
RPCClientTimeout: 62 * time.Second,
RPCHoldTimeout: 15707 * time.Second,
RPCProtocol: 30793,
RPCRateLimit: 12029.43,
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/TestRuntimeConfig_Sanitize.golden
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@
"RPCMaxConnsPerClient": 0,
"RPCProtocol": 0,
"RPCRateLimit": 0,
"RPCClientTimeout": "0s",
"RaftBoltDBConfig": {
"NoFreelistSync": false
},
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/full-config.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ limits {
http_max_conns_per_client = 100
https_handshake_timeout = "2391ms"
rpc_handshake_timeout = "1932ms"
rpc_client_timeout = "62s"
rpc_rate = 12029.43
rpc_max_burst = 44848
rpc_max_conns_per_client = 2954
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/full-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@
"http_max_conns_per_client": 100,
"https_handshake_timeout": "2391ms",
"rpc_handshake_timeout": "1932ms",
"rpc_client_timeout": "62s",
"rpc_rate": 12029.43,
"rpc_max_burst": 44848,
"rpc_max_conns_per_client": 2954,
Expand Down
1 change: 0 additions & 1 deletion agent/consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1642,7 +1642,6 @@ func TestCatalog_ListServices_Stale(t *testing.T) {
c.PrimaryDatacenter = "dc1" // Enable ACLs!
c.ACLsEnabled = true
c.Bootstrap = false // Disable bootstrap
c.RPCHoldTimeout = 10 * time.Millisecond
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
Expand Down
11 changes: 6 additions & 5 deletions agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,12 +396,12 @@ func (c *Client) Stats() map[string]map[string]string {
// GetLANCoordinate returns the coordinate of the node in the LAN gossip
// pool.
//
// - Clients return a single coordinate for the single gossip pool they are
// in (default, segment, or partition).
// - Clients return a single coordinate for the single gossip pool they are
// in (default, segment, or partition).
//
// - Servers return one coordinate for their canonical gossip pool (i.e.
// default partition/segment) and one per segment they are also ancillary
// members of.
// - Servers return one coordinate for their canonical gossip pool (i.e.
// default partition/segment) and one per segment they are also ancillary
// members of.
//
// NOTE: servers do not emit coordinates for partitioned gossip pools they
// are ancillary members of.
Expand All @@ -421,6 +421,7 @@ func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) {
// relevant configuration information
func (c *Client) ReloadConfig(config ReloadableConfig) error {
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst))
c.connPool.SetRPCClientTimeout(config.RPCClientTimeout)
return nil
}

Expand Down
81 changes: 49 additions & 32 deletions agent/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func testClientConfig(t *testing.T) (string, *Config) {
config.SerfLANConfig.MemberlistConfig.ProbeTimeout = 200 * time.Millisecond
config.SerfLANConfig.MemberlistConfig.ProbeInterval = time.Second
config.SerfLANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
config.RPCHoldTimeout = 10 * time.Second
return dir, config
}

Expand Down Expand Up @@ -527,11 +526,10 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
MaxStreams: 4,
TLSConfigurator: tls,
Datacenter: c.Datacenter,
Timeout: c.RPCHoldTimeout,
DefaultQueryTime: c.DefaultQueryTime,
MaxQueryTime: c.MaxQueryTime,
}

connPool.SetRPCClientTimeout(c.RPCClientTimeout)
return Deps{
Logger: logger,
TLSConfigurator: tls,
Expand Down Expand Up @@ -874,7 +872,7 @@ func TestClient_RPC_Timeout(t *testing.T) {
_, c1 := testClientWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.NodeName = uniqueNodeName(t.Name())
c.RPCHoldTimeout = 10 * time.Millisecond
c.RPCClientTimeout = 10 * time.Millisecond
c.DefaultQueryTime = 100 * time.Millisecond
c.MaxQueryTime = 200 * time.Millisecond
})
Expand All @@ -887,34 +885,53 @@ func TestClient_RPC_Timeout(t *testing.T) {
}
})

// waiter will sleep for 101ms which is 1ms more than the DefaultQueryTime
require.NoError(t, s1.RegisterEndpoint("Wait", &waiter{duration: 101 * time.Millisecond}))
require.NoError(t, s1.RegisterEndpoint("Long", &waiter{duration: 100 * time.Millisecond}))
require.NoError(t, s1.RegisterEndpoint("Short", &waiter{duration: 5 * time.Millisecond}))

// Requests with QueryOptions have a default timeout of RPCHoldTimeout (10ms)
// so we expect the RPC call to timeout.
var out struct{}
err := c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")

// Blocking requests have a longer timeout (100ms) so this should pass since we
// add the maximum jitter which should be 16ms
out = struct{}{}
err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
},
}, &out)
require.NoError(t, err)
t.Run("non-blocking query times out after RPCClientTimeout", func(t *testing.T) {
// Requests with QueryOptions have a default timeout of RPCClientTimeout (10ms)
// so we expect the RPC call to timeout.
var out struct{}
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
})

// We pass in a custom MaxQueryTime (20ms) through QueryOptions which should fail
out = struct{}{}
err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
MaxQueryTime: 20 * time.Millisecond,
},
}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
t.Run("non-blocking query succeeds", func(t *testing.T) {
var out struct{}
require.NoError(t, c1.RPC("Short.Wait", &structs.NodeSpecificRequest{}, &out))
})

t.Run("check that deadline does not persist across calls", func(t *testing.T) {
var out struct{}
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
require.NoError(t, c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
},
}, &out))
})

t.Run("blocking query succeeds", func(t *testing.T) {
var out struct{}
require.NoError(t, c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
},
}, &out))
})

t.Run("blocking query with short MaxQueryTime fails", func(t *testing.T) {
var out struct{}
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
MaxQueryTime: 20 * time.Millisecond,
},
}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
})
}
8 changes: 8 additions & 0 deletions agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,13 @@ type Config struct {
// place, and a small jitter is applied to avoid a thundering herd.
RPCHoldTimeout time.Duration

// RPCClientTimeout limits how long a client is allowed to read from an RPC
// connection. This is used to set an upper bound for non-blocking queries to
// eventually terminate so that RPC connections are not held indefinitely.
// Blocking queries will use MaxQueryTime and DefaultQueryTime to calculate
// their own timeouts.
RPCClientTimeout time.Duration

// RPCRateLimit and RPCMaxBurst control how frequently RPC calls are allowed
// to happen. In any large enough time interval, rate limiter limits the
// rate to RPCRateLimit tokens per second, with a maximum burst size of
Expand Down Expand Up @@ -599,6 +606,7 @@ type RPCConfig struct {
// ReloadableConfig is the configuration that is passed to ReloadConfig when
// application config is reloaded.
type ReloadableConfig struct {
RPCClientTimeout time.Duration
RPCRateLimit rate.Limit
RPCMaxBurst int
RPCMaxConnsPerClient int
Expand Down
6 changes: 1 addition & 5 deletions agent/consul/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1366,14 +1366,10 @@ func (r isReadRequest) IsRead() bool {
return true
}

func (r isReadRequest) HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool {
func (r isReadRequest) HasTimedOut(_ time.Time, _, _, _ time.Duration) bool {
return false
}

func (r isReadRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
return time.Duration(-1)
}

func TestRPC_AuthorizeRaftRPC(t *testing.T) {
caPEM, caPK, err := tlsutil.GenerateCA(tlsutil.CAOpts{Days: 5, Domain: "consul"})
require.NoError(t, err)
Expand Down

0 comments on commit 42df428

Please sign in to comment.