Skip to content

Commit

Permalink
Add timeout to Client RPC calls (#11500)
Browse files Browse the repository at this point in the history
Adds a timeout (deadline) to client RPC calls, so that streams will no longer hang indefinitely in unstable network conditions.

Co-authored-by: kisunji <ckim@hashicorp.com>
  • Loading branch information
2 people authored and hc-github-team-consul-core committed Apr 25, 2022
1 parent 73d70fa commit f94f53f
Show file tree
Hide file tree
Showing 13 changed files with 226 additions and 39 deletions.
4 changes: 4 additions & 0 deletions .changelog/11500.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```release-note:bugfix
rpc: Adds a deadline to client RPC calls, so that streams will no longer hang
indefinitely in unstable network conditions. [[GH-8504](https://github.com/hashicorp/consul/issues/8504)]
```
1 change: 1 addition & 0 deletions agent/consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1650,6 +1650,7 @@ func TestCatalog_ListServices_Stale(t *testing.T) {
c.PrimaryDatacenter = "dc1" // Enable ACLs!
c.ACLsEnabled = true
c.Bootstrap = false // Disable bootstrap
c.RPCHoldTimeout = 10 * time.Millisecond
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
Expand Down
18 changes: 12 additions & 6 deletions agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,20 +291,26 @@ TRY:
}

// Move off to another server, and see if we can retry.
c.logger.Error("RPC failed to server",
"method", method,
"server", server.Addr,
"error", rpcErr,
)
metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}})
manager.NotifyFailedServer(server)

// Use the zero value for RPCInfo if the request doesn't implement RPCInfo
info, _ := args.(structs.RPCInfo)
if retry := canRetry(info, rpcErr, firstCheck, c.config); !retry {
c.logger.Error("RPC failed to server",
"method", method,
"server", server.Addr,
"error", rpcErr,
)
metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}})
return rpcErr
}

c.logger.Warn("Retrying RPC to server",
"method", method,
"server", server.Addr,
"error", rpcErr,
)

// We can wait a bit and retry!
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)
select {
Expand Down
84 changes: 76 additions & 8 deletions agent/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func testClientConfig(t *testing.T) (string, *Config) {
config.SerfLANConfig.MemberlistConfig.ProbeTimeout = 200 * time.Millisecond
config.SerfLANConfig.MemberlistConfig.ProbeInterval = time.Second
config.SerfLANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
config.RPCHoldTimeout = 10 * time.Second
return dir, config
}

Expand All @@ -72,7 +73,7 @@ func testClientWithConfigWithErr(t *testing.T, cb func(c *Config)) (string, *Cli
}

// Apply config to copied fields because many tests only set the old
//values.
// values.
config.ACLResolverSettings.ACLsEnabled = config.ACLsEnabled
config.ACLResolverSettings.NodeName = config.NodeName
config.ACLResolverSettings.Datacenter = config.Datacenter
Expand Down Expand Up @@ -521,13 +522,16 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
resolver.Register(builder)

connPool := &pool.ConnPool{
Server: false,
SrcAddr: c.RPCSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
MaxTime: 2 * time.Minute,
MaxStreams: 4,
TLSConfigurator: tls,
Datacenter: c.Datacenter,
Server: false,
SrcAddr: c.RPCSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
MaxTime: 2 * time.Minute,
MaxStreams: 4,
TLSConfigurator: tls,
Datacenter: c.Datacenter,
Timeout: c.RPCHoldTimeout,
DefaultQueryTime: c.DefaultQueryTime,
MaxQueryTime: c.MaxQueryTime,
}

return Deps{
Expand Down Expand Up @@ -853,3 +857,67 @@ func TestClient_ShortReconnectTimeout(t *testing.T) {
50*time.Millisecond,
"The client node was not reaped within the alotted time")
}

type waiter struct {
duration time.Duration
}

func (w *waiter) Wait(struct{}, *struct{}) error {
time.Sleep(w.duration)
return nil
}

func TestClient_RPC_Timeout(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()

_, s1 := testServerWithConfig(t)

_, c1 := testClientWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.NodeName = uniqueNodeName(t.Name())
c.RPCHoldTimeout = 10 * time.Millisecond
c.DefaultQueryTime = 100 * time.Millisecond
c.MaxQueryTime = 200 * time.Millisecond
})
joinLAN(t, c1, s1)

retry.Run(t, func(r *retry.R) {
var out struct{}
if err := c1.RPC("Status.Ping", struct{}{}, &out); err != nil {
r.Fatalf("err: %v", err)
}
})

// waiter will sleep for 50ms
require.NoError(t, s1.RegisterEndpoint("Wait", &waiter{duration: 50 * time.Millisecond}))

// Requests with QueryOptions have a default timeout of RPCHoldTimeout (10ms)
// so we expect the RPC call to timeout.
var out struct{}
err := c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")

// Blocking requests have a longer timeout (100ms) so this should pass
out = struct{}{}
err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
},
}, &out)
require.NoError(t, err)

// We pass in a custom MaxQueryTime (20ms) through QueryOptions which should fail
out = struct{}{}
err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
MaxQueryTime: 20 * time.Millisecond,
},
}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
}
4 changes: 4 additions & 0 deletions agent/consul/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,10 @@ func (r isReadRequest) HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime
return false, nil
}

func (r isReadRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
return time.Duration(-1)
}

func TestRPC_AuthorizeRaftRPC(t *testing.T) {
caPEM, caPK, err := tlsutil.GenerateCA(tlsutil.CAOpts{Days: 5, Domain: "consul"})
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func testServerConfig(t *testing.T) (string, *Config) {

// TODO (slackpad) - We should be able to run all tests w/o this, but it
// looks like several depend on it.
config.RPCHoldTimeout = 5 * time.Second
config.RPCHoldTimeout = 10 * time.Second

config.ConnectEnabled = true
config.CAConfig = &structs.CAConfiguration{
Expand Down
54 changes: 49 additions & 5 deletions agent/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type muxSession interface {

// streamClient is used to wrap a stream with an RPC client
type StreamClient struct {
stream net.Conn
stream *TimeoutConn
codec rpc.ClientCodec
}

Expand All @@ -56,6 +56,36 @@ type Conn struct {
clientLock sync.Mutex
}

// TimeoutConn wraps net.Conn with a read timeout.
// When set, FirstReadTimeout only applies to the very next Read.
// DefaultTimeout is used for any other Read.
type TimeoutConn struct {
net.Conn
DefaultTimeout time.Duration
FirstReadTimeout time.Duration
}

func (c *TimeoutConn) Read(b []byte) (int, error) {
timeout := c.DefaultTimeout
// Apply timeout to first read then zero it out
if c.FirstReadTimeout > 0 {
timeout = c.FirstReadTimeout
c.FirstReadTimeout = 0
}
var deadline time.Time
if timeout > 0 {
deadline = time.Now().Add(timeout)
}
if err := c.Conn.SetReadDeadline(deadline); err != nil {
return 0, err
}
return c.Conn.Read(b)
}

func (c *TimeoutConn) Write(b []byte) (int, error) {
return c.Conn.Write(b)
}

func (c *Conn) Close() error {
return c.session.Close()
}
Expand All @@ -79,12 +109,14 @@ func (c *Conn) getClient() (*StreamClient, error) {
return nil, err
}

timeoutStream := &TimeoutConn{Conn: stream, DefaultTimeout: c.pool.Timeout}

// Create the RPC client
codec := msgpackrpc.NewCodecFromHandle(true, true, stream, structs.MsgpackHandle)
codec := msgpackrpc.NewCodecFromHandle(true, true, timeoutStream, structs.MsgpackHandle)

// Return a new stream client
sc := &StreamClient{
stream: stream,
stream: timeoutStream,
codec: codec,
}
return sc, nil
Expand All @@ -101,7 +133,7 @@ func (c *Conn) returnClient(client *StreamClient) {

// If this is a Yamux stream, shrink the internal buffers so that
// we can GC the idle memory
if ys, ok := client.stream.(*yamux.Stream); ok {
if ys, ok := client.stream.Conn.(*yamux.Stream); ok {
ys.Shrink()
}
}
Expand Down Expand Up @@ -133,6 +165,13 @@ type ConnPool struct {
// TODO: consider refactoring to accept a full yamux.Config instead of a logger
Logger *log.Logger

// The default timeout for stream reads/writes
Timeout time.Duration

// Used for calculating timeouts on RPC requests
MaxQueryTime time.Duration
DefaultQueryTime time.Duration

// The maximum time to keep a connection open
MaxTime time.Duration

Expand Down Expand Up @@ -325,7 +364,7 @@ func (p *ConnPool) dial(
tlsRPCType RPCType,
) (net.Conn, HalfCloser, error) {
// Try to dial the conn
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: DefaultDialTimeout}
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: p.Timeout}
conn, err := d.Dial("tcp", addr.String())
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -590,6 +629,11 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string,
return fmt.Errorf("rpc error getting client: %w", err)
}

// Use the zero value if the request doesn't implement RPCInfo
if info, ok := args.(structs.RPCInfo); ok {
sc.stream.FirstReadTimeout = info.Timeout(p.Timeout, p.MaxQueryTime, p.DefaultQueryTime)
}

// Make the RPC call
err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply)
if err != nil {
Expand Down
13 changes: 8 additions & 5 deletions agent/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,14 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil
}

pool := &pool.ConnPool{
Server: config.ServerMode,
SrcAddr: rpcSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
TLSConfigurator: tls,
Datacenter: config.Datacenter,
Server: config.ServerMode,
SrcAddr: rpcSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
TLSConfigurator: tls,
Datacenter: config.Datacenter,
Timeout: config.RPCHoldTimeout,
MaxQueryTime: config.MaxQueryTime,
DefaultQueryTime: config.DefaultQueryTime,
}
if config.ServerMode {
pool.MaxTime = 2 * time.Minute
Expand Down
25 changes: 17 additions & 8 deletions agent/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ import (
"github.com/golang/protobuf/ptypes/timestamp"

"github.com/golang/protobuf/proto"
ptypes "github.com/golang/protobuf/ptypes"
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/serf/coordinate"
"github.com/mitchellh/hashstructure"

"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"

ptypes "github.com/golang/protobuf/ptypes"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/api"
Expand Down Expand Up @@ -217,6 +215,7 @@ type RPCInfo interface {
TokenSecret() string
SetTokenSecret(string)
HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error)
Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration
}

// QueryOptions is used to specify various flags for read queries
Expand Down Expand Up @@ -315,18 +314,24 @@ func (q *QueryOptions) SetTokenSecret(s string) {
q.Token = s
}

func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
func (q QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
// Match logic in Server.blockingQuery.
if q.MinQueryIndex > 0 {
if q.MaxQueryTime > maxQueryTime {
q.MaxQueryTime = maxQueryTime
} else if q.MaxQueryTime <= 0 {
q.MaxQueryTime = defaultQueryTime
}
// Timeout after maximum jitter has elapsed.
q.MaxQueryTime += lib.RandomStagger(q.MaxQueryTime / JitterFraction)

return time.Since(start) > (q.MaxQueryTime + rpcHoldTimeout), nil
return q.MaxQueryTime + rpcHoldTimeout
}
return time.Since(start) > rpcHoldTimeout, nil
return rpcHoldTimeout
}

func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > q.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}

type WriteRequest struct {
Expand All @@ -353,7 +358,11 @@ func (w *WriteRequest) SetTokenSecret(s string) {
}

func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > rpcHoldTimeout, nil
return time.Since(start) > w.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}

func (w WriteRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
return rpcHoldTimeout
}

type QueryBackend int
Expand Down
Loading

0 comments on commit f94f53f

Please sign in to comment.