Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: hookup tracing aggregator events from the restore job #108359

Merged
merged 1 commit into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"context"
"fmt"
"runtime"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils"
Expand Down Expand Up @@ -41,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
gogotypes "github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -76,7 +78,10 @@ type restoreDataProcessor struct {
// concurrent workers and sent down the flow by the processor.
progCh chan backuppb.RestoreProgress

agg *bulkutil.TracingAggregator
// Aggregator that aggregates StructuredEvents emitted in the
// restoreDataProcessors' trace recording.
agg *bulkutil.TracingAggregator
aggTimer *timeutil.Timer

// qp is a MemoryBackedQuotaPool that restricts the amount of memory that
// can be used by this processor to open iterators on SSTs.
Expand Down Expand Up @@ -206,7 +211,7 @@ func newRestoreDataProcessor(
InputsToDrain: []execinfra.RowSource{input},
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
rd.ConsumerClosed()
return nil
return []execinfrapb.ProducerMetadata{*rd.constructTracingAggregatorProducerMeta(ctx)}
},
}); err != nil {
return nil, err
Expand All @@ -222,6 +227,8 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) {

ctx, cancel := context.WithCancel(ctx)
ctx, rd.agg = bulkutil.MakeTracingAggregatorWithSpan(ctx, fmt.Sprintf("%s-aggregator", restoreDataProcName), rd.EvalCtx.Tracer)
rd.aggTimer = timeutil.NewTimer()
rd.aggTimer.Reset(15 * time.Second)

rd.cancelWorkersAndWait = func() {
cancel()
Expand Down Expand Up @@ -479,10 +486,6 @@ func (rd *restoreDataProcessor) runRestoreWorkers(
return err
}

ctx, agg := bulkutil.MakeTracingAggregatorWithSpan(ctx,
fmt.Sprintf("%s-worker-%d-aggregator", restoreDataProcName, worker), rd.EvalCtx.Tracer)
defer agg.Close()

var sstIter mergedSST
for {
done, err := func() (done bool, _ error) {
Expand Down Expand Up @@ -680,6 +683,28 @@ func makeProgressUpdate(
return progDetails
}

func (rd *restoreDataProcessor) constructTracingAggregatorProducerMeta(
ctx context.Context,
) *execinfrapb.ProducerMetadata {
aggEvents := &execinfrapb.TracingAggregatorEvents{
SQLInstanceID: rd.flowCtx.NodeID.SQLInstanceID(),
FlowID: rd.flowCtx.ID,
Events: make(map[string][]byte),
}
rd.agg.ForEachAggregatedEvent(func(name string, event bulkutil.TracingAggregatorEvent) {
var data []byte
var err error
if data, err = bulkutil.TracingAggregatorEventToBytes(ctx, event); err != nil {
// This should never happen but if it does skip the aggregated event.
log.Warningf(ctx, "failed to unmarshal aggregated event: %v", err.Error())
return
}
aggEvents.Events[name] = data
})

return &execinfrapb.ProducerMetadata{AggregatorEvents: aggEvents}
}

// Next is part of the RowSource interface.
func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
if rd.State != execinfra.StateRunning {
Expand All @@ -702,14 +727,17 @@ func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Produce
return nil, rd.DrainHelper()
}
prog.ProgressDetails = *details
return nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog}
case <-rd.aggTimer.C:
rd.aggTimer.Read = true
rd.aggTimer.Reset(15 * time.Second)
return nil, rd.constructTracingAggregatorProducerMeta(rd.Ctx())
case meta := <-rd.metaCh:
return nil, meta
case <-rd.Ctx().Done():
rd.MoveToDraining(rd.Ctx().Err())
return nil, rd.DrainHelper()
}

return nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog}
}

// ConsumerClosed is part of the RowSource interface.
Expand Down
42 changes: 31 additions & 11 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -402,6 +403,21 @@ func restore(
}
tasks = append(tasks, generativeCheckpointLoop)

tracingAggCh := make(chan *execinfrapb.TracingAggregatorEvents)
tracingAggLoop := func(ctx context.Context) error {
if err := bulk.AggregateTracingStats(ctx, job.ID(),
execCtx.ExecCfg().Settings, execCtx.ExecCfg().InternalDB, tracingAggCh); err != nil {
log.Warningf(ctx, "failed to aggregate tracing stats: %v", err)
// Even if we fail to aggregate tracing stats, we must continue draining
// the channel so that the sender in the DistSQLReceiver does not block
// and allows the backup to continue uninterrupted.
for range tracingAggCh {
}
}
return nil
}
tasks = append(tasks, tracingAggLoop)

runRestore := func(ctx context.Context) error {
if details.ExperimentalOnline {
log.Warningf(ctx, "EXPERIMENTAL ONLINE RESTORE being used")
Expand All @@ -422,21 +438,25 @@ func restore(
genSpan,
)
}
md := restoreJobMetadata{
jobID: job.ID(),
dataToRestore: dataToRestore,
restoreTime: endTime,
encryption: encryption,
kmsEnv: kmsEnv,
uris: details.URIs,
backupLocalityInfo: backupLocalityInfo,
spanFilter: filter,
numImportSpans: numImportSpans,
useSimpleImportSpans: simpleImportSpans,
execLocality: details.ExecutionLocality,
}
return distRestore(
ctx,
execCtx,
job.ID(),
dataToRestore,
endTime,
encryption,
kmsEnv,
details.URIs,
backupLocalityInfo,
filter,
numImportSpans,
simpleImportSpans,
details.ExecutionLocality,
md,
progCh,
tracingAggCh,
)
}
tasks = append(tasks, runRestore)
Expand Down
92 changes: 51 additions & 41 deletions pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ var memoryMonitorSSTs = settings.RegisterBoolSetting(
false,
)

type restoreJobMetadata struct {
jobID jobspb.JobID
dataToRestore restorationData
restoreTime hlc.Timestamp
encryption *jobspb.BackupEncryptionOptions
kmsEnv cloud.KMSEnv
uris []string
backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo
spanFilter spanCoveringFilter
numImportSpans int
useSimpleImportSpans bool
execLocality roachpb.Locality
}
stevendanna marked this conversation as resolved.
Show resolved Hide resolved

// distRestore plans a 2 stage distSQL flow for a distributed restore. It
// streams back progress updates over the given progCh. The first stage is a
// splitAndScatter processor on every node that is running a compatible version.
Expand All @@ -69,24 +83,16 @@ var memoryMonitorSSTs = settings.RegisterBoolSetting(
func distRestore(
ctx context.Context,
execCtx sql.JobExecContext,
jobID jobspb.JobID,
dataToRestore restorationData,
restoreTime hlc.Timestamp,
encryption *jobspb.BackupEncryptionOptions,
kmsEnv cloud.KMSEnv,
uris []string,
backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo,
spanFilter spanCoveringFilter,
numImportSpans int,
useSimpleImportSpans bool,
execLocality roachpb.Locality,
md restoreJobMetadata,
progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
tracingAggCh chan *execinfrapb.TracingAggregatorEvents,
) error {
defer close(progCh)
defer close(tracingAggCh)
var noTxn *kv.Txn

if encryption != nil && encryption.Mode == jobspb.EncryptionMode_KMS {
kms, err := cloud.KMSFromURI(ctx, encryption.KMSInfo.Uri, kmsEnv)
if md.encryption != nil && md.encryption.Mode == jobspb.EncryptionMode_KMS {
kms, err := cloud.KMSFromURI(ctx, md.encryption.KMSInfo.Uri, md.kmsEnv)
if err != nil {
return err
}
Expand All @@ -97,7 +103,7 @@ func distRestore(
}
}()

encryption.Key, err = kms.Decrypt(ctx, encryption.KMSInfo.EncryptedDataKey)
md.encryption.Key, err = kms.Decrypt(ctx, md.encryption.KMSInfo.EncryptedDataKey)
if err != nil {
return errors.Wrap(err,
"failed to decrypt data key before starting BackupDataProcessor")
Expand All @@ -106,16 +112,16 @@ func distRestore(
// Wrap the relevant BackupEncryptionOptions to be used by the Restore
// processor.
var fileEncryption *kvpb.FileEncryptionOptions
if encryption != nil {
fileEncryption = &kvpb.FileEncryptionOptions{Key: encryption.Key}
if md.encryption != nil {
fileEncryption = &kvpb.FileEncryptionOptions{Key: md.encryption.Key}
}

memMonSSTs := memoryMonitorSSTs.Get(execCtx.ExecCfg().SV())
makePlan := func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {

planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanningWithOracle(
ctx, execCtx.ExtendedEvalContext(), execCtx.ExecCfg(),
physicalplan.DefaultReplicaChooser, execLocality,
physicalplan.DefaultReplicaChooser, md.execLocality,
)
if err != nil {
return nil, nil, err
Expand All @@ -125,13 +131,13 @@ func distRestore(
p := planCtx.NewPhysicalPlan()

restoreDataSpec := execinfrapb.RestoreDataSpec{
JobID: int64(jobID),
RestoreTime: restoreTime,
JobID: int64(md.jobID),
RestoreTime: md.restoreTime,
Encryption: fileEncryption,
TableRekeys: dataToRestore.getRekeys(),
TenantRekeys: dataToRestore.getTenantRekeys(),
PKIDs: dataToRestore.getPKIDs(),
ValidateOnly: dataToRestore.isValidateOnly(),
TableRekeys: md.dataToRestore.getRekeys(),
TenantRekeys: md.dataToRestore.getTenantRekeys(),
PKIDs: md.dataToRestore.getPKIDs(),
ValidateOnly: md.dataToRestore.isValidateOnly(),
MemoryMonitorSSTs: memMonSSTs,
}

Expand Down Expand Up @@ -175,34 +181,34 @@ func distRestore(
// It tries to take the cluster size into account so that larger clusters
// distribute more chunks amongst them so that after scattering there isn't
// a large varience in the distribution of entries.
chunkSize := int(math.Sqrt(float64(numImportSpans))) / numNodes
chunkSize := int(math.Sqrt(float64(md.numImportSpans))) / numNodes
if chunkSize == 0 {
chunkSize = 1
}

id := execCtx.ExecCfg().NodeInfo.NodeID.SQLInstanceID()

spec := &execinfrapb.GenerativeSplitAndScatterSpec{
TableRekeys: dataToRestore.getRekeys(),
TenantRekeys: dataToRestore.getTenantRekeys(),
ValidateOnly: dataToRestore.isValidateOnly(),
URIs: uris,
Encryption: encryption,
EndTime: restoreTime,
Spans: dataToRestore.getSpans(),
BackupLocalityInfo: backupLocalityInfo,
HighWater: spanFilter.highWaterMark,
TableRekeys: md.dataToRestore.getRekeys(),
TenantRekeys: md.dataToRestore.getTenantRekeys(),
ValidateOnly: md.dataToRestore.isValidateOnly(),
URIs: md.uris,
Encryption: md.encryption,
EndTime: md.restoreTime,
Spans: md.dataToRestore.getSpans(),
BackupLocalityInfo: md.backupLocalityInfo,
HighWater: md.spanFilter.highWaterMark,
UserProto: execCtx.User().EncodeProto(),
TargetSize: spanFilter.targetSize,
TargetSize: md.spanFilter.targetSize,
ChunkSize: int64(chunkSize),
NumEntries: int64(numImportSpans),
NumEntries: int64(md.numImportSpans),
NumNodes: int64(numNodes),
UseSimpleImportSpans: useSimpleImportSpans,
UseFrontierCheckpointing: spanFilter.useFrontierCheckpointing,
JobID: int64(jobID),
UseSimpleImportSpans: md.useSimpleImportSpans,
UseFrontierCheckpointing: md.spanFilter.useFrontierCheckpointing,
JobID: int64(md.jobID),
}
if spanFilter.useFrontierCheckpointing {
spec.CheckpointedSpans = persistFrontier(spanFilter.checkpointFrontier, 0)
if md.spanFilter.useFrontierCheckpointing {
spec.CheckpointedSpans = persistFrontier(md.spanFilter.checkpointFrontier, 0)
}

proc := physicalplan.Processor{
Expand Down Expand Up @@ -291,6 +297,10 @@ func distRestore(
// Send the progress up a level to be written to the manifest.
progCh <- meta.BulkProcessorProgress
}

if meta.AggregatorEvents != nil {
tracingAggCh <- meta.AggregatorEvents
}
return nil
}

Expand All @@ -308,7 +318,7 @@ func distRestore(
defer recv.Release()

execCfg := execCtx.ExecCfg()
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, md.jobID)

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/bulk/bulkpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_library(
"//pkg/util/humanizeutil",
"//pkg/util/log",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@io_opentelemetry_go_otel//attribute",
],
)
2 changes: 2 additions & 0 deletions pkg/kv/bulk/bulkpb/bulkpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import "util/hlc/timestamp.proto";
// IngestionPerformanceStats is a message containing information about the
// creation of SSTables by an SSTBatcher or BufferingAdder.
message IngestionPerformanceStats {
option (gogoproto.goproto_stringer) = false;

// LogicalDataSize is the total byte size of all the KVs ingested.
int64 logical_data_size = 1;

Expand Down
Loading
Loading