From bd1928056e2e7bc1b6cc9f9400e11bc31a286c5c Mon Sep 17 00:00:00 2001 From: Brendan Gerrity Date: Fri, 17 Apr 2026 13:17:40 -0400 Subject: [PATCH] importer: add checks and validations for row count validation failures This makes two temporary changes with the aim to debug #168396: After the `INSPECT` job finds inconsistencies, the `IMPORT` job runs an independent `SELECT count(*)` on the target table. This determines if the row-count discrepancy is due to the `INSPECT` job's row counting over the spans or the `IMPORT` job's calculation of the expected row count. The `INSPECT` resumer asserts that the spans it delegates to workers are not overlapping which would cause overcounting of rows in the overlap. Informs: #168396 Release note: None --- pkg/sql/importer/import_into_test.go | 2 +- pkg/sql/importer/import_job.go | 101 +++++++++++++++++++++++++-- pkg/sql/inspect/inspect_resumer.go | 18 +++++ pkg/util/besteffort/besteffort.go | 16 +++-- 4 files changed, 126 insertions(+), 11 deletions(-) diff --git a/pkg/sql/importer/import_into_test.go b/pkg/sql/importer/import_into_test.go index 41a61267458a..5f270ecf34dc 100644 --- a/pkg/sql/importer/import_into_test.go +++ b/pkg/sql/importer/import_into_test.go @@ -361,7 +361,7 @@ func TestImportIntoNonEmptyTableRowCountCheck(t *testing.T) { _, err := db.Exec(`IMPORT INTO foo (k, v) CSV DATA ('nodelocal://1/export2/export*-n*.0.csv')`) require.Error(t, err) - require.ErrorContains(t, err, "INSPECT found inconsistencies") + require.ErrorContains(t, err, "import row count validation failed") // Extract the inspect job ID from the error hint and verify the error // details contain the expected and actual row counts. diff --git a/pkg/sql/importer/import_job.go b/pkg/sql/importer/import_job.go index 767bac475927..cbf0524511a4 100644 --- a/pkg/sql/importer/import_job.go +++ b/pkg/sql/importer/import_job.go @@ -417,8 +417,7 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { return err } - validationMode := importRowCountValidation.Get(&p.ExecCfg().Settings.SV) - switch validationMode { + switch validationMode := importRowCountValidation.Get(&p.ExecCfg().Settings.SV); validationMode { case ImportRowCountValidationOff: // No validation required. case ImportRowCountValidationAsync, ImportRowCountValidationSync: @@ -429,6 +428,7 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { var checks []*jobspb.InspectDetails_Check var dbName, schemaName, tableName string + var expectedRowCount uint64 if err := p.ExecCfg().InternalDB.DescsTxn(ctx, func( ctx context.Context, txn descs.Txn, ) error { @@ -463,7 +463,7 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { if importProgress := prog.GetImport(); importProgress != nil { totalImportedRows = importProgress.Summary.EntryCounts[pkID] } - expectedRowCount := uint64(totalImportedRows + int64(table.InitialRowCount) + r.testingKnobs.expectedRowCountOffset) + expectedRowCount = uint64(totalImportedRows + int64(table.InitialRowCount) + r.testingKnobs.expectedRowCountOffset) checks, err = inspect.ChecksForTable(ctx, p.ExecCfg(), tblDesc, &expectedRowCount) return err }); err != nil { @@ -489,11 +489,102 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { r.inspectJobID = inspectJob.ID() log.Eventf(ctx, "triggered inspect job %d for import validation for table %s with AOST %s", inspectJob.ID(), tableName, setPublicTimestamp) - // For sync mode, wait for the inspect job to complete. - if validationMode == ImportRowCountValidationSync { + switch validationMode { + case ImportRowCountValidationSync: + // validateInspectRowCount runs a separate row count for + // debugging the importer's use of inspect. + validateInspectRowCount := func() (err error) { + if err := p.ExecCfg().JobRegistry.WaitForJobsIgnoringJobErrors(ctx, []jobspb.JobID{inspectJob.ID()}); err != nil { + return errors.Wrapf(err, "failed to wait for inspect job %d for table %s", inspectJob.ID(), tableName) + } + + completedJob, err := p.ExecCfg().JobRegistry.LoadJob(ctx, inspectJob.ID()) + if err != nil { + return errors.Wrapf(err, "failed to load inspect job %d for table %s", inspectJob.ID(), tableName) + } + + var inspectRowCount uint64 + completedProgress := completedJob.Progress() + if inspectProgress := completedProgress.GetInspect(); inspectProgress != nil { + inspectRowCount = inspectProgress.RowCount + } + + payload := completedJob.Payload() + if payload.FinalResumeError != nil { + decodedErr := errors.DecodeError(ctx, *payload.FinalResumeError) + log.Eventf(ctx, + "inspect job %d found issues for table %s (inspectRowCount=%d, expectedRowCount=%d)", + inspectJob.ID(), tableName, inspectRowCount, expectedRowCount, + ) + + // rowCountTable runs a count(*) to independently + // validate the inspect row count. + rowCountTable := func() error { + query := fmt.Sprintf( + `SELECT count(*) FROM [%d AS t] AS OF SYSTEM TIME '%s'`, + table.Desc.ID, setPublicTimestamp.AsOfSystemTime(), + ) + row, err := p.ExecCfg().InternalDB.Executor().QueryRowEx( + ctx, "import-count-validation", nil, /* txn */ + sessiondata.InternalExecutorOverride{ + User: username.NodeUserName(), + }, + query, + ) + if err != nil { + return err + } + if row == nil { + return errors.AssertionFailedf("row count query returned no rows") + } + if len(row) != 1 { + return errors.AssertionFailedf("row count query returned unexpected column count: %d", len(row)) + } + actualCount := uint64(tree.MustBeDInt(row[0])) + + prog := r.job.Progress() + var totalImportedRows int64 + if importProgress := prog.GetImport(); importProgress != nil { + totalImportedRows = importProgress.Summary.EntryCounts[pkID] + } + + log.Ops.Infof(ctx, + "import row count validation for table %s (id=%d): count(*)=%d, inspectRowCount=%d, expected=%d (imported=%d + initial=%d)", + tableName, table.Desc.ID, actualCount, inspectRowCount, expectedRowCount, + totalImportedRows, table.InitialRowCount, + ) + + if actualCount != expectedRowCount { + return errors.Errorf("import row count validation failed for table %s (id=%d): count(*)=%d, expected=%d", tableName, table.Desc.ID, actualCount, expectedRowCount) + } + + return nil + } + rowCountTableErr := rowCountTable() + + inspectErr := errors.Wrapf(decodedErr, + "inspect job %d found issues for table %s", inspectJob.ID(), tableName) + if rowCountTableErr != nil { + return errors.CombineErrors(errors.WithHintf(rowCountTableErr, + "Run 'SHOW INSPECT ERRORS FOR JOB %d WITH DETAILS' for more information.", + inspectJob.ID()), inspectErr) + } + return inspectErr + } + + return nil + } + if err := validateInspectRowCount(); err != nil { + return err + } + + // The broader second wait captures errors from the job being + // cancelled or paused that the `validateInspectRowCount` debug + // function lets through. if err := p.ExecCfg().JobRegistry.WaitForJobs(ctx, []jobspb.JobID{inspectJob.ID()}); err != nil { return errors.Wrapf(err, "failed to wait for inspect job %d for table %s", inspectJob.ID(), tableName) } + log.Eventf(ctx, "inspect job %d completed for table %s", inspectJob.ID(), tableName) } } diff --git a/pkg/sql/inspect/inspect_resumer.go b/pkg/sql/inspect/inspect_resumer.go index 6754170d1dfd..cc5cab873559 100644 --- a/pkg/sql/inspect/inspect_resumer.go +++ b/pkg/sql/inspect/inspect_resumer.go @@ -6,7 +6,9 @@ package inspect import ( + "bytes" "context" + "slices" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -88,6 +90,22 @@ func (c *inspectResumer) Resume(ctx context.Context, execCtx interface{}) error return err } + // Assert that the partitioned spans are non-overlapping. It is checked + // explicitly for debugging. + // + // It has the side effect of sorting the partitioned spans by start key. + slices.SortFunc(remainingPartitionedSpans, func(a, b roachpb.Span) int { + return bytes.Compare(a.Key, b.Key) + }) + for i := 1; i < len(remainingPartitionedSpans); i++ { + if bytes.Compare(remainingPartitionedSpans[i-1].EndKey, remainingPartitionedSpans[i].Key) > 0 { + return errors.AssertionFailedf( + "inspect spans overlapping: span %d [%q, %q) overlaps span %d [%q, %q)", + i-1, remainingPartitionedSpans[i-1].Key, remainingPartitionedSpans[i-1].EndKey, + i, remainingPartitionedSpans[i].Key, remainingPartitionedSpans[i].EndKey) + } + } + if err := c.runInspectPlan(ctx, jobExecCtx, planCtx, plan, progressTracker); err != nil { return err } diff --git a/pkg/util/besteffort/besteffort.go b/pkg/util/besteffort/besteffort.go index fd6e90504f04..d11e456550ff 100644 --- a/pkg/util/besteffort/besteffort.go +++ b/pkg/util/besteffort/besteffort.go @@ -14,8 +14,11 @@ import ( // Warning executes a best-effort operation that logs a warning on failure. // // Best-effort operations are tasks that should be attempted but are not critical -// to system correctness. In production builds, failures are logged as warnings. -// In test builds, failures panic by default to catch regressions. +// to system correctness. +// In production builds, failures are logged as warnings. +// In test builds, failures panic by default to catch regressions and operations +// are randomly skipped 50% of the time to ensure the system remains correct +// when those operations don't run. // // Example usage: // @@ -40,8 +43,11 @@ func Warning(ctx context.Context, name string, do func(ctx context.Context) erro // Error executes a best-effort operation that logs an error on failure. // // Best-effort operations are tasks that should be attempted but are not critical -// to system correctness. In production builds, failures are logged as errors. -// In test builds, failures panic by default to catch regressions. +// to system correctness. +// In production builds, failures are logged as errors. In test builds, failures +// panic by default to catch regressions and operations are randomly skipped 50% +// of the time to ensure the system remains correct when those operations don't +// run. // // Example usage: // @@ -67,7 +73,7 @@ func Error(ctx context.Context, name string, do func(ctx context.Context) error) // // Unlike Warning and Error, Cleanup never skips execution in test builds. // This is appropriate for resource cleanup (closing files, release locks, -// etc.), where skipping would cause resource leaks. In production ubilds, +// etc.), where skipping would cause resource leaks. In production builds, // errors are logged as warnings. In test builds, errors panic by default to // catch regressions. //