Skip to content

Commit

Permalink
Merge pull request #98803 from irfansharif/backport22.2-98422
Browse files Browse the repository at this point in the history
release-22.2: kvserver: disable {split,replicate,mvccGC} queues until...
  • Loading branch information
irfansharif committed Mar 17, 2023
2 parents 0307883 + d169ac2 commit 4014ae2
Show file tree
Hide file tree
Showing 17 changed files with 151 additions and 50 deletions.
18 changes: 13 additions & 5 deletions pkg/kv/kvserver/client_spanconfigs_test.go
Expand Up @@ -13,6 +13,7 @@ package kvserver_test
import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -44,7 +45,8 @@ func TestSpanConfigUpdateAppliedToReplica(t *testing.T) {
cluster.MakeTestingClusterSettings(),
nil,
)
mockSubscriber := newMockSpanConfigSubscriber(spanConfigStore)
var t0 = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
mockSubscriber := newMockSpanConfigSubscriber(t0, spanConfigStore)

ctx := context.Background()

Expand Down Expand Up @@ -119,14 +121,20 @@ func TestFallbackSpanConfigNumReplicasOverride(t *testing.T) {
}

type mockSpanConfigSubscriber struct {
callback func(ctx context.Context, config roachpb.Span)
callback func(ctx context.Context, config roachpb.Span)
lastUpdated time.Time
spanconfig.Store
}

var _ spanconfig.KVSubscriber = &mockSpanConfigSubscriber{}

func newMockSpanConfigSubscriber(store spanconfig.Store) *mockSpanConfigSubscriber {
return &mockSpanConfigSubscriber{Store: store}
func newMockSpanConfigSubscriber(
lastUpdated time.Time, store spanconfig.Store,
) *mockSpanConfigSubscriber {
return &mockSpanConfigSubscriber{
lastUpdated: lastUpdated,
Store: store,
}
}

func (m *mockSpanConfigSubscriber) NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool {
Expand All @@ -152,7 +160,7 @@ func (m *mockSpanConfigSubscriber) GetProtectionTimestamps(
}

func (m *mockSpanConfigSubscriber) LastUpdated() hlc.Timestamp {
panic("unimplemented")
return hlc.Timestamp{WallTime: m.lastUpdated.UnixNano()}
}

func (m *mockSpanConfigSubscriber) Subscribe(callback func(context.Context, roachpb.Span)) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/consistency_queue.go
Expand Up @@ -113,7 +113,7 @@ func newConsistencyQueue(store *Store) *consistencyQueue {
queueConfig{
maxSize: defaultQueueMaxSize,
needsLease: true,
needsSystemConfig: false,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
successes: store.metrics.ConsistencyQueueSuccesses,
failures: store.metrics.ConsistencyQueueFailures,
Expand Down
27 changes: 27 additions & 0 deletions pkg/kv/kvserver/kvserverbase/base.go
Expand Up @@ -34,6 +34,33 @@ var MergeQueueEnabled = settings.RegisterBoolSetting(
true,
)

// ReplicateQueueEnabled is a setting that controls whether the replicate queue
// is enabled.
var ReplicateQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.replicate_queue.enabled",
"whether the replicate queue is enabled",
true,
)

// SplitQueueEnabled is a setting that controls whether the split queue is
// enabled.
var SplitQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.split_queue.enabled",
"whether the split queue is enabled",
true,
)

// MVCCGCQueueEnabled is a setting that controls whether the MVCC GC queue is
// enabled.
var MVCCGCQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.mvcc_gc_queue.enabled",
"whether the MVCC GC queue is enabled",
true,
)

// CmdIDKey is a Raft command id. This will be logged unredacted - keep it random.
type CmdIDKey string

Expand Down
11 changes: 1 addition & 10 deletions pkg/kv/kvserver/merge_queue.go
Expand Up @@ -114,7 +114,7 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue {
// factor.
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate, recoverySnapshotRate),
needsLease: true,
needsSystemConfig: true,
needsSpanConfigs: true,
acceptsUnsplitRanges: false,
successes: store.metrics.MergeQueueSuccesses,
failures: store.metrics.MergeQueueFailures,
Expand All @@ -127,15 +127,6 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue {
}

func (mq *mergeQueue) enabled() bool {
if !mq.store.cfg.SpanConfigsDisabled {
if mq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() {
// If we don't have any span configs available, enabling range merges would
// be extremely dangerous -- we could collapse everything into a single
// range.
return false
}
}

st := mq.store.ClusterSettings()
return kvserverbase.MergeQueueEnabled.Get(&st.SV)
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/kv/kvserver/mvcc_gc_queue.go
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -175,7 +176,7 @@ func newMVCCGCQueue(store *Store) *mvccGCQueue {
queueConfig{
maxSize: defaultQueueMaxSize,
needsLease: true,
needsSystemConfig: true,
needsSpanConfigs: true,
acceptsUnsplitRanges: false,
processTimeoutFunc: func(st *cluster.Settings, _ replicaInQueue) time.Duration {
timeout := mvccGCQueueTimeout
Expand Down Expand Up @@ -229,13 +230,22 @@ func (r mvccGCQueueScore) String() string {
humanizeutil.IBytes(r.GCByteAge), humanizeutil.IBytes(r.ExpMinGCByteAgeReduction))
}

func (mgcq *mvccGCQueue) enabled() bool {
st := mgcq.store.ClusterSettings()
return kvserverbase.MVCCGCQueueEnabled.Get(&st.SV)
}

// shouldQueue determines whether a replica should be queued for garbage
// collection, and if so, at what priority. Returns true for shouldQ
// in the event that the cumulative ages of GC'able bytes or extant
// intents exceed thresholds.
func (mgcq *mvccGCQueue) shouldQueue(
ctx context.Context, _ hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader,
) (bool, float64) {
if !mgcq.enabled() {
return false, 0
}

// Consult the protected timestamp state to determine whether we can GC and
// the timestamp which can be used to calculate the score.
conf := repl.SpanConfig()
Expand Down Expand Up @@ -670,6 +680,11 @@ func (r *replicaGCer) GC(
func (mgcq *mvccGCQueue) process(
ctx context.Context, repl *Replica, _ spanconfig.StoreReader,
) (processed bool, err error) {
if !mgcq.enabled() {
log.VEventf(ctx, 2, "skipping mvcc gc: queue has been disabled")
return false, nil
}

// Lookup the descriptor and GC policy for the zone containing this key range.
desc, conf := repl.DescAndSpanConfig()

Expand Down
28 changes: 14 additions & 14 deletions pkg/kv/kvserver/queue.go
Expand Up @@ -321,13 +321,13 @@ type queueConfig struct {
// (if not already initialized) when deciding whether to process this
// replica.
needsRaftInitialized bool
// needsSystemConfig controls whether this queue requires a valid copy of the
// system config to operate on a replica. Not all queues require it, and it's
// needsSpanConfigs controls whether this queue requires a valid copy of the
// span configs to operate on a replica. Not all queues require it, and it's
// unsafe for certain queues to wait on it. For example, a raft snapshot may
// be needed in order to make it possible for the system config to become
// available (as observed in #16268), so the raft snapshot queue can't
// require the system config to already be available.
needsSystemConfig bool
// be needed in order to make it possible for the span config range to
// become available (as observed in #16268), so the raft snapshot queue
// can't require the span configs to already be available.
needsSpanConfigs bool
// acceptsUnsplitRanges controls whether this queue can process ranges that
// need to be split due to zone config settings. Ranges are checked before
// calling queueImpl.shouldQueue and queueImpl.process.
Expand Down Expand Up @@ -377,7 +377,7 @@ type queueConfig struct {
//
// Replicas are added asynchronously through `MaybeAddAsync` or `AddAsync`.
// MaybeAddAsync checks the various requirements selected by the queue config
// (needsSystemConfig, needsLease, acceptsUnsplitRanges) as well as the
// (needsSpanConfigs, needsLease, acceptsUnsplitRanges) as well as the
// queueImpl's `shouldQueue`. AddAsync does not check any of this and accept a
// priority directly instead of getting it from `shouldQueue`. These methods run
// with shared a maximum concurrency of `addOrMaybeAddSemSize`. If the maximum
Expand Down Expand Up @@ -472,9 +472,9 @@ func newBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *b
ambient := store.cfg.AmbientCtx
ambient.AddLogTag(name, nil)

if !cfg.acceptsUnsplitRanges && !cfg.needsSystemConfig {
if !cfg.acceptsUnsplitRanges && !cfg.needsSpanConfigs {
log.Fatalf(ambient.AnnotateCtx(context.Background()),
"misconfigured queue: acceptsUnsplitRanges=false requires needsSystemConfig=true; got %+v", cfg)
"misconfigured queue: acceptsUnsplitRanges=false requires needsSpanConfigs=true; got %+v", cfg)
}

bq := baseQueue{
Expand Down Expand Up @@ -638,12 +638,12 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
ctx = repl.AnnotateCtx(ctx)
// Load the system config if it's needed.
var confReader spanconfig.StoreReader
if bq.needsSystemConfig {
if bq.needsSpanConfigs {
var err error
confReader, err = bq.store.GetConfReader(ctx)
if err != nil {
if errors.Is(err, errSysCfgUnavailable) && log.V(1) {
log.Warningf(ctx, "unable to retrieve system config, skipping: %v", err)
if errors.Is(err, errSpanConfigsUnavailable) && log.V(1) {
log.Warningf(ctx, "unable to retrieve span configs, skipping: %v", err)
}
return
}
Expand Down Expand Up @@ -930,10 +930,10 @@ func (bq *baseQueue) recordProcessDuration(ctx context.Context, dur time.Duratio
func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) error {
// Load the system config if it's needed.
var confReader spanconfig.StoreReader
if bq.needsSystemConfig {
if bq.needsSpanConfigs {
var err error
confReader, err = bq.store.GetConfReader(ctx)
if errors.Is(err, errSysCfgUnavailable) {
if errors.Is(err, errSpanConfigsUnavailable) {
if log.V(1) {
log.Warningf(ctx, "unable to retrieve conf reader, skipping: %v", err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/queue_test.go
Expand Up @@ -98,7 +98,7 @@ func (tq *testQueueImpl) updateChan() <-chan time.Time {
func makeTestBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *baseQueue {
if !cfg.acceptsUnsplitRanges {
// Needed in order to pass the validation in newBaseQueue.
cfg.needsSystemConfig = true
cfg.needsSpanConfigs = true
}
cfg.successes = metric.NewCounter(metric.Metadata{Name: "processed"})
cfg.failures = metric.NewCounter(metric.Metadata{Name: "failures"})
Expand Down Expand Up @@ -579,7 +579,7 @@ func TestNeedsSystemConfig(t *testing.T) {
{
confReader, err := tc.store.GetConfReader(ctx)
require.Nil(t, confReader)
require.True(t, errors.Is(err, errSysCfgUnavailable))
require.True(t, errors.Is(err, errSpanConfigsUnavailable))
}

r, err := tc.store.GetReplica(1)
Expand All @@ -597,7 +597,7 @@ func TestNeedsSystemConfig(t *testing.T) {

// bqNeedsSysCfg will not add the replica or process it without a system config.
bqNeedsSysCfg := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{
needsSystemConfig: true,
needsSpanConfigs: true,
acceptsUnsplitRanges: true,
maxSize: 1,
})
Expand All @@ -623,7 +623,7 @@ func TestNeedsSystemConfig(t *testing.T) {
// Now check that a queue which doesn't require the system config can
// successfully add and process a replica.
bqNoSysCfg := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{
needsSystemConfig: false,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
maxSize: 1,
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_log_queue.go
Expand Up @@ -171,7 +171,7 @@ func newRaftLogQueue(store *Store, db *kv.DB) *raftLogQueue {
maxSize: defaultQueueMaxSize,
maxConcurrency: raftLogQueueConcurrency,
needsLease: false,
needsSystemConfig: false,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
successes: store.metrics.RaftLogQueueSuccesses,
failures: store.metrics.RaftLogQueueFailures,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_snapshot_queue.go
Expand Up @@ -50,7 +50,7 @@ func newRaftSnapshotQueue(store *Store) *raftSnapshotQueue {
// leaseholder. Operating on a replica without holding the lease is the
// reason Raft snapshots cannot be performed by the replicateQueue.
needsLease: false,
needsSystemConfig: false,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
processTimeoutFunc: makeRateLimitedTimeoutFunc(recoverySnapshotRate, rebalanceSnapshotRate),
successes: store.metrics.RaftSnapshotQueueSuccesses,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_gc_queue.go
Expand Up @@ -98,7 +98,7 @@ func newReplicaGCQueue(store *Store, db *kv.DB) *replicaGCQueue {
maxSize: defaultQueueMaxSize,
needsLease: false,
needsRaftInitialized: true,
needsSystemConfig: false,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
processDestroyedReplicas: true,
successes: store.metrics.ReplicaGCQueueSuccesses,
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_raftstorage.go
Expand Up @@ -725,9 +725,9 @@ func (r *Replica) updateRangeInfo(ctx context.Context, desc *roachpb.RangeDescri
// to different zones.
// Load the system config.
confReader, err := r.store.GetConfReader(ctx)
if errors.Is(err, errSysCfgUnavailable) {
// This could be before the system config was ever gossiped, or it
// expired. Let the gossip callback set the info.
if errors.Is(err, errSpanConfigsUnavailable) {
// This could be before the span config subscription was ever
// established.
log.Warningf(ctx, "unable to retrieve conf reader, cannot determine range MaxBytes")
return nil
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/kv/kvserver/replicate_queue.go
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
Expand Down Expand Up @@ -543,7 +544,7 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
queueConfig{
maxSize: defaultQueueMaxSize,
needsLease: true,
needsSystemConfig: true,
needsSpanConfigs: true,
acceptsUnsplitRanges: store.TestingKnobs().ReplicateQueueAcceptsUnsplit,
// The processing of the replicate queue often needs to send snapshots
// so we use the raftSnapshotQueueTimeoutFunc. This function sets a
Expand Down Expand Up @@ -589,9 +590,18 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
return rq
}

func (rq *replicateQueue) enabled() bool {
st := rq.store.ClusterSettings()
return kvserverbase.ReplicateQueueEnabled.Get(&st.SV)
}

func (rq *replicateQueue) shouldQueue(
ctx context.Context, now hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader,
) (shouldQueue bool, priority float64) {
if !rq.enabled() {
return false, 0
}

desc, conf := repl.DescAndSpanConfig()
action, priority := rq.allocator.ComputeAction(ctx, conf, desc)

Expand Down Expand Up @@ -668,6 +678,11 @@ func (rq *replicateQueue) shouldQueue(
func (rq *replicateQueue) process(
ctx context.Context, repl *Replica, confReader spanconfig.StoreReader,
) (processed bool, err error) {
if !rq.enabled() {
log.VEventf(ctx, 2, "skipping replication: queue has been disabled")
return false, nil
}

retryOpts := retry.Options{
InitialBackoff: 50 * time.Millisecond,
MaxBackoff: 1 * time.Second,
Expand Down

0 comments on commit 4014ae2

Please sign in to comment.