Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add vmstorageUserTimeout flags to configure TCP User Timeout on Linux #4423

Merged
merged 1 commit into from Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion app/vminsert/netstorage/netstorage.go
Expand Up @@ -32,6 +32,10 @@ var (
disableRerouting = flag.Bool("disableRerouting", true, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster during rolling restarts and during spikes in series churn rate. See also -dropSamplesOnOverload")
dropSamplesOnOverload = flag.Bool("dropSamplesOnOverload", false, "Whether to drop incoming samples if the destination vmstorage node is overloaded and/or unavailable. This prioritizes cluster availability over consistency, e.g. the cluster continues accepting all the ingested samples, but some of them may be dropped if vmstorage nodes are temporarily unavailable and/or overloaded. The drop of samples happens before the replication, so it's not recommended to use this flag with -replicationFactor enabled.")
vmstorageDialTimeout = flag.Duration("vmstorageDialTimeout", 5*time.Second, "Timeout for establishing RPC connections from vminsert to vmstorage")
vmstorageUserTimeout = flag.Duration("vmstorageUserTimeout", 0, "TCP user timeout for RPC connections from vminsert to vmstorage (Linux only). "+
"When greater than 0, it specifies the maximum amount of time transmitted data may remain unacknowledged before the TCP connection is closed."+
"Setting a low TCP user timeout allows inserts to reroute around unresponsive storage nodes faster than the full insert timeout (at least 60 seconds)."+
"By default, this timeout is disabled.")
)

var errStorageReadOnly = errors.New("storage node is read only")
Expand Down Expand Up @@ -516,7 +520,7 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
addr += ":8400"
}
sn := &storageNode{
dialer: netutil.NewTCPDialer(ms, "vminsert", addr, *vmstorageDialTimeout),
dialer: netutil.NewTCPDialer(ms, "vminsert", addr, *vmstorageDialTimeout, *vmstorageUserTimeout),

stopCh: stopCh,

Expand Down
7 changes: 6 additions & 1 deletion app/vmselect/netstorage/netstorage.go
Expand Up @@ -42,6 +42,11 @@ var (
maxSamplesPerSeries = flag.Int("search.maxSamplesPerSeries", 30e6, "The maximum number of raw samples a single query can scan per each time series. See also -search.maxSamplesPerQuery")
maxSamplesPerQuery = flag.Int("search.maxSamplesPerQuery", 1e9, "The maximum number of raw samples a single query can process across all time series. This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries")
vmstorageDialTimeout = flag.Duration("vmstorageDialTimeout", 5*time.Second, "Timeout for establishing RPC connections from vmselect to vmstorage")
vmstorageUserTimeout = flag.Duration("vmstorageUserTimeout", 0, "TCP user timeout for RPC connections from vmselect to vmstorage (Linux only). "+
"When greater than 0, it specifies the maximum amount of time transmitted data may remain unacknowledged before the TCP connection is closed. "+
"Setting a low TCP user timeout allows queries to ignore unresponsive storage nodes faster than the max query duration. "+
"By default, this timeout is disabled. "+
"See also -search.maxQueryDuration")
)

// Result is a single timeseries result.
Expand Down Expand Up @@ -2753,7 +2758,7 @@ func newStorageNode(ms *metrics.Set, addr string) *storageNode {
addr += ":8401"
}
// There is no need in requests compression, since vmselect requests are usually very small.
connPool := netutil.NewConnPool(ms, "vmselect", addr, handshake.VMSelectClient, 0, *vmstorageDialTimeout)
connPool := netutil.NewConnPool(ms, "vmselect", addr, handshake.VMSelectClient, 0, *vmstorageDialTimeout, *vmstorageUserTimeout)

sn := &storageNode{
connPool: connPool,
Expand Down
4 changes: 2 additions & 2 deletions lib/netutil/conn_pool.go
Expand Up @@ -49,9 +49,9 @@ type connWithTimestamp struct {
// The compression is disabled if compressionLevel <= 0.
//
// Call ConnPool.MustStop when the returned ConnPool is no longer needed.
func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.Func, compressionLevel int, dialTimeout time.Duration) *ConnPool {
func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.Func, compressionLevel int, dialTimeout time.Duration, userTimeout time.Duration) *ConnPool {
cp := &ConnPool{
d: NewTCPDialer(ms, name, addr, dialTimeout),
d: NewTCPDialer(ms, name, addr, dialTimeout, userTimeout),
concurrentDialsCh: make(chan struct{}, 8),

name: name,
Expand Down
2 changes: 1 addition & 1 deletion lib/netutil/conn_pool_test.go
Expand Up @@ -44,7 +44,7 @@ func testConnPoolStartStop(t *testing.T, name string, ms *metrics.Set) {
var cps []*ConnPool
for i := 0; i < 5; i++ {
addr := fmt.Sprintf("host-%d", i)
cp := NewConnPool(ms, name, addr, handshake.VMSelectClient, compressLevel, dialTimeout)
cp := NewConnPool(ms, name, addr, handshake.VMSelectClient, compressLevel, dialTimeout, 0)
cps = append(cps, cp)
}
for _, cp := range cps {
Expand Down
14 changes: 13 additions & 1 deletion lib/netutil/tcpdialer.go
Expand Up @@ -3,6 +3,7 @@
import (
"fmt"
"net"
"syscall"
"time"

"github.com/VictoriaMetrics/metrics"
Expand All @@ -12,7 +13,7 @@
//
// The name is used in metric tags for the returned dialer.
// The name must be unique among dialers.
func NewTCPDialer(ms *metrics.Set, name, addr string, dialTimeout time.Duration) *TCPDialer {
func NewTCPDialer(ms *metrics.Set, name, addr string, dialTimeout time.Duration, userTimeout time.Duration) *TCPDialer {
d := &TCPDialer{
d: &net.Dialer{
Timeout: dialTimeout,
Expand All @@ -27,6 +28,17 @@
dialErrors: ms.NewCounter(fmt.Sprintf(`vm_tcpdialer_errors_total{name=%q, addr=%q, type="dial"}`, name, addr)),
}
d.connMetrics.init(ms, "vm_tcpdialer", name, addr)
if userTimeout > 0 {
d.d.Control = func(network, address string, c syscall.RawConn) (err error) {
controlErr := c.Control(func(fd uintptr) {
err = setTCPUserTimeout(fd, userTimeout)
})
if controlErr != nil {
return controlErr
}
return err

Check warning on line 39 in lib/netutil/tcpdialer.go

View check run for this annotation

Codecov / codecov/patch

lib/netutil/tcpdialer.go#L32-L39

Added lines #L32 - L39 were not covered by tests
}
}
return d
}

Expand Down
12 changes: 12 additions & 0 deletions lib/netutil/tcpdialer_default.go
@@ -0,0 +1,12 @@
//go:build !linux
// +build !linux

package netutil

import (
"time"
)

func setTCPUserTimeout(fd uintptr, timeout time.Duration) error {
return nil
}
12 changes: 12 additions & 0 deletions lib/netutil/tcpdialer_linux.go
@@ -0,0 +1,12 @@
package netutil

import (
"golang.org/x/sys/unix"
"syscall"
"time"
)

func setTCPUserTimeout(fd uintptr, timeout time.Duration) error {
return syscall.SetsockoptInt(
int(fd), syscall.IPPROTO_TCP, unix.TCP_USER_TIMEOUT, int(timeout.Milliseconds()))

Check warning on line 11 in lib/netutil/tcpdialer_linux.go

View check run for this annotation

Codecov / codecov/patch

lib/netutil/tcpdialer_linux.go#L9-L11

Added lines #L9 - L11 were not covered by tests
}