Skip to content

Commit

Permalink
Add vmstorageUserTimeout flags to configure TCP user timeout (Linux)
Browse files Browse the repository at this point in the history
`TCP_USER_TIMEOUT` (since Linux 2.6.37) specifies the maximum amount of
time that transmitted data may remain unacknowledged before TCP will
forcibly close the connection and return `ETIMEDOUT` to the application.

Setting a low TCP user timeout allows RPC connections quickly reroute
around unavailable storage nodes during network interruptions.
  • Loading branch information
wjordan committed Aug 28, 2023
1 parent 13ed621 commit 9bdaf0a
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 6 deletions.
6 changes: 5 additions & 1 deletion app/vminsert/netstorage/netstorage.go
Expand Up @@ -32,6 +32,10 @@ var (
disableRerouting = flag.Bool("disableRerouting", true, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster during rolling restarts and during spikes in series churn rate. See also -dropSamplesOnOverload")
dropSamplesOnOverload = flag.Bool("dropSamplesOnOverload", false, "Whether to drop incoming samples if the destination vmstorage node is overloaded and/or unavailable. This prioritizes cluster availability over consistency, e.g. the cluster continues accepting all the ingested samples, but some of them may be dropped if vmstorage nodes are temporarily unavailable and/or overloaded. The drop of samples happens before the replication, so it's not recommended to use this flag with -replicationFactor enabled.")
vmstorageDialTimeout = flag.Duration("vmstorageDialTimeout", 5*time.Second, "Timeout for establishing RPC connections from vminsert to vmstorage")
vmstorageUserTimeout = flag.Duration("vmstorageUserTimeout", 0, "TCP user timeout for RPC connections from vminsert to vmstorage (Linux only). "+
"When greater than 0, it specifies the maximum amount of time transmitted data may remain unacknowledged before the TCP connection is closed."+
"Setting a low TCP user timeout allows inserts to reroute around unresponsive storage nodes faster than the full insert timeout (at least 60 seconds)."+
"By default, this timeout is disabled.")
)

var errStorageReadOnly = errors.New("storage node is read only")
Expand Down Expand Up @@ -516,7 +520,7 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
addr += ":8400"
}
sn := &storageNode{
dialer: netutil.NewTCPDialer(ms, "vminsert", addr, *vmstorageDialTimeout),
dialer: netutil.NewTCPDialer(ms, "vminsert", addr, *vmstorageDialTimeout, *vmstorageUserTimeout),

stopCh: stopCh,

Expand Down
7 changes: 6 additions & 1 deletion app/vmselect/netstorage/netstorage.go
Expand Up @@ -42,6 +42,11 @@ var (
maxSamplesPerSeries = flag.Int("search.maxSamplesPerSeries", 30e6, "The maximum number of raw samples a single query can scan per each time series. See also -search.maxSamplesPerQuery")
maxSamplesPerQuery = flag.Int("search.maxSamplesPerQuery", 1e9, "The maximum number of raw samples a single query can process across all time series. This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries")
vmstorageDialTimeout = flag.Duration("vmstorageDialTimeout", 5*time.Second, "Timeout for establishing RPC connections from vmselect to vmstorage")
vmstorageUserTimeout = flag.Duration("vmstorageUserTimeout", 0, "TCP user timeout for RPC connections from vmselect to vmstorage (Linux only). "+
"When greater than 0, it specifies the maximum amount of time transmitted data may remain unacknowledged before the TCP connection is closed. "+
"Setting a low TCP user timeout allows queries to ignore unresponsive storage nodes faster than the max query duration. "+
"By default, this timeout is disabled. "+
"See also -search.maxQueryDuration")
)

// Result is a single timeseries result.
Expand Down Expand Up @@ -2753,7 +2758,7 @@ func newStorageNode(ms *metrics.Set, addr string) *storageNode {
addr += ":8401"
}
// There is no need in requests compression, since vmselect requests are usually very small.
connPool := netutil.NewConnPool(ms, "vmselect", addr, handshake.VMSelectClient, 0, *vmstorageDialTimeout)
connPool := netutil.NewConnPool(ms, "vmselect", addr, handshake.VMSelectClient, 0, *vmstorageDialTimeout, *vmstorageUserTimeout)

sn := &storageNode{
connPool: connPool,
Expand Down
4 changes: 2 additions & 2 deletions lib/netutil/conn_pool.go
Expand Up @@ -49,9 +49,9 @@ type connWithTimestamp struct {
// The compression is disabled if compressionLevel <= 0.
//
// Call ConnPool.MustStop when the returned ConnPool is no longer needed.
func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.Func, compressionLevel int, dialTimeout time.Duration) *ConnPool {
func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.Func, compressionLevel int, dialTimeout time.Duration, userTimeout time.Duration) *ConnPool {
cp := &ConnPool{
d: NewTCPDialer(ms, name, addr, dialTimeout),
d: NewTCPDialer(ms, name, addr, dialTimeout, userTimeout),
concurrentDialsCh: make(chan struct{}, 8),

name: name,
Expand Down
2 changes: 1 addition & 1 deletion lib/netutil/conn_pool_test.go
Expand Up @@ -44,7 +44,7 @@ func testConnPoolStartStop(t *testing.T, name string, ms *metrics.Set) {
var cps []*ConnPool
for i := 0; i < 5; i++ {
addr := fmt.Sprintf("host-%d", i)
cp := NewConnPool(ms, name, addr, handshake.VMSelectClient, compressLevel, dialTimeout)
cp := NewConnPool(ms, name, addr, handshake.VMSelectClient, compressLevel, dialTimeout, 0)
cps = append(cps, cp)
}
for _, cp := range cps {
Expand Down
14 changes: 13 additions & 1 deletion lib/netutil/tcpdialer.go
Expand Up @@ -3,6 +3,7 @@ package netutil
import (
"fmt"
"net"
"syscall"
"time"

"github.com/VictoriaMetrics/metrics"
Expand All @@ -12,7 +13,7 @@ import (
//
// The name is used in metric tags for the returned dialer.
// The name must be unique among dialers.
func NewTCPDialer(ms *metrics.Set, name, addr string, dialTimeout time.Duration) *TCPDialer {
func NewTCPDialer(ms *metrics.Set, name, addr string, dialTimeout time.Duration, userTimeout time.Duration) *TCPDialer {
d := &TCPDialer{
d: &net.Dialer{
Timeout: dialTimeout,
Expand All @@ -27,6 +28,17 @@ func NewTCPDialer(ms *metrics.Set, name, addr string, dialTimeout time.Duration)
dialErrors: ms.NewCounter(fmt.Sprintf(`vm_tcpdialer_errors_total{name=%q, addr=%q, type="dial"}`, name, addr)),
}
d.connMetrics.init(ms, "vm_tcpdialer", name, addr)
if userTimeout > 0 {
d.d.Control = func(network, address string, c syscall.RawConn) (err error) {
controlErr := c.Control(func(fd uintptr) {
err = setTCPUserTimeout(fd, userTimeout)
})
if controlErr != nil {
return controlErr
}
return err

Check warning on line 39 in lib/netutil/tcpdialer.go

View check run for this annotation

Codecov / codecov/patch

lib/netutil/tcpdialer.go#L32-L39

Added lines #L32 - L39 were not covered by tests
}
}
return d
}

Expand Down
12 changes: 12 additions & 0 deletions lib/netutil/tcpdialer_default.go
@@ -0,0 +1,12 @@
//go:build !linux
// +build !linux

package netutil

import (
"time"
)

func setTCPUserTimeout(fd uintptr, timeout time.Duration) error {
return nil
}
12 changes: 12 additions & 0 deletions lib/netutil/tcpdialer_linux.go
@@ -0,0 +1,12 @@
package netutil

import (
"golang.org/x/sys/unix"
"syscall"
"time"
)

func setTCPUserTimeout(fd uintptr, timeout time.Duration) error {
return syscall.SetsockoptInt(
int(fd), syscall.IPPROTO_TCP, unix.TCP_USER_TIMEOUT, int(timeout.Milliseconds()))

Check warning on line 11 in lib/netutil/tcpdialer_linux.go

View check run for this annotation

Codecov / codecov/patch

lib/netutil/tcpdialer_linux.go#L9-L11

Added lines #L9 - L11 were not covered by tests
}

0 comments on commit 9bdaf0a

Please sign in to comment.