Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: improve row-level TTL performance using DistSQL #84728

Merged
merged 1 commit into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -984,11 +984,24 @@ message AutoSQLStatsCompactionProgress {
}

message RowLevelTTLDetails {

// TableID is the ID of the table that the TTL job removes records from.
uint32 table_id = 1 [
(gogoproto.customname) = "TableID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"
];

// Cutoff is compared against execinfrapb.TTLSpec.TTLExpr by the
// ttlProcessor to determine what records to delete. Records are deleted
// if TTLExpr <= Cutoff.
google.protobuf.Timestamp cutoff = 2 [(gogoproto.nullable)=false, (gogoproto.stdtime) = true];

// TableVersion is the table descriptor version of the table when the TTLJob
// started.
uint64 table_version = 3 [
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.DescriptorVersion"
];

}

message RowLevelTTLProgress {
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,18 @@ func NewMetadataCallbackWriter(
return &MetadataCallbackWriter{rowResultWriter: rowResultWriter, fn: metaFn}
}

// NewMetadataOnlyMetadataCallbackWriter creates a new MetadataCallbackWriter
// that uses errOnlyResultWriter and only supports receiving
// execinfrapb.ProducerMetadata.
func NewMetadataOnlyMetadataCallbackWriter() *MetadataCallbackWriter {
return NewMetadataCallbackWriter(
&errOnlyResultWriter{},
func(ctx context.Context, meta *execinfrapb.ProducerMetadata) error {
return nil
},
)
}

// errOnlyResultWriter is a rowResultWriter and batchResultWriter that only
// supports receiving an error. All other functions that deal with producing
// results panic.
Expand Down
18 changes: 11 additions & 7 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr"
Expand Down Expand Up @@ -1528,13 +1527,18 @@ type TTLTestingKnobs struct {
// AOSTDuration changes the AOST timestamp duration to add to the
// current time.
AOSTDuration *time.Duration
// ReturnStatsError causes stats errors to be returned instead of logged as warnings.
// RequireMultipleSpanPartitions is a flag to verify that the DistSQL will
// distribute the work across multiple nodes.
RequireMultipleSpanPartitions bool
// ReturnStatsError causes stats errors to be returned instead of logged as
// warnings.
ReturnStatsError bool
// MockTableDescriptorVersionDuringDelete is a version to mock the table descriptor
// as during delete.
MockTableDescriptorVersionDuringDelete *descpb.DescriptorVersion
// PreSelectDeleteStatement runs before the start of the TTL select-delete loop
PreSelectDeleteStatement string
// PreDeleteChangeTableVersion is a flag to change the table descriptor
// during a delete.
PreDeleteChangeTableVersion bool
// PreSelectStatement runs before the start of the TTL select-delete
// loop.
PreSelectStatement string
}

// ModuleTestingKnobs implements the base.ModuleTestingKnobs interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfrapb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ proto_library(
"processors_changefeeds.proto",
"processors_sql.proto",
"processors_table_stats.proto",
"processors_ttl.proto",
],
strip_import_prefix = "/pkg",
visibility = ["//visibility:public"],
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/execinfrapb/flow_diagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,16 @@ func (s *ChangeFrontierSpec) summary() (string, []string) {
return "ChangeFrontier", []string{}
}

// summary implements the diagramCellType interface.
func (s *TTLSpec) summary() (string, []string) {
details := s.RowLevelTTLDetails
return "TTL", []string{
fmt.Sprintf("JobID: %d", s.JobID),
fmt.Sprintf("TableID: %d", details.TableID),
fmt.Sprintf("TableVersion: %d", details.TableVersion),
}
}

type diagramCell struct {
Title string `json:"title"`
Details []string `json:"details"`
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/execinfrapb/processors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ option go_package = "execinfrapb";
import "sql/execinfrapb/data.proto";
import "sql/execinfrapb/processors_base.proto";
import "sql/execinfrapb/processors_sql.proto";
import "sql/execinfrapb/processors_ttl.proto";
import "sql/execinfrapb/processors_bulk_io.proto";
import "sql/execinfrapb/processors_changefeeds.proto";
import "sql/execinfrapb/processors_table_stats.proto";
Expand Down Expand Up @@ -120,6 +121,7 @@ message ProcessorCoreUnion {
optional StreamIngestionFrontierSpec streamIngestionFrontier = 36;
optional ExportSpec exporter = 37;
optional IndexBackfillMergerSpec indexBackfillMerger = 38;
optional TTLSpec ttl = 39;

reserved 6, 12, 14, 17, 18, 19, 20;
}
Expand Down
85 changes: 85 additions & 0 deletions pkg/sql/execinfrapb/processors_ttl.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
//
// Processor definitions for distributed SQL APIs. See
// docs/RFCS/distributed_sql.md.
// All the concepts here are "physical plan" concepts.

syntax = "proto2";
// Beware! This package name must not be changed, even though it doesn't match
// the Go package name, because it defines the Protobuf message names which
// can't be changed without breaking backward compatibility.
package cockroach.sql.distsqlrun;
option go_package = "execinfrapb";

import "gogoproto/gogo.proto";
import "google/protobuf/timestamp.proto";
import "roachpb/data.proto";
import "jobs/jobspb/jobs.proto";

message TTLSpec {

// JobID of the job that ran the ttlProcessor.
optional int64 job_id = 1 [
(gogoproto.nullable) = false,
(gogoproto.customname) = "JobID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/jobs/jobspb.JobID"
];

// RowLevelTTLDetails are the details of the job that ran the ttlProcessor.
optional jobs.jobspb.RowLevelTTLDetails row_level_ttl_details = 2 [
(gogoproto.nullable) = false,
(gogoproto.customname) = "RowLevelTTLDetails"
];

// AOST is the AS OF SYSTEM TIME value the ttlProcessor uses to select records.
optional google.protobuf.Timestamp aost = 3 [
(gogoproto.nullable) = false,
(gogoproto.customname) = "AOST",
(gogoproto.stdtime) = true
];

// TTLExpr is compared against jobspb.RowLevelTTLDetails.Cutoff by the
// ttlProcessor to determine what records to delete. Records are deleted
// if TTLExpr <= Cutoff.
optional string ttl_expr = 4 [
(gogoproto.nullable) = false,
(gogoproto.customname) = "TTLExpr",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb.Expression"
];

// Spans determine which records are processed by which nodes in the DistSQL
// flow.
repeated roachpb.Span spans = 5 [(gogoproto.nullable) = false];

// RangeConcurrency controls how many ranges a single ttlProcessor processes
// in parallel.
optional int64 range_concurrency = 6 [(gogoproto.nullable) = false];

// SelectBatchSize controls the batch size for SELECTs.
optional int64 select_batch_size = 7 [(gogoproto.nullable) = false];

// DeleteBatchSize controls the batch size for DELETEs.
optional int64 delete_batch_size = 8 [(gogoproto.nullable) = false];

// DeleteRateLimit controls how many records can be deleted per second.
optional int64 delete_rate_limit = 9 [(gogoproto.nullable) = false];

// LabelMetrics controls if metrics are labeled with the name of the table being TTLed.
optional bool label_metrics = 10 [(gogoproto.nullable) = false];

// PreDeleteChangeTableVersion is a test flag to change the table
// descriptor before a delete.
optional bool pre_delete_change_table_version = 11 [(gogoproto.nullable) = false];

// PreSelectStatement is a test setting to run a SQL statement
// before selecting records.
optional string pre_select_statement = 12 [(gogoproto.nullable) = false];
}
9 changes: 9 additions & 0 deletions pkg/sql/rowexec/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,12 @@ func NewProcessor(
}
return backfill.NewIndexBackfillMerger(ctx, flowCtx, *core.IndexBackfillMerger, outputs[0])
}
if core.Ttl != nil {
if err := checkNumInOut(inputs, outputs, 0, 1); err != nil {
return nil, err
}
return NewTTLProcessor(flowCtx, processorID, *core.Ttl, outputs[0])
}
return nil, errors.Errorf("unsupported processor core %q", core)
}

Expand Down Expand Up @@ -391,3 +397,6 @@ var NewChangeFrontierProcessor func(*execinfra.FlowCtx, int32, execinfrapb.Chang

// NewStreamIngestionFrontierProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization.
var NewStreamIngestionFrontierProcessor func(*execinfra.FlowCtx, int32, execinfrapb.StreamIngestionFrontierSpec, execinfra.RowSource, *execinfrapb.PostProcessSpec, execinfra.RowReceiver) (execinfra.Processor, error)

// NewTTLProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization.
var NewTTLProcessor func(*execinfra.FlowCtx, int32, execinfrapb.TTLSpec, execinfra.RowReceiver) (execinfra.Processor, error)
7 changes: 6 additions & 1 deletion pkg/sql/ttl/ttljob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttljob",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
Expand All @@ -27,12 +28,17 @@ go_library(
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/lexbase",
"//pkg/sql/physicalplan",
"//pkg/sql/rowenc",
"//pkg/sql/rowexec",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqltelemetry",
"//pkg/sql/sqlutil",
"//pkg/sql/types",
"//pkg/util/ctxgroup",
"//pkg/util/log",
Expand Down Expand Up @@ -68,7 +74,6 @@ go_test(
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/lexbase",
"//pkg/sql/parser",
Expand Down