diff --git a/pkg/storage/BUILD.bazel b/pkg/storage/BUILD.bazel index 3531a3300f28..95a4add9cde1 100644 --- a/pkg/storage/BUILD.bazel +++ b/pkg/storage/BUILD.bazel @@ -173,6 +173,7 @@ go_test( "//pkg/util", "//pkg/util/admission", "//pkg/util/encoding", + "//pkg/util/envutil", "//pkg/util/hlc", "//pkg/util/iterutil", "//pkg/util/leaktest", diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index a71e2c163276..d761f319a5d1 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -93,16 +93,47 @@ var MaxConflictsPerLockConflictError = settings.RegisterIntSetting( settings.WithName("storage.mvcc.max_conflicts_per_lock_conflict_error"), ) -var rocksdbConcurrency = envutil.EnvOrDefaultInt( - "COCKROACH_ROCKSDB_CONCURRENCY", func() int { - // Use up to min(numCPU, 4) threads for background RocksDB compactions per - // store. - const max = 4 - if n := runtime.GOMAXPROCS(0); n <= max { - return n - } - return max - }()) +// getMaxConcurrentCompactions wraps the maxConcurrentCompactions env var in a +// func that may be installed on Options.MaxConcurrentCompactions. It also +// imposes a floor on the max, so that an engine is always created with at least +// 1 slot for a compactions. +// +// NB: This function inspects the environment every time it's called. This is +// okay, because Engine construction in NewPebble will invoke it and store the +// value on the Engine itself. +func getMaxConcurrentCompactions() int { + n := envutil.EnvOrDefaultInt( + "COCKROACH_CONCURRENT_COMPACTIONS", func() int { + // The old COCKROACH_ROCKSDB_CONCURRENCY environment variable was never + // documented, but customers were told about it and use today in + // production. We don't want to break them, so if the new env var + // is unset but COCKROACH_ROCKSDB_CONCURRENCY is set, use the old env + // var's value. This old env var has a wart in that it's expressed as a + // number of concurrency slots to make available to both flushes and + // compactions (a vestige of the corresponding RocksDB option's + // mechanics). We need to adjust it to be in terms of just compaction + // concurrency by subtracting the flushing routine's dedicated slot. + // + // TODO(jackson): Should envutil expose its `getEnv` internal func for + // cases like this where we actually want to know whether it's present + // or not; not just fallback to a default? + if oldV := envutil.EnvOrDefaultInt("COCKROACH_ROCKSDB_CONCURRENCY", 0); oldV > 0 { + return oldV - 1 + } + + // By default use up to min(numCPU-1, 3) threads for background + // compactions per store (reserving the final process for flushes). + const max = 3 + if n := runtime.GOMAXPROCS(0); n-1 < max { + return n - 1 + } + return max + }()) + if n < 1 { + return 1 + } + return n +} // l0SubLevelCompactionConcurrency is the sub-level threshold at which to // allow an increase in compaction concurrency. The maximum is still diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index e4f9a6a0fde7..ee78084359dd 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -600,23 +600,18 @@ const MinimumSupportedFormatVersion = pebble.FormatPrePebblev1Marked // DefaultPebbleOptions returns the default pebble options. func DefaultPebbleOptions() *pebble.Options { - // In RocksDB, the concurrency setting corresponds to both flushes and - // compactions. In Pebble, there is always a slot for a flush, and - // compactions are counted separately. - maxConcurrentCompactions := rocksdbConcurrency - 1 - if maxConcurrentCompactions < 1 { - maxConcurrentCompactions = 1 - } - opts := &pebble.Options{ Comparer: EngineComparer, FS: vfs.Default, // A value of 2 triggers a compaction when there is 1 sub-level. - L0CompactionThreshold: 2, - L0StopWritesThreshold: 1000, - LBaseMaxBytes: 64 << 20, // 64 MB - Levels: make([]pebble.LevelOptions, 7), - MaxConcurrentCompactions: func() int { return maxConcurrentCompactions }, + L0CompactionThreshold: 2, + L0StopWritesThreshold: 1000, + LBaseMaxBytes: 64 << 20, // 64 MB + Levels: make([]pebble.LevelOptions, 7), + // NB: Options.MaxConcurrentCompactions may be overidden in NewPebble to + // allow overriding the max at runtime through + // Engine.SetCompactionConcurrency. + MaxConcurrentCompactions: getMaxConcurrentCompactions, MemTableSize: 64 << 20, // 64 MB MemTableStopWritesThreshold: 4, Merger: MVCCMerger, @@ -1339,12 +1334,12 @@ func (p *Pebble) writePreventStartupFile(ctx context.Context, corruptionError er preventStartupMsg := fmt.Sprintf(`ATTENTION: - this node is terminating because of sstable corruption. - Corruption may be a consequence of a hardware error. + this node is terminating because of sstable corruption. + Corruption may be a consequence of a hardware error. - Error: %s + Error: %s - A file preventing this node from restarting was placed at: + A file preventing this node from restarting was placed at: %s`, corruptionError.Error(), path) if err := fs.WriteFile(p.unencryptedFS, path, []byte(preventStartupMsg)); err != nil { diff --git a/pkg/storage/pebble_test.go b/pkg/storage/pebble_test.go index 8b7ad8299652..e7f40e574c11 100644 --- a/pkg/storage/pebble_test.go +++ b/pkg/storage/pebble_test.go @@ -15,6 +15,7 @@ import ( "fmt" "math/rand" "path/filepath" + "runtime" "strings" "sync/atomic" "testing" @@ -28,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -1520,3 +1522,74 @@ func TestConvertFilesToBatchAndCommit(t *testing.T) { } require.Equal(t, outputState(engs[ingestEngine]), outputState(engs[batchEngine])) } + +func TestCompactionConcurrencyEnvVars(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + maxProcsBefore := runtime.GOMAXPROCS(0) + defer runtime.GOMAXPROCS(maxProcsBefore) + + type testCase struct { + maxProcs int + rocksDBConcurrency string + cockroachConcurrency string + want int + } + cases := []testCase{ + // Defaults + {32, "", "", 3}, + {4, "", "", 3}, + {3, "", "", 2}, + {2, "", "", 1}, + {1, "", "", 1}, + // Old COCKROACH_ROCKSDB_CONCURRENCY env var is set. The user-provided + // value includes 1 slot for flushes, so resulting compaction + // concurrency is n-1. + {32, "4", "", 3}, + {4, "4", "", 3}, + {2, "4", "", 3}, + {1, "4", "", 3}, + {32, "8", "", 7}, + {4, "8", "", 7}, + {2, "8", "", 7}, + {1, "8", "", 7}, + // New COCKROACH_CONCURRENT_COMPACTIONS env var is set. + {32, "", "4", 4}, + {4, "", "4", 4}, + {2, "", "4", 4}, + {1, "", "4", 4}, + {32, "", "8", 8}, + {4, "", "8", 8}, + {2, "", "8", 8}, + {1, "", "8", 8}, + // Both settings are set; COCKROACH_CONCURRENT_COMPACTIONS supersedes + // COCKROACH_ROCKSDB_CONCURRENCY. + {32, "8", "4", 4}, + {4, "1", "4", 4}, + {2, "2", "4", 4}, + {1, "5", "4", 4}, + {32, "1", "8", 8}, + {4, "2", "8", 8}, + {2, "4", "8", 8}, + {1, "1", "8", 8}, + } + for _, tc := range cases { + t.Run(fmt.Sprintf("GOMAXPROCS=%d,old=%q,new=%q", tc.maxProcs, tc.rocksDBConcurrency, tc.cockroachConcurrency), + func(t *testing.T) { + runtime.GOMAXPROCS(tc.maxProcs) + + if tc.rocksDBConcurrency == "" { + defer envutil.TestUnsetEnv(t, "COCKROACH_ROCKSDB_CONCURRENCY")() + } else { + defer envutil.TestSetEnv(t, "COCKROACH_ROCKSDB_CONCURRENCY", tc.rocksDBConcurrency)() + } + if tc.cockroachConcurrency == "" { + defer envutil.TestUnsetEnv(t, "COCKROACH_CONCURRENT_COMPACTIONS")() + } else { + defer envutil.TestSetEnv(t, "COCKROACH_CONCURRENT_COMPACTIONS", tc.cockroachConcurrency)() + } + require.Equal(t, tc.want, getMaxConcurrentCompactions()) + }) + } +} diff --git a/pkg/util/envutil/env.go b/pkg/util/envutil/env.go index ddb681a0bd6f..14c555045845 100644 --- a/pkg/util/envutil/env.go +++ b/pkg/util/envutil/env.go @@ -452,3 +452,23 @@ func TestSetEnv(t TB, name string, value string) func() { ClearEnvCache() } } + +// TestUnsetEnv unsets an environment variable and the cleanup function +// resets it to the original value. +func TestUnsetEnv(t TB, name string) func() { + t.Helper() + ClearEnvCache() + before, exists := os.LookupEnv(name) + if !exists { + return func() {} + } + if err := os.Unsetenv(name); err != nil { + t.Fatal(err) + } + return func() { + if err := os.Setenv(name, before); err != nil { + t.Fatal(err) + } + ClearEnvCache() + } +}