Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.2: restore: limit restore spans to 200 files #119883

Merged
merged 1 commit into from Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/bench_covering_test.go
Expand Up @@ -39,7 +39,7 @@ func BenchmarkCoverageChecks(b *testing.B) {
b.Run(fmt.Sprintf("numFiles=%d", baseFiles), func(b *testing.B) {
for _, hasExternalFilesList := range []bool{true, false} {
b.Run(fmt.Sprintf("slim=%t", hasExternalFilesList), func(b *testing.B) {
backups, err := MockBackupChain(ctx, numBackups, numSpans, baseFiles, r, hasExternalFilesList, execCfg)
backups, err := MockBackupChain(ctx, numBackups, numSpans, baseFiles, 1<<20, r, hasExternalFilesList, execCfg)
require.NoError(b, err)
b.ResetTimer()

Expand Down Expand Up @@ -75,7 +75,7 @@ func BenchmarkRestoreEntryCover(b *testing.B) {
for _, hasExternalFilesList := range []bool{true, false} {
b.Run(fmt.Sprintf("hasExternalFilesList=%t", hasExternalFilesList),
func(b *testing.B) {
backups, err := MockBackupChain(ctx, numBackups, numSpans, baseFiles, r, hasExternalFilesList, execCfg)
backups, err := MockBackupChain(ctx, numBackups, numSpans, baseFiles, 1<<20, r, hasExternalFilesList, execCfg)
require.NoError(b, err)
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down
20 changes: 17 additions & 3 deletions pkg/ccl/backupccl/restore_span_covering.go
Expand Up @@ -292,6 +292,7 @@ func generateAndSendImportSpans(
// lastCovSpanSize is the size of files added to the right-most span of
// the cover so far.
var lastCovSpanSize int64
var lastCovSpanCount int
var lastCovSpan roachpb.Span
var covFilesByLayer [][]*backuppb.BackupManifest_File
var firstInSpan bool
Expand Down Expand Up @@ -374,8 +375,8 @@ func generateAndSendImportSpans(
}

var filesByLayer [][]*backuppb.BackupManifest_File
var covSize int64
var newCovFilesSize int64
var covSize, newCovFilesSize int64
var covCount, newCovFilesCount int

for layer := range newFilesByLayer {
for _, file := range newFilesByLayer[layer] {
Expand All @@ -385,6 +386,7 @@ func generateAndSendImportSpans(
}
newCovFilesSize += sz
}
newCovFilesCount += len(newFilesByLayer[layer])
filesByLayer = append(filesByLayer, newFilesByLayer[layer])
}

Expand All @@ -397,6 +399,7 @@ func generateAndSendImportSpans(

if inclusiveOverlap(coverSpan, file.Span) {
covSize += sz
covCount++
filesByLayer[layer] = append(filesByLayer[layer], file)
}
}
Expand All @@ -406,8 +409,17 @@ func generateAndSendImportSpans(
covFilesByLayer = newFilesByLayer
lastCovSpan = coverSpan
lastCovSpanSize = newCovFilesSize
lastCovSpanCount = newCovFilesCount
} else {
if (newCovFilesSize == 0 || lastCovSpanSize+newCovFilesSize <= filter.targetSize) && !firstInSpan {
// We have room to add to the last span if doing so would remain below
// both the target byte size and 200 total files. We limit the number
// of files since we default to running multiple concurrent workers so
// we want to bound sum total open files across all of them to <= 1k.
// We bound the span byte size to improve work distribution and make
// the progress more granular.
fits := lastCovSpanSize+newCovFilesSize <= filter.targetSize && lastCovSpanCount+newCovFilesCount <= 200

if (newCovFilesCount == 0 || fits) && !firstInSpan {
// If there are no new files that cover this span or if we can add the
// files in the new span's cover to the last span's cover and still stay
// below targetSize, then we should merge the two spans.
Expand All @@ -416,13 +428,15 @@ func generateAndSendImportSpans(
}
lastCovSpan.EndKey = coverSpan.EndKey
lastCovSpanSize = lastCovSpanSize + newCovFilesSize
lastCovSpanCount = lastCovSpanCount + newCovFilesCount
} else {
if err := flush(ctx); err != nil {
return err
}
lastCovSpan = coverSpan
covFilesByLayer = filesByLayer
lastCovSpanSize = covSize
lastCovSpanCount = covCount
}
}
firstInSpan = false
Expand Down
26 changes: 21 additions & 5 deletions pkg/ccl/backupccl/restore_span_covering_test.go
Expand Up @@ -47,7 +47,7 @@ import (
// Files spans are ordered by start key but may overlap.
func MockBackupChain(
ctx context.Context,
length, spans, baseFiles int,
length, spans, baseFiles, fileSize int,
r *rand.Rand,
hasExternalFilesList bool,
execCfg sql.ExecutorConfig,
Expand Down Expand Up @@ -109,7 +109,7 @@ func MockBackupChain(
backups[i].Files[f].Span.Key = encoding.EncodeVarintAscending(k, int64(start))
backups[i].Files[f].Span.EndKey = encoding.EncodeVarintAscending(k, int64(end))
backups[i].Files[f].Path = fmt.Sprintf("12345-b%d-f%d.sst", i, f)
backups[i].Files[f].EntryCounts.DataSize = 1 << 20
backups[i].Files[f].EntryCounts.DataSize = int64(fileSize)
}

es, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx,
Expand Down Expand Up @@ -226,6 +226,9 @@ func checkRestoreCovering(
}
var spanIdx int
for _, c := range cov {
if len(c.Files) > 500 {
return errors.Errorf("%d files in span %v", len(c.Files), c.Span)
}
for _, f := range c.Files {
if requireSpan, ok := required[f.Path]; ok {
requireSpan.Sub(c.Span)
Expand Down Expand Up @@ -951,8 +954,21 @@ func sanityCheckFileIterator(
}
}

func TestRestoreEntryCoverTinyFiles(t *testing.T) {
defer leaktest.AfterTest(t)()
runTestRestoreEntryCoverForSpanAndFileCounts(t, 5, 5<<10, []int{5}, []int{1000, 5000})
}

//lint:ignore U1000 unused
func runTestRestoreEntryCover(t *testing.T, numBackups int) {
spans := []int{1, 2, 3, 5, 9, 11, 12}
files := []int{0, 1, 2, 3, 4, 10, 12, 50}
runTestRestoreEntryCoverForSpanAndFileCounts(t, numBackups, 1<<20, spans, files)
}

func runTestRestoreEntryCoverForSpanAndFileCounts(
t *testing.T, numBackups, fileSize int, spanCounts, fileCounts []int,
) {
r, _ := randutil.NewTestRand()
ctx := context.Background()
tc, _, _, cleanupFn := backupRestoreTestSetup(t, singleNode, 1, InitManualReplication)
Expand Down Expand Up @@ -982,10 +998,10 @@ func runTestRestoreEntryCover(t *testing.T, numBackups int) {
return merged
}

for _, spans := range []int{1, 2, 3, 5, 9, 11, 12} {
for _, files := range []int{0, 1, 2, 3, 4, 10, 12, 50} {
for _, spans := range spanCounts {
for _, files := range fileCounts {
for _, hasExternalFilesList := range []bool{true, false} {
backups, err := MockBackupChain(ctx, numBackups, spans, files, r, hasExternalFilesList, execCfg)
backups, err := MockBackupChain(ctx, numBackups, spans, files, fileSize, r, hasExternalFilesList, execCfg)
require.NoError(t, err)
layerToIterFactory, err := backupinfo.GetBackupManifestIterFactories(ctx,
execCfg.DistSQLSrv.ExternalStorage, backups, nil, nil)
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/utils_test.go
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -580,6 +581,7 @@ func requireRecoveryEvent(
//
//lint:ignore U1000 unused
func runTestRestoreMemoryMonitoring(t *testing.T, numSplits, numInc, restoreProcessorMaxFiles int) {
skip.WithIssue(t, 119836, "this functionality was never enabled and will likely be removed rather than enabled")
const splitSize = 10
numAccounts := numSplits * splitSize
var expectedNumFiles int
Expand Down