From 2b7b3293c1119cc993143c34f0945504cdb35575 Mon Sep 17 00:00:00 2001 From: Will Jordan Date: Tue, 29 Aug 2023 02:46:39 -0700 Subject: [PATCH] Add `vmstorageUserTimeout` flags to configure TCP user timeout (Linux) (#4423) `TCP_USER_TIMEOUT` (since Linux 2.6.37) specifies the maximum amount of time that transmitted data may remain unacknowledged before TCP will forcibly close the connection and return `ETIMEDOUT` to the application. Setting a low TCP user timeout allows RPC connections quickly reroute around unavailable storage nodes during network interruptions. --- app/vminsert/netstorage/netstorage.go | 6 +++++- app/vmselect/netstorage/netstorage.go | 7 ++++++- lib/netutil/conn_pool.go | 4 ++-- lib/netutil/conn_pool_test.go | 2 +- lib/netutil/tcpdialer.go | 14 +++++++++++++- lib/netutil/tcpdialer_default.go | 12 ++++++++++++ lib/netutil/tcpdialer_linux.go | 12 ++++++++++++ 7 files changed, 51 insertions(+), 6 deletions(-) create mode 100644 lib/netutil/tcpdialer_default.go create mode 100644 lib/netutil/tcpdialer_linux.go diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index 224c7cee9d75..4e5e0e0462c7 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -32,6 +32,10 @@ var ( disableRerouting = flag.Bool("disableRerouting", true, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster during rolling restarts and during spikes in series churn rate. See also -dropSamplesOnOverload") dropSamplesOnOverload = flag.Bool("dropSamplesOnOverload", false, "Whether to drop incoming samples if the destination vmstorage node is overloaded and/or unavailable. This prioritizes cluster availability over consistency, e.g. the cluster continues accepting all the ingested samples, but some of them may be dropped if vmstorage nodes are temporarily unavailable and/or overloaded. The drop of samples happens before the replication, so it's not recommended to use this flag with -replicationFactor enabled.") vmstorageDialTimeout = flag.Duration("vmstorageDialTimeout", 5*time.Second, "Timeout for establishing RPC connections from vminsert to vmstorage") + vmstorageUserTimeout = flag.Duration("vmstorageUserTimeout", 0, "TCP user timeout for RPC connections from vminsert to vmstorage (Linux only). "+ + "When greater than 0, it specifies the maximum amount of time transmitted data may remain unacknowledged before the TCP connection is closed."+ + "Setting a low TCP user timeout allows inserts to reroute around unresponsive storage nodes faster than the full insert timeout (at least 60 seconds)."+ + "By default, this timeout is disabled.") ) var errStorageReadOnly = errors.New("storage node is read only") @@ -516,7 +520,7 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket { addr += ":8400" } sn := &storageNode{ - dialer: netutil.NewTCPDialer(ms, "vminsert", addr, *vmstorageDialTimeout), + dialer: netutil.NewTCPDialer(ms, "vminsert", addr, *vmstorageDialTimeout, *vmstorageUserTimeout), stopCh: stopCh, diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 7e51d1661090..dfa8613831b6 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -42,6 +42,11 @@ var ( maxSamplesPerSeries = flag.Int("search.maxSamplesPerSeries", 30e6, "The maximum number of raw samples a single query can scan per each time series. See also -search.maxSamplesPerQuery") maxSamplesPerQuery = flag.Int("search.maxSamplesPerQuery", 1e9, "The maximum number of raw samples a single query can process across all time series. This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries") vmstorageDialTimeout = flag.Duration("vmstorageDialTimeout", 5*time.Second, "Timeout for establishing RPC connections from vmselect to vmstorage") + vmstorageUserTimeout = flag.Duration("vmstorageUserTimeout", 0, "TCP user timeout for RPC connections from vmselect to vmstorage (Linux only). "+ + "When greater than 0, it specifies the maximum amount of time transmitted data may remain unacknowledged before the TCP connection is closed. "+ + "Setting a low TCP user timeout allows queries to ignore unresponsive storage nodes faster than the max query duration. "+ + "By default, this timeout is disabled. "+ + "See also -search.maxQueryDuration") ) // Result is a single timeseries result. @@ -2753,7 +2758,7 @@ func newStorageNode(ms *metrics.Set, addr string) *storageNode { addr += ":8401" } // There is no need in requests compression, since vmselect requests are usually very small. - connPool := netutil.NewConnPool(ms, "vmselect", addr, handshake.VMSelectClient, 0, *vmstorageDialTimeout) + connPool := netutil.NewConnPool(ms, "vmselect", addr, handshake.VMSelectClient, 0, *vmstorageDialTimeout, *vmstorageUserTimeout) sn := &storageNode{ connPool: connPool, diff --git a/lib/netutil/conn_pool.go b/lib/netutil/conn_pool.go index bbb379dfd0cc..68fe770a1cf5 100644 --- a/lib/netutil/conn_pool.go +++ b/lib/netutil/conn_pool.go @@ -49,9 +49,9 @@ type connWithTimestamp struct { // The compression is disabled if compressionLevel <= 0. // // Call ConnPool.MustStop when the returned ConnPool is no longer needed. -func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.Func, compressionLevel int, dialTimeout time.Duration) *ConnPool { +func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.Func, compressionLevel int, dialTimeout time.Duration, userTimeout time.Duration) *ConnPool { cp := &ConnPool{ - d: NewTCPDialer(ms, name, addr, dialTimeout), + d: NewTCPDialer(ms, name, addr, dialTimeout, userTimeout), concurrentDialsCh: make(chan struct{}, 8), name: name, diff --git a/lib/netutil/conn_pool_test.go b/lib/netutil/conn_pool_test.go index 822991ffba01..5e40f8604a9e 100644 --- a/lib/netutil/conn_pool_test.go +++ b/lib/netutil/conn_pool_test.go @@ -44,7 +44,7 @@ func testConnPoolStartStop(t *testing.T, name string, ms *metrics.Set) { var cps []*ConnPool for i := 0; i < 5; i++ { addr := fmt.Sprintf("host-%d", i) - cp := NewConnPool(ms, name, addr, handshake.VMSelectClient, compressLevel, dialTimeout) + cp := NewConnPool(ms, name, addr, handshake.VMSelectClient, compressLevel, dialTimeout, 0) cps = append(cps, cp) } for _, cp := range cps { diff --git a/lib/netutil/tcpdialer.go b/lib/netutil/tcpdialer.go index 7b3614db6b19..33852ca0d915 100644 --- a/lib/netutil/tcpdialer.go +++ b/lib/netutil/tcpdialer.go @@ -3,6 +3,7 @@ package netutil import ( "fmt" "net" + "syscall" "time" "github.com/VictoriaMetrics/metrics" @@ -12,7 +13,7 @@ import ( // // The name is used in metric tags for the returned dialer. // The name must be unique among dialers. -func NewTCPDialer(ms *metrics.Set, name, addr string, dialTimeout time.Duration) *TCPDialer { +func NewTCPDialer(ms *metrics.Set, name, addr string, dialTimeout time.Duration, userTimeout time.Duration) *TCPDialer { d := &TCPDialer{ d: &net.Dialer{ Timeout: dialTimeout, @@ -27,6 +28,17 @@ func NewTCPDialer(ms *metrics.Set, name, addr string, dialTimeout time.Duration) dialErrors: ms.NewCounter(fmt.Sprintf(`vm_tcpdialer_errors_total{name=%q, addr=%q, type="dial"}`, name, addr)), } d.connMetrics.init(ms, "vm_tcpdialer", name, addr) + if userTimeout > 0 { + d.d.Control = func(network, address string, c syscall.RawConn) (err error) { + controlErr := c.Control(func(fd uintptr) { + err = setTCPUserTimeout(fd, userTimeout) + }) + if controlErr != nil { + return controlErr + } + return err + } + } return d } diff --git a/lib/netutil/tcpdialer_default.go b/lib/netutil/tcpdialer_default.go new file mode 100644 index 000000000000..cc338cf7bff7 --- /dev/null +++ b/lib/netutil/tcpdialer_default.go @@ -0,0 +1,12 @@ +//go:build !linux +// +build !linux + +package netutil + +import ( + "time" +) + +func setTCPUserTimeout(fd uintptr, timeout time.Duration) error { + return nil +} diff --git a/lib/netutil/tcpdialer_linux.go b/lib/netutil/tcpdialer_linux.go new file mode 100644 index 000000000000..6d125823d177 --- /dev/null +++ b/lib/netutil/tcpdialer_linux.go @@ -0,0 +1,12 @@ +package netutil + +import ( + "golang.org/x/sys/unix" + "syscall" + "time" +) + +func setTCPUserTimeout(fd uintptr, timeout time.Duration) error { + return syscall.SetsockoptInt( + int(fd), syscall.IPPROTO_TCP, unix.TCP_USER_TIMEOUT, int(timeout.Milliseconds())) +}