Skip to content

Commit

Permalink
Refactor client RPC timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
kisunji committed Oct 13, 2022
1 parent 9ca8bb8 commit 03c53f9
Show file tree
Hide file tree
Showing 21 changed files with 141 additions and 131 deletions.
1 change: 1 addition & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,7 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
// RPC-related performance configs. We allow explicit zero value to disable so
// copy it whatever the value.
cfg.RPCHoldTimeout = runtimeCfg.RPCHoldTimeout
cfg.RPCReadTimeout = runtimeCfg.RPCReadTimeout

cfg.RPCConfig = runtimeCfg.RPCConfig

Expand Down
4 changes: 3 additions & 1 deletion agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ import (
"time"

"github.com/armon/go-metrics/prometheus"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-sockaddr/template"
"github.com/hashicorp/memberlist"
"golang.org/x/time/rate"

hcpconfig "github.com/hashicorp/consul/agent/hcp/config"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/connect/ca"
Expand Down Expand Up @@ -1030,6 +1031,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
RPCBindAddr: rpcBindAddr,
RPCHandshakeTimeout: b.durationVal("limits.rpc_handshake_timeout", c.Limits.RPCHandshakeTimeout),
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
RPCReadTimeout: b.durationValWithDefault("limits.rpc_read_timeout", c.Limits.RPCReadTimeout, 10*time.Minute),
RPCMaxBurst: intVal(c.Limits.RPCMaxBurst),
RPCMaxConnsPerClient: intVal(c.Limits.RPCMaxConnsPerClient),
RPCProtocol: intVal(c.RPCProtocol),
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ type UnixSocket struct {
type Limits struct {
HTTPMaxConnsPerClient *int `mapstructure:"http_max_conns_per_client"`
HTTPSHandshakeTimeout *string `mapstructure:"https_handshake_timeout"`
RPCReadTimeout *string `mapstructure:"rpc_read_timeout"`
RPCHandshakeTimeout *string `mapstructure:"rpc_handshake_timeout"`
RPCMaxBurst *int `mapstructure:"rpc_max_burst"`
RPCMaxConnsPerClient *int `mapstructure:"rpc_max_conns_per_client"`
Expand Down
16 changes: 14 additions & 2 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ type RuntimeConfig struct {
// AutopilotMinQuorum sets the minimum number of servers required in a cluster
// before autopilot can prune dead servers.
//
//hcl: autopilot { min_quorum = int }
// hcl: autopilot { min_quorum = int }
AutopilotMinQuorum uint

// AutopilotRedundancyZoneTag is the Meta tag to use for separating servers
Expand Down Expand Up @@ -907,6 +907,18 @@ type RuntimeConfig struct {
// hcl: performance { rpc_hold_timeout = "duration" }
RPCHoldTimeout time.Duration

// RPCReadTimeout limits how long a client is allowed to read from an RPC
// connection. This is used to set an upper bound for non-blocking queries to
// eventually terminate so that RPC connections are not held indefinitely.
// It may be set to 0 explicitly to disable the timeout but this should never
// be used in production. Default is 10 minutes.
//
// Note: Blocking queries use MaxQueryTime and DefaultQueryTime to calculate
// timeouts.
//
// hcl: limits { rpc_read_timeout = "duration" }
RPCReadTimeout time.Duration

// RPCRateLimit and RPCMaxBurst control how frequently RPC calls are allowed
// to happen. In any large enough time interval, rate limiter limits the
// rate to RPCRateLimit tokens per second, with a maximum burst size of
Expand Down Expand Up @@ -1344,7 +1356,7 @@ type RuntimeConfig struct {
SkipLeaveOnInt bool

// AutoReloadConfig indicate if the config will be
//auto reloaded bases on config file modification
// auto reloaded bases on config file modification
// hcl: auto_reload_config = (true|false)
AutoReloadConfig bool

Expand Down
4 changes: 3 additions & 1 deletion agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (

"github.com/armon/go-metrics/prometheus"
"github.com/google/go-cmp/cmp/cmpopts"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/stretchr/testify/require"

hcpconfig "github.com/hashicorp/consul/agent/hcp/config"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks"
Expand Down Expand Up @@ -6076,6 +6077,7 @@ func TestLoad_FullConfig(t *testing.T) {
RPCAdvertiseAddr: tcpAddr("17.99.29.16:3757"),
RPCBindAddr: tcpAddr("16.99.34.17:3757"),
RPCHandshakeTimeout: 1932 * time.Millisecond,
RPCReadTimeout: 62 * time.Minute,
RPCHoldTimeout: 15707 * time.Second,
RPCProtocol: 30793,
RPCRateLimit: 12029.43,
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/TestRuntimeConfig_Sanitize.golden
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@
"RPCMaxConnsPerClient": 0,
"RPCProtocol": 0,
"RPCRateLimit": 0,
"RPCReadTimeout": "0s",
"RaftBoltDBConfig": {
"NoFreelistSync": false
},
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/full-config.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ limits {
http_max_conns_per_client = 100
https_handshake_timeout = "2391ms"
rpc_handshake_timeout = "1932ms"
rpc_read_timeout = "62m"
rpc_rate = 12029.43
rpc_max_burst = 44848
rpc_max_conns_per_client = 2954
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/full-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@
"http_max_conns_per_client": 100,
"https_handshake_timeout": "2391ms",
"rpc_handshake_timeout": "1932ms",
"rpc_read_timeout": "62m",
"rpc_rate": 12029.43,
"rpc_max_burst": 44848,
"rpc_max_conns_per_client": 2954,
Expand Down
1 change: 0 additions & 1 deletion agent/consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1690,7 +1690,6 @@ func TestCatalog_ListServices_Stale(t *testing.T) {
c.PrimaryDatacenter = "dc1" // Enable ACLs!
c.ACLsEnabled = true
c.Bootstrap = false // Disable bootstrap
c.RPCHoldTimeout = 10 * time.Millisecond
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
Expand Down
79 changes: 49 additions & 30 deletions agent/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func testClientConfig(t *testing.T) (string, *Config) {
config.SerfLANConfig.MemberlistConfig.ProbeTimeout = 200 * time.Millisecond
config.SerfLANConfig.MemberlistConfig.ProbeInterval = time.Second
config.SerfLANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
config.RPCHoldTimeout = 10 * time.Second
return dir, config
}

Expand Down Expand Up @@ -531,7 +530,7 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
MaxStreams: 4,
TLSConfigurator: tls,
Datacenter: c.Datacenter,
Timeout: c.RPCHoldTimeout,
ReadTimeout: c.RPCReadTimeout,
DefaultQueryTime: c.DefaultQueryTime,
MaxQueryTime: c.MaxQueryTime,
}
Expand Down Expand Up @@ -882,7 +881,7 @@ func TestClient_RPC_Timeout(t *testing.T) {
_, c1 := testClientWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.NodeName = uniqueNodeName(t.Name())
c.RPCHoldTimeout = 10 * time.Millisecond
c.RPCReadTimeout = 10 * time.Millisecond
c.DefaultQueryTime = 100 * time.Millisecond
c.MaxQueryTime = 200 * time.Millisecond
})
Expand All @@ -896,33 +895,53 @@ func TestClient_RPC_Timeout(t *testing.T) {
})

// waiter will sleep for 101ms which is 1ms more than the DefaultQueryTime
require.NoError(t, s1.RegisterEndpoint("Wait", &waiter{duration: 101 * time.Millisecond}))
require.NoError(t, s1.RegisterEndpoint("Long", &waiter{duration: 101 * time.Millisecond}))
require.NoError(t, s1.RegisterEndpoint("Short", &waiter{duration: 5 * time.Millisecond}))

// Requests with QueryOptions have a default timeout of RPCHoldTimeout (10ms)
// so we expect the RPC call to timeout.
var out struct{}
err := c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")

// Blocking requests have a longer timeout (100ms) so this should pass since we
// add the maximum jitter which should be 16ms
out = struct{}{}
err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
},
}, &out)
require.NoError(t, err)
t.Run("non-blocking query times out after RPCReadTimeout", func(t *testing.T) {
// Requests with QueryOptions have a default timeout of RPCReadTimeout (10ms)
// so we expect the RPC call to timeout.
var out struct{}
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
})

// We pass in a custom MaxQueryTime (20ms) through QueryOptions which should fail
out = struct{}{}
err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
MaxQueryTime: 20 * time.Millisecond,
},
}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
t.Run("non-blocking query succeeds", func(t *testing.T) {
var out struct{}
require.NoError(t, c1.RPC("Short.Wait", &structs.NodeSpecificRequest{}, &out))
})

t.Run("check that deadline does not persist across calls", func(t *testing.T) {
var out struct{}
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
// We use structs.KVSRequest, which does not implement pool.BlockableQuery
// and should have no timeouts defined.
require.NoError(t, c1.RPC("Long.Wait", &structs.KVSRequest{}, &out))
})

t.Run("blocking query succeeds", func(t *testing.T) {
// Blocking requests have a longer timeout (100ms) so this should pass since we
// add the maximum jitter which should be 16ms
var out struct{}
require.NoError(t, c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
},
}, &out))
})

t.Run("blocking query with short MaxQueryTime fails", func(t *testing.T) {
var out struct{}
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
MaxQueryTime: 20 * time.Millisecond,
},
}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
})
}
7 changes: 7 additions & 0 deletions agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,13 @@ type Config struct {
// place, and a small jitter is applied to avoid a thundering herd.
RPCHoldTimeout time.Duration

// RPCReadTimeout limits how long a client is allowed to read from an RPC
// connection. This is used to set an upper bound for non-blocking queries to
// eventually terminate so that RPC connections are not held indefinitely.
// Blocking queries will use MaxQueryTime and DefaultQueryTime to calculate
// their own timeouts.
RPCReadTimeout time.Duration

// RPCRateLimit and RPCMaxBurst control how frequently RPC calls are allowed
// to happen. In any large enough time interval, rate limiter limits the
// rate to RPCRateLimit tokens per second, with a maximum burst size of
Expand Down
6 changes: 1 addition & 5 deletions agent/consul/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1378,14 +1378,10 @@ func (r isReadRequest) IsRead() bool {
return true
}

func (r isReadRequest) HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
func (r isReadRequest) HasTimedOut(_ time.Time, _, _, _ time.Duration) (bool, error) {
return false, nil
}

func (r isReadRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
return time.Duration(-1)
}

func TestRPC_AuthorizeRaftRPC(t *testing.T) {
caPEM, caPK, err := tlsutil.GenerateCA(tlsutil.CAOpts{Days: 5, Domain: "consul"})
require.NoError(t, err)
Expand Down
51 changes: 37 additions & 14 deletions agent/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/tlsutil"
)

Expand All @@ -31,7 +32,7 @@ type muxSession interface {

// streamClient is used to wrap a stream with an RPC client
type StreamClient struct {
stream *TimeoutConn
stream net.Conn
codec rpc.ClientCodec
}

Expand Down Expand Up @@ -100,6 +101,7 @@ func (c *Conn) getClient() (*StreamClient, error) {
}
c.clientLock.Unlock()
if front != nil {
fmt.Println("got cached client")
return front.Value.(*StreamClient), nil
}

Expand All @@ -109,14 +111,12 @@ func (c *Conn) getClient() (*StreamClient, error) {
return nil, err
}

timeoutStream := &TimeoutConn{Conn: stream, DefaultTimeout: c.pool.Timeout}

// Create the RPC client
codec := msgpackrpc.NewCodecFromHandle(true, true, timeoutStream, structs.MsgpackHandle)
codec := msgpackrpc.NewCodecFromHandle(true, true, stream, structs.MsgpackHandle)

// Return a new stream client
sc := &StreamClient{
stream: timeoutStream,
stream: stream,
codec: codec,
}
return sc, nil
Expand All @@ -133,7 +133,7 @@ func (c *Conn) returnClient(client *StreamClient) {

// If this is a Yamux stream, shrink the internal buffers so that
// we can GC the idle memory
if ys, ok := client.stream.Conn.(*yamux.Stream); ok {
if ys, ok := client.stream.(*yamux.Stream); ok {
ys.Shrink()
}
}
Expand Down Expand Up @@ -165,11 +165,12 @@ type ConnPool struct {
// TODO: consider refactoring to accept a full yamux.Config instead of a logger
Logger *log.Logger

// The default timeout for stream reads/writes
Timeout time.Duration
// The default timeout for non-blocking queries.
ReadTimeout time.Duration

// Used for calculating timeouts on RPC requests
MaxQueryTime time.Duration
// MaxQueryTime is used for calculating timeouts on blocking queries.
MaxQueryTime time.Duration
// DefaultQueryTime is used for calculating timeouts on blocking queries.
DefaultQueryTime time.Duration

// The maximum time to keep a connection open
Expand Down Expand Up @@ -364,7 +365,7 @@ func (p *ConnPool) dial(
tlsRPCType RPCType,
) (net.Conn, HalfCloser, error) {
// Try to dial the conn
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: p.Timeout}
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: DefaultDialTimeout}
conn, err := d.Dial("tcp", addr.String())
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -620,6 +621,17 @@ func (p *ConnPool) rpcInsecure(dc string, addr net.Addr, method string, args int
return nil
}

// BlockableQuery represents a read query which can be blocking or non-blocking.
// This interface is used to set an appropriate read timeout for rpc connections.
type BlockableQuery interface {
// BlockingTimeout returns duration > 0 if the query is blocking.
// Otherwise returns 0 for non-blocking queries.
BlockingTimeout(maxQueryTime, defaultQueryTime time.Duration) time.Duration
}

var _ BlockableQuery = (*structs.QueryOptions)(nil)
var _ BlockableQuery = (*pbcommon.QueryOptions)(nil)

func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, args interface{}, reply interface{}) error {
p.once.Do(p.init)

Expand All @@ -629,9 +641,20 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string,
return fmt.Errorf("rpc error getting client: %w", err)
}

// Use the zero value if the request doesn't implement RPCInfo
if info, ok := args.(structs.RPCInfo); ok {
sc.stream.FirstReadTimeout = info.Timeout(p.Timeout, p.MaxQueryTime, p.DefaultQueryTime)
var deadline time.Time
if info, ok := args.(BlockableQuery); ok {
// Timeout here is calculated differently based on blocking vs non-blocking query.
timeout := info.BlockingTimeout(p.MaxQueryTime, p.DefaultQueryTime)
if timeout <= 0 {
// must be non-blocking
timeout = p.ReadTimeout
}
if timeout > 0 {
deadline = time.Now().Add(timeout)
}
}
if err := sc.stream.SetReadDeadline(deadline); err != nil {
return fmt.Errorf("rpc error setting read deadline: %w", err)
}

// Make the RPC call
Expand Down

0 comments on commit 03c53f9

Please sign in to comment.