Skip to content

Commit

Permalink
bulk,importccl: re-read settings during ingestion
Browse files Browse the repository at this point in the history
Prior to this patch, the bulk-ingestion helpers were configured with size limits read from settings,
but then executed, sometimes for hours, against those values that were passed in, not reacting to
later changes in the underlying setting.

This change plumbs those values though as func() int64 instead of int64, re-reading the setting value
each time it is used and thus reflecting any changes to the underlying setting during execution.

Also, while I'm here: switch uint64 to int64 to match our usual style guidelines.

Release note (bug fix): fix bug where some settings changes were not reflected during currently running IMPORTs.
  • Loading branch information
dt committed Nov 7, 2019
1 parent df74d2a commit aaf5a86
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 62 deletions.
18 changes: 9 additions & 9 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func ingestKvs(

writeTS := hlc.Timestamp{WallTime: spec.WalltimeNanos}

flushSize := storageccl.MaxImportBatchSize(flowCtx.Cfg.Settings)
flushSize := func() int64 { return storageccl.MaxImportBatchSize(flowCtx.Cfg.Settings) }

// We create two bulk adders so as to combat the excessive flushing of small
// SSTs which was observed when using a single adder for both primary and
Expand All @@ -179,10 +179,10 @@ func ingestKvs(
Name: "pkAdder",
DisallowShadowing: true,
SkipDuplicates: true,
MinBufferSize: uint64(minBufferSize),
MaxBufferSize: uint64(maxBufferSize),
StepBufferSize: uint64(stepSize),
SSTSize: uint64(flushSize),
MinBufferSize: minBufferSize,
MaxBufferSize: maxBufferSize,
StepBufferSize: stepSize,
SSTSize: flushSize,
})
if err != nil {
return nil, err
Expand All @@ -194,10 +194,10 @@ func ingestKvs(
Name: "indexAdder",
DisallowShadowing: true,
SkipDuplicates: true,
MinBufferSize: uint64(minBufferSize),
MaxBufferSize: uint64(maxBufferSize),
StepBufferSize: uint64(stepSize),
SSTSize: uint64(flushSize),
MinBufferSize: minBufferSize,
MaxBufferSize: maxBufferSize,
StepBufferSize: stepSize,
SSTSize: flushSize,
})
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/storageccl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ func MaxImportBatchSize(st *cluster.Settings) int64 {

// ImportBufferConfigSizes determines the minimum, maximum and step size for the
// BulkAdder buffer used in import.
func ImportBufferConfigSizes(st *cluster.Settings, isPKAdder bool) (int64, int64, int64) {
func ImportBufferConfigSizes(st *cluster.Settings, isPKAdder bool) (int64, func() int64, int64) {
if isPKAdder {
return importPKAdderBufferSize.Get(&st.SV),
importPKAdderMaxBufferSize.Get(&st.SV),
func() int64 { return importPKAdderMaxBufferSize.Get(&st.SV) },
importBufferIncrementSize.Get(&st.SV)
}
return importIndexAdderBufferSize.Get(&st.SV),
importIndexAdderMaxBufferSize.Get(&st.SV),
func() int64 { return importIndexAdderMaxBufferSize.Get(&st.SV) },
importBufferIncrementSize.Get(&st.SV)
}

Expand Down Expand Up @@ -191,7 +191,7 @@ func evalImport(ctx context.Context, cArgs batcheval.CommandArgs) (*roachpb.Impo
iters = append(iters, iter)
}

batcher, err := bulk.MakeSSTBatcher(ctx, db, uint64(MaxImportBatchSize(cArgs.EvalCtx.ClusterSettings())))
batcher, err := bulk.MakeSSTBatcher(ctx, db, func() int64 { return MaxImportBatchSize(cArgs.EvalCtx.ClusterSettings()) })
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/rowexec/indexbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ func newIndexBackfiller(

func (ib *indexBackfiller) prepare(ctx context.Context) error {
minBufferSize := backfillerBufferSize.Get(&ib.flowCtx.Cfg.Settings.SV)
maxBufferSize := backfillerMaxBufferSize.Get(&ib.flowCtx.Cfg.Settings.SV)
sstSize := backillerSSTSize.Get(&ib.flowCtx.Cfg.Settings.SV)
maxBufferSize := func() int64 { return backfillerMaxBufferSize.Get(&ib.flowCtx.Cfg.Settings.SV) }
sstSize := func() int64 { return backillerSSTSize.Get(&ib.flowCtx.Cfg.Settings.SV) }
stepSize := backfillerBufferIncrementSize.Get(&ib.flowCtx.Cfg.Settings.SV)
opts := storagebase.BulkAdderOptions{
SSTSize: uint64(sstSize),
MinBufferSize: uint64(minBufferSize),
MaxBufferSize: uint64(maxBufferSize),
StepBufferSize: uint64(stepSize),
SSTSize: sstSize,
MinBufferSize: minBufferSize,
MaxBufferSize: maxBufferSize,
StepBufferSize: stepSize,
SkipDuplicates: ib.ContainsInvertedIndex(),
}
adder, err := ib.flowCtx.Cfg.BulkAdder(ctx, ib.flowCtx.Cfg.DB, ib.spec.ReadAsOf, opts)
Expand Down
24 changes: 12 additions & 12 deletions pkg/storage/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ type BufferingAdder struct {
timestamp hlc.Timestamp

// threshold at which buffered entries will be flushed to SSTBatcher.
curBufferSize uint64
curBufferSize int64

// ceiling till which we can grow curBufferSize if bulkMon permits.
maxBufferSize uint64
maxBufferSize func() int64

// unit by which we increment the curBufferSize.
incrementBufferSize uint64
incrementBufferSize int64

// currently buffered kvs.
curBuf kvBuf
Expand Down Expand Up @@ -81,19 +81,19 @@ func MakeBulkAdder(
if opts.MinBufferSize == 0 {
opts.MinBufferSize = 32 << 20
}
if opts.MaxBufferSize == 0 {
opts.MaxBufferSize = 128 << 20
if opts.MaxBufferSize == nil {
opts.MaxBufferSize = func() int64 { return 128 << 20 }
}
if opts.StepBufferSize == 0 {
opts.StepBufferSize = 32 << 20
}
if opts.SSTSize == 0 {
opts.SSTSize = 16 << 20
if opts.SSTSize == nil {
opts.SSTSize = func() int64 { return 16 << 20 }
}
if opts.SplitAndScatterAfter == 0 {
if opts.SplitAndScatterAfter == nil {
// splitting _before_ hitting max reduces chance of auto-splitting after the
// range is full and is more expensive to split/move.
opts.SplitAndScatterAfter = 48 << 20
opts.SplitAndScatterAfter = func() int64 { return 48 << 20 }
}

b := &BufferingAdder{
Expand Down Expand Up @@ -127,7 +127,7 @@ func MakeBulkAdder(
// TODO(adityamaru): IMPORT should also reserve memory for a single SST which
// it will store in-memory before sending it to RocksDB.
b.memAcc = bulkMon.MakeBoundAccount()
if err := b.memAcc.Grow(ctx, int64(b.curBufferSize)); err != nil {
if err := b.memAcc.Grow(ctx, b.curBufferSize); err != nil {
return nil, errors.Wrap(err, "Not enough memory available to create a BulkAdder. Try setting a higher --max-sql-memory.")
}

Expand Down Expand Up @@ -171,8 +171,8 @@ func (b *BufferingAdder) Add(ctx context.Context, key roachpb.Key, value []byte)
//
// To prevent a single import from growing its buffer indefinitely we check
// if it has exceeded its upper bound.
if b.bulkMon != nil && b.curBufferSize < b.maxBufferSize {
if err := b.memAcc.Grow(ctx, int64(b.incrementBufferSize)); err != nil {
if b.bulkMon != nil && b.curBufferSize < b.maxBufferSize() {
if err := b.memAcc.Grow(ctx, b.incrementBufferSize); err != nil {
// If we are unable to reserve the additional memory then flush the
// buffer, and continue as normal.
b.flushCounts.bufferSize++
Expand Down
46 changes: 25 additions & 21 deletions pkg/storage/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ type SSTBatcher struct {
db SSTSender
rc *kv.RangeDescriptorCache
settings *cluster.Settings
maxSize uint64
splitAfter uint64
maxSize func() int64
splitAfter func() int64

// allows ingestion of keys where the MVCC.Key would shadow an existing row.
disallowShadowing bool
Expand Down Expand Up @@ -85,7 +85,7 @@ type SSTBatcher struct {
splitWait time.Duration
}
// Tracking for if we have "filled" a range in case we want to split/scatter.
flushedToCurrentRange uint64
flushedToCurrentRange int64
lastFlushKey []byte

// The rest of the fields are per-batch and are reset via Reset() before each
Expand All @@ -103,7 +103,9 @@ type SSTBatcher struct {
}

// MakeSSTBatcher makes a ready-to-use SSTBatcher.
func MakeSSTBatcher(ctx context.Context, db SSTSender, flushBytes uint64) (*SSTBatcher, error) {
func MakeSSTBatcher(
ctx context.Context, db SSTSender, flushBytes func() int64,
) (*SSTBatcher, error) {
b := &SSTBatcher{db: db, maxSize: flushBytes, disallowShadowing: true}
err := b.Reset()
return b, err
Expand Down Expand Up @@ -220,7 +222,7 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err
return b.Reset()
}

if b.sstWriter.DataSize >= b.maxSize {
if b.sstWriter.DataSize >= b.maxSize() {
if err := b.doFlush(ctx, sizeFlush, nextKey); err != nil {
return err
}
Expand Down Expand Up @@ -249,7 +251,7 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int, nextKey roachpb.Ke

size := b.sstWriter.DataSize
if reason == sizeFlush {
log.VEventf(ctx, 3, "flushing %s SST due to size > %s", sz(size), sz(b.maxSize))
log.VEventf(ctx, 3, "flushing %s SST due to size > %s", sz(size), sz(b.maxSize()))
b.flushCounts.sstSize++

// On first flush, if it is due to size, we introduce one split at the start
Expand Down Expand Up @@ -304,27 +306,29 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int, nextKey roachpb.Ke
b.lastFlushKey = append(b.lastFlushKey[:0], b.flushKey...)
b.flushedToCurrentRange = size
}
if b.splitAfter > 0 && b.flushedToCurrentRange > b.splitAfter && nextKey != nil {
if splitAt, err := keys.EnsureSafeSplitKey(nextKey); err != nil {
log.Warning(ctx, err)
} else {
beforeSplit := timeutil.Now()

log.VEventf(ctx, 2, "%s added since last split, splitting/scattering for next range at %v", sz(b.flushedToCurrentRange), end)
// NB: Passing 'hour' here is technically illegal until 19.2 is
// active, but the value will be ignored before that, and we don't
// have access to the cluster version here.
if err := b.db.SplitAndScatter(ctx, splitAt, hour); err != nil {
log.Warningf(ctx, "failed to split and scatter during ingest: %+v", err)
if b.splitAfter != nil {
if splitAfter := b.splitAfter(); b.flushedToCurrentRange > splitAfter && nextKey != nil {
if splitAt, err := keys.EnsureSafeSplitKey(nextKey); err != nil {
log.Warning(ctx, err)
} else {
beforeSplit := timeutil.Now()

log.VEventf(ctx, 2, "%s added since last split, splitting/scattering for next range at %v", sz(b.flushedToCurrentRange), end)
// NB: Passing 'hour' here is technically illegal until 19.2 is
// active, but the value will be ignored before that, and we don't
// have access to the cluster version here.
if err := b.db.SplitAndScatter(ctx, splitAt, hour); err != nil {
log.Warningf(ctx, "failed to split and scatter during ingest: %+v", err)
}
b.flushCounts.splitWait += timeutil.Since(beforeSplit)
}
b.flushCounts.splitWait += timeutil.Since(beforeSplit)
b.flushedToCurrentRange = 0
}
b.flushedToCurrentRange = 0
}
}

b.totalRows.Add(b.rowCounter.BulkOpSummary)
b.totalRows.DataSize += int64(b.sstWriter.DataSize)
b.totalRows.DataSize += b.sstWriter.DataSize
return nil
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/storage/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ func TestAddBatched(t *testing.T) {
})
}

func runTestImport(t *testing.T, batchSize uint64) {
func runTestImport(t *testing.T, batchSizeValue int64) {

ctx := context.Background()
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

batchSize := func() int64 { return batchSizeValue }

const split1, split2 = 3, 5

// Each test case consists of some number of batches of keys, represented as
Expand Down Expand Up @@ -134,7 +136,7 @@ func runTestImport(t *testing.T, batchSize uint64) {

ts := hlc.Timestamp{WallTime: 100}
b, err := bulk.MakeBulkAdder(
ctx, kvDB, mockCache, s.ClusterSettings(), ts, storagebase.BulkAdderOptions{MinBufferSize: batchSize, SSTSize: batchSize}, nil, /* bulkMon */
ctx, kvDB, mockCache, s.ClusterSettings(), ts, storagebase.BulkAdderOptions{MinBufferSize: batchSize(), SSTSize: batchSize}, nil, /* bulkMon */
)
if err != nil {
t.Fatal(err)
Expand All @@ -160,7 +162,7 @@ func runTestImport(t *testing.T, batchSize uint64) {
// if our adds is batching multiple keys and we've previously added
// a key prior to split2 and are now adding one after split2, then we
// should expect this batch to span split2 and thus cause a retry.
if batchSize > 1 && idx > 0 && batch[idx-1] < split2 && batch[idx-1] >= split1 && batch[idx] >= split2 {
if batchSize() > 1 && idx > 0 && batch[idx-1] < split2 && batch[idx-1] >= split1 && batch[idx] >= split2 {
expectedSplitRetries = 1
}
v := roachpb.MakeValueFromString(fmt.Sprintf("value-%d", x))
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/engine/sst_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type SSTWriter struct {
fw *sstable.Writer
f *memFile
// DataSize tracks the total key and value bytes added so far.
DataSize uint64
DataSize int64
scratch []byte
}

Expand All @@ -48,7 +48,7 @@ func (fw *SSTWriter) Add(kv MVCCKeyValue) error {
if fw.fw == nil {
return errors.New("cannot call Open on a closed writer")
}
fw.DataSize += uint64(len(kv.Key.Key)) + uint64(len(kv.Value))
fw.DataSize += int64(len(kv.Key.Key)) + int64(len(kv.Value))
fw.scratch = EncodeKeyToBuf(fw.scratch[:0], kv.Key)
return fw.fw.Set(fw.scratch, kv.Value)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/storagebase/bulk_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,23 @@ type BulkAdderOptions struct {
// SSTSize is the size at which an SST will be flushed and a new one started.
// SSTs are also split during a buffer flush to avoid spanning range bounds so
// they may be smaller than this limit.
SSTSize uint64
SSTSize func() int64

// SplitAndScatterAfter is the number of bytes which if added without hitting
// an existing split will cause the adder to split and scatter the next span.
SplitAndScatterAfter uint64
SplitAndScatterAfter func() int64

// MinBufferSize is the initial size of the BulkAdder buffer. It indicates the
// amount of memory we require to be able to buffer data before flushing for
// SST creation.
MinBufferSize uint64
MinBufferSize int64

// BufferSize is the maximum size we can grow the BulkAdder buffer to.
MaxBufferSize uint64
MaxBufferSize func() int64

// StepBufferSize is the increment in which we will attempt to grow the
// BulkAdder buffer if the memory monitor permits.
StepBufferSize uint64
StepBufferSize int64

// SkipLocalDuplicates configures handling of duplicate keys within a local
// sorted batch. When true if the same key/value pair is added more than once
Expand Down

0 comments on commit aaf5a86

Please sign in to comment.