Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

admission: adjust token computation during WAL failover #120135

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
194 changes: 130 additions & 64 deletions pkg/util/admission/io_load_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ var L0MinimumSizePerSubLevel = settings.RegisterIntSetting(
"when non-zero, this indicates the minimum size that is needed to count towards one sub-level",
5<<20, settings.NonNegativeInt)

var walFailoverUnlimitedTokens = settings.RegisterBoolSetting(
settings.SystemOnly,
"admission.wal.failover.unlimited_tokens.enabled",
"when true, during WAL failover, unlimited admission tokens are allocated",
false)

// Experimental observations:
// - Sub-level count of ~40 caused a node heartbeat latency p90, p99 of 2.5s,
// 4s. With a setting that limits sub-level count to 10, before the system
Expand Down Expand Up @@ -208,7 +214,8 @@ type ioLoadListenerState struct {
bytesWritten uint64
incomingLSMBytes uint64
}
cumCompactionStats cumStoreCompactionStats
cumCompactionStats cumStoreCompactionStats
cumWALSecondaryWriteDuration time.Duration

// Exponentially smoothed per interval values.

Expand Down Expand Up @@ -464,11 +471,12 @@ func (io *ioLoadListener) pebbleMetricsTick(ctx context.Context, metrics StoreMe
io.perWorkTokenEstimator.updateEstimates(metrics.Levels[0], cumLSMIngestedBytes, sas)
io.adjustTokensResult = adjustTokensResult{
ioLoadListenerState: ioLoadListenerState{
cumL0AddedBytes: m.Levels[0].BytesFlushed + m.Levels[0].BytesIngested,
curL0Bytes: m.Levels[0].Size,
cumWriteStallCount: metrics.WriteStallCount,
cumFlushWriteThroughput: metrics.Flush.WriteThroughput,
cumCompactionStats: computeCumStoreCompactionStats(metrics.Metrics),
cumL0AddedBytes: m.Levels[0].BytesFlushed + m.Levels[0].BytesIngested,
curL0Bytes: m.Levels[0].Size,
cumWriteStallCount: metrics.WriteStallCount,
cumFlushWriteThroughput: m.Flush.WriteThroughput,
cumCompactionStats: computeCumStoreCompactionStats(m),
cumWALSecondaryWriteDuration: m.WAL.Failover.SecondaryWriteDuration,
// No initial limit, i.e, the first interval is unlimited.
totalNumByteTokens: unlimitedTokens,
totalNumElasticByteTokens: unlimitedTokens,
Expand Down Expand Up @@ -618,7 +626,8 @@ func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics
cumCompactionStats := computeCumStoreCompactionStats(metrics.Metrics)

res := io.adjustTokensInner(ctx, io.ioLoadListenerState,
metrics.Levels[0], metrics.WriteStallCount, cumCompactionStats, wt,
metrics.Levels[0], metrics.WriteStallCount, cumCompactionStats,
metrics.WAL.Failover.SecondaryWriteDuration, wt,
L0FileCountOverloadThreshold.Get(&io.settings.SV),
L0SubLevelCountOverloadThreshold.Get(&io.settings.SV),
L0MinimumSizePerSubLevel.Get(&io.settings.SV),
Expand Down Expand Up @@ -692,6 +701,8 @@ type adjustTokensAuxComputations struct {
intFlushUtilization float64
intWriteStalls int64

intWALFailover bool

prevTokensUsed int64
prevTokensUsedByElasticWork int64
tokenKind tokenKind
Expand All @@ -714,6 +725,7 @@ func (io *ioLoadListener) adjustTokensInner(
l0Metrics pebble.LevelMetrics,
cumWriteStallCount int64,
cumCompactionStats cumStoreCompactionStats,
cumWALSecondaryWriteDuration time.Duration,
flushWriteThroughput pebble.ThroughputMetric,
threshNumFiles, threshNumSublevels int64,
l0MinSizePerSubLevel int64,
Expand Down Expand Up @@ -752,9 +764,43 @@ func (io *ioLoadListener) adjustTokensInner(

const alpha = 0.5

// Compaction scheduling can be uneven in prioritizing L0 for compactions,
// so smooth out what is being removed by compactions.
smoothedIntL0CompactedBytes := int64(alpha*float64(intL0CompactedBytes) + (1-alpha)*float64(prev.smoothedIntL0CompactedBytes))
// intWALFailover captures whether there were any writes to the secondary
// WAL location in the last interval. WAL failover indicates that the
// primary WAL location, which is also the location to which the store
// flushes and compacts, may be unhealthy. If it is unhealthy, flushes and
// compactions can stall, which can result in artificially low token counts
// for flushes and compactions, which can unnecessarily throttle work.
//
// We make the assumption that failover will be very aggressive compared to
// the interval at which this token computation is happening (15s). An
// UnhealthyOperationLatencyThreshold of 1s or lower means that an interval
// in which intWALFailover was false could at worst have had its last 1s
// have stalled flushes/compactions. So the throughput observed here will be
// 93.3% of what would have been possible with a healthy primary, which is
// considered acceptable.
//
// We also make the assumption that failback will be reasonably aggressive
// once the primary is considered healthy, say within 10s. So a disk stall
// in the primary that lasts 30s, will cause WAL failover for ~40s, and a
// disk stall for 1s will cause failover for ~11s. The latter (11s) is short
// enough that we could potentially allow unlimited tokens during failover.
// The concern is the former case, where unlimited tokens could result in
// excessive admission into L0. So the default behavior when intWALFailover
// is true is to (a) continue using the compaction tokens from before the
// failover, (b) not constrain flush tokens, (c) constrain elastic traffic
// to effectively 0 tokens. We allow this behavior to be overridden to have
// unlimited tokens.
intWALFailover := cumWALSecondaryWriteDuration-io.cumWALSecondaryWriteDuration > 0
var smoothedIntL0CompactedBytes int64
if intWALFailover {
// Reuse previous smoothed value.
smoothedIntL0CompactedBytes = prev.smoothedIntL0CompactedBytes
} else {
// Compaction scheduling can be uneven in prioritizing L0 for compactions,
// so smooth out what is being removed by compactions.
smoothedIntL0CompactedBytes = int64(alpha*float64(intL0CompactedBytes) +
(1-alpha)*float64(prev.smoothedIntL0CompactedBytes))
}

// Flush tokens:
//
Expand Down Expand Up @@ -886,7 +932,7 @@ func (io *ioLoadListener) adjustTokensInner(
doLogFlush := false
smoothedNumFlushTokens := prev.smoothedNumFlushTokens
const flushUtilIgnoreThreshold = 0.1
if intFlushUtilization > flushUtilIgnoreThreshold {
if intFlushUtilization > flushUtilIgnoreThreshold && !intWALFailover {
if smoothedNumFlushTokens == 0 {
// Initialization.
smoothedNumFlushTokens = intFlushTokens
Expand Down Expand Up @@ -927,14 +973,15 @@ func (io *ioLoadListener) adjustTokensInner(
doLogFlush = true
}
flushTokensFloat := flushUtilTargetFraction * smoothedNumFlushTokens
if flushTokensFloat < float64(math.MaxInt64) {
if flushTokensFloat < float64(unlimitedTokens) {
numFlushTokens = int64(flushTokensFloat)
}
// Else avoid overflow by using the previously set unlimitedTokens. This
// should not really happen.
}
// Else intFlushUtilization is too low. We don't want to make token
// determination based on a very low utilization, so we hand out unlimited
// Else intFlushUtilization is too low or WAL failover is active. We
// don't want to make token determination based on a very low utilization,
// or when flushes are stalled, so we hand out unlimited
// tokens. Note that flush utilization has been observed to fluctuate from
// 0.16 to 0.9 in a single interval, when compaction tokens are not limited,
// hence we have set flushUtilIgnoreThreshold to a very low value. If we've
Expand Down Expand Up @@ -967,47 +1014,54 @@ func (io *ioLoadListener) adjustTokensInner(
// Overload: Score is >= 2. We limit compaction tokens, and limit tokens to
// at most C/2 tokens.
if score < 0.5 {
// Underload. Maintain a smoothedCompactionByteTokens based on what was
// removed, so that when we go over the threshold we have some history.
// This is also useful when we temporarily dip below the threshold --
// we've seen extreme situations with alternating 15s intervals of above
// and below the threshold.
numTokens := intL0CompactedBytes
// Smooth it. This may seem peculiar since we are already using
// smoothedIntL0CompactedBytes, but the clauses below use different
// computations so we also want the history of smoothedCompactionByteTokens.
smoothedCompactionByteTokens = alpha*float64(numTokens) + (1-alpha)*prev.smoothedCompactionByteTokens
if intWALFailover {
smoothedCompactionByteTokens = prev.smoothedCompactionByteTokens
} else {
// Underload. Maintain a smoothedCompactionByteTokens based on what was
// removed, so that when we go over the threshold we have some history.
// This is also useful when we temporarily dip below the threshold --
// we've seen extreme situations with alternating 15s intervals of above
// and below the threshold.
numTokens := intL0CompactedBytes
// Smooth it. Unlike the else block below, we smooth using
// intL0CompactedBytes, to be more responsive.
smoothedCompactionByteTokens = alpha*float64(numTokens) + (1-alpha)*prev.smoothedCompactionByteTokens
}
totalNumByteTokens = unlimitedTokens
} else {
doLogFlush = true
var fTotalNumByteTokens float64
if score >= 2 {
// Overload.
//
// Don't admit more byte work than we can remove via compactions.
// totalNumByteTokens tracks our goal for admission. Scale down
// since we want to get under the thresholds over time.
fTotalNumByteTokens = float64(smoothedIntL0CompactedBytes / 2.0)
} else if score >= 0.5 && score < 1 {
// Low load. Score in [0.5, 1). Tokens should be
// smoothedIntL0CompactedBytes at 1, and 2 * smoothedIntL0CompactedBytes
// at 0.5.
fTotalNumByteTokens = -score*(2*float64(smoothedIntL0CompactedBytes)) + 3*float64(smoothedIntL0CompactedBytes)
if fTotalNumByteTokens < float64(l0CompactionTokensLowerBound) {
fTotalNumByteTokens = float64(l0CompactionTokensLowerBound)
usedCompactionTokensLowerBound = true
}
if intWALFailover {
smoothedCompactionByteTokens = prev.smoothedCompactionByteTokens
} else {
// Medium load. Score in [1, 2). We use linear interpolation from
// medium load to overload, to slowly give out fewer tokens as we
// move towards overload.
halfSmoothedBytes := float64(smoothedIntL0CompactedBytes / 2.0)
fTotalNumByteTokens = -score*halfSmoothedBytes + 3*halfSmoothedBytes
var fTotalNumByteTokens float64
if score >= 2 {
// Overload.
//
// Don't admit more byte work than we can remove via compactions.
// totalNumByteTokens tracks our goal for admission. Scale down
// since we want to get under the thresholds over time.
fTotalNumByteTokens = float64(smoothedIntL0CompactedBytes / 2.0)
} else if score >= 0.5 && score < 1 {
// Low load. Score in [0.5, 1). Tokens should be
// smoothedIntL0CompactedBytes at 1, and 2 * smoothedIntL0CompactedBytes
// at 0.5.
fTotalNumByteTokens = -score*(2*float64(smoothedIntL0CompactedBytes)) + 3*float64(smoothedIntL0CompactedBytes)
if fTotalNumByteTokens < float64(l0CompactionTokensLowerBound) {
fTotalNumByteTokens = float64(l0CompactionTokensLowerBound)
usedCompactionTokensLowerBound = true
}
} else {
// Medium load. Score in [1, 2). We use linear interpolation from
// medium load to overload, to slowly give out fewer tokens as we
// move towards overload.
halfSmoothedBytes := float64(smoothedIntL0CompactedBytes / 2.0)
fTotalNumByteTokens = -score*halfSmoothedBytes + 3*halfSmoothedBytes
}
smoothedCompactionByteTokens = alpha*fTotalNumByteTokens + (1-alpha)*prev.smoothedCompactionByteTokens
}
smoothedCompactionByteTokens = alpha*fTotalNumByteTokens + (1-alpha)*prev.smoothedCompactionByteTokens
if float64(math.MaxInt64) < smoothedCompactionByteTokens {
if float64(unlimitedTokens) < smoothedCompactionByteTokens {
// Avoid overflow. This should not really happen.
totalNumByteTokens = math.MaxInt64
totalNumByteTokens = unlimitedTokens
} else {
totalNumByteTokens = int64(smoothedCompactionByteTokens)
}
Expand All @@ -1019,21 +1073,24 @@ func (io *ioLoadListener) adjustTokensInner(
// means that we start shaping when there are 2 sublevels.
if score >= 0.2 {
doLogFlush = true
// Use a linear function with slope of -1.25 and compaction tokens of
// 1.25*compaction-bandwidth at score of 0.2. At a score of 0.6 (6
// sublevels) the tokens will be 0.75*compaction-bandwidth. Experimental
// results show the sublevels hovering around 4, as expected.
//
// NB: at score >= 1.2 (12 sublevels), there are 0 elastic tokens.
//
// NB: we are not using l0CompactionTokensLowerBound at all for elastic
// tokens. This is because that lower bound is useful primarily when L0 is
// not seeing compactions because a compaction backlog has accumulated in
// other levels. For elastic work, we would rather clear that compaction
// backlog than admit some elastic work.
totalNumElasticByteTokens = int64(float64(smoothedIntL0CompactedBytes) *
(1.25 - 1.25*(score-0.2)))

if intWALFailover {
totalNumElasticByteTokens = 1
} else {
// Use a linear function with slope of -1.25 and compaction tokens of
// 1.25*compaction-bandwidth at score of 0.2. At a score of 0.6 (6
// sublevels) the tokens will be 0.75*compaction-bandwidth. Experimental
// results show the sublevels hovering around 4, as expected.
//
// NB: at score >= 1.2 (12 sublevels), there are 0 elastic tokens.
//
// NB: we are not using l0CompactionTokensLowerBound at all for elastic
// tokens. This is because that lower bound is useful primarily when L0 is
// not seeing compactions because a compaction backlog has accumulated in
// other levels. For elastic work, we would rather clear that compaction
// backlog than admit some elastic work.
totalNumElasticByteTokens = int64(float64(smoothedIntL0CompactedBytes) *
(1.25 - 1.25*(score-0.2)))
}
totalNumElasticByteTokens = max(totalNumElasticByteTokens, 1)
}
// Use the minimum of the token count calculated using compactions and
Expand All @@ -1052,6 +1109,10 @@ func (io *ioLoadListener) adjustTokensInner(
if totalNumElasticByteTokens > totalNumByteTokens {
totalNumElasticByteTokens = totalNumByteTokens
}
if intWALFailover && walFailoverUnlimitedTokens.Get(&io.settings.SV) {
totalNumByteTokens = unlimitedTokens
totalNumElasticByteTokens = unlimitedTokens
}

io.l0TokensProduced.Inc(totalNumByteTokens)

Expand All @@ -1062,6 +1123,7 @@ func (io *ioLoadListener) adjustTokensInner(
curL0Bytes: curL0Bytes,
cumWriteStallCount: cumWriteStallCount,
cumCompactionStats: cumCompactionStats,
cumWALSecondaryWriteDuration: cumWALSecondaryWriteDuration,
smoothedIntL0CompactedBytes: smoothedIntL0CompactedBytes,
smoothedCompactionByteTokens: smoothedCompactionByteTokens,
smoothedNumFlushTokens: smoothedNumFlushTokens,
Expand All @@ -1079,6 +1141,7 @@ func (io *ioLoadListener) adjustTokensInner(
intFlushTokens: intFlushTokens,
intFlushUtilization: intFlushUtilization,
intWriteStalls: intWriteStalls,
intWALFailover: intWALFailover,
prevTokensUsed: prev.byteTokensUsed,
prevTokensUsedByElasticWork: prev.byteTokensUsedByElasticWork,
tokenKind: tokenKind,
Expand Down Expand Up @@ -1143,6 +1206,9 @@ func (res adjustTokensResult) SafeFormat(p redact.SafePrinter, _ rune) {
p.Printf("flushed %s [≈%s] (mult %.2f); ", ib(int64(res.aux.intFlushTokens)),
ib(int64(res.smoothedNumFlushTokens)), res.flushUtilTargetFraction)
p.Printf("admitting ")
if res.aux.intWALFailover {
p.Printf("(WAL failover) ")
}
if n, m := res.ioLoadListenerState.totalNumByteTokens,
res.ioLoadListenerState.totalNumElasticByteTokens; n < unlimitedTokens {
p.Printf("%s (rate %s/s) (elastic %s rate %s/s)", ib(n), ib(n/adjustmentInterval), ib(m),
Expand Down
20 changes: 19 additions & 1 deletion pkg/util/admission/io_load_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestIOLoadListener(t *testing.T) {
var ioll *ioLoadListener
var cumFlushBytes int64
var cumFlushWork, cumFlushIdle time.Duration
var cumWALSecondaryWriteDuration time.Duration

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
Expand All @@ -53,6 +54,7 @@ func TestIOLoadListener(t *testing.T) {
switch d.Cmd {
case "init":
L0MinimumSizePerSubLevel.Override(ctx, &st.SV, 0)
walFailoverUnlimitedTokens.Override(ctx, &st.SV, false)
ioll = &ioLoadListener{
settings: st,
kvRequester: req,
Expand All @@ -71,6 +73,7 @@ func TestIOLoadListener(t *testing.T) {
cumFlushBytes = 0
cumFlushWork = time.Duration(0)
cumFlushIdle = time.Duration(0)
cumWALSecondaryWriteDuration = 0
return ""

case "prep-admission-stats":
Expand Down Expand Up @@ -103,6 +106,14 @@ func TestIOLoadListener(t *testing.T) {
MinFlushUtilizationFraction.Override(ctx, &st.SV, float64(percent)/100)
return ""

case "set-unlimited-wal-failover-tokens":
unlimitedTokensEnabled := true
if d.HasArg("enabled") {
d.ScanArgs(t, "enabled", &unlimitedTokensEnabled)
}
walFailoverUnlimitedTokens.Override(ctx, &st.SV, unlimitedTokensEnabled)
return ""

case "set-min-size-per-sub-level":
var minSize int64
d.ScanArgs(t, "size", &minSize)
Expand Down Expand Up @@ -164,6 +175,13 @@ func TestIOLoadListener(t *testing.T) {
metrics.Flush.WriteThroughput.IdleDuration = cumFlushIdle
metrics.Flush.WriteThroughput.WorkDuration = cumFlushWork

if d.HasArg("wal-secondary-write-sec") {
var writeDurSec int
d.ScanArgs(t, "wal-secondary-write-sec", &writeDurSec)
cumWALSecondaryWriteDuration += time.Duration(writeDurSec) * time.Second
}
metrics.WAL.Failover.SecondaryWriteDuration = cumWALSecondaryWriteDuration

var writeStallCount int
if d.HasArg("write-stall-count") {
d.ScanArgs(t, "write-stall-count", &writeStallCount)
Expand Down Expand Up @@ -310,7 +328,7 @@ func TestAdjustTokensInnerAndLogging(t *testing.T) {
l0TokensProduced: metric.NewCounter(l0TokensProduced),
}
res := ioll.adjustTokensInner(
ctx, tt.prev, tt.l0Metrics, 12, cumStoreCompactionStats{numOutLevelsGauge: 1},
ctx, tt.prev, tt.l0Metrics, 12, cumStoreCompactionStats{numOutLevelsGauge: 1}, 0,
pebble.ThroughputMetric{}, 100, 10, 0, 0.50)
buf.Printf("%s\n", res)
}
Expand Down