Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
108359: backupccl: hookup tracing aggregator events from the restore job r=stevendanna a=adityamaru

This change builds on top of #107994 and wires up each restore
data processor to emit TracingAggregatorEvents to the job coordinator.
These events are periodically flushed to files in the `job_info`
table and are consumable via the DBConsole Job Details page.

Fixes: #100126
Release note: None

109291: streamingccl: stream span config checkpoints r=stevendanna a=msbutler

This patch modifies the span config event stream to emit a checkpoint event
containing the rangefeed frontier after the event stream processes each
rangefeed cache flush.

The span config client can then use this information while processing updates.
Specifically, the subscription.Next() call may return a
checkpoint which indicates that all updates up to a given frontier have been
emitted by the rangefeed.

This patch also fixes two bugs:
- prevents sending an empty batch of updates
- prevents sending system target span config updates

Informs #106823

Release note: None

Co-authored-by: adityamaru <adityamaru@gmail.com>
Co-authored-by: Michael Butler <butler@cockroachlabs.com>
  • Loading branch information
3 people committed Aug 25, 2023
3 parents c215aba + e89c74f + 9077f50 commit 155dc00
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 82 deletions.
44 changes: 36 additions & 8 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"context"
"fmt"
"runtime"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils"
Expand Down Expand Up @@ -41,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
gogotypes "github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -76,7 +78,10 @@ type restoreDataProcessor struct {
// concurrent workers and sent down the flow by the processor.
progCh chan backuppb.RestoreProgress

agg *bulkutil.TracingAggregator
// Aggregator that aggregates StructuredEvents emitted in the
// restoreDataProcessors' trace recording.
agg *bulkutil.TracingAggregator
aggTimer *timeutil.Timer

// qp is a MemoryBackedQuotaPool that restricts the amount of memory that
// can be used by this processor to open iterators on SSTs.
Expand Down Expand Up @@ -206,7 +211,7 @@ func newRestoreDataProcessor(
InputsToDrain: []execinfra.RowSource{input},
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
rd.ConsumerClosed()
return nil
return []execinfrapb.ProducerMetadata{*rd.constructTracingAggregatorProducerMeta(ctx)}
},
}); err != nil {
return nil, err
Expand All @@ -222,6 +227,8 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) {

ctx, cancel := context.WithCancel(ctx)
ctx, rd.agg = bulkutil.MakeTracingAggregatorWithSpan(ctx, fmt.Sprintf("%s-aggregator", restoreDataProcName), rd.EvalCtx.Tracer)
rd.aggTimer = timeutil.NewTimer()
rd.aggTimer.Reset(15 * time.Second)

rd.cancelWorkersAndWait = func() {
cancel()
Expand Down Expand Up @@ -479,10 +486,6 @@ func (rd *restoreDataProcessor) runRestoreWorkers(
return err
}

ctx, agg := bulkutil.MakeTracingAggregatorWithSpan(ctx,
fmt.Sprintf("%s-worker-%d-aggregator", restoreDataProcName, worker), rd.EvalCtx.Tracer)
defer agg.Close()

var sstIter mergedSST
for {
done, err := func() (done bool, _ error) {
Expand Down Expand Up @@ -680,6 +683,28 @@ func makeProgressUpdate(
return progDetails
}

func (rd *restoreDataProcessor) constructTracingAggregatorProducerMeta(
ctx context.Context,
) *execinfrapb.ProducerMetadata {
aggEvents := &execinfrapb.TracingAggregatorEvents{
SQLInstanceID: rd.flowCtx.NodeID.SQLInstanceID(),
FlowID: rd.flowCtx.ID,
Events: make(map[string][]byte),
}
rd.agg.ForEachAggregatedEvent(func(name string, event bulkutil.TracingAggregatorEvent) {
var data []byte
var err error
if data, err = bulkutil.TracingAggregatorEventToBytes(ctx, event); err != nil {
// This should never happen but if it does skip the aggregated event.
log.Warningf(ctx, "failed to unmarshal aggregated event: %v", err.Error())
return
}
aggEvents.Events[name] = data
})

return &execinfrapb.ProducerMetadata{AggregatorEvents: aggEvents}
}

// Next is part of the RowSource interface.
func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
if rd.State != execinfra.StateRunning {
Expand All @@ -702,14 +727,17 @@ func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Produce
return nil, rd.DrainHelper()
}
prog.ProgressDetails = *details
return nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog}
case <-rd.aggTimer.C:
rd.aggTimer.Read = true
rd.aggTimer.Reset(15 * time.Second)
return nil, rd.constructTracingAggregatorProducerMeta(rd.Ctx())
case meta := <-rd.metaCh:
return nil, meta
case <-rd.Ctx().Done():
rd.MoveToDraining(rd.Ctx().Err())
return nil, rd.DrainHelper()
}

return nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog}
}

// ConsumerClosed is part of the RowSource interface.
Expand Down
42 changes: 31 additions & 11 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -403,6 +404,21 @@ func restore(
}
tasks = append(tasks, generativeCheckpointLoop)

tracingAggCh := make(chan *execinfrapb.TracingAggregatorEvents)
tracingAggLoop := func(ctx context.Context) error {
if err := bulk.AggregateTracingStats(ctx, job.ID(),
execCtx.ExecCfg().Settings, execCtx.ExecCfg().InternalDB, tracingAggCh); err != nil {
log.Warningf(ctx, "failed to aggregate tracing stats: %v", err)
// Even if we fail to aggregate tracing stats, we must continue draining
// the channel so that the sender in the DistSQLReceiver does not block
// and allows the backup to continue uninterrupted.
for range tracingAggCh {
}
}
return nil
}
tasks = append(tasks, tracingAggLoop)

runRestore := func(ctx context.Context) error {
if details.ExperimentalOnline {
log.Warningf(ctx, "EXPERIMENTAL ONLINE RESTORE being used")
Expand All @@ -423,21 +439,25 @@ func restore(
genSpan,
)
}
md := restoreJobMetadata{
jobID: job.ID(),
dataToRestore: dataToRestore,
restoreTime: endTime,
encryption: encryption,
kmsEnv: kmsEnv,
uris: details.URIs,
backupLocalityInfo: backupLocalityInfo,
spanFilter: filter,
numImportSpans: numImportSpans,
useSimpleImportSpans: simpleImportSpans,
execLocality: details.ExecutionLocality,
}
return distRestore(
ctx,
execCtx,
job.ID(),
dataToRestore,
endTime,
encryption,
kmsEnv,
details.URIs,
backupLocalityInfo,
filter,
numImportSpans,
simpleImportSpans,
details.ExecutionLocality,
md,
progCh,
tracingAggCh,
)
}
tasks = append(tasks, runRestore)
Expand Down
92 changes: 51 additions & 41 deletions pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ var memoryMonitorSSTs = settings.RegisterBoolSetting(
settings.WithName("bulkio.restore.sst_memory_limit.enabled"),
)

type restoreJobMetadata struct {
jobID jobspb.JobID
dataToRestore restorationData
restoreTime hlc.Timestamp
encryption *jobspb.BackupEncryptionOptions
kmsEnv cloud.KMSEnv
uris []string
backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo
spanFilter spanCoveringFilter
numImportSpans int
useSimpleImportSpans bool
execLocality roachpb.Locality
}

// distRestore plans a 2 stage distSQL flow for a distributed restore. It
// streams back progress updates over the given progCh. The first stage is a
// splitAndScatter processor on every node that is running a compatible version.
Expand All @@ -70,24 +84,16 @@ var memoryMonitorSSTs = settings.RegisterBoolSetting(
func distRestore(
ctx context.Context,
execCtx sql.JobExecContext,
jobID jobspb.JobID,
dataToRestore restorationData,
restoreTime hlc.Timestamp,
encryption *jobspb.BackupEncryptionOptions,
kmsEnv cloud.KMSEnv,
uris []string,
backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo,
spanFilter spanCoveringFilter,
numImportSpans int,
useSimpleImportSpans bool,
execLocality roachpb.Locality,
md restoreJobMetadata,
progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
tracingAggCh chan *execinfrapb.TracingAggregatorEvents,
) error {
defer close(progCh)
defer close(tracingAggCh)
var noTxn *kv.Txn

if encryption != nil && encryption.Mode == jobspb.EncryptionMode_KMS {
kms, err := cloud.KMSFromURI(ctx, encryption.KMSInfo.Uri, kmsEnv)
if md.encryption != nil && md.encryption.Mode == jobspb.EncryptionMode_KMS {
kms, err := cloud.KMSFromURI(ctx, md.encryption.KMSInfo.Uri, md.kmsEnv)
if err != nil {
return err
}
Expand All @@ -98,7 +104,7 @@ func distRestore(
}
}()

encryption.Key, err = kms.Decrypt(ctx, encryption.KMSInfo.EncryptedDataKey)
md.encryption.Key, err = kms.Decrypt(ctx, md.encryption.KMSInfo.EncryptedDataKey)
if err != nil {
return errors.Wrap(err,
"failed to decrypt data key before starting BackupDataProcessor")
Expand All @@ -107,16 +113,16 @@ func distRestore(
// Wrap the relevant BackupEncryptionOptions to be used by the Restore
// processor.
var fileEncryption *kvpb.FileEncryptionOptions
if encryption != nil {
fileEncryption = &kvpb.FileEncryptionOptions{Key: encryption.Key}
if md.encryption != nil {
fileEncryption = &kvpb.FileEncryptionOptions{Key: md.encryption.Key}
}

memMonSSTs := memoryMonitorSSTs.Get(execCtx.ExecCfg().SV())
makePlan := func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {

planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanningWithOracle(
ctx, execCtx.ExtendedEvalContext(), execCtx.ExecCfg(),
physicalplan.DefaultReplicaChooser, execLocality,
physicalplan.DefaultReplicaChooser, md.execLocality,
)
if err != nil {
return nil, nil, err
Expand All @@ -126,13 +132,13 @@ func distRestore(
p := planCtx.NewPhysicalPlan()

restoreDataSpec := execinfrapb.RestoreDataSpec{
JobID: int64(jobID),
RestoreTime: restoreTime,
JobID: int64(md.jobID),
RestoreTime: md.restoreTime,
Encryption: fileEncryption,
TableRekeys: dataToRestore.getRekeys(),
TenantRekeys: dataToRestore.getTenantRekeys(),
PKIDs: dataToRestore.getPKIDs(),
ValidateOnly: dataToRestore.isValidateOnly(),
TableRekeys: md.dataToRestore.getRekeys(),
TenantRekeys: md.dataToRestore.getTenantRekeys(),
PKIDs: md.dataToRestore.getPKIDs(),
ValidateOnly: md.dataToRestore.isValidateOnly(),
MemoryMonitorSSTs: memMonSSTs,
}

Expand Down Expand Up @@ -176,34 +182,34 @@ func distRestore(
// It tries to take the cluster size into account so that larger clusters
// distribute more chunks amongst them so that after scattering there isn't
// a large varience in the distribution of entries.
chunkSize := int(math.Sqrt(float64(numImportSpans))) / numNodes
chunkSize := int(math.Sqrt(float64(md.numImportSpans))) / numNodes
if chunkSize == 0 {
chunkSize = 1
}

id := execCtx.ExecCfg().NodeInfo.NodeID.SQLInstanceID()

spec := &execinfrapb.GenerativeSplitAndScatterSpec{
TableRekeys: dataToRestore.getRekeys(),
TenantRekeys: dataToRestore.getTenantRekeys(),
ValidateOnly: dataToRestore.isValidateOnly(),
URIs: uris,
Encryption: encryption,
EndTime: restoreTime,
Spans: dataToRestore.getSpans(),
BackupLocalityInfo: backupLocalityInfo,
HighWater: spanFilter.highWaterMark,
TableRekeys: md.dataToRestore.getRekeys(),
TenantRekeys: md.dataToRestore.getTenantRekeys(),
ValidateOnly: md.dataToRestore.isValidateOnly(),
URIs: md.uris,
Encryption: md.encryption,
EndTime: md.restoreTime,
Spans: md.dataToRestore.getSpans(),
BackupLocalityInfo: md.backupLocalityInfo,
HighWater: md.spanFilter.highWaterMark,
UserProto: execCtx.User().EncodeProto(),
TargetSize: spanFilter.targetSize,
TargetSize: md.spanFilter.targetSize,
ChunkSize: int64(chunkSize),
NumEntries: int64(numImportSpans),
NumEntries: int64(md.numImportSpans),
NumNodes: int64(numNodes),
UseSimpleImportSpans: useSimpleImportSpans,
UseFrontierCheckpointing: spanFilter.useFrontierCheckpointing,
JobID: int64(jobID),
UseSimpleImportSpans: md.useSimpleImportSpans,
UseFrontierCheckpointing: md.spanFilter.useFrontierCheckpointing,
JobID: int64(md.jobID),
}
if spanFilter.useFrontierCheckpointing {
spec.CheckpointedSpans = persistFrontier(spanFilter.checkpointFrontier, 0)
if md.spanFilter.useFrontierCheckpointing {
spec.CheckpointedSpans = persistFrontier(md.spanFilter.checkpointFrontier, 0)
}

proc := physicalplan.Processor{
Expand Down Expand Up @@ -292,6 +298,10 @@ func distRestore(
// Send the progress up a level to be written to the manifest.
progCh <- meta.BulkProcessorProgress
}

if meta.AggregatorEvents != nil {
tracingAggCh <- meta.AggregatorEvents
}
return nil
}

Expand All @@ -309,7 +319,7 @@ func distRestore(
defer recv.Release()

execCfg := execCtx.ExecCfg()
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, md.jobID)

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
Expand Down
29 changes: 20 additions & 9 deletions pkg/ccl/streamingccl/streamproducer/replication_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,17 +950,28 @@ USE d;`)
codec := source.mu.codec.(*partitionStreamDecoder)
updateCount := 0
for {
source.mu.Lock()
require.True(t, source.mu.rows.Next())
source.mu.codec.decode()
if codec.e.Batch != nil {
for _, cfg := range codec.e.Batch.SpanConfigs {
if receivedSpanConfigs.maybeAddNewRecord(cfg.SpanConfig, cfg.Timestamp.WallTime) {
updateCount++
func() {
// This codeblock is wrapped in an anonymous function to ensure the source
// gets unlocked if an assertion fails. Else, the test can hang.
source.mu.Lock()
defer source.mu.Unlock()
require.True(t, source.mu.rows.Next())
source.mu.codec.decode()
if codec.e.Batch != nil {
require.Greater(t, len(codec.e.Batch.SpanConfigs), 0, "a non empty batch had zero span config updates")
for _, cfg := range codec.e.Batch.SpanConfigs {
if receivedSpanConfigs.maybeAddNewRecord(cfg.SpanConfig, cfg.Timestamp.WallTime) {
updateCount++
}
}
}
}
source.mu.Unlock()
if codec.e.Checkpoint != nil {
require.Equal(t, 1, len(codec.e.Checkpoint.ResolvedSpans))
// The frontier in the checkpoint must be greater or equal to the commit
// timestamp associated with the latest event.
require.LessOrEqual(t, receivedSpanConfigs.latestTime, codec.e.Checkpoint.ResolvedSpans[0].Timestamp.WallTime)
}
}()
if updateCount == len(expectedSpanConfigs.allUpdates) {
break
}
Expand Down
Loading

0 comments on commit 155dc00

Please sign in to comment.