Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/sql/importer/import_into_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func TestImportIntoNonEmptyTableRowCountCheck(t *testing.T) {

_, err := db.Exec(`IMPORT INTO foo (k, v) CSV DATA ('nodelocal://1/export2/export*-n*.0.csv')`)
require.Error(t, err)
require.ErrorContains(t, err, "INSPECT found inconsistencies")
require.ErrorContains(t, err, "import row count validation failed")

// Extract the inspect job ID from the error hint and verify the error
// details contain the expected and actual row counts.
Expand Down
101 changes: 96 additions & 5 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,7 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
return err
}

validationMode := importRowCountValidation.Get(&p.ExecCfg().Settings.SV)
switch validationMode {
switch validationMode := importRowCountValidation.Get(&p.ExecCfg().Settings.SV); validationMode {
case ImportRowCountValidationOff:
// No validation required.
case ImportRowCountValidationAsync, ImportRowCountValidationSync:
Expand All @@ -429,6 +428,7 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {

var checks []*jobspb.InspectDetails_Check
var dbName, schemaName, tableName string
var expectedRowCount uint64
if err := p.ExecCfg().InternalDB.DescsTxn(ctx, func(
ctx context.Context, txn descs.Txn,
) error {
Expand Down Expand Up @@ -463,7 +463,7 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
if importProgress := prog.GetImport(); importProgress != nil {
totalImportedRows = importProgress.Summary.EntryCounts[pkID]
}
expectedRowCount := uint64(totalImportedRows + int64(table.InitialRowCount) + r.testingKnobs.expectedRowCountOffset)
expectedRowCount = uint64(totalImportedRows + int64(table.InitialRowCount) + r.testingKnobs.expectedRowCountOffset)
checks, err = inspect.ChecksForTable(ctx, p.ExecCfg(), tblDesc, &expectedRowCount)
return err
}); err != nil {
Expand All @@ -489,11 +489,102 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
r.inspectJobID = inspectJob.ID()
log.Eventf(ctx, "triggered inspect job %d for import validation for table %s with AOST %s", inspectJob.ID(), tableName, setPublicTimestamp)

// For sync mode, wait for the inspect job to complete.
if validationMode == ImportRowCountValidationSync {
switch validationMode {
case ImportRowCountValidationSync:
// validateInspectRowCount runs a separate row count for
// debugging the importer's use of inspect.
validateInspectRowCount := func() (err error) {
if err := p.ExecCfg().JobRegistry.WaitForJobsIgnoringJobErrors(ctx, []jobspb.JobID{inspectJob.ID()}); err != nil {
return errors.Wrapf(err, "failed to wait for inspect job %d for table %s", inspectJob.ID(), tableName)
}

completedJob, err := p.ExecCfg().JobRegistry.LoadJob(ctx, inspectJob.ID())
if err != nil {
return errors.Wrapf(err, "failed to load inspect job %d for table %s", inspectJob.ID(), tableName)
}

var inspectRowCount uint64
completedProgress := completedJob.Progress()
if inspectProgress := completedProgress.GetInspect(); inspectProgress != nil {
inspectRowCount = inspectProgress.RowCount
}

payload := completedJob.Payload()
if payload.FinalResumeError != nil {
decodedErr := errors.DecodeError(ctx, *payload.FinalResumeError)
log.Eventf(ctx,
"inspect job %d found issues for table %s (inspectRowCount=%d, expectedRowCount=%d)",
inspectJob.ID(), tableName, inspectRowCount, expectedRowCount,
)

// rowCountTable runs a count(*) to independently
// validate the inspect row count.
rowCountTable := func() error {
query := fmt.Sprintf(
`SELECT count(*) FROM [%d AS t] AS OF SYSTEM TIME '%s'`,
table.Desc.ID, setPublicTimestamp.AsOfSystemTime(),
)
row, err := p.ExecCfg().InternalDB.Executor().QueryRowEx(
ctx, "import-count-validation", nil, /* txn */
sessiondata.InternalExecutorOverride{
User: username.NodeUserName(),
},
query,
)
if err != nil {
return err
}
if row == nil {
return errors.AssertionFailedf("row count query returned no rows")
}
if len(row) != 1 {
return errors.AssertionFailedf("row count query returned unexpected column count: %d", len(row))
}
actualCount := uint64(tree.MustBeDInt(row[0]))

prog := r.job.Progress()
var totalImportedRows int64
if importProgress := prog.GetImport(); importProgress != nil {
totalImportedRows = importProgress.Summary.EntryCounts[pkID]
}

log.Ops.Infof(ctx,
"import row count validation for table %s (id=%d): count(*)=%d, inspectRowCount=%d, expected=%d (imported=%d + initial=%d)",
tableName, table.Desc.ID, actualCount, inspectRowCount, expectedRowCount,
totalImportedRows, table.InitialRowCount,
)

if actualCount != expectedRowCount {
return errors.Errorf("import row count validation failed for table %s (id=%d): count(*)=%d, expected=%d", tableName, table.Desc.ID, actualCount, expectedRowCount)
}

return nil
}
rowCountTableErr := rowCountTable()

inspectErr := errors.Wrapf(decodedErr,
"inspect job %d found issues for table %s", inspectJob.ID(), tableName)
if rowCountTableErr != nil {
return errors.CombineErrors(errors.WithHintf(rowCountTableErr,
"Run 'SHOW INSPECT ERRORS FOR JOB %d WITH DETAILS' for more information.",
inspectJob.ID()), inspectErr)
}
return inspectErr
}

return nil
}
if err := validateInspectRowCount(); err != nil {
return err
}

// The broader second wait captures errors from the job being
// cancelled or paused that the `validateInspectRowCount` debug
// function lets through.
if err := p.ExecCfg().JobRegistry.WaitForJobs(ctx, []jobspb.JobID{inspectJob.ID()}); err != nil {
return errors.Wrapf(err, "failed to wait for inspect job %d for table %s", inspectJob.ID(), tableName)
}

log.Eventf(ctx, "inspect job %d completed for table %s", inspectJob.ID(), tableName)
}
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/sql/inspect/inspect_resumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
package inspect

import (
"bytes"
"context"
"slices"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -88,6 +90,22 @@ func (c *inspectResumer) Resume(ctx context.Context, execCtx interface{}) error
return err
}

// Assert that the partitioned spans are non-overlapping. It is checked
// explicitly for debugging.
//
// It has the side effect of sorting the partitioned spans by start key.
slices.SortFunc(remainingPartitionedSpans, func(a, b roachpb.Span) int {
return bytes.Compare(a.Key, b.Key)
})
for i := 1; i < len(remainingPartitionedSpans); i++ {
if bytes.Compare(remainingPartitionedSpans[i-1].EndKey, remainingPartitionedSpans[i].Key) > 0 {
return errors.AssertionFailedf(
"inspect spans overlapping: span %d [%q, %q) overlaps span %d [%q, %q)",
i-1, remainingPartitionedSpans[i-1].Key, remainingPartitionedSpans[i-1].EndKey,
i, remainingPartitionedSpans[i].Key, remainingPartitionedSpans[i].EndKey)
}
}

if err := c.runInspectPlan(ctx, jobExecCtx, planCtx, plan, progressTracker); err != nil {
return err
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/util/besteffort/besteffort.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ import (
// Warning executes a best-effort operation that logs a warning on failure.
//
// Best-effort operations are tasks that should be attempted but are not critical
// to system correctness. In production builds, failures are logged as warnings.
// In test builds, failures panic by default to catch regressions.
// to system correctness.
// In production builds, failures are logged as warnings.
// In test builds, failures panic by default to catch regressions and operations
// are randomly skipped 50% of the time to ensure the system remains correct
// when those operations don't run.
//
// Example usage:
//
Expand All @@ -40,8 +43,11 @@ func Warning(ctx context.Context, name string, do func(ctx context.Context) erro
// Error executes a best-effort operation that logs an error on failure.
//
// Best-effort operations are tasks that should be attempted but are not critical
// to system correctness. In production builds, failures are logged as errors.
// In test builds, failures panic by default to catch regressions.
// to system correctness.
// In production builds, failures are logged as errors. In test builds, failures
// panic by default to catch regressions and operations are randomly skipped 50%
// of the time to ensure the system remains correct when those operations don't
// run.
//
// Example usage:
//
Expand All @@ -67,7 +73,7 @@ func Error(ctx context.Context, name string, do func(ctx context.Context) error)
//
// Unlike Warning and Error, Cleanup never skips execution in test builds.
// This is appropriate for resource cleanup (closing files, release locks,
// etc.), where skipping would cause resource leaks. In production ubilds,
// etc.), where skipping would cause resource leaks. In production builds,
// errors are logged as warnings. In test builds, errors panic by default to
// catch regressions.
//
Expand Down
Loading