Skip to content

Commit

Permalink
Merge pull request #15039 from hashicorp/backport/kisunji/NET-1092/le…
Browse files Browse the repository at this point in the history
…gally-bold-mite

Backport of Refactor client RPC timeouts into release/1.12.x
  • Loading branch information
Chris S. Kim committed Oct 18, 2022
2 parents 24bfe59 + eeb9660 commit 4668936
Show file tree
Hide file tree
Showing 26 changed files with 237 additions and 190 deletions.
3 changes: 3 additions & 0 deletions .changelog/14965.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
agent: Added a new config option `rpc_client_timeout` to tune timeouts for client RPC requests
```
26 changes: 14 additions & 12 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,18 +374,18 @@ type Agent struct {

// New process the desired options and creates a new Agent.
// This process will
// * parse the config given the config Flags
// * setup logging
// * using predefined logger given in an option
// OR
// * initialize a new logger from the configuration
// including setting up gRPC logging
// * initialize telemetry
// * create a TLS Configurator
// * build a shared connection pool
// * create the ServiceManager
// * setup the NodeID if one isn't provided in the configuration
// * create the AutoConfig object for future use in fully
// - parse the config given the config Flags
// - setup logging
// - using predefined logger given in an option
// OR
// - initialize a new logger from the configuration
// including setting up gRPC logging
// - initialize telemetry
// - create a TLS Configurator
// - build a shared connection pool
// - create the ServiceManager
// - setup the NodeID if one isn't provided in the configuration
// - create the AutoConfig object for future use in fully
// resolving the configuration
func New(bd BaseDeps) (*Agent, error) {
a := Agent{
Expand Down Expand Up @@ -1250,6 +1250,7 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
// RPC-related performance configs. We allow explicit zero value to disable so
// copy it whatever the value.
cfg.RPCHoldTimeout = runtimeCfg.RPCHoldTimeout
cfg.RPCClientTimeout = runtimeCfg.RPCClientTimeout

cfg.RPCConfig = runtimeCfg.RPCConfig

Expand Down Expand Up @@ -3901,6 +3902,7 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
}

cc := consul.ReloadableConfig{
RPCClientTimeout: newCfg.RPCClientTimeout,
RPCRateLimit: newCfg.RPCRateLimit,
RPCMaxBurst: newCfg.RPCMaxBurst,
RPCMaxConnsPerClient: newCfg.RPCMaxConnsPerClient,
Expand Down
30 changes: 30 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4111,6 +4111,36 @@ func TestAgent_consulConfig_AutoEncryptAllowTLS(t *testing.T) {
require.True(t, a.consulConfig().AutoEncryptAllowTLS)
}

func TestAgent_ReloadConfigRPCClientConfig(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

dataDir := testutil.TempDir(t, "agent") // we manage the data dir
hcl := `
data_dir = "` + dataDir + `"
server = false
bootstrap = false
`
a := NewTestAgent(t, hcl)

defaultRPCTimeout := 60 * time.Second
require.Equal(t, defaultRPCTimeout, a.baseDeps.ConnPool.RPCClientTimeout())

hcl = `
data_dir = "` + dataDir + `"
server = false
bootstrap = false
limits {
rpc_client_timeout = "2m"
}
`
c := TestConfig(testutil.Logger(t), config.FileSource{Name: t.Name(), Format: "hcl", Data: hcl})
require.NoError(t, a.reloadConfigInternal(c))

require.Equal(t, 2*time.Minute, a.baseDeps.ConnPool.RPCClientTimeout())
}

func TestAgent_consulConfig_RaftTrailingLogs(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
Expand Down
9 changes: 5 additions & 4 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ type LoadResult struct {
//
// The sources are merged in the following order:
//
// * default configuration
// * config files in alphabetical order
// * command line arguments
// * overrides
// - default configuration
// - config files in alphabetical order
// - command line arguments
// - overrides
//
// The config sources are merged sequentially and later values overwrite
// previously set values. Slice values are merged by concatenating the two slices.
Expand Down Expand Up @@ -1021,6 +1021,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
RPCBindAddr: rpcBindAddr,
RPCHandshakeTimeout: b.durationVal("limits.rpc_handshake_timeout", c.Limits.RPCHandshakeTimeout),
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
RPCClientTimeout: b.durationVal("limits.rpc_client_timeout", c.Limits.RPCClientTimeout),
RPCMaxBurst: intVal(c.Limits.RPCMaxBurst),
RPCMaxConnsPerClient: intVal(c.Limits.RPCMaxConnsPerClient),
RPCProtocol: intVal(c.RPCProtocol),
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ type UnixSocket struct {
type Limits struct {
HTTPMaxConnsPerClient *int `mapstructure:"http_max_conns_per_client"`
HTTPSHandshakeTimeout *string `mapstructure:"https_handshake_timeout"`
RPCClientTimeout *string `mapstructure:"rpc_client_timeout"`
RPCHandshakeTimeout *string `mapstructure:"rpc_handshake_timeout"`
RPCMaxBurst *int `mapstructure:"rpc_max_burst"`
RPCMaxConnsPerClient *int `mapstructure:"rpc_max_conns_per_client"`
Expand Down
1 change: 1 addition & 0 deletions agent/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func DefaultSource() Source {
http_max_conns_per_client = 200
https_handshake_timeout = "5s"
rpc_handshake_timeout = "5s"
rpc_client_timeout = "60s"
rpc_rate = -1
rpc_max_burst = 1000
rpc_max_conns_per_client = 100
Expand Down
16 changes: 14 additions & 2 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ type RuntimeConfig struct {
// AutopilotMinQuorum sets the minimum number of servers required in a cluster
// before autopilot can prune dead servers.
//
//hcl: autopilot { min_quorum = int }
// hcl: autopilot { min_quorum = int }
AutopilotMinQuorum uint

// AutopilotRedundancyZoneTag is the Meta tag to use for separating servers
Expand Down Expand Up @@ -868,6 +868,18 @@ type RuntimeConfig struct {
// hcl: performance { rpc_hold_timeout = "duration" }
RPCHoldTimeout time.Duration

// RPCClientTimeout limits how long a client is allowed to read from an RPC
// connection. This is used to set an upper bound for requests to eventually
// terminate so that RPC connections are not held indefinitely.
// It may be set to 0 explicitly to disable the timeout but this should never
// be used in production. Default is 60 seconds.
//
// Note: Blocking queries use MaxQueryTime and DefaultQueryTime to calculate
// timeouts.
//
// hcl: limits { rpc_client_timeout = "duration" }
RPCClientTimeout time.Duration

// RPCRateLimit and RPCMaxBurst control how frequently RPC calls are allowed
// to happen. In any large enough time interval, rate limiter limits the
// rate to RPCRateLimit tokens per second, with a maximum burst size of
Expand Down Expand Up @@ -1305,7 +1317,7 @@ type RuntimeConfig struct {
SkipLeaveOnInt bool

// AutoReloadConfig indicate if the config will be
//auto reloaded bases on config file modification
// auto reloaded bases on config file modification
// hcl: auto_reload_config = (true|false)
AutoReloadConfig bool

Expand Down
2 changes: 2 additions & 0 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4525,6 +4525,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
// defaults are changed from these values forcing that change to be
// intentional.
rt.RPCHandshakeTimeout = 5 * time.Second
rt.RPCClientTimeout = 60 * time.Second
rt.HTTPSHandshakeTimeout = 5 * time.Second
rt.HTTPMaxConnsPerClient = 200
rt.RPCMaxConnsPerClient = 100
Expand Down Expand Up @@ -5960,6 +5961,7 @@ func TestLoad_FullConfig(t *testing.T) {
RPCAdvertiseAddr: tcpAddr("17.99.29.16:3757"),
RPCBindAddr: tcpAddr("16.99.34.17:3757"),
RPCHandshakeTimeout: 1932 * time.Millisecond,
RPCClientTimeout: 62 * time.Second,
RPCHoldTimeout: 15707 * time.Second,
RPCProtocol: 30793,
RPCRateLimit: 12029.43,
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/TestRuntimeConfig_Sanitize.golden
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@
"RPCMaxConnsPerClient": 0,
"RPCProtocol": 0,
"RPCRateLimit": 0,
"RPCClientTimeout": "0s",
"RaftBoltDBConfig": {
"NoFreelistSync": false
},
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/full-config.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ limits {
http_max_conns_per_client = 100
https_handshake_timeout = "2391ms"
rpc_handshake_timeout = "1932ms"
rpc_client_timeout = "62s"
rpc_rate = 12029.43
rpc_max_burst = 44848
rpc_max_conns_per_client = 2954
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/full-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@
"http_max_conns_per_client": 100,
"https_handshake_timeout": "2391ms",
"rpc_handshake_timeout": "1932ms",
"rpc_client_timeout": "62s",
"rpc_rate": 12029.43,
"rpc_max_burst": 44848,
"rpc_max_conns_per_client": 2954,
Expand Down
1 change: 0 additions & 1 deletion agent/consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1650,7 +1650,6 @@ func TestCatalog_ListServices_Stale(t *testing.T) {
c.PrimaryDatacenter = "dc1" // Enable ACLs!
c.ACLsEnabled = true
c.Bootstrap = false // Disable bootstrap
c.RPCHoldTimeout = 10 * time.Millisecond
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
Expand Down
11 changes: 6 additions & 5 deletions agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,12 @@ func (c *Client) Stats() map[string]map[string]string {
// GetLANCoordinate returns the coordinate of the node in the LAN gossip
// pool.
//
// - Clients return a single coordinate for the single gossip pool they are
// in (default, segment, or partition).
// - Clients return a single coordinate for the single gossip pool they are
// in (default, segment, or partition).
//
// - Servers return one coordinate for their canonical gossip pool (i.e.
// default partition/segment) and one per segment they are also ancillary
// members of.
// - Servers return one coordinate for their canonical gossip pool (i.e.
// default partition/segment) and one per segment they are also ancillary
// members of.
//
// NOTE: servers do not emit coordinates for partitioned gossip pools they
// are ancillary members of.
Expand All @@ -422,6 +422,7 @@ func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) {
// relevant configuration information
func (c *Client) ReloadConfig(config ReloadableConfig) error {
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst))
c.connPool.SetRPCClientTimeout(config.RPCClientTimeout)
return nil
}

Expand Down
81 changes: 49 additions & 32 deletions agent/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func testClientConfig(t *testing.T) (string, *Config) {
config.SerfLANConfig.MemberlistConfig.ProbeTimeout = 200 * time.Millisecond
config.SerfLANConfig.MemberlistConfig.ProbeInterval = time.Second
config.SerfLANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
config.RPCHoldTimeout = 10 * time.Second
return dir, config
}

Expand Down Expand Up @@ -529,11 +528,10 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
MaxStreams: 4,
TLSConfigurator: tls,
Datacenter: c.Datacenter,
Timeout: c.RPCHoldTimeout,
DefaultQueryTime: c.DefaultQueryTime,
MaxQueryTime: c.MaxQueryTime,
}

connPool.SetRPCClientTimeout(c.RPCClientTimeout)
return Deps{
Logger: logger,
TLSConfigurator: tls,
Expand Down Expand Up @@ -878,7 +876,7 @@ func TestClient_RPC_Timeout(t *testing.T) {
_, c1 := testClientWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.NodeName = uniqueNodeName(t.Name())
c.RPCHoldTimeout = 10 * time.Millisecond
c.RPCClientTimeout = 10 * time.Millisecond
c.DefaultQueryTime = 100 * time.Millisecond
c.MaxQueryTime = 200 * time.Millisecond
})
Expand All @@ -891,34 +889,53 @@ func TestClient_RPC_Timeout(t *testing.T) {
}
})

// waiter will sleep for 101ms which is 1ms more than the DefaultQueryTime
require.NoError(t, s1.RegisterEndpoint("Wait", &waiter{duration: 101 * time.Millisecond}))
require.NoError(t, s1.RegisterEndpoint("Long", &waiter{duration: 100 * time.Millisecond}))
require.NoError(t, s1.RegisterEndpoint("Short", &waiter{duration: 5 * time.Millisecond}))

// Requests with QueryOptions have a default timeout of RPCHoldTimeout (10ms)
// so we expect the RPC call to timeout.
var out struct{}
err := c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")

// Blocking requests have a longer timeout (100ms) so this should pass since we
// add the maximum jitter which should be 16ms
out = struct{}{}
err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
},
}, &out)
require.NoError(t, err)
t.Run("non-blocking query times out after RPCClientTimeout", func(t *testing.T) {
// Requests with QueryOptions have a default timeout of RPCClientTimeout (10ms)
// so we expect the RPC call to timeout.
var out struct{}
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
})

// We pass in a custom MaxQueryTime (20ms) through QueryOptions which should fail
out = struct{}{}
err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
MaxQueryTime: 20 * time.Millisecond,
},
}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
t.Run("non-blocking query succeeds", func(t *testing.T) {
var out struct{}
require.NoError(t, c1.RPC("Short.Wait", &structs.NodeSpecificRequest{}, &out))
})

t.Run("check that deadline does not persist across calls", func(t *testing.T) {
var out struct{}
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
require.NoError(t, c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
},
}, &out))
})

t.Run("blocking query succeeds", func(t *testing.T) {
var out struct{}
require.NoError(t, c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
},
}, &out))
})

t.Run("blocking query with short MaxQueryTime fails", func(t *testing.T) {
var out struct{}
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
MaxQueryTime: 20 * time.Millisecond,
},
}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
})
}
8 changes: 8 additions & 0 deletions agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,13 @@ type Config struct {
// place, and a small jitter is applied to avoid a thundering herd.
RPCHoldTimeout time.Duration

// RPCClientTimeout limits how long a client is allowed to read from an RPC
// connection. This is used to set an upper bound for non-blocking queries to
// eventually terminate so that RPC connections are not held indefinitely.
// Blocking queries will use MaxQueryTime and DefaultQueryTime to calculate
// their own timeouts.
RPCClientTimeout time.Duration

// RPCRateLimit and RPCMaxBurst control how frequently RPC calls are allowed
// to happen. In any large enough time interval, rate limiter limits the
// rate to RPCRateLimit tokens per second, with a maximum burst size of
Expand Down Expand Up @@ -599,6 +606,7 @@ type RPCConfig struct {
// ReloadableConfig is the configuration that is passed to ReloadConfig when
// application config is reloaded.
type ReloadableConfig struct {
RPCClientTimeout time.Duration
RPCRateLimit rate.Limit
RPCMaxBurst int
RPCMaxConnsPerClient int
Expand Down
6 changes: 1 addition & 5 deletions agent/consul/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1370,14 +1370,10 @@ func (r isReadRequest) IsRead() bool {
return true
}

func (r isReadRequest) HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
func (r isReadRequest) HasTimedOut(_ time.Time, _, _, _ time.Duration) (bool, error) {
return false, nil
}

func (r isReadRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
return time.Duration(-1)
}

func TestRPC_AuthorizeRaftRPC(t *testing.T) {
caPEM, caPK, err := tlsutil.GenerateCA(tlsutil.CAOpts{Days: 5, Domain: "consul"})
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 4668936

Please sign in to comment.