Skip to content

Commit

Permalink
admission: adjust token computation during WAL failover
Browse files Browse the repository at this point in the history
During WAL failover, possibly caused by a disk stall in the primary
location, flushes and compactions can also be stalled. This can cause
admission control to compute artificially low token counts for compaction
bandwidth out of L0 (if L0 has elevated score), and flush tokens (which
are meant to prevent memtable write stalls).

The solution outlined here detects WAL failover by looking at increases
in the pebble metric WAL.Failover.SecondaryWriteDuration. If an increase
happened in the last 15s interval (the token computation interval), the
current flush and compaction bytes are ignored for the purpose of
smoothing and therefore ignored for computing tokens. For regular work,
the previous smoothed compaction tokens continue to be used, and flush
tokens are unlimited. For elastic work, the tokens are reduced to near
zero. An alternative is to allow unlimited tokens during the stall, but
it runs the risk of over-admitting. We allow this alternative to be
configured by changing the cluster setting
admission.wal.failover.unlimited_tokens.enabled to true.

Informs cockroachdb/pebble#3230

Informs CRDB-35401

Epic: none

Release note (ops change): The cluster setting
admission.wal.failover.unlimited_tokens.enabled can be set to true to
cause unlimited admission tokens during WAL failover. This should not
be changed without consulting admission control team since the default,
which preserves the token counts from the preceding non-WAL-failover
interval, is expected to be safer.
  • Loading branch information
sumeerbhola committed Mar 8, 2024
1 parent 559994e commit 9f7a1f5
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 116 deletions.
194 changes: 130 additions & 64 deletions pkg/util/admission/io_load_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ var L0MinimumSizePerSubLevel = settings.RegisterIntSetting(
"when non-zero, this indicates the minimum size that is needed to count towards one sub-level",
5<<20, settings.NonNegativeInt)

var walFailoverUnlimitedTokens = settings.RegisterBoolSetting(
settings.SystemOnly,
"admission.wal.failover.unlimited_tokens.enabled",
"when true, during WAL failover, unlimited admission tokens are allocated",
false)

// Experimental observations:
// - Sub-level count of ~40 caused a node heartbeat latency p90, p99 of 2.5s,
// 4s. With a setting that limits sub-level count to 10, before the system
Expand Down Expand Up @@ -208,7 +214,8 @@ type ioLoadListenerState struct {
bytesWritten uint64
incomingLSMBytes uint64
}
cumCompactionStats cumStoreCompactionStats
cumCompactionStats cumStoreCompactionStats
cumWALSecondaryWriteDuration time.Duration

// Exponentially smoothed per interval values.

Expand Down Expand Up @@ -464,11 +471,12 @@ func (io *ioLoadListener) pebbleMetricsTick(ctx context.Context, metrics StoreMe
io.perWorkTokenEstimator.updateEstimates(metrics.Levels[0], cumLSMIngestedBytes, sas)
io.adjustTokensResult = adjustTokensResult{
ioLoadListenerState: ioLoadListenerState{
cumL0AddedBytes: m.Levels[0].BytesFlushed + m.Levels[0].BytesIngested,
curL0Bytes: m.Levels[0].Size,
cumWriteStallCount: metrics.WriteStallCount,
cumFlushWriteThroughput: metrics.Flush.WriteThroughput,
cumCompactionStats: computeCumStoreCompactionStats(metrics.Metrics),
cumL0AddedBytes: m.Levels[0].BytesFlushed + m.Levels[0].BytesIngested,
curL0Bytes: m.Levels[0].Size,
cumWriteStallCount: metrics.WriteStallCount,
cumFlushWriteThroughput: m.Flush.WriteThroughput,
cumCompactionStats: computeCumStoreCompactionStats(m),
cumWALSecondaryWriteDuration: m.WAL.Failover.SecondaryWriteDuration,
// No initial limit, i.e, the first interval is unlimited.
totalNumByteTokens: unlimitedTokens,
totalNumElasticByteTokens: unlimitedTokens,
Expand Down Expand Up @@ -618,7 +626,8 @@ func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics
cumCompactionStats := computeCumStoreCompactionStats(metrics.Metrics)

res := io.adjustTokensInner(ctx, io.ioLoadListenerState,
metrics.Levels[0], metrics.WriteStallCount, cumCompactionStats, wt,
metrics.Levels[0], metrics.WriteStallCount, cumCompactionStats,
metrics.WAL.Failover.SecondaryWriteDuration, wt,
L0FileCountOverloadThreshold.Get(&io.settings.SV),
L0SubLevelCountOverloadThreshold.Get(&io.settings.SV),
L0MinimumSizePerSubLevel.Get(&io.settings.SV),
Expand Down Expand Up @@ -692,6 +701,8 @@ type adjustTokensAuxComputations struct {
intFlushUtilization float64
intWriteStalls int64

intWALFailover bool

prevTokensUsed int64
prevTokensUsedByElasticWork int64
tokenKind tokenKind
Expand All @@ -714,6 +725,7 @@ func (io *ioLoadListener) adjustTokensInner(
l0Metrics pebble.LevelMetrics,
cumWriteStallCount int64,
cumCompactionStats cumStoreCompactionStats,
cumWALSecondaryWriteDuration time.Duration,
flushWriteThroughput pebble.ThroughputMetric,
threshNumFiles, threshNumSublevels int64,
l0MinSizePerSubLevel int64,
Expand Down Expand Up @@ -752,9 +764,43 @@ func (io *ioLoadListener) adjustTokensInner(

const alpha = 0.5

// Compaction scheduling can be uneven in prioritizing L0 for compactions,
// so smooth out what is being removed by compactions.
smoothedIntL0CompactedBytes := int64(alpha*float64(intL0CompactedBytes) + (1-alpha)*float64(prev.smoothedIntL0CompactedBytes))
// intWALFailover captures whether there were any writes to the secondary
// WAL location in the last interval. WAL failover indicates that the
// primary WAL location, which is also the location to which the store
// flushes and compacts, may be unhealthy. If it is unhealthy, flushes and
// compactions can stall, which can result in artificially low token counts
// for flushes and compactions, which can unnecessarily throttle work.
//
// We make the assumption that failover will be very aggressive compared to
// the interval at which this token computation is happening (15s). An
// UnhealthyOperationLatencyThreshold of 1s or lower means that an interval
// in which intWALFailover was false could at worst have had its last 1s
// have stalled flushes/compactions. So the throughput observed here will be
// 93.3% of what would have been possible with a healthy primary, which is
// considered acceptable.
//
// We also make the assumption that failback will be reasonably aggressive
// once the primary is considered healthy, say within 10s. So a disk stall
// in the primary that lasts 30s, will cause WAL failover for ~40s, and a
// disk stall for 1s will cause failover for ~11s. The latter (11s) is short
// enough that we could potentially allow unlimited tokens during failover.
// The concern is the former case, where unlimited tokens could result in
// excessive admission into L0. So the default behavior when intWALFailover
// is true is to (a) continue using the compaction tokens from before the
// failover, (b) not constrain flush tokens, (c) constrain elastic traffic
// to effectively 0 tokens. We allow this behavior to be overridden to have
// unlimited tokens.
intWALFailover := cumWALSecondaryWriteDuration-io.cumWALSecondaryWriteDuration > 0
var smoothedIntL0CompactedBytes int64
if intWALFailover {
// Reuse previous smoothed value.
smoothedIntL0CompactedBytes = prev.smoothedIntL0CompactedBytes
} else {
// Compaction scheduling can be uneven in prioritizing L0 for compactions,
// so smooth out what is being removed by compactions.
smoothedIntL0CompactedBytes = int64(alpha*float64(intL0CompactedBytes) +
(1-alpha)*float64(prev.smoothedIntL0CompactedBytes))
}

// Flush tokens:
//
Expand Down Expand Up @@ -886,7 +932,7 @@ func (io *ioLoadListener) adjustTokensInner(
doLogFlush := false
smoothedNumFlushTokens := prev.smoothedNumFlushTokens
const flushUtilIgnoreThreshold = 0.1
if intFlushUtilization > flushUtilIgnoreThreshold {
if intFlushUtilization > flushUtilIgnoreThreshold && !intWALFailover {
if smoothedNumFlushTokens == 0 {
// Initialization.
smoothedNumFlushTokens = intFlushTokens
Expand Down Expand Up @@ -927,14 +973,15 @@ func (io *ioLoadListener) adjustTokensInner(
doLogFlush = true
}
flushTokensFloat := flushUtilTargetFraction * smoothedNumFlushTokens
if flushTokensFloat < float64(math.MaxInt64) {
if flushTokensFloat < float64(unlimitedTokens) {
numFlushTokens = int64(flushTokensFloat)
}
// Else avoid overflow by using the previously set unlimitedTokens. This
// should not really happen.
}
// Else intFlushUtilization is too low. We don't want to make token
// determination based on a very low utilization, so we hand out unlimited
// Else intFlushUtilization is too low or WAL failover is active. We
// don't want to make token determination based on a very low utilization,
// or when flushes are stalled, so we hand out unlimited
// tokens. Note that flush utilization has been observed to fluctuate from
// 0.16 to 0.9 in a single interval, when compaction tokens are not limited,
// hence we have set flushUtilIgnoreThreshold to a very low value. If we've
Expand Down Expand Up @@ -967,47 +1014,54 @@ func (io *ioLoadListener) adjustTokensInner(
// Overload: Score is >= 2. We limit compaction tokens, and limit tokens to
// at most C/2 tokens.
if score < 0.5 {
// Underload. Maintain a smoothedCompactionByteTokens based on what was
// removed, so that when we go over the threshold we have some history.
// This is also useful when we temporarily dip below the threshold --
// we've seen extreme situations with alternating 15s intervals of above
// and below the threshold.
numTokens := intL0CompactedBytes
// Smooth it. This may seem peculiar since we are already using
// smoothedIntL0CompactedBytes, but the clauses below use different
// computations so we also want the history of smoothedCompactionByteTokens.
smoothedCompactionByteTokens = alpha*float64(numTokens) + (1-alpha)*prev.smoothedCompactionByteTokens
if intWALFailover {
smoothedCompactionByteTokens = prev.smoothedCompactionByteTokens
} else {
// Underload. Maintain a smoothedCompactionByteTokens based on what was
// removed, so that when we go over the threshold we have some history.
// This is also useful when we temporarily dip below the threshold --
// we've seen extreme situations with alternating 15s intervals of above
// and below the threshold.
numTokens := intL0CompactedBytes
// Smooth it. Unlike the else block below, we smooth using
// intL0CompactedBytes, to be more responsive.
smoothedCompactionByteTokens = alpha*float64(numTokens) + (1-alpha)*prev.smoothedCompactionByteTokens
}
totalNumByteTokens = unlimitedTokens
} else {
doLogFlush = true
var fTotalNumByteTokens float64
if score >= 2 {
// Overload.
//
// Don't admit more byte work than we can remove via compactions.
// totalNumByteTokens tracks our goal for admission. Scale down
// since we want to get under the thresholds over time.
fTotalNumByteTokens = float64(smoothedIntL0CompactedBytes / 2.0)
} else if score >= 0.5 && score < 1 {
// Low load. Score in [0.5, 1). Tokens should be
// smoothedIntL0CompactedBytes at 1, and 2 * smoothedIntL0CompactedBytes
// at 0.5.
fTotalNumByteTokens = -score*(2*float64(smoothedIntL0CompactedBytes)) + 3*float64(smoothedIntL0CompactedBytes)
if fTotalNumByteTokens < float64(l0CompactionTokensLowerBound) {
fTotalNumByteTokens = float64(l0CompactionTokensLowerBound)
usedCompactionTokensLowerBound = true
}
if intWALFailover {
smoothedCompactionByteTokens = prev.smoothedCompactionByteTokens
} else {
// Medium load. Score in [1, 2). We use linear interpolation from
// medium load to overload, to slowly give out fewer tokens as we
// move towards overload.
halfSmoothedBytes := float64(smoothedIntL0CompactedBytes / 2.0)
fTotalNumByteTokens = -score*halfSmoothedBytes + 3*halfSmoothedBytes
var fTotalNumByteTokens float64
if score >= 2 {
// Overload.
//
// Don't admit more byte work than we can remove via compactions.
// totalNumByteTokens tracks our goal for admission. Scale down
// since we want to get under the thresholds over time.
fTotalNumByteTokens = float64(smoothedIntL0CompactedBytes / 2.0)
} else if score >= 0.5 && score < 1 {
// Low load. Score in [0.5, 1). Tokens should be
// smoothedIntL0CompactedBytes at 1, and 2 * smoothedIntL0CompactedBytes
// at 0.5.
fTotalNumByteTokens = -score*(2*float64(smoothedIntL0CompactedBytes)) + 3*float64(smoothedIntL0CompactedBytes)
if fTotalNumByteTokens < float64(l0CompactionTokensLowerBound) {
fTotalNumByteTokens = float64(l0CompactionTokensLowerBound)
usedCompactionTokensLowerBound = true
}
} else {
// Medium load. Score in [1, 2). We use linear interpolation from
// medium load to overload, to slowly give out fewer tokens as we
// move towards overload.
halfSmoothedBytes := float64(smoothedIntL0CompactedBytes / 2.0)
fTotalNumByteTokens = -score*halfSmoothedBytes + 3*halfSmoothedBytes
}
smoothedCompactionByteTokens = alpha*fTotalNumByteTokens + (1-alpha)*prev.smoothedCompactionByteTokens
}
smoothedCompactionByteTokens = alpha*fTotalNumByteTokens + (1-alpha)*prev.smoothedCompactionByteTokens
if float64(math.MaxInt64) < smoothedCompactionByteTokens {
if float64(unlimitedTokens) < smoothedCompactionByteTokens {
// Avoid overflow. This should not really happen.
totalNumByteTokens = math.MaxInt64
totalNumByteTokens = unlimitedTokens
} else {
totalNumByteTokens = int64(smoothedCompactionByteTokens)
}
Expand All @@ -1019,21 +1073,24 @@ func (io *ioLoadListener) adjustTokensInner(
// means that we start shaping when there are 2 sublevels.
if score >= 0.2 {
doLogFlush = true
// Use a linear function with slope of -1.25 and compaction tokens of
// 1.25*compaction-bandwidth at score of 0.2. At a score of 0.6 (6
// sublevels) the tokens will be 0.75*compaction-bandwidth. Experimental
// results show the sublevels hovering around 4, as expected.
//
// NB: at score >= 1.2 (12 sublevels), there are 0 elastic tokens.
//
// NB: we are not using l0CompactionTokensLowerBound at all for elastic
// tokens. This is because that lower bound is useful primarily when L0 is
// not seeing compactions because a compaction backlog has accumulated in
// other levels. For elastic work, we would rather clear that compaction
// backlog than admit some elastic work.
totalNumElasticByteTokens = int64(float64(smoothedIntL0CompactedBytes) *
(1.25 - 1.25*(score-0.2)))

if intWALFailover {
totalNumElasticByteTokens = 1
} else {
// Use a linear function with slope of -1.25 and compaction tokens of
// 1.25*compaction-bandwidth at score of 0.2. At a score of 0.6 (6
// sublevels) the tokens will be 0.75*compaction-bandwidth. Experimental
// results show the sublevels hovering around 4, as expected.
//
// NB: at score >= 1.2 (12 sublevels), there are 0 elastic tokens.
//
// NB: we are not using l0CompactionTokensLowerBound at all for elastic
// tokens. This is because that lower bound is useful primarily when L0 is
// not seeing compactions because a compaction backlog has accumulated in
// other levels. For elastic work, we would rather clear that compaction
// backlog than admit some elastic work.
totalNumElasticByteTokens = int64(float64(smoothedIntL0CompactedBytes) *
(1.25 - 1.25*(score-0.2)))
}
totalNumElasticByteTokens = max(totalNumElasticByteTokens, 1)
}
// Use the minimum of the token count calculated using compactions and
Expand All @@ -1052,6 +1109,10 @@ func (io *ioLoadListener) adjustTokensInner(
if totalNumElasticByteTokens > totalNumByteTokens {
totalNumElasticByteTokens = totalNumByteTokens
}
if intWALFailover && walFailoverUnlimitedTokens.Get(&io.settings.SV) {
totalNumByteTokens = unlimitedTokens
totalNumElasticByteTokens = unlimitedTokens
}

io.l0TokensProduced.Inc(totalNumByteTokens)

Expand All @@ -1062,6 +1123,7 @@ func (io *ioLoadListener) adjustTokensInner(
curL0Bytes: curL0Bytes,
cumWriteStallCount: cumWriteStallCount,
cumCompactionStats: cumCompactionStats,
cumWALSecondaryWriteDuration: cumWALSecondaryWriteDuration,
smoothedIntL0CompactedBytes: smoothedIntL0CompactedBytes,
smoothedCompactionByteTokens: smoothedCompactionByteTokens,
smoothedNumFlushTokens: smoothedNumFlushTokens,
Expand All @@ -1079,6 +1141,7 @@ func (io *ioLoadListener) adjustTokensInner(
intFlushTokens: intFlushTokens,
intFlushUtilization: intFlushUtilization,
intWriteStalls: intWriteStalls,
intWALFailover: intWALFailover,
prevTokensUsed: prev.byteTokensUsed,
prevTokensUsedByElasticWork: prev.byteTokensUsedByElasticWork,
tokenKind: tokenKind,
Expand Down Expand Up @@ -1143,6 +1206,9 @@ func (res adjustTokensResult) SafeFormat(p redact.SafePrinter, _ rune) {
p.Printf("flushed %s [≈%s] (mult %.2f); ", ib(int64(res.aux.intFlushTokens)),
ib(int64(res.smoothedNumFlushTokens)), res.flushUtilTargetFraction)
p.Printf("admitting ")
if res.aux.intWALFailover {
p.Printf(" (WAL failover) ")
}
if n, m := res.ioLoadListenerState.totalNumByteTokens,
res.ioLoadListenerState.totalNumElasticByteTokens; n < unlimitedTokens {
p.Printf("%s (rate %s/s) (elastic %s rate %s/s)", ib(n), ib(n/adjustmentInterval), ib(m),
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/admission/io_load_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func TestAdjustTokensInnerAndLogging(t *testing.T) {
l0TokensProduced: metric.NewCounter(l0TokensProduced),
}
res := ioll.adjustTokensInner(
ctx, tt.prev, tt.l0Metrics, 12, cumStoreCompactionStats{numOutLevelsGauge: 1},
ctx, tt.prev, tt.l0Metrics, 12, cumStoreCompactionStats{numOutLevelsGauge: 1}, 0,
pebble.ThroughputMetric{}, 100, 10, 0, 0.50)
buf.Printf("%s\n", res)
}
Expand Down

0 comments on commit 9f7a1f5

Please sign in to comment.