Skip to content

Commit

Permalink
bulk: add pre-flush delay setting
Browse files Browse the repository at this point in the history
This is a very crude knob that can slow down all bulk data ingestion --
including IMPORTs, RESTOREs, Index Creation, etc -- by applying a fixed
delay before each batch of data buffered by the process that is producing
data is flushed out to the storage layer.

This is almost never the 'right' way to throttle a process to reduce its
impact on the cluster/nodes' ability to serve other traffic. It is
almost always going to be preferable to have whatever node is being sent
some request choose to delay or refuse that request, based on its own
idea of what resources that request will use and if they may be scarce
or at risk. For example, a node should delay ingesting an SST to allow
compactions to catch up, either on itself or its raft followers, if not
doing so might make future reads or writes slower (perhaps indirectly,
if followers become too slow).

However, while that may be true -- that a more granular limiter that is
closer to the limited resource is generally superior and should be
preferred -- it seems likely that having this blunt knob on the higher
level sending process can provide an operator an option of last resort,
if we discover, in production, that some operation is managing to impact
a cluster despite the efforts of existing more granular limiting. In
these cases, a knob like this could mitigate the impact via manual
tuning for the duration of the operation, until a better long-term
solution can be implemented.

Release note (ops change): A new setting bulkio.ingest.flush_delay is
added to act as a last-resort option to manually slow bulk-writing
processes if needed for cluster stability. This should only be used if
there is no better suited back-pressure mechanism available for the
contended resource.
  • Loading branch information
dt committed Dec 11, 2021
1 parent 39f0190 commit eed1b9c
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions pkg/kv/bulk/sst_batcher.go
Expand Up @@ -36,6 +36,13 @@ var (
"size below which a 'bulk' write will be performed as a normal write instead",
400*1<<10, // 400 Kib
)

ingestDelay = settings.RegisterDurationSetting(
"bulkio.ingest.flush_delay",
"amount of time to wait before sending a file to the KV/Storage layer to ingest",
0,
settings.NonNegativeDuration,
)
)

type sz int64
Expand Down Expand Up @@ -264,6 +271,17 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int, nextKey roachpb.Ke
}
b.flushCounts.total++

if delay := ingestDelay.Get(&b.settings.SV); delay != 0 {
if delay > time.Second || log.V(1) {
log.Infof(ctx, "delaying %s before flushing ingestion buffer...", delay)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
}
}

hour := hlc.Timestamp{WallTime: timeutil.Now().Add(time.Hour).UnixNano()}

start := roachpb.Key(append([]byte(nil), b.batchStartKey...))
Expand Down

0 comments on commit eed1b9c

Please sign in to comment.