Skip to content

Commit

Permalink
Merge #113052
Browse files Browse the repository at this point in the history
113052: storage: expose new compaction concurrency env var r=itsbilal a=jbowens

Add a new COCKROACH_COMPACTION_CONCURRENCY environment variable to control the
maximum number of concurrent compactions that a single store will schedule.
This environment variable will supersede the old COCKROACH_ROCKSDB_CONCURRENCY
environment variable which was undocumented and was unfortunately named.

Epic: none
Release note (ops change): Introduced a new documented environment variable
that allows an operator to configure the compaction concurrency.

Co-authored-by: Jackson Owens <jackson@cockroachlabs.com>
  • Loading branch information
craig[bot] and jbowens committed Oct 30, 2023
2 parents 0574c0c + 9230393 commit 047f45e
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 27 deletions.
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ go_test(
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/encoding",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/leaktest",
Expand Down
51 changes: 41 additions & 10 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,47 @@ var MaxConflictsPerLockConflictError = settings.RegisterIntSetting(
settings.WithName("storage.mvcc.max_conflicts_per_lock_conflict_error"),
)

var rocksdbConcurrency = envutil.EnvOrDefaultInt(
"COCKROACH_ROCKSDB_CONCURRENCY", func() int {
// Use up to min(numCPU, 4) threads for background RocksDB compactions per
// store.
const max = 4
if n := runtime.GOMAXPROCS(0); n <= max {
return n
}
return max
}())
// getMaxConcurrentCompactions wraps the maxConcurrentCompactions env var in a
// func that may be installed on Options.MaxConcurrentCompactions. It also
// imposes a floor on the max, so that an engine is always created with at least
// 1 slot for a compactions.
//
// NB: This function inspects the environment every time it's called. This is
// okay, because Engine construction in NewPebble will invoke it and store the
// value on the Engine itself.
func getMaxConcurrentCompactions() int {
n := envutil.EnvOrDefaultInt(
"COCKROACH_CONCURRENT_COMPACTIONS", func() int {
// The old COCKROACH_ROCKSDB_CONCURRENCY environment variable was never
// documented, but customers were told about it and use today in
// production. We don't want to break them, so if the new env var
// is unset but COCKROACH_ROCKSDB_CONCURRENCY is set, use the old env
// var's value. This old env var has a wart in that it's expressed as a
// number of concurrency slots to make available to both flushes and
// compactions (a vestige of the corresponding RocksDB option's
// mechanics). We need to adjust it to be in terms of just compaction
// concurrency by subtracting the flushing routine's dedicated slot.
//
// TODO(jackson): Should envutil expose its `getEnv` internal func for
// cases like this where we actually want to know whether it's present
// or not; not just fallback to a default?
if oldV := envutil.EnvOrDefaultInt("COCKROACH_ROCKSDB_CONCURRENCY", 0); oldV > 0 {
return oldV - 1
}

// By default use up to min(numCPU-1, 3) threads for background
// compactions per store (reserving the final process for flushes).
const max = 3
if n := runtime.GOMAXPROCS(0); n-1 < max {
return n - 1
}
return max
}())
if n < 1 {
return 1
}
return n
}

// l0SubLevelCompactionConcurrency is the sub-level threshold at which to
// allow an increase in compaction concurrency. The maximum is still
Expand Down
29 changes: 12 additions & 17 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,23 +600,18 @@ const MinimumSupportedFormatVersion = pebble.FormatPrePebblev1Marked

// DefaultPebbleOptions returns the default pebble options.
func DefaultPebbleOptions() *pebble.Options {
// In RocksDB, the concurrency setting corresponds to both flushes and
// compactions. In Pebble, there is always a slot for a flush, and
// compactions are counted separately.
maxConcurrentCompactions := rocksdbConcurrency - 1
if maxConcurrentCompactions < 1 {
maxConcurrentCompactions = 1
}

opts := &pebble.Options{
Comparer: EngineComparer,
FS: vfs.Default,
// A value of 2 triggers a compaction when there is 1 sub-level.
L0CompactionThreshold: 2,
L0StopWritesThreshold: 1000,
LBaseMaxBytes: 64 << 20, // 64 MB
Levels: make([]pebble.LevelOptions, 7),
MaxConcurrentCompactions: func() int { return maxConcurrentCompactions },
L0CompactionThreshold: 2,
L0StopWritesThreshold: 1000,
LBaseMaxBytes: 64 << 20, // 64 MB
Levels: make([]pebble.LevelOptions, 7),
// NB: Options.MaxConcurrentCompactions may be overidden in NewPebble to
// allow overriding the max at runtime through
// Engine.SetCompactionConcurrency.
MaxConcurrentCompactions: getMaxConcurrentCompactions,
MemTableSize: 64 << 20, // 64 MB
MemTableStopWritesThreshold: 4,
Merger: MVCCMerger,
Expand Down Expand Up @@ -1339,12 +1334,12 @@ func (p *Pebble) writePreventStartupFile(ctx context.Context, corruptionError er

preventStartupMsg := fmt.Sprintf(`ATTENTION:
this node is terminating because of sstable corruption.
Corruption may be a consequence of a hardware error.
this node is terminating because of sstable corruption.
Corruption may be a consequence of a hardware error.
Error: %s
Error: %s
A file preventing this node from restarting was placed at:
A file preventing this node from restarting was placed at:
%s`, corruptionError.Error(), path)

if err := fs.WriteFile(p.unencryptedFS, path, []byte(preventStartupMsg)); err != nil {
Expand Down
73 changes: 73 additions & 0 deletions pkg/storage/pebble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"math/rand"
"path/filepath"
"runtime"
"strings"
"sync/atomic"
"testing"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1520,3 +1522,74 @@ func TestConvertFilesToBatchAndCommit(t *testing.T) {
}
require.Equal(t, outputState(engs[ingestEngine]), outputState(engs[batchEngine]))
}

func TestCompactionConcurrencyEnvVars(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

maxProcsBefore := runtime.GOMAXPROCS(0)
defer runtime.GOMAXPROCS(maxProcsBefore)

type testCase struct {
maxProcs int
rocksDBConcurrency string
cockroachConcurrency string
want int
}
cases := []testCase{
// Defaults
{32, "", "", 3},
{4, "", "", 3},
{3, "", "", 2},
{2, "", "", 1},
{1, "", "", 1},
// Old COCKROACH_ROCKSDB_CONCURRENCY env var is set. The user-provided
// value includes 1 slot for flushes, so resulting compaction
// concurrency is n-1.
{32, "4", "", 3},
{4, "4", "", 3},
{2, "4", "", 3},
{1, "4", "", 3},
{32, "8", "", 7},
{4, "8", "", 7},
{2, "8", "", 7},
{1, "8", "", 7},
// New COCKROACH_CONCURRENT_COMPACTIONS env var is set.
{32, "", "4", 4},
{4, "", "4", 4},
{2, "", "4", 4},
{1, "", "4", 4},
{32, "", "8", 8},
{4, "", "8", 8},
{2, "", "8", 8},
{1, "", "8", 8},
// Both settings are set; COCKROACH_CONCURRENT_COMPACTIONS supersedes
// COCKROACH_ROCKSDB_CONCURRENCY.
{32, "8", "4", 4},
{4, "1", "4", 4},
{2, "2", "4", 4},
{1, "5", "4", 4},
{32, "1", "8", 8},
{4, "2", "8", 8},
{2, "4", "8", 8},
{1, "1", "8", 8},
}
for _, tc := range cases {
t.Run(fmt.Sprintf("GOMAXPROCS=%d,old=%q,new=%q", tc.maxProcs, tc.rocksDBConcurrency, tc.cockroachConcurrency),
func(t *testing.T) {
runtime.GOMAXPROCS(tc.maxProcs)

if tc.rocksDBConcurrency == "" {
defer envutil.TestUnsetEnv(t, "COCKROACH_ROCKSDB_CONCURRENCY")()
} else {
defer envutil.TestSetEnv(t, "COCKROACH_ROCKSDB_CONCURRENCY", tc.rocksDBConcurrency)()
}
if tc.cockroachConcurrency == "" {
defer envutil.TestUnsetEnv(t, "COCKROACH_CONCURRENT_COMPACTIONS")()
} else {
defer envutil.TestSetEnv(t, "COCKROACH_CONCURRENT_COMPACTIONS", tc.cockroachConcurrency)()
}
require.Equal(t, tc.want, getMaxConcurrentCompactions())
})
}
}
20 changes: 20 additions & 0 deletions pkg/util/envutil/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,3 +452,23 @@ func TestSetEnv(t TB, name string, value string) func() {
ClearEnvCache()
}
}

// TestUnsetEnv unsets an environment variable and the cleanup function
// resets it to the original value.
func TestUnsetEnv(t TB, name string) func() {
t.Helper()
ClearEnvCache()
before, exists := os.LookupEnv(name)
if !exists {
return func() {}
}
if err := os.Unsetenv(name); err != nil {
t.Fatal(err)
}
return func() {
if err := os.Setenv(name, before); err != nil {
t.Fatal(err)
}
ClearEnvCache()
}
}

0 comments on commit 047f45e

Please sign in to comment.