Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: disable {split,replicate,mvccGC} queues until... #98422

Merged
merged 1 commit into from Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 15 additions & 6 deletions pkg/kv/kvserver/client_spanconfigs_test.go
Expand Up @@ -13,6 +13,7 @@ package kvserver_test
import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -42,7 +43,8 @@ func TestSpanConfigUpdateAppliedToReplica(t *testing.T) {
cluster.MakeTestingClusterSettings(),
nil,
)
mockSubscriber := newMockSpanConfigSubscriber(spanConfigStore)
var t0 = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
mockSubscriber := newMockSpanConfigSubscriber(t0, spanConfigStore)

ctx := context.Background()

Expand Down Expand Up @@ -106,7 +108,8 @@ func TestFallbackSpanConfigOverride(t *testing.T) {

st := cluster.MakeTestingClusterSettings()
spanConfigStore := spanconfigstore.New(roachpb.TestingDefaultSpanConfig(), st, nil)
mockSubscriber := newMockSpanConfigSubscriber(spanConfigStore)
var t0 = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
mockSubscriber := newMockSpanConfigSubscriber(t0, spanConfigStore)

ctx := context.Background()
args := base.TestServerArgs{
Expand Down Expand Up @@ -152,14 +155,20 @@ func TestFallbackSpanConfigOverride(t *testing.T) {
}

type mockSpanConfigSubscriber struct {
callback func(ctx context.Context, config roachpb.Span)
callback func(ctx context.Context, config roachpb.Span)
lastUpdated time.Time
spanconfig.Store
}

var _ spanconfig.KVSubscriber = &mockSpanConfigSubscriber{}

func newMockSpanConfigSubscriber(store spanconfig.Store) *mockSpanConfigSubscriber {
return &mockSpanConfigSubscriber{Store: store}
func newMockSpanConfigSubscriber(
lastUpdated time.Time, store spanconfig.Store,
) *mockSpanConfigSubscriber {
return &mockSpanConfigSubscriber{
lastUpdated: lastUpdated,
Store: store,
}
}

func (m *mockSpanConfigSubscriber) NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool {
Expand All @@ -185,7 +194,7 @@ func (m *mockSpanConfigSubscriber) GetProtectionTimestamps(
}

func (m *mockSpanConfigSubscriber) LastUpdated() hlc.Timestamp {
panic("unimplemented")
return hlc.Timestamp{WallTime: m.lastUpdated.UnixNano()}
}

func (m *mockSpanConfigSubscriber) Subscribe(callback func(context.Context, roachpb.Span)) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/consistency_queue.go
Expand Up @@ -93,7 +93,7 @@ func newConsistencyQueue(store *Store) *consistencyQueue {
queueConfig{
maxSize: defaultQueueMaxSize,
needsLease: true,
needsSystemConfig: false,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
successes: store.metrics.ConsistencyQueueSuccesses,
failures: store.metrics.ConsistencyQueueFailures,
Expand Down
27 changes: 27 additions & 0 deletions pkg/kv/kvserver/kvserverbase/base.go
Expand Up @@ -36,6 +36,33 @@ var MergeQueueEnabled = settings.RegisterBoolSetting(
true,
)

// ReplicateQueueEnabled is a setting that controls whether the replicate queue
// is enabled.
var ReplicateQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.replicate_queue.enabled",
"whether the replicate queue is enabled",
true,
)

// SplitQueueEnabled is a setting that controls whether the split queue is
// enabled.
var SplitQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.split_queue.enabled",
"whether the split queue is enabled",
true,
)

// MVCCGCQueueEnabled is a setting that controls whether the MVCC GC queue is
// enabled.
var MVCCGCQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.mvcc_gc_queue.enabled",
"whether the MVCC GC queue is enabled",
true,
)

// CmdIDKey is a Raft command id. This will be logged unredacted - keep it random.
type CmdIDKey string

Expand Down
11 changes: 1 addition & 10 deletions pkg/kv/kvserver/merge_queue.go
Expand Up @@ -116,7 +116,7 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue {
// factor.
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate, recoverySnapshotRate),
needsLease: true,
needsSystemConfig: true,
needsSpanConfigs: true,
acceptsUnsplitRanges: false,
successes: store.metrics.MergeQueueSuccesses,
failures: store.metrics.MergeQueueFailures,
Expand All @@ -129,15 +129,6 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue {
}

func (mq *mergeQueue) enabled() bool {
if !mq.store.cfg.SpanConfigsDisabled {
if mq.store.cfg.SpanConfigSubscriber.LastUpdated().IsEmpty() {
// If we don't have any span configs available, enabling range merges would
// be extremely dangerous -- we could collapse everything into a single
// range.
return false
}
}

st := mq.store.ClusterSettings()
return kvserverbase.MergeQueueEnabled.Get(&st.SV)
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/kv/kvserver/mvcc_gc_queue.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -178,7 +179,7 @@ func newMVCCGCQueue(store *Store) *mvccGCQueue {
queueConfig{
maxSize: defaultQueueMaxSize,
needsLease: true,
needsSystemConfig: true,
needsSpanConfigs: true,
acceptsUnsplitRanges: false,
processTimeoutFunc: func(st *cluster.Settings, _ replicaInQueue) time.Duration {
timeout := mvccGCQueueTimeout
Expand Down Expand Up @@ -232,13 +233,22 @@ func (r mvccGCQueueScore) String() string {
humanizeutil.IBytes(r.GCByteAge), humanizeutil.IBytes(r.ExpMinGCByteAgeReduction))
}

func (mgcq *mvccGCQueue) enabled() bool {
st := mgcq.store.ClusterSettings()
return kvserverbase.MVCCGCQueueEnabled.Get(&st.SV)
}

// shouldQueue determines whether a replica should be queued for garbage
// collection, and if so, at what priority. Returns true for shouldQ
// in the event that the cumulative ages of GC'able bytes or extant
// intents exceed thresholds.
func (mgcq *mvccGCQueue) shouldQueue(
ctx context.Context, _ hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader,
) (bool, float64) {
if !mgcq.enabled() {
return false, 0
}

// Consult the protected timestamp state to determine whether we can GC and
// the timestamp which can be used to calculate the score.
conf := repl.SpanConfig()
Expand Down Expand Up @@ -672,6 +682,11 @@ func (r *replicaGCer) GC(
func (mgcq *mvccGCQueue) process(
ctx context.Context, repl *Replica, _ spanconfig.StoreReader,
) (processed bool, err error) {
if !mgcq.enabled() {
log.VEventf(ctx, 2, "skipping mvcc gc: queue has been disabled")
return false, nil
}

// Record the CPU time processing the request for this replica. This is
// recorded regardless of errors that are encountered.
defer repl.MeasureReqCPUNanos(grunning.Time())
Expand Down
28 changes: 14 additions & 14 deletions pkg/kv/kvserver/queue.go
Expand Up @@ -322,13 +322,13 @@ type queueConfig struct {
// (if not already initialized) when deciding whether to process this
// replica.
needsRaftInitialized bool
// needsSystemConfig controls whether this queue requires a valid copy of the
// system config to operate on a replica. Not all queues require it, and it's
// needsSpanConfigs controls whether this queue requires a valid copy of the
// span configs to operate on a replica. Not all queues require it, and it's
// unsafe for certain queues to wait on it. For example, a raft snapshot may
// be needed in order to make it possible for the system config to become
// available (as observed in #16268), so the raft snapshot queue can't
// require the system config to already be available.
needsSystemConfig bool
// be needed in order to make it possible for the span config range to
// become available (as observed in #16268), so the raft snapshot queue
// can't require the span configs to already be available.
needsSpanConfigs bool
// acceptsUnsplitRanges controls whether this queue can process ranges that
// need to be split due to zone config settings. Ranges are checked before
// calling queueImpl.shouldQueue and queueImpl.process.
Expand Down Expand Up @@ -378,7 +378,7 @@ type queueConfig struct {
//
// Replicas are added asynchronously through `MaybeAddAsync` or `AddAsync`.
// MaybeAddAsync checks the various requirements selected by the queue config
// (needsSystemConfig, needsLease, acceptsUnsplitRanges) as well as the
// (needsSpanConfigs, needsLease, acceptsUnsplitRanges) as well as the
// queueImpl's `shouldQueue`. AddAsync does not check any of this and accept a
// priority directly instead of getting it from `shouldQueue`. These methods run
// with shared a maximum concurrency of `addOrMaybeAddSemSize`. If the maximum
Expand Down Expand Up @@ -473,9 +473,9 @@ func newBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *b
ambient := store.cfg.AmbientCtx
ambient.AddLogTag(name, nil)

if !cfg.acceptsUnsplitRanges && !cfg.needsSystemConfig {
if !cfg.acceptsUnsplitRanges && !cfg.needsSpanConfigs {
log.Fatalf(ambient.AnnotateCtx(context.Background()),
"misconfigured queue: acceptsUnsplitRanges=false requires needsSystemConfig=true; got %+v", cfg)
"misconfigured queue: acceptsUnsplitRanges=false requires needsSpanConfigs=true; got %+v", cfg)
}

bq := baseQueue{
Expand Down Expand Up @@ -639,12 +639,12 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
ctx = repl.AnnotateCtx(ctx)
// Load the system config if it's needed.
var confReader spanconfig.StoreReader
if bq.needsSystemConfig {
if bq.needsSpanConfigs {
var err error
confReader, err = bq.store.GetConfReader(ctx)
if err != nil {
if errors.Is(err, errSysCfgUnavailable) && log.V(1) {
log.Warningf(ctx, "unable to retrieve system config, skipping: %v", err)
if errors.Is(err, errSpanConfigsUnavailable) && log.V(1) {
log.Warningf(ctx, "unable to retrieve span configs, skipping: %v", err)
}
return
}
Expand Down Expand Up @@ -931,10 +931,10 @@ func (bq *baseQueue) recordProcessDuration(ctx context.Context, dur time.Duratio
func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) error {
// Load the system config if it's needed.
var confReader spanconfig.StoreReader
if bq.needsSystemConfig {
if bq.needsSpanConfigs {
var err error
confReader, err = bq.store.GetConfReader(ctx)
if errors.Is(err, errSysCfgUnavailable) {
if errors.Is(err, errSpanConfigsUnavailable) {
if log.V(1) {
log.Warningf(ctx, "unable to retrieve conf reader, skipping: %v", err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/queue_test.go
Expand Up @@ -98,7 +98,7 @@ func (tq *testQueueImpl) updateChan() <-chan time.Time {
func makeTestBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *baseQueue {
if !cfg.acceptsUnsplitRanges {
// Needed in order to pass the validation in newBaseQueue.
cfg.needsSystemConfig = true
cfg.needsSpanConfigs = true
}
cfg.successes = metric.NewCounter(metric.Metadata{Name: "processed"})
cfg.failures = metric.NewCounter(metric.Metadata{Name: "failures"})
Expand Down Expand Up @@ -579,7 +579,7 @@ func TestNeedsSystemConfig(t *testing.T) {
{
confReader, err := tc.store.GetConfReader(ctx)
require.Nil(t, confReader)
require.True(t, errors.Is(err, errSysCfgUnavailable))
require.True(t, errors.Is(err, errSpanConfigsUnavailable))
}

r, err := tc.store.GetReplica(1)
Expand All @@ -597,7 +597,7 @@ func TestNeedsSystemConfig(t *testing.T) {

// bqNeedsSysCfg will not add the replica or process it without a system config.
bqNeedsSysCfg := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{
needsSystemConfig: true,
needsSpanConfigs: true,
acceptsUnsplitRanges: true,
maxSize: 1,
})
Expand All @@ -623,7 +623,7 @@ func TestNeedsSystemConfig(t *testing.T) {
// Now check that a queue which doesn't require the system config can
// successfully add and process a replica.
bqNoSysCfg := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{
needsSystemConfig: false,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
maxSize: 1,
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_log_queue.go
Expand Up @@ -177,7 +177,7 @@ func newRaftLogQueue(store *Store, db *kv.DB) *raftLogQueue {
maxSize: defaultQueueMaxSize,
maxConcurrency: raftLogQueueConcurrency,
needsLease: false,
needsSystemConfig: false,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
successes: store.metrics.RaftLogQueueSuccesses,
failures: store.metrics.RaftLogQueueFailures,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_snapshot_queue.go
Expand Up @@ -50,7 +50,7 @@ func newRaftSnapshotQueue(store *Store) *raftSnapshotQueue {
// leaseholder. Operating on a replica without holding the lease is the
// reason Raft snapshots cannot be performed by the replicateQueue.
needsLease: false,
needsSystemConfig: false,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
processTimeoutFunc: makeRateLimitedTimeoutFunc(recoverySnapshotRate, rebalanceSnapshotRate),
successes: store.metrics.RaftSnapshotQueueSuccesses,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_gc_queue.go
Expand Up @@ -99,7 +99,7 @@ func newReplicaGCQueue(store *Store, db *kv.DB) *replicaGCQueue {
maxSize: defaultQueueMaxSize,
needsLease: false,
needsRaftInitialized: true,
needsSystemConfig: false,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
processDestroyedReplicas: true,
successes: store.metrics.ReplicaGCQueueSuccesses,
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_raftstorage.go
Expand Up @@ -358,9 +358,9 @@ func (r *Replica) updateRangeInfo(ctx context.Context, desc *roachpb.RangeDescri
// to different zones.
// Load the system config.
confReader, err := r.store.GetConfReader(ctx)
if errors.Is(err, errSysCfgUnavailable) {
// This could be before the system config was ever gossiped, or it
// expired. Let the gossip callback set the info.
if errors.Is(err, errSpanConfigsUnavailable) {
// This could be before the span config subscription was ever
// established.
log.Warningf(ctx, "unable to retrieve conf reader, cannot determine range MaxBytes")
return nil
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/kv/kvserver/replicate_queue.go
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -567,7 +568,7 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
queueConfig{
maxSize: defaultQueueMaxSize,
needsLease: true,
needsSystemConfig: true,
needsSpanConfigs: true,
acceptsUnsplitRanges: store.TestingKnobs().ReplicateQueueAcceptsUnsplit,
// The processing of the replicate queue often needs to send snapshots
// so we use the raftSnapshotQueueTimeoutFunc. This function sets a
Expand Down Expand Up @@ -613,9 +614,18 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
return rq
}

func (rq *replicateQueue) enabled() bool {
st := rq.store.ClusterSettings()
return kvserverbase.ReplicateQueueEnabled.Get(&st.SV)
}

func (rq *replicateQueue) shouldQueue(
ctx context.Context, now hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader,
) (shouldQueue bool, priority float64) {
if !rq.enabled() {
return false, 0
}

desc, conf := repl.DescAndSpanConfig()
action, priority := rq.allocator.ComputeAction(ctx, rq.storePool, conf, desc)

Expand Down Expand Up @@ -695,6 +705,11 @@ func (rq *replicateQueue) shouldQueue(
func (rq *replicateQueue) process(
ctx context.Context, repl *Replica, confReader spanconfig.StoreReader,
) (processed bool, err error) {
if !rq.enabled() {
log.VEventf(ctx, 2, "skipping replication: queue has been disabled")
return false, nil
}

retryOpts := retry.Options{
InitialBackoff: 50 * time.Millisecond,
MaxBackoff: 1 * time.Second,
Expand Down