Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restore: remove disabled experimental monitored mode #119926

Merged
merged 1 commit into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ build/Railroad.jar
*.pb.gw.go
pkg/ccl/kvccl/kvtenantccl/upgradeinterlockccl/generated_test.go
pkg/ccl/backupccl/data_driven_generated_test.go
pkg/ccl/backupccl/restore_memory_monitoring_generated_test.go
pkg/ccl/backupccl/restore_entry_cover_generated_test.go
pkg/ccl/backupccl/restore_mid_schema_change_generated_test.go
pkg/testutils/serverutils/*_generated.go
Expand Down
5 changes: 1 addition & 4 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ go_library(
"//pkg/util/mon",
"//pkg/util/pprofutil",
"//pkg/util/protoutil",
"//pkg/util/quotapool",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/span",
Expand Down Expand Up @@ -188,7 +187,6 @@ go_test(
"partitioned_backup_test.go",
"restore_data_processor_test.go",
"restore_entry_cover_generated_test.go", # keep
"restore_memory_monitoring_generated_test.go", # keep
"restore_mid_schema_change_generated_test.go", # keep
"restore_mid_schema_change_test.go",
"restore_multiregion_rbr_test.go",
Expand Down Expand Up @@ -362,13 +360,12 @@ genrule(
name = "gen-backupccl-tests",
srcs = glob(["testdata/**"]),
outs = [
"restore_memory_monitoring_generated_test.go",
"data_driven_generated_test.go",
"restore_entry_cover_generated_test.go",
"restore_mid_schema_change_generated_test.go",
],
cmd = """
$(location //pkg/ccl/backupccl/testgen) -restore-memory-monitoring=$(location restore_memory_monitoring_generated_test.go) \
$(location //pkg/ccl/backupccl/testgen) \
-data-driven=$(location data_driven_generated_test.go) -restore-entry-cover=$(location restore_entry_cover_generated_test.go) \
-restore-mid-schema-change=$(location restore_mid_schema_change_generated_test.go)
""",
Expand Down
92 changes: 0 additions & 92 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11422,95 +11422,3 @@ CREATE TABLE child_pk (k INT8 PRIMARY KEY REFERENCES parent);
sqlDB.Exec(t, `DROP DATABASE test`)
}
}

// Verify that restore with memory monitoring should be able to succeed with
// partial SST iterators that shadow previously written values.
func TestRestoreMemoryMonitoringWithShadowing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numAccounts = 10
const numIncrementals = 10
const restoreProcessorMaxFiles = 5

restoreProcessorKnobCount := atomic.Uint32{}

args := base.TestServerArgs{
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, entry *execinfrapb.RestoreSpanEntry) error {
restoreProcessorKnobCount.Add(1)
return nil
},
},
},
},
}
params := base.TestClusterArgs{ServerArgs: args}
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
defer cleanupFn()

sqlDB.Exec(t, "SET CLUSTER SETTING kv.bulk_io_write.restore_node_concurrency = 1")
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.sst_memory_limit.enabled=true")
sqlDB.Exec(t, "BACKUP data.bank INTO 'userfile:///backup'")

// Repeatedly alter a single row and do an incremental backup.
for i := 0; i < numIncrementals; i++ {
sqlDB.Exec(t, `UPDATE data.bank SET balance = $1 WHERE id = $2`, 1000+i, i)
sqlDB.Exec(t, "BACKUP data.bank INTO latest IN 'userfile:///backup'")
}

// Set the memory budget for the restore processor to be enough to open 5
// files.
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", restoreProcessorMaxFiles*sstReaderOverheadBytesPerFile)

sqlDB.Exec(t, "CREATE DATABASE data2")
sqlDB.Exec(t, "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH OPTIONS (into_db='data2')")
files := sqlDB.QueryStr(t, "SHOW BACKUP FILES FROM latest IN 'userfile:///backup'")
require.GreaterOrEqual(t, len(files), 11) // 1 file for full + 10 for 10 incrementals

// Assert that the restore processor is processing the same span multiple
// times, and the count is based on what's expected from the memory budget.
require.Equal(t, 3, int(restoreProcessorKnobCount.Load())) // Ceiling(11/5)

// Verify data in the restored table.
expectedFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data.bank")
actualFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data2.bank")
require.Equal(t, expectedFingerprints, actualFingerprints)
}

// TestRestoreMemoryMonitoringMinWorkerMemory tests that restore properly fails
// fast if there's not enough memory to reserve for the minimum number of
// workers.
func TestRestoreMemoryMonitoringMinWorkerMemory(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
const numAccounts = 100

_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, base.TestClusterArgs{})
defer cleanupFn()

// 4 restore workers means we need minimum 2 workers to start restore.
sqlDB.Exec(t, "SET CLUSTER SETTING kv.bulk_io_write.restore_node_concurrency=4")
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.sst_memory_limit.enabled=true")

sqlDB.Exec(t, "BACKUP data.bank INTO 'userfile:///backup'")

// Set the budget to be 1 byte lower than minimum mem for 2 workers. This
// restore should fail.
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", 2*minWorkerMemReservation-1)
sqlDB.Exec(t, "CREATE DATABASE restore_fail")
sqlDB.ExpectErr(t, "insufficient memory", "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH into_db='restore_fail'")

// Set the budget to be equal to the minimum mem for 2 workers. The restore
// should succeed.
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", 2*minWorkerMemReservation)
sqlDB.Exec(t, "CREATE DATABASE restore")
sqlDB.Exec(t, "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH into_db='restore'")

// Verify data in the restored table.
expectedFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data.bank")
actualFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE restore.bank")
require.Equal(t, expectedFingerprints, actualFingerprints)
}
112 changes: 4 additions & 108 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,9 @@ import (
bulkutil "github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -103,12 +100,6 @@ const restoreDataProcName = "restoreDataProcessor"

const maxConcurrentRestoreWorkers = 32

// sstReaderOverheadBytesPerFile and sstReaderEncryptedOverheadBytesPerFile were obtained
// benchmarking external SST iterators on GCP and AWS and selecting the highest
// observed memory per file.
const sstReaderOverheadBytesPerFile = 5 << 20
const sstReaderEncryptedOverheadBytesPerFile = 8 << 20

// minWorkerMemReservation is the minimum amount of memory reserved per restore
// data processor worker. It should be greater than
// sstReaderOverheadBytesPerFile and sstReaderEncryptedOverheadBytesPerFile to
Expand Down Expand Up @@ -149,27 +140,6 @@ var numRestoreWorkers = settings.RegisterIntSetting(
settings.PositiveInt,
)

// restorePerProcessorMemoryLimit is the limit on the memory used by a
// restoreDataProcessor. The actual limit is the lowest of this setting
// and the limit determined by restorePerProcessorMemoryLimitSQLFraction
// and --max-sql-memory.
var restorePerProcessorMemoryLimit = settings.RegisterByteSizeSetting(
settings.ApplicationLevel,
"bulkio.restore.per_processor_memory_limit",
"limit on the amount of memory that can be used by a restore processor",
1<<30, // 1 GiB
)

// restorePerProcessorMemoryLimitSQLFraction is the maximum percentage of the
// SQL memory pool that could be used by a restoreDataProcessor.
var restorePerProcessorMemoryLimitSQLFraction = settings.RegisterFloatSetting(
settings.ApplicationLevel,
"bulkio.restore.per_processor_memory_limit_sql_fraction",
"limit on the amount of memory that can be used by a restore processor as a fraction of max SQL memory",
0.5,
settings.NonNegativeFloatWithMaximum(1.0),
)

func newRestoreDataProcessor(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
Expand All @@ -185,28 +155,7 @@ func newRestoreDataProcessor(
progCh: make(chan backuppb.RestoreProgress, maxConcurrentRestoreWorkers),
}

var memMonitor *mon.BytesMonitor
var limit int64
if spec.MemoryMonitorSSTs {
limit = restorePerProcessorMemoryLimit.Get(&flowCtx.EvalCtx.Settings.SV)
sqlFraction := restorePerProcessorMemoryLimitSQLFraction.Get(&flowCtx.EvalCtx.Settings.SV)
sqlFractionLimit := int64(sqlFraction * float64(flowCtx.Cfg.RootSQLMemoryPoolSize))
if sqlFractionLimit < limit {
log.Infof(ctx, "using a maximum of %s memory per restore data processor (%f of max SQL memory %s)",
humanizeutil.IBytes(sqlFractionLimit), sqlFraction,
humanizeutil.IBytes(flowCtx.Cfg.RootSQLMemoryPoolSize))
limit = sqlFractionLimit
}

memMonitor = flowCtx.Cfg.BackupMonitor
if knobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
if knobs.BackupMemMonitor != nil {
memMonitor = knobs.BackupMemMonitor
}
}
}

rd.qp = backuputils.NewMemoryBackedQuotaPool(ctx, memMonitor, "restore-mon", limit)
rd.qp = backuputils.NewMemoryBackedQuotaPool(ctx, flowCtx.Cfg.BackupMonitor, "restore-mon", 0)
if err := rd.Init(ctx, rd, post, restoreDataOutputTypes, flowCtx, processorID, nil, /* memMonitor */
execinfra.ProcStateOpts{
InputsToDrain: []execinfra.RowSource{input},
Expand Down Expand Up @@ -349,11 +298,7 @@ type resumeEntry struct {
}

// openSSTs opens all files in entry starting from the resumeIdx and returns a
// multiplexed SST iterator over the files. If memory monitoring is enabled and
// opening an additional file would exceed the current memory budget, a partial
// iterator over only the currently opened files would be returned, along with an
// updated resume idx, which the caller should use with openSSTs again to get an
// iterator over the remaining files.
// multiplexed SST iterator over the files.
func (rd *restoreDataProcessor) openSSTs(
ctx context.Context, entry execinfrapb.RestoreSpanEntry, resume *resumeEntry,
) (mergedSST, *resumeEntry, error) {
Expand All @@ -373,13 +318,12 @@ func (rd *restoreDataProcessor) openSSTs(

// getIter returns a multiplexed iterator covering the currently accumulated
// files over the channel.
getIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage, iterAllocs []*quotapool.IntAlloc, completeUpTo hlc.Timestamp) (mergedSST, error) {
getIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage, completeUpTo hlc.Timestamp) (mergedSST, error) {
readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime)

cleanup := func() {
log.VInfof(ctx, 1, "finished with and closing %d files in span %d [%s-%s)", len(entry.Files), entry.ProgressIdx, entry.Span.Key, entry.Span.EndKey)
readAsOfIter.Close()
rd.qp.Release(iterAllocs...)

for _, dir := range dirsToSend {
if err := dir.Close(); err != nil {
Expand All @@ -402,13 +346,6 @@ func (rd *restoreDataProcessor) openSSTs(
log.VEventf(ctx, 1, "ingesting %d files in span %d [%s-%s)", len(entry.Files), entry.ProgressIdx, entry.Span.Key, entry.Span.EndKey)

storeFiles := make([]storageccl.StoreFile, 0, len(entry.Files))
iterAllocs := make([]*quotapool.IntAlloc, 0, len(entry.Files))
var sstOverheadBytesPerFile uint64
if rd.spec.Encryption != nil {
sstOverheadBytesPerFile = sstReaderEncryptedOverheadBytesPerFile
} else {
sstOverheadBytesPerFile = sstReaderOverheadBytesPerFile
}

idx := 0
if resume != nil {
Expand All @@ -419,47 +356,6 @@ func (rd *restoreDataProcessor) openSSTs(
file := entry.Files[idx]

log.VEventf(ctx, 2, "import file %s which starts at %s", file.Path, entry.Span.Key)

alloc, err := rd.qp.TryAcquireMaybeIncreaseCapacity(ctx, sstOverheadBytesPerFile)
if errors.Is(err, quotapool.ErrNotEnoughQuota) {
// If we failed to allocate more memory, send the iterator
// containing the files we have right now.
if len(storeFiles) > 0 {
iterOpts := storage.IterOptions{
RangeKeyMaskingBelow: rd.spec.RestoreTime,
KeyTypes: storage.IterKeyTypePointsAndRanges,
LowerBound: keys.LocalMax,
UpperBound: keys.MaxKey,
}
iter, err := storageccl.ExternalSSTReader(ctx, storeFiles, rd.spec.Encryption, iterOpts)
if err != nil {
return mergedSST{}, nil, err
}

log.VInfof(ctx, 2, "sending iterator after %d out of %d files due to insufficient memory", idx, len(entry.Files))

// TODO(rui): this is a placeholder value to show that a span has been
// partially but not completely processed. Eventually this timestamp should
// be the actual timestamp that we have processed up to so far.
completeUpTo := hlc.Timestamp{Logical: 1}
mSST, err := getIter(iter, dirs, iterAllocs, completeUpTo)
res := &resumeEntry{
idx: idx,
done: false,
}
return mSST, res, err
}

alloc, err = rd.qp.Acquire(ctx, sstOverheadBytesPerFile)
if err != nil {
return mergedSST{}, nil, err
}
} else if err != nil {
return mergedSST{}, nil, err
}

iterAllocs = append(iterAllocs, alloc)

dir, err := rd.flowCtx.Cfg.ExternalStorage(ctx, file.Dir)
if err != nil {
return mergedSST{}, nil, err
Expand All @@ -479,7 +375,7 @@ func (rd *restoreDataProcessor) openSSTs(
return mergedSST{}, nil, err
}

mSST, err := getIter(iter, dirs, iterAllocs, rd.spec.RestoreTime)
mSST, err := getIter(iter, dirs, rd.spec.RestoreTime)
res := &resumeEntry{
idx: idx,
done: true,
Expand Down
25 changes: 7 additions & 18 deletions pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand All @@ -34,14 +33,6 @@ import (
"github.com/cockroachdb/errors"
)

var memoryMonitorSSTs = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"bulkio.restore.memory_monitor_ssts",
"if true, restore will limit number of simultaneously open SSTs to keep memory usage under the configured memory fraction",
false,
settings.WithName("bulkio.restore.sst_memory_limit.enabled"),
)

type restoreJobMetadata struct {
jobID jobspb.JobID
dataToRestore restorationData
Expand Down Expand Up @@ -101,7 +92,6 @@ func distRestore(
fileEncryption = &kvpb.FileEncryptionOptions{Key: md.encryption.Key}
}

memMonSSTs := memoryMonitorSSTs.Get(execCtx.ExecCfg().SV())
makePlan := func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {

planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanningWithOracle(
Expand All @@ -116,14 +106,13 @@ func distRestore(
p := planCtx.NewPhysicalPlan()

restoreDataSpec := execinfrapb.RestoreDataSpec{
JobID: int64(md.jobID),
RestoreTime: md.restoreTime,
Encryption: fileEncryption,
TableRekeys: md.dataToRestore.getRekeys(),
TenantRekeys: md.dataToRestore.getTenantRekeys(),
PKIDs: md.dataToRestore.getPKIDs(),
ValidateOnly: md.dataToRestore.isValidateOnly(),
MemoryMonitorSSTs: memMonSSTs,
JobID: int64(md.jobID),
RestoreTime: md.restoreTime,
Encryption: fileEncryption,
TableRekeys: md.dataToRestore.getRekeys(),
TenantRekeys: md.dataToRestore.getTenantRekeys(),
PKIDs: md.dataToRestore.getPKIDs(),
ValidateOnly: md.dataToRestore.isValidateOnly(),
}

// Plan SplitAndScatter in a round-robin fashion.
Expand Down