Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
115472: admission: use compaction token lower bound when score is low r=aadityasondhi a=sumeerbhola

When the overload score is such that the rate shaping is attempting to give out tokens greater than the compaction bandwidth out of L0, we can have an interaction with Pebble compaction picking which deprioritizes compactions out of L0. This can cause tokens to drop to very low values, including as low as 0 tokens. This is bad for regular traffic, which is typically user facing. This problem was introduced when we started shaping below 20 sub-levels -- previously by the time L0 reached 20 sub-levels there were enough compactions happening out of L0.

By using a compaction token lower bound derived from the overall compaction bandwidth observed in the LSM, we ensure that there are sufficient admission tokens, even when the overload score is low.

Fixes #115373

Epic: none

Release note: None

115516: backup: use ingestion writer for backup data ssts r=dt a=dt

Release note: none.
Epic: none.

Co-authored-by: sumeerbhola <sumeer@cockroachlabs.com>
Co-authored-by: David Taylor <tinystatemachine@gmail.com>
  • Loading branch information
3 people committed Dec 4, 2023
3 parents 17f1017 + 7cab821 + 0eb8c0d commit b4c4e21
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 68 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/file_sst_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ func (s *fileSSTSink) open(ctx context.Context) error {
}
s.out = e
}
s.sst = storage.MakeBackupSSTWriter(ctx, s.dest.Settings(), s.out)
// TODO(dt): make ExternalStorage.Writer return objstorage.Writable.
s.sst = storage.MakeIngestionSSTWriter(ctx, s.dest.Settings(), storage.NoopFinishAbortWritable(s.out))

return nil
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/sst_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ var _ Writer = &SSTWriter{}
var _ ExportWriter = &SSTWriter{}
var _ InternalWriter = &SSTWriter{}

// NoopFinishAbortWritable wraps an io.Writer to make a objstorage.Writable that
// will ignore Finish and Abort calls.
func NoopFinishAbortWritable(w io.Writer) objstorage.Writable {
return &noopFinishAbort{Writer: w}
}

// noopFinishAbort is used to wrap io.Writers for sstable.Writer.
type noopFinishAbort struct {
io.Writer
Expand Down
110 changes: 92 additions & 18 deletions pkg/util/admission/io_load_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ type ioLoadListenerState struct {
bytesWritten uint64
incomingLSMBytes uint64
}
cumCompactionStats cumStoreCompactionStats

// Exponentially smoothed per interval values.

Expand Down Expand Up @@ -243,6 +244,56 @@ type ioLoadListenerState struct {
elasticDiskBWTokensAllocated int64
}

type cumStoreCompactionStats struct {
writeBytes uint64
// Not cumulative. This is the number of levels from which bytes will be
// moved out to lower levels via compactions.
numOutLevelsGauge int
}

func computeCumStoreCompactionStats(m *pebble.Metrics) cumStoreCompactionStats {
var compactedWriteBytes uint64
baseLevel := -1
for i := range m.Levels {
compactedWriteBytes += m.Levels[i].BytesCompacted
if i > 0 && m.Levels[i].Size > 0 && baseLevel < 0 {
baseLevel = i
}
}
if baseLevel < 0 {
baseLevel = len(m.Levels) - 1
}
return cumStoreCompactionStats{
writeBytes: compactedWriteBytes,
numOutLevelsGauge: len(m.Levels) - baseLevel,
}
}

// computeL0CompactionTokensLowerBound is used to give some tokens to L0 even
// if compactions out of L0 are not happening because the compaction score is
// not high enough. This allows admission until compaction score is high
// enough, at which point we stop using l0CompactionTokensLowerBound.
//
// See https://github.com/cockroachdb/cockroach/issues/115373 for motivation.
func computeL0CompactionTokensLowerBound(
prev cumStoreCompactionStats, cur cumStoreCompactionStats,
) int64 {
// All levels are expected to have similar write amp, so we expect that each
// will be given equal compaction resources. This isn't actually true if the
// levels have very different scores, but it suffices for the purpose of
// this lower bound. Consider an actual incident where L0 was not
// being compacted because it only had 5 sub-levels, and
// numOutLevelsGauge=6, and the aggregate compaction bandwidth was ~140MB/s
// distributed over 7 concurrent compactions. The aggregated compacted in 15
// seconds is 2100MB. So we return 2100MB/6 = 350MB of tokens.
intCompactedWriteBytes := int64(cur.writeBytes) - int64(prev.writeBytes)
if intCompactedWriteBytes < 0 {
// Defensive: ignore bogus stats.
intCompactedWriteBytes = 0
}
return intCompactedWriteBytes / int64(cur.numOutLevelsGauge)
}

const unlimitedTokens = math.MaxInt64

// Token changes are made at a coarse time granularity of 15s since
Expand Down Expand Up @@ -408,9 +459,11 @@ func (io *ioLoadListener) pebbleMetricsTick(ctx context.Context, metrics StoreMe
io.perWorkTokenEstimator.updateEstimates(metrics.Levels[0], cumLSMIngestedBytes, sas)
io.adjustTokensResult = adjustTokensResult{
ioLoadListenerState: ioLoadListenerState{
cumL0AddedBytes: m.Levels[0].BytesFlushed + m.Levels[0].BytesIngested,
curL0Bytes: m.Levels[0].Size,
cumWriteStallCount: metrics.WriteStallCount,
cumL0AddedBytes: m.Levels[0].BytesFlushed + m.Levels[0].BytesIngested,
curL0Bytes: m.Levels[0].Size,
cumWriteStallCount: metrics.WriteStallCount,
cumFlushWriteThroughput: metrics.Flush.WriteThroughput,
cumCompactionStats: computeCumStoreCompactionStats(metrics.Metrics),
// No initial limit, i.e, the first interval is unlimited.
totalNumByteTokens: unlimitedTokens,
totalNumElasticByteTokens: unlimitedTokens,
Expand All @@ -429,7 +482,6 @@ func (io *ioLoadListener) pebbleMetricsTick(ctx context.Context, metrics StoreMe
io.diskBW.bytesRead = metrics.DiskStats.BytesRead
io.diskBW.bytesWritten = metrics.DiskStats.BytesWritten
io.diskBW.incomingLSMBytes = cumLSMIncomingBytes
io.cumFlushWriteThroughput = metrics.Flush.WriteThroughput
io.copyAuxEtcFromPerWorkEstimator()

// Assume system starts off unloaded.
Expand Down Expand Up @@ -554,9 +606,10 @@ func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics
cumDiskBW := io.ioLoadListenerState.diskBW
wt := metrics.Flush.WriteThroughput
wt.Subtract(io.cumFlushWriteThroughput)
cumCompactionStats := computeCumStoreCompactionStats(metrics.Metrics)

res := io.adjustTokensInner(ctx, io.ioLoadListenerState,
metrics.Levels[0], metrics.WriteStallCount, wt,
metrics.Levels[0], metrics.WriteStallCount, cumCompactionStats, wt,
L0FileCountOverloadThreshold.Get(&io.settings.SV),
L0SubLevelCountOverloadThreshold.Get(&io.settings.SV),
L0MinimumSizePerSubLevel.Get(&io.settings.SV),
Expand Down Expand Up @@ -630,9 +683,10 @@ type adjustTokensAuxComputations struct {
intFlushUtilization float64
intWriteStalls int64

prevTokensUsed int64
prevTokensUsedByElasticWork int64
tokenKind tokenKind
prevTokensUsed int64
prevTokensUsedByElasticWork int64
tokenKind tokenKind
usedCompactionTokensLowerBound bool

perWorkTokensAux perWorkTokensAux
doLogFlush bool
Expand All @@ -650,6 +704,7 @@ func (io *ioLoadListener) adjustTokensInner(
prev ioLoadListenerState,
l0Metrics pebble.LevelMetrics,
cumWriteStallCount int64,
cumCompactionStats cumStoreCompactionStats,
flushWriteThroughput pebble.ThroughputMetric,
threshNumFiles, threshNumSublevels int64,
l0MinSizePerSubLevel int64,
Expand Down Expand Up @@ -682,6 +737,9 @@ func (io *ioLoadListener) adjustTokensInner(
intL0CompactedBytes = 0
}
io.l0CompactedBytes.Inc(intL0CompactedBytes)
l0CompactionTokensLowerBound := computeL0CompactionTokensLowerBound(
io.cumCompactionStats, cumCompactionStats)
usedCompactionTokensLowerBound := false

const alpha = 0.5

Expand Down Expand Up @@ -922,6 +980,10 @@ func (io *ioLoadListener) adjustTokensInner(
// smoothedIntL0CompactedBytes at 1, and 2 * smoothedIntL0CompactedBytes
// at 0.5.
fTotalNumByteTokens = -score*(2*float64(smoothedIntL0CompactedBytes)) + 3*float64(smoothedIntL0CompactedBytes)
if fTotalNumByteTokens < float64(l0CompactionTokensLowerBound) {
fTotalNumByteTokens = float64(l0CompactionTokensLowerBound)
usedCompactionTokensLowerBound = true
}
} else {
// Medium load. Score in [1, 2). We use linear interpolation from
// medium load to overload, to slowly give out fewer tokens as we
Expand Down Expand Up @@ -950,6 +1012,12 @@ func (io *ioLoadListener) adjustTokensInner(
// results show the sublevels hovering around 4, as expected.
//
// NB: at score >= 1.2 (12 sublevels), there are 0 elastic tokens.
//
// NB: we are not using l0CompactionTokensLowerBound at all for elastic
// tokens. This is because that lower bound is useful primarily when L0 is
// not seeing compactions because a compaction backlog has accumulated in
// other levels. For elastic work, we would rather clear that compaction
// backlog than admit some elastic work.
totalNumElasticByteTokens = int64(float64(smoothedIntL0CompactedBytes) *
(1.25 - 1.25*(score-0.2)))

Expand All @@ -974,6 +1042,7 @@ func (io *ioLoadListener) adjustTokensInner(
cumL0AddedBytes: cumL0AddedBytes,
curL0Bytes: curL0Bytes,
cumWriteStallCount: cumWriteStallCount,
cumCompactionStats: cumCompactionStats,
smoothedIntL0CompactedBytes: smoothedIntL0CompactedBytes,
smoothedCompactionByteTokens: smoothedCompactionByteTokens,
smoothedNumFlushTokens: smoothedNumFlushTokens,
Expand All @@ -986,15 +1055,16 @@ func (io *ioLoadListener) adjustTokensInner(
elasticByteTokensAllocated: 0,
},
aux: adjustTokensAuxComputations{
intL0AddedBytes: intL0AddedBytes,
intL0CompactedBytes: intL0CompactedBytes,
intFlushTokens: intFlushTokens,
intFlushUtilization: intFlushUtilization,
intWriteStalls: intWriteStalls,
prevTokensUsed: prev.byteTokensUsed,
prevTokensUsedByElasticWork: prev.byteTokensUsedByElasticWork,
tokenKind: tokenKind,
doLogFlush: doLogFlush,
intL0AddedBytes: intL0AddedBytes,
intL0CompactedBytes: intL0CompactedBytes,
intFlushTokens: intFlushTokens,
intFlushUtilization: intFlushUtilization,
intWriteStalls: intWriteStalls,
prevTokensUsed: prev.byteTokensUsed,
prevTokensUsedByElasticWork: prev.byteTokensUsedByElasticWork,
tokenKind: tokenKind,
usedCompactionTokensLowerBound: usedCompactionTokensLowerBound,
doLogFlush: doLogFlush,
},
ioThreshold: ioThreshold,
}
Expand Down Expand Up @@ -1063,7 +1133,11 @@ func (res adjustTokensResult) SafeFormat(p redact.SafePrinter, _ rune) {
// NB: res.smoothedCompactionByteTokens is the same as
// res.ioLoadListenerState.totalNumByteTokens (printed above) when
// res.aux.tokenKind == compactionTokenKind.
p.Printf(" due to L0 growth")
lowerBoundBoolStr := ""
if res.aux.usedCompactionTokensLowerBound {
lowerBoundBoolStr = "(used token lower bound)"
}
p.Printf(" due to L0 growth%s", lowerBoundBoolStr)
case flushTokenKind:
p.Printf(" due to memtable flush (multiplier %.3f)", res.flushUtilTargetFraction)
}
Expand Down
97 changes: 94 additions & 3 deletions pkg/util/admission/io_load_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/echotest"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/datadriven"
Expand All @@ -46,11 +48,11 @@ func TestIOLoadListener(t *testing.T) {

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
L0MinimumSizePerSubLevel.Override(ctx, &st.SV, 0)
datadriven.RunTest(t, datapathutils.TestDataPath(t, "io_load_listener"),
func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "init":
L0MinimumSizePerSubLevel.Override(ctx, &st.SV, 0)
ioll = &ioLoadListener{
settings: st,
kvRequester: req,
Expand Down Expand Up @@ -140,6 +142,14 @@ func TestIOLoadListener(t *testing.T) {
d.ScanArgs(t, "flush-work-sec", &flushWorkSec)
d.ScanArgs(t, "flush-idle-sec", &flushIdleSec)
}
if d.HasArg("base-level") {
var baseLevel int
d.ScanArgs(t, "base-level", &baseLevel)
metrics.Levels[baseLevel].Size = 1000
var compactedBytes int
d.ScanArgs(t, "compacted-bytes", &compactedBytes)
metrics.Levels[baseLevel].BytesCompacted = uint64(compactedBytes)
}

cumFlushIdle += time.Duration(flushIdleSec) * time.Second
cumFlushWork += time.Duration(flushWorkSec) * time.Second
Expand Down Expand Up @@ -295,8 +305,8 @@ func TestAdjustTokensInnerAndLogging(t *testing.T) {
l0TokensProduced: metric.NewCounter(l0TokensProduced),
}
res := ioll.adjustTokensInner(
ctx, tt.prev, tt.l0Metrics, 12, pebble.ThroughputMetric{},
100, 10, 0, 0.50)
ctx, tt.prev, tt.l0Metrics, 12, cumStoreCompactionStats{numOutLevelsGauge: 1},
pebble.ThroughputMetric{}, 100, 10, 0, 0.50)
buf.Printf("%s\n", res)
}
echotest.Require(t, string(redact.Sprint(buf)), filepath.Join(datapathutils.TestDataPath(t, "format_adjust_tokens_stats.txt")))
Expand Down Expand Up @@ -546,3 +556,84 @@ func TestTokenAllocationTicker(t *testing.T) {
ticker.adjustmentIntervalStartTime = timeutil.Now().Add(-17 * time.Second)
require.Equal(t, 0, int(ticker.remainingTicks()))
}

func TestComputeCumStoreCompactionStats(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

for _, tc := range []struct {
name string
baseLevel int
writeBytes int64
sizeBytes int64
expected cumStoreCompactionStats
}{
{
name: "base-l6-zero",
expected: cumStoreCompactionStats{
numOutLevelsGauge: 1,
},
},
{
name: "base-l6",
baseLevel: 6,
writeBytes: 50,
sizeBytes: 500,
expected: cumStoreCompactionStats{
writeBytes: 50,
numOutLevelsGauge: 1,
},
},
{
name: "base-l2",
baseLevel: 2,
writeBytes: 97,
sizeBytes: 397,
expected: cumStoreCompactionStats{
writeBytes: 97,
numOutLevelsGauge: 5,
},
},
} {
t.Run(tc.name, func(t *testing.T) {
m := pebble.Metrics{}
var cumSizeBytes int64
var cumWriteBytes uint64
divisor := int64(len(m.Levels) - tc.baseLevel)
for i := tc.baseLevel; i < len(m.Levels); i++ {
m.Levels[i].Size = tc.sizeBytes / divisor
cumSizeBytes += m.Levels[i].Size
m.Levels[i].BytesCompacted = uint64(tc.writeBytes / divisor)
cumWriteBytes += m.Levels[i].BytesCompacted
}
if cumSizeBytes < tc.sizeBytes {
m.Levels[tc.baseLevel].Size += tc.sizeBytes - cumSizeBytes
}
if cumWriteBytes < uint64(tc.writeBytes) {
m.Levels[tc.baseLevel].BytesCompacted += uint64(tc.writeBytes) - cumWriteBytes
}
require.Equal(t, tc.expected, computeCumStoreCompactionStats(&m))
})
}
}

func TestComputeL0CompactionTokensLowerBound(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.Equal(t, int64(1000), computeL0CompactionTokensLowerBound(cumStoreCompactionStats{
writeBytes: 3000,
numOutLevelsGauge: 1,
}, cumStoreCompactionStats{
writeBytes: 8000,
numOutLevelsGauge: 5,
}))

require.Equal(t, int64(0), computeL0CompactionTokensLowerBound(cumStoreCompactionStats{
writeBytes: 1000,
numOutLevelsGauge: 1,
}, cumStoreCompactionStats{
writeBytes: 500,
numOutLevelsGauge: 2,
}))
}
Loading

0 comments on commit b4c4e21

Please sign in to comment.