Skip to content

Commit

Permalink
Merge pull request #74456 from miretskiy/backport21.2-74222
Browse files Browse the repository at this point in the history
release-21.2: kv,rpc: Introduce dedicated rangefeed connection class.
  • Loading branch information
miretskiy authored Jan 6, 2022
2 parents 0c8df44 + 5cf23b8 commit 3880f42
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 13 deletions.
13 changes: 8 additions & 5 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func getSink(

switch {
case u.Scheme == changefeedbase.SinkSchemeNull:
return makeNullSink(sinkURL{URL: u})
return makeNullSink(sinkURL{URL: u}, m)
case u.Scheme == changefeedbase.SinkSchemeKafka:
return validateOptionsAndMakeSink(changefeedbase.KafkaValidOptions, func() (Sink, error) {
return makeKafkaSink(ctx, sinkURL{URL: u}, feedCfg.Targets, feedCfg.Opts, m)
Expand Down Expand Up @@ -359,12 +359,13 @@ func (s *bufferSink) Dial() error {
}

type nullSink struct {
ticker *time.Ticker
ticker *time.Ticker
metrics *sliMetrics
}

var _ Sink = (*nullSink)(nil)

func makeNullSink(u sinkURL) (Sink, error) {
func makeNullSink(u sinkURL, m *sliMetrics) (Sink, error) {
var pacer *time.Ticker
if delay := u.consumeParam(`delay`); delay != "" {
pace, err := time.ParseDuration(delay)
Expand All @@ -373,7 +374,7 @@ func makeNullSink(u sinkURL) (Sink, error) {
}
pacer = time.NewTicker(pace)
}
return &nullSink{ticker: pacer}, nil
return &nullSink{ticker: pacer, metrics: m}, nil
}

func (n *nullSink) pace(ctx context.Context) error {
Expand All @@ -397,7 +398,7 @@ func (n *nullSink) EmitRow(
r kvevent.Alloc,
) error {
defer r.Release(ctx)

defer n.metrics.recordEmittedMessages()(1, mvcc, len(key)+len(value), sinkDoesNotCompress)
if err := n.pace(ctx); err != nil {
return err
}
Expand All @@ -411,6 +412,7 @@ func (n *nullSink) EmitRow(
func (n *nullSink) EmitResolvedTimestamp(
ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
) error {
defer n.metrics.recordResolvedCallback()()
if err := n.pace(ctx); err != nil {
return err
}
Expand All @@ -423,6 +425,7 @@ func (n *nullSink) EmitResolvedTimestamp(

// Flush implements Sink interface.
func (n *nullSink) Flush(ctx context.Context) error {
defer n.metrics.recordFlushRequestCallback()()
if log.V(2) {
log.Info(ctx, "flushing")
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -41,6 +43,13 @@ type singleRangeInfo struct {
token rangecache.EvictionToken
}

var useDedicatedRangefeedConnectionClass = settings.RegisterBoolSetting(
"kv.rangefeed.use_dedicated_connection_class.enabled",
"uses dedicated connection when running rangefeeds",
util.ConstantWithMetamorphicTestBool(
"kv.rangefeed.use_dedicated_connection_class.enabled", false),
)

// RangeFeed divides a RangeFeed request on range boundaries and establishes a
// RangeFeed to each of the individual ranges. It streams back results on the
// provided channel.
Expand Down Expand Up @@ -368,7 +377,7 @@ func (ds *DistSender) singleRangeFeed(
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn)
// The RangeFeed is not used for system critical traffic so use a DefaultClass
// connection regardless of the range.
opts := SendOptions{class: rpc.DefaultClass}
opts := SendOptions{class: connectionClass(&ds.st.SV)}
transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas)
if err != nil {
return args.Timestamp, err
Expand Down Expand Up @@ -424,3 +433,10 @@ func (ds *DistSender) singleRangeFeed(
}
}
}

func connectionClass(sv *settings.Values) rpc.ConnectionClass {
if useDedicatedRangefeedConnectionClass.Get(sv) {
return rpc.RangefeedClass
}
return rpc.DefaultClass
}
7 changes: 5 additions & 2 deletions pkg/rpc/connection_class.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@ const (
DefaultClass ConnectionClass = iota
// SystemClass is the ConnectionClass used for system traffic.
SystemClass
// RangefeedClass is the ConnectionClass used for rangefeeds.
RangefeedClass

// NumConnectionClasses is the number of valid ConnectionClass values.
NumConnectionClasses int = iota
)

// connectionClassName maps classes to their name.
var connectionClassName = map[ConnectionClass]string{
DefaultClass: "default",
SystemClass: "system",
DefaultClass: "default",
SystemClass: "system",
RangefeedClass: "rangefeed",
}

// String implements the fmt.Stringer interface.
Expand Down
36 changes: 31 additions & 5 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,32 @@ const (
)

const (
defaultWindowSize = 65535
initialWindowSize = defaultWindowSize * 32 // for an RPC
defaultWindowSize = 65535
)

func getWindowSize(name string, c ConnectionClass, defaultSize int) int32 {
const maxWindowSize = defaultWindowSize * 32
s := envutil.EnvOrDefaultInt(name, defaultSize)
if s > maxWindowSize {
log.Warningf(context.Background(), "%s value too large; trimmed to %d", name, maxWindowSize)
s = maxWindowSize
}
if s <= defaultWindowSize {
log.Warningf(context.Background(),
"%s RPC will use dynamic window sizes due to %s value lower than %d", c, name, defaultSize)
}
return int32(s)
}

var (
// for an RPC
initialWindowSize = getWindowSize(
"COCKROACH_RPC_INITIAL_WINDOW_SIZE", DefaultClass, defaultWindowSize*32)
initialConnWindowSize = initialWindowSize * 16 // for a connection

// for RangeFeed RPC
rangefeedInitialWindowSize = getWindowSize(
"COCKROACH_RANGEFEED_RPC_INITIAL_WINDOW_SIZE", RangefeedClass, 2*defaultWindowSize /* 128K */)
)

// GRPC Dialer connection timeout. 20s matches default value that is
Expand Down Expand Up @@ -1016,9 +1039,12 @@ func (ctx *Context) grpcDialRaw(
Backoff: backoffConfig,
MinConnectTimeout: minConnectionTimeout}))
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(clientKeepalive))
dialOpts = append(dialOpts,
grpc.WithInitialWindowSize(initialWindowSize),
grpc.WithInitialConnWindowSize(initialConnWindowSize))
dialOpts = append(dialOpts, grpc.WithInitialConnWindowSize(initialConnWindowSize))
if class == RangefeedClass {
dialOpts = append(dialOpts, grpc.WithInitialWindowSize(rangefeedInitialWindowSize))
} else {
dialOpts = append(dialOpts, grpc.WithInitialWindowSize(initialWindowSize))
}

dialer := onlyOnceDialer{
redialChan: make(chan struct{}),
Expand Down

0 comments on commit 3880f42

Please sign in to comment.