Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs: only force jobs.MaybeGenerateForcedRetryableError in 23.1 #113864

Merged
merged 3 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func backup(
return nil
}
jobsprofiler.StorePerNodeProcessorProgressFraction(
ctx, execCtx.ExecCfg().InternalDB, job.ID(), prog)
ctx, execCtx.ExecCfg().InternalDB, job.ID(), prog, execCtx.ExecCfg().Settings.Version)
case <-ctx.Done():
return ctx.Err()
}
Expand Down Expand Up @@ -2095,7 +2095,7 @@ func (b *backupResumer) CollectProfile(ctx context.Context, execCtx interface{})
aggStatsCopy = b.mu.perNodeAggregatorStats.DeepCopy()
}()
return bulkutil.FlushTracingAggregatorStats(ctx, b.job.ID(),
p.ExecCfg().InternalDB, aggStatsCopy)
p.ExecCfg().InternalDB, aggStatsCopy, p.ExecCfg().Settings.Version)
}

func (b *backupResumer) deleteCheckpoint(
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func distBackup(
defer close(progCh)
defer close(tracingAggCh)
execCfg := execCtx.ExecCfg()
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID, execCfg.Settings.Version)

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2544,7 +2544,7 @@ func (r *restoreResumer) CollectProfile(ctx context.Context, execCtx interface{}
aggStatsCopy = r.mu.perNodeAggregatorStats.DeepCopy()
}()
return bulkutil.FlushTracingAggregatorStats(ctx, r.job.ID(),
p.ExecCfg().InternalDB, aggStatsCopy)
p.ExecCfg().InternalDB, aggStatsCopy, p.ExecCfg().Settings.Version)
}

// dropDescriptors implements the OnFailOrCancel logic.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func distRestore(
defer recv.Release()

execCfg := execCtx.ExecCfg()
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, md.jobID)
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, md.jobID, execCfg.Settings.Version)

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func startDistChangefeed(
finishedSetupFn = func(flowinfra.Flow) { resultsCh <- tree.Datums(nil) }
}

jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID, execCfg.Settings.Version)

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/replicationutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv/kvpb",
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/streamingccl/replicationutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -174,9 +175,9 @@ func ReplicatedTimeFromProgress(p *jobspb.Progress) hlc.Timestamp {
// LoadIngestionProgress loads the latest persisted stream ingestion progress.
// The method returns nil if the progress does not exist yet.
func LoadIngestionProgress(
ctx context.Context, db isql.DB, jobID jobspb.JobID,
ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle,
) (*jobspb.StreamIngestionProgress, error) {
progress, err := jobs.LoadJobProgress(ctx, db, jobID)
progress, err := jobs.LoadJobProgress(ctx, db, jobID, cv)
if err != nil || progress == nil {
return nil, err
}
Expand All @@ -192,9 +193,9 @@ func LoadIngestionProgress(
// LoadReplicationProgress loads the latest persisted stream replication progress.
// The method returns nil if the progress does not exist yet.
func LoadReplicationProgress(
ctx context.Context, db isql.DB, jobID jobspb.JobID,
ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle,
) (*jobspb.StreamReplicationProgress, error) {
progress, err := jobs.LoadJobProgress(ctx, db, jobID)
progress, err := jobs.LoadJobProgress(ctx, db, jobID, cv)
if err != nil || progress == nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"//pkg/cloud",
"//pkg/cloud/externalconn",
"//pkg/cloud/externalconn/connectionpb",
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprofiler",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"text/tabwriter"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -104,15 +105,19 @@ func constructSpanFrontierExecutionDetails(
// - The snapshot of the frontier tracking how far each span has been replicated
// up to.
func generateSpanFrontierExecutionDetailFile(
ctx context.Context, execCfg *sql.ExecutorConfig, ingestionJobID jobspb.JobID, skipBehindBy bool,
ctx context.Context,
execCfg *sql.ExecutorConfig,
ingestionJobID jobspb.JobID,
skipBehindBy bool,
cv clusterversion.Handle,
) error {
return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
var sb bytes.Buffer
w := tabwriter.NewWriter(&sb, 0, 0, 1, ' ', tabwriter.TabIndent)

// Read the StreamIngestionPartitionSpecs to get a mapping from spans to
// their source and destination SQL instance IDs.
specs, err := jobs.ReadChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, txn, ingestionJobID)
specs, err := jobs.ReadChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, txn, ingestionJobID, cv)
if err != nil {
return err
}
Expand All @@ -124,7 +129,7 @@ func generateSpanFrontierExecutionDetailFile(

// Now, read the latest snapshot of the frontier that tells us what
// timestamp each span has been replicated up to.
frontierEntries, err := jobs.ReadChunkedFileToJobInfo(ctx, frontierEntriesFilename, txn, ingestionJobID)
frontierEntries, err := jobs.ReadChunkedFileToJobInfo(ctx, frontierEntriesFilename, txn, ingestionJobID, cv)
if err != nil {
return err
}
Expand Down Expand Up @@ -157,7 +162,7 @@ func generateSpanFrontierExecutionDetailFile(
if err := w.Flush(); err != nil {
return err
}
return jobs.WriteExecutionDetailFile(ctx, filename, sb.Bytes(), txn, ingestionJobID)
return jobs.WriteExecutionDetailFile(ctx, filename, sb.Bytes(), txn, ingestionJobID, cv)
})
}

Expand All @@ -170,6 +175,7 @@ func persistStreamIngestionPartitionSpecs(
execCfg *sql.ExecutorConfig,
ingestionJobID jobspb.JobID,
streamIngestionSpecs map[base.SQLInstanceID]*execinfrapb.StreamIngestionDataSpec,
cv clusterversion.Handle,
) error {
err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
specs := make([]*execinfrapb.StreamIngestionPartitionSpec, 0)
Expand All @@ -183,7 +189,7 @@ func persistStreamIngestionPartitionSpecs(
if err != nil {
return err
}
return jobs.WriteChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, specBytes, txn, ingestionJobID)
return jobs.WriteChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, specBytes, txn, ingestionJobID, cv)
})
if knobs := execCfg.StreamingTestingKnobs; knobs != nil && knobs.AfterPersistingPartitionSpecs != nil {
knobs.AfterPersistingPartitionSpecs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func TestEndToEndFrontierExecutionDetailFile(t *testing.T) {

ingestionJobID := jobspb.JobID(123)
require.NoError(t, persistStreamIngestionPartitionSpecs(ctx, &execCfg,
ingestionJobID, streamIngestionsSpecs))
ingestionJobID, streamIngestionsSpecs, execCfg.Settings.Version))

// Now, let's persist some frontier entries.
frontierEntries := execinfrapb.FrontierEntries{ResolvedSpans: []jobspb.ResolvedSpan{
Expand All @@ -369,9 +369,9 @@ func TestEndToEndFrontierExecutionDetailFile(t *testing.T) {
frontierBytes, err := protoutil.Marshal(&frontierEntries)
require.NoError(t, err)
require.NoError(t, execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, ingestionJobID)
return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, ingestionJobID, execCfg.Settings.Version)
}))
require.NoError(t, generateSpanFrontierExecutionDetailFile(ctx, &execCfg, ingestionJobID, true /* skipBehindBy */))
require.NoError(t, generateSpanFrontierExecutionDetailFile(ctx, &execCfg, ingestionJobID, true /* skipBehindBy */, execCfg.Settings.Version))
files := listExecutionDetails(t, srv, ingestionJobID)
require.Len(t, files, 1)
data, err := checkExecutionDetails(t, srv, ingestionJobID, files[0])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1114,12 +1114,14 @@ func TestLoadProducerAndIngestionProgress(t *testing.T) {
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(replicationJobID))

srcDB := c.SrcSysServer.ExecutorConfig().(sql.ExecutorConfig).InternalDB
producerProgress, err := replicationutils.LoadReplicationProgress(ctx, srcDB, jobspb.JobID(producerJobID))
producerProgress, err := replicationutils.LoadReplicationProgress(ctx, srcDB, jobspb.JobID(producerJobID),
c.SrcSysServer.ExecutorConfig().(sql.ExecutorConfig).Settings.Version)
require.NoError(t, err)
require.Equal(t, jobspb.StreamReplicationProgress_NOT_FINISHED, producerProgress.StreamIngestionStatus)

destDB := c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig).InternalDB
ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, destDB, jobspb.JobID(replicationJobID))
ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, destDB, jobspb.JobID(replicationJobID),
c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig).Settings.Version)
require.NoError(t, err)
require.Equal(t, jobspb.Replicating, ingestionProgress.ReplicationStatus)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func startDistIngestion(
return errors.Wrap(err, "failed to update job progress")
}
jobsprofiler.StorePlanDiagram(ctx, execCtx.ExecCfg().DistSQLSrv.Stopper, planner.initialPlan, execCtx.ExecCfg().InternalDB,
ingestionJob.ID())
ingestionJob.ID(), execCtx.ExecCfg().Settings.Version)

replanOracle := sql.ReplanOnCustomFunc(
measurePlanChange,
Expand Down Expand Up @@ -463,7 +463,7 @@ func (p *replicationFlowPlanner) constructPlanGenerator(
if !p.createdInitialPlan() {
// Only persist the initial plan as it's the only plan that actually gets
// executed.
if err := persistStreamIngestionPartitionSpecs(ctx, execCtx.ExecCfg(), ingestionJobID, streamIngestionSpecs); err != nil {
if err := persistStreamIngestionPartitionSpecs(ctx, execCtx.ExecCfg(), ingestionJobID, streamIngestionSpecs, execCtx.ExecCfg().Settings.Version); err != nil {
return nil, nil, err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (sf *streamIngestionFrontier) maybePersistFrontierEntries() error {
}

if err = sf.FlowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, jobID)
return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, jobID, sf.EvalCtx.Settings.Version)
}); err != nil {
return err
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
Expand Down Expand Up @@ -224,7 +225,7 @@ func ingestWithRetries(
}
status := redact.Sprintf("waiting before retrying error: %s", err)
updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationError, status)
newReplicatedTime := loadReplicatedTime(ctx, execCtx.ExecCfg().InternalDB, ingestionJob)
newReplicatedTime := loadReplicatedTime(ctx, execCtx.ExecCfg().InternalDB, ingestionJob, execCtx.ExecCfg().Settings.Version)
if lastReplicatedTime.Less(newReplicatedTime) {
r.Reset()
lastReplicatedTime = newReplicatedTime
Expand All @@ -241,8 +242,10 @@ func ingestWithRetries(
return nil
}

func loadReplicatedTime(ctx context.Context, db isql.DB, ingestionJob *jobs.Job) hlc.Timestamp {
latestProgress, err := replicationutils.LoadIngestionProgress(ctx, db, ingestionJob.ID())
func loadReplicatedTime(
ctx context.Context, db isql.DB, ingestionJob *jobs.Job, cv clusterversion.Handle,
) hlc.Timestamp {
latestProgress, err := replicationutils.LoadIngestionProgress(ctx, db, ingestionJob.ID(), cv)
if err != nil {
log.Warningf(ctx, "error loading job progress: %s", err)
return hlc.Timestamp{}
Expand Down Expand Up @@ -556,11 +559,11 @@ func (s *streamIngestionResumer) CollectProfile(ctx context.Context, execCtx int

var combinedErr error
if err := bulkutil.FlushTracingAggregatorStats(ctx, s.job.ID(),
p.ExecCfg().InternalDB, aggStatsCopy); err != nil {
p.ExecCfg().InternalDB, aggStatsCopy, p.ExecCfg().Settings.Version); err != nil {
combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, "failed to flush aggregator stats"))
}
if err := generateSpanFrontierExecutionDetailFile(ctx, p.ExecCfg(),
s.job.ID(), false /* skipBehindBy */); err != nil {
s.job.ID(), false /* skipBehindBy */, p.ExecCfg().Settings.Version); err != nil {
combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, "failed to generate span frontier execution details"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -318,6 +319,7 @@ func newStreamIngestionDataProcessor(
cutoverProvider: &cutoverFromJobProgress{
jobID: jobspb.JobID(spec.JobID),
db: flowCtx.Cfg.DB,
cv: flowCtx.Cfg.Settings.Version,
},
buffer: &streamIngestionBuffer{},
cutoverCh: make(chan struct{}),
Expand Down Expand Up @@ -1245,10 +1247,11 @@ type cutoverProvider interface {
type cutoverFromJobProgress struct {
db isql.DB
jobID jobspb.JobID
cv clusterversion.Handle
}

func (c *cutoverFromJobProgress) cutoverReached(ctx context.Context) (bool, error) {
ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, c.db, c.jobID)
ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, c.db, c.jobID, c.cv)
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamproducer/producer_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) er
case <-p.timer.Ch():
p.timer.MarkRead()
p.timer.Reset(streamingccl.StreamReplicationStreamLivenessTrackFrequency.Get(execCfg.SV()))
progress, err := replicationutils.LoadReplicationProgress(ctx, execCfg.InternalDB, p.job.ID())
progress, err := replicationutils.LoadReplicationProgress(ctx, execCfg.InternalDB, p.job.ID(), execCfg.Settings.Version)
if knobs := execCfg.StreamingTestingKnobs; knobs != nil && knobs.AfterResumerJobLoad != nil {
err = knobs.AfterResumerJobLoad(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (r *Registry) maybeDumpTrace(resumerCtx context.Context, resumer Resumer, j
r.ID().String(), timeutil.Now().Format("20060102_150405.00"))
td := jobspb.TraceData{CollectedSpans: sp.GetConfiguredRecording()}
if err := r.db.Txn(dumpCtx, func(ctx context.Context, txn isql.Txn) error {
return WriteProtobinExecutionDetailFile(dumpCtx, resumerTraceFilename, &td, txn, jobID)
return WriteProtobinExecutionDetailFile(dumpCtx, resumerTraceFilename, &td, txn, jobID, r.settings.Version)
}); err != nil {
log.Warning(dumpCtx, "failed to write trace on resumer trace file")
}
Expand Down