Skip to content

Commit

Permalink
bulk: memory monitor sst batching during RESTORE
Browse files Browse the repository at this point in the history
This commit narrowly focuses on adding memory monitoring for the
buffering portion of a RESTORE. SSTs are buffered during restore before
being flushed with an AddSSTable request. The memory used to buffer the
table is now accounted for under the bulk memory monitor.

Release note (enterprise change): Data buffeed during RESTORE is now
counted towards the cluster's --sql-max-memory limit to help guard
against OOMs. If not enough memory is available, the RESTORE will fail.
  • Loading branch information
pbardea committed May 18, 2021
1 parent 25e26a5 commit d0d67e0
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 15 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Expand Up @@ -201,6 +201,7 @@ go_test(
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
Expand Down
58 changes: 58 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Expand Up @@ -16,6 +16,7 @@ import (
"hash/crc32"
"io"
"io/ioutil"
"math"
"math/rand"
"net/url"
"os"
Expand Down Expand Up @@ -71,6 +72,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -8390,3 +8392,59 @@ func TestBackupWorkerFailure(t *testing.T) {
sqlDB.QueryRow(t, `SELECT count(*) FROM data.bank`).Scan(&actualCount)
require.Equal(t, expectedCount, actualCount)
}

// TestRestoreMemAccounting tests that RESTORE detects when its settings are set
// such that it will overshoot its memory budget.
func TestRestoreMemAccounting(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Normally, RESTORE uses the BulkMon but we create our own, with limited
// memory. The allocation size is also reduced for testing.
//
// We allocate 1 byte less than the 2*allocationSize so that the second
// allocation will fail. This allows RESTOREs with a small batch size, that
// only need 1 allocation to pass, while it will fail with a larger batch
// size.
//
// Pebble's SSTWriter does allocations of about ~4.5KB, so let's make the
// unit of allocation incrementing larger.
testAllocationSizeBytes := int64(5000)
memLimit := 2*testAllocationSizeBytes - 1
limitedMemMonitor := mon.NewMonitor(
"test-mm", mon.MemoryResource,
nil /* curCount */, nil, /* maxHist */
testAllocationSizeBytes, math.MaxInt64, /* noteworthy */
cluster.MakeTestingClusterSettings())
limitedMemMonitor.Start(context.Background(), nil, mon.MakeStandaloneBudget(memLimit))

params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RestoreMemMonitor: limitedMemMonitor,
}},
}
params.ServerArgs.Knobs = knobs

const numAccounts = 0
_, _, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts,
InitManualReplication, params)
defer cleanupFn()

sqlDB.Exec(t, `CREATE TABLE data.test_data AS SELECT * FROM generate_series(0,1000)`)
sqlDB.Exec(t, `BACKUP data.test_data TO $1;`, LocalFoo)
sqlDB.Exec(t, `CREATE DATABASE restoredb;`)

// Setting a large batch size should use more memory the allowed by the
// monitor.
sqlDB.Exec(t, `SET CLUSTER SETTING kv.bulk_ingest.batch_size = '100MB';`)
sqlDB.ExpectErr(t, `memory budget exceeded`,
`RESTORE data.test_data FROM $1 WITH into_db='restoredb'`, LocalFoo)

// Reducing the batch size should ensure that RESTORE doesn't allocate more
// than what's permitted by the monitor.
sqlDB.Exec(t, `SET CLUSTER SETTING kv.bulk_ingest.batch_size = '1KB';`)
sqlDB.Exec(t, `RESTORE data.test_data FROM $1 WITH into_db='restoredb'`, LocalFoo)
sqlDB.Exec(t, `DROP TABLE restoredb.test_data;`)
}
16 changes: 13 additions & 3 deletions pkg/ccl/backupccl/restore_data_processor.go
Expand Up @@ -41,6 +41,8 @@ type restoreDataProcessor struct {

alloc rowenc.DatumAlloc
kr *KeyRewriter

knobs sql.BackupRestoreTestingKnobs
}

var _ execinfra.Processor = &restoreDataProcessor{}
Expand Down Expand Up @@ -106,6 +108,10 @@ func newRestoreDataProcessor(
func (rd *restoreDataProcessor) Start(ctx context.Context) {
ctx = rd.StartInternal(ctx, restoreDataProcName)
rd.input.Start(ctx)

if rdKnobs, ok := rd.flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
rd.knobs = *rdKnobs
}
}

// Next is part of the RowSource interface.
Expand Down Expand Up @@ -212,12 +218,16 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
iters = append(iters, iter)
}

batcher, err := bulk.MakeSSTBatcher(ctx, db, evalCtx.Settings,
func() int64 { return storageccl.MaxImportBatchSize(evalCtx.Settings) })
maxBatchSize := func() int64 { return storageccl.MaxImportBatchSize(evalCtx.Settings) }
memMonitor := rd.flowCtx.Cfg.RestoreMemMonitor
if rd.knobs.RestoreMemMonitor != nil {
memMonitor = rd.knobs.RestoreMemMonitor
}
batcher, err := bulk.MakeSSTBatcher(ctx, db, evalCtx.Settings, maxBatchSize, memMonitor)
if err != nil {
return summary, err
}
defer batcher.Close()
defer batcher.Close(ctx)

startKeyMVCC, endKeyMVCC := storage.MVCCKey{Key: entry.Span.Key},
storage.MVCCKey{Key: entry.Span.EndKey}
Expand Down
Expand Up @@ -260,7 +260,7 @@ func (sip *streamIngestionProcessor) ConsumerClosed() {
func (sip *streamIngestionProcessor) close() {
if sip.InternalClose() {
if sip.batcher != nil {
sip.batcher.Close()
sip.batcher.Close(sip.Ctx)
}
if sip.maxFlushRateTimer != nil {
sip.maxFlushRateTimer.Stop()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/bulk/buffering_adder.go
Expand Up @@ -155,7 +155,7 @@ func (b *BufferingAdder) Close(ctx context.Context) {
b.sink.flushCounts.total, b.sink.flushCounts.files,
b.sink.flushCounts.split, b.sink.flushCounts.sstSize,
)
b.sink.Close()
b.sink.Close(ctx)

if b.bulkMon != nil {
b.memAcc.Close(ctx)
Expand Down
71 changes: 62 additions & 9 deletions pkg/kv/bulk/sst_batcher.go
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -100,7 +101,7 @@ type SSTBatcher struct {
// The rest of the fields are per-batch and are reset via Reset() before each
// batch is started.
sstWriter storage.SSTWriter
sstFile *storage.MemFile
sstFile *storage.AccountedMemFile
batchStartKey []byte
batchEndKey []byte
batchEndValue []byte
Expand All @@ -110,13 +111,30 @@ type SSTBatcher struct {
ms enginepb.MVCCStats
// rows written in the current batch.
rowCounter storage.RowCounter

// memMon is passed to underlying buffers to keep track of memory allocations
// made by the storage writers.
memMon *mon.BytesMonitor
// memAcc is a bound account that's used to track allocations made during
// splitting a given SST file into subfiles when an AddSSTable request runs
// into a range boundary.
memAcc *mon.BoundAccount
}

// MakeSSTBatcher makes a ready-to-use SSTBatcher.
func MakeSSTBatcher(
ctx context.Context, db SSTSender, settings *cluster.Settings, flushBytes func() int64,
ctx context.Context,
db SSTSender,
settings *cluster.Settings,
flushBytes func() int64,
bulkMon *mon.BytesMonitor,
) (*SSTBatcher, error) {
b := &SSTBatcher{db: db, settings: settings, maxSize: flushBytes, disallowShadowing: true}
if bulkMon != nil {
b.memMon = bulkMon
memAcc := bulkMon.MakeBoundAccount()
b.memAcc = &memAcc
}
err := b.Reset(ctx)
return b, err
}
Expand Down Expand Up @@ -195,7 +213,8 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value
// Reset clears all state in the batcher and prepares it for reuse.
func (b *SSTBatcher) Reset(ctx context.Context) error {
b.sstWriter.Close()
b.sstFile = &storage.MemFile{}
newMemFile := storage.MakeAccountedMemFile(ctx, b.memMon)
b.sstFile = &newMemFile
// Create "Ingestion" SSTs in the newer RocksDBv2 format only if all nodes
// in the cluster can support it. Until then, for backward compatibility,
// create SSTs in the leveldb format ("backup" ones).
Expand Down Expand Up @@ -314,7 +333,7 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int, nextKey roachpb.Ke
}

beforeSend := timeutil.Now()
files, err := AddSSTable(ctx, b.db, start, end, b.sstFile.Data(), b.disallowShadowing, b.ms, b.settings, b.batchTS)
files, err := AddSSTable(ctx, b.db, start, end, b.sstFile.Data(), b.disallowShadowing, b.ms, b.settings, b.batchTS, b.memMon, b.memAcc)
if err != nil {
return err
}
Expand Down Expand Up @@ -359,7 +378,7 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int, nextKey roachpb.Ke
}

// Close closes the underlying SST builder.
func (b *SSTBatcher) Close() {
func (b *SSTBatcher) Close(ctx context.Context) {
b.sstWriter.Close()
}

Expand Down Expand Up @@ -401,9 +420,12 @@ func AddSSTable(
ms enginepb.MVCCStats,
settings *cluster.Settings,
batchTs hlc.Timestamp,
memMon *mon.BytesMonitor,
memAcc *mon.BoundAccount,
) (int, error) {
var files int
now := timeutil.Now()
// sstBytes is already accounted for by b.memFile.
iter, err := storage.NewMemSSTIterator(sstBytes, true)
if err != nil {
return 0, err
Expand All @@ -420,6 +442,7 @@ func AddSSTable(
stats = ms
}

firstWorkItem := true
work := []*sstSpan{{start: start, end: end, sstBytes: sstBytes, disallowShadowing: disallowShadowing, stats: stats}}
const maxAddSSTableRetries = 10
for len(work) > 0 {
Expand Down Expand Up @@ -457,7 +480,7 @@ func AddSSTable(
// should be using all of them to avoid further retries.
split := m.Ranges()[0].Desc.EndKey.AsRawKey()
log.Infof(ctx, "SSTable cannot be added spanning range bounds %v, retrying...", split)
left, right, err := createSplitSSTable(ctx, db, item.start, split, item.disallowShadowing, iter, settings)
left, right, err := createSplitSSTable(ctx, db, item.start, split, item.disallowShadowing, iter, settings, memMon, memAcc)
if err != nil {
return err
}
Expand Down Expand Up @@ -486,7 +509,19 @@ func AddSSTable(
return files, err
}
files++
// explicitly deallocate SST. This will not deallocate the

if firstWorkItem {
// If this is the first work item, the item references the top level SST
// which is accounted for by the batcher's sstFile.
firstWorkItem = false
} else {
// If this was not the first item of work, then we are responsible for
// tracking it since it was created while splitting the original file.
if memAcc != nil {
memAcc.Shrink(ctx, int64(cap(item.sstBytes)))
}
}
// Explicitly deallocate SST. This will not deallocate the
// top level SST which is kept around to iterate over.
item.sstBytes = nil
}
Expand All @@ -503,8 +538,11 @@ func createSplitSSTable(
disallowShadowing bool,
iter storage.SimpleMVCCIterator,
settings *cluster.Settings,
memMon *mon.BytesMonitor,
memAcc *mon.BoundAccount,
) (*sstSpan, *sstSpan, error) {
sstFile := &storage.MemFile{}
leftFile := storage.MakeAccountedMemFile(ctx, memMon)
sstFile := &leftFile
w := storage.MakeIngestionSSTWriter(sstFile)
defer w.Close()

Expand All @@ -528,13 +566,21 @@ func createSplitSSTable(
if err != nil {
return nil, nil, err
}
// Responsibility for accounting for the memory in SSTFile now switches
// from the file to the batcher.
if memAcc != nil {
if err := memAcc.Grow(ctx, int64(sstFile.Cap())); err != nil {
return nil, nil, err
}
}

left = &sstSpan{
start: first,
end: last.PrefixEnd(),
sstBytes: sstFile.Data(),
disallowShadowing: disallowShadowing,
}
*sstFile = storage.MemFile{}
*sstFile = storage.MakeAccountedMemFile(ctx, memMon)
w = storage.MakeIngestionSSTWriter(sstFile)
split = true
first = nil
Expand All @@ -557,6 +603,13 @@ func createSplitSSTable(
if err != nil {
return nil, nil, err
}
// Responsibility for accounting for the memory in SSTFile now switches
// from the file to the batcher.
if memAcc != nil {
if err := memAcc.Grow(ctx, int64(sstFile.Cap())); err != nil {
return nil, nil, err
}
}
right = &sstSpan{
start: first,
end: last.PrefixEnd(),
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/bulk/sst_batcher_test.go
Expand Up @@ -334,7 +334,8 @@ func TestAddBigSpanningSSTWithSplits(t *testing.T) {

t.Logf("Adding %dkb sst spanning %d splits from %v to %v", len(sst)/kb, len(splits), start, end)
if _, err := bulk.AddSSTable(
ctx, mock, start, end, sst, false /* disallowShadowing */, enginepb.MVCCStats{}, cluster.MakeTestingClusterSettings(), hlc.Timestamp{},
ctx, mock, start, end, sst, false /* disallowShadowing */, enginepb.MVCCStats{},
cluster.MakeTestingClusterSettings(), hlc.Timestamp{}, nil /* memMon */, nil, /* memAcc */
); err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/server_sql.go
Expand Up @@ -350,6 +350,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
bulkMemoryMonitor.Start(context.Background(), rootSQLMemoryMonitor, mon.BoundAccount{})

backfillMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backfill-mon")
restoreMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "restore-mon")

// Set up the DistSQL temp engine.

Expand Down Expand Up @@ -421,6 +422,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
VecFDSemaphore: semaphore.New(envutil.EnvOrDefaultInt("COCKROACH_VEC_MAX_OPEN_FDS", colexec.VecMaxOpenFDsLimit)),
ParentDiskMonitor: cfg.TempStorageConfig.Mon,
BackfillerMonitor: backfillMemoryMonitor,
RestoreMemMonitor: restoreMemoryMonitor,

ParentMemoryMonitor: rootSQLMemoryMonitor,
BulkAdder: func(
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/exec_util.go
Expand Up @@ -1056,6 +1056,11 @@ type BackupRestoreTestingKnobs struct {
// RunAfterExportingSpanEntry allows blocking the BACKUP job after a single
// span has been exported.
RunAfterExportingSpanEntry func(ctx context.Context)

// RestoreMemMonitor is used to overwrite the monitor used by restore during
// testing. It is typically
// This is typically the bulk mem monitor if not specified here.
RestoreMemMonitor *mon.BytesMonitor
}

var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/execinfra/server_config.go
Expand Up @@ -96,6 +96,10 @@ type ServerConfig struct {
// used by the column and index backfillers.
BackfillerMonitor *mon.BytesMonitor

// Child monitor of the bulk monitor which will be used to monitor the memory
// used during RESTORE.
RestoreMemMonitor *mon.BytesMonitor

// ParentDiskMonitor is normally the root disk monitor. It should only be used
// when setting up a server, a child monitor (usually belonging to a sql
// execution flow), or in tests. It is used to monitor temporary storage disk
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Expand Up @@ -91,6 +91,7 @@ go_library(
"//pkg/util/humanizeutil",
"//pkg/util/iterutil",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
Expand Down

0 comments on commit d0d67e0

Please sign in to comment.