Skip to content

Commit

Permalink
protectedts: add target field to pts record
Browse files Browse the repository at this point in the history
Protected timestamp records are moving away from
the notion of protecting spans, and instead will operate
on objects. Objects will be defined as:

- Cluster
- Tenant
- Schema objects (database and table)

This change deprecates the Spans field on `ptpb.Record`.
Additionally, it adds a `oneof` target field that reflects
which of the above objects the record corresponds to.
This information will be needed by the SQLTranslator/Reconciler
in future work to emit SpanConfigurations based on the type
of object the job is protecting.

Informs: cockroachdb#73727

Release note: None
  • Loading branch information
adityamaru committed Dec 22, 2021
1 parent bf8034e commit 826929d
Show file tree
Hide file tree
Showing 16 changed files with 950 additions and 120 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/schedule_pts_chaining.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func getSpansProtectedByBackup(
return nil, err
}

return ptsRecord.Spans, nil
return ptsRecord.DeprecatedSpans, nil
}

func protectTimestampRecordForSchedule(
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3527,8 +3527,8 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
if ptr == nil {
return errors.Errorf("expected protected timestamp")
}
require.Equal(t, len(ptr.Spans), len(expectedKeys), ptr.Spans, expectedKeys)
for _, s := range ptr.Spans {
require.Equal(t, len(ptr.DeprecatedSpans), len(expectedKeys), ptr.DeprecatedSpans, expectedKeys)
for _, s := range ptr.DeprecatedSpans {
require.Contains(t, expectedKeys, string(s.Key))
}
return nil
Expand Down
12 changes: 6 additions & 6 deletions pkg/jobs/jobsprotectedts/jobs_protected_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ func MakeRecord(
metaType MetaType,
) *ptpb.Record {
return &ptpb.Record{
ID: recordID.GetBytesMut(),
Timestamp: tsToProtect,
Mode: ptpb.PROTECT_AFTER,
MetaType: metaTypes[metaType],
Meta: encodeID(metaID),
Spans: spans,
ID: recordID.GetBytesMut(),
Timestamp: tsToProtect,
Mode: ptpb.PROTECT_AFTER,
MetaType: metaTypes[metaType],
Meta: encodeID(metaID),
DeprecatedSpans: spans,
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_protectedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestProtectedTimestamps(t *testing.T) {
ID: uuid.MakeV4().GetBytes(),
Timestamp: s0.Clock().Now(),
Mode: ptpb.PROTECT_AFTER,
Spans: []roachpb.Span{
DeprecatedSpans: []roachpb.Span{
{
Key: startKey,
EndKey: startKey.PrefixEnd(),
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/protectedts/ptcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ func (c *Cache) upToDate(asOf hlc.Timestamp) bool {
}

func overlaps(r *ptpb.Record, sp roachpb.Span) bool {
for i := range r.Spans {
if r.Spans[i].Overlaps(sp) {
for i := range r.DeprecatedSpans {
if r.DeprecatedSpans[i].Overlaps(sp) {
return true
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/protectedts/ptcache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,10 @@ func protect(
) (r *ptpb.Record, createdAt hlc.Timestamp) {
protectTS := s.Clock().Now()
r = &ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: protectTS,
Mode: ptpb.PROTECT_AFTER,
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
Timestamp: protectTS,
Mode: ptpb.PROTECT_AFTER,
DeprecatedSpans: spans,
}
ctx := context.Background()
txn := s.DB().NewTxn(ctx, "test")
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/protectedts/ptpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ proto_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/roachpb:roachpb_proto",
"//pkg/sql/catalog/descpb:descpb_proto",
"//pkg/util/hlc:hlc_proto",
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
"@go_googleapis//google/api:annotations_proto",
Expand All @@ -22,6 +23,7 @@ go_proto_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/roachpb:with-mocks",
"//pkg/sql/catalog/descpb",
"//pkg/util/hlc",
"//pkg/util/uuid", # keep
"@com_github_gogo_protobuf//gogoproto",
Expand Down
893 changes: 847 additions & 46 deletions pkg/kv/kvserver/protectedts/ptpb/protectedts.pb.go

Large diffs are not rendered by default.

31 changes: 29 additions & 2 deletions pkg/kv/kvserver/protectedts/ptpb/protectedts.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import "gogoproto/gogo.proto";
import "google/api/annotations.proto";
import "roachpb/data.proto";
import "util/hlc/timestamp.proto";
import "sql/catalog/descpb/structured.proto";


// TODO(ajwerner): Consider splitting up Record into two pieces. It would
Expand Down Expand Up @@ -88,6 +89,21 @@ message Metadata {

// Record corresponds to a protected timestamp.
message Record {
message SchemaObjectsTarget {
// IDs are the descriptor IDs of the schema objects being protected by this
// Record. This field will only contain database and table IDs.
repeated uint32 ids = 1 [(gogoproto.customname) = "IDs",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"];
}

message TenantsTarget {
// IDs are the tenant IDs being protected by this Record.
repeated roachpb.TenantID ids = 1 [(gogoproto.customname) = "IDs"];
}

message ClusterTarget {
// Indicates that the entire cluster is being protected by this Record.
}

// ID uniquely identifies this row.
bytes id = 1 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.Bytes", (gogoproto.customname) = "ID"];
Expand Down Expand Up @@ -119,8 +135,19 @@ message Record {
// change the Version of the subsystem.
bool verified = 6;

// Spans are the spans which this Record protects.
repeated roachpb.Span spans = 7 [(gogoproto.nullable) = false];
// DeprecatedSpans are the spans which this Record protects.
repeated roachpb.Span deprecated_spans = 7 [(gogoproto.nullable) = false];

// Target holds information about what this Record protects. The Record can
// either protect the entire cluster, a subset of tenants, or individual
// schema objects (database and table).
oneof target {
SchemaObjectsTarget schema_objects = 8;
TenantsTarget tenants = 9;
ClusterTarget cluster = 10;
}

// next ID: 11
}

// State is the complete system state.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestReconciler(t *testing.T) {
Mode: ptpb.PROTECT_AFTER,
MetaType: testTaskType,
Meta: []byte(recMeta),
Spans: []roachpb.Span{
DeprecatedSpans: []roachpb.Span{
{Key: keys.MinKey, EndKey: keys.MaxKey},
},
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/protectedts/ptstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro
if txn == nil {
return errNoTxn
}
encodedSpans, err := protoutil.Marshal(&Spans{Spans: r.Spans})
encodedSpans, err := protoutil.Marshal(&Spans{Spans: r.DeprecatedSpans})
if err != nil { // how can this possibly fail?
return errors.Wrap(err, "failed to marshal spans")
}
Expand All @@ -95,10 +95,10 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro
it, err := p.ex.QueryIteratorEx(ctx, "protectedts-protect", txn,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
protectQuery,
s.maxSpans, s.maxBytes, len(r.Spans),
s.maxSpans, s.maxBytes, len(r.DeprecatedSpans),
r.ID, r.Timestamp.AsOfSystemTime(),
r.MetaType, meta,
len(r.Spans), encodedSpans)
len(r.DeprecatedSpans), encodedSpans)
if err != nil {
return errors.Wrapf(err, "failed to write record %v", r.ID)
}
Expand All @@ -115,10 +115,10 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro
}
if failed := *row[0].(*tree.DBool); failed {
curNumSpans := int64(*row[1].(*tree.DInt))
if s.maxSpans > 0 && curNumSpans+int64(len(r.Spans)) > s.maxSpans {
if s.maxSpans > 0 && curNumSpans+int64(len(r.DeprecatedSpans)) > s.maxSpans {
return errors.WithHint(
errors.Errorf("protectedts: limit exceeded: %d+%d > %d spans", curNumSpans,
len(r.Spans), s.maxSpans),
len(r.DeprecatedSpans), s.maxSpans),
"SET CLUSTER SETTING kv.protectedts.max_spans to a higher value")
}
curBytes := int64(*row[2].(*tree.DInt))
Expand Down Expand Up @@ -271,7 +271,7 @@ func rowToRecord(ctx context.Context, row tree.Datums, r *ptpb.Record) error {
if err := protoutil.Unmarshal([]byte(*row[4].(*tree.DBytes)), &spans); err != nil {
return errors.Wrapf(err, "failed to unmarshal spans for %v", r.ID)
}
r.Spans = spans.Spans
r.DeprecatedSpans = spans.Spans
r.Verified = bool(*row[5].(*tree.DBool))
return nil
}
Expand Down Expand Up @@ -303,7 +303,7 @@ func validateRecordForProtect(r *ptpb.Record) error {
if r.ID.GetUUID() == uuid.Nil {
return errZeroID
}
if len(r.Spans) == 0 {
if len(r.DeprecatedSpans) == 0 {
return errEmptySpans
}
if len(r.Meta) > 0 && len(r.MetaType) == 0 {
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ func (r releaseOp) run(ctx context.Context, t *testing.T, tCtx *testContext) {
}
tCtx.state.Version++
tCtx.state.NumRecords--
tCtx.state.NumSpans -= uint64(len(rec.Spans))
encoded, err := protoutil.Marshal(&ptstorage.Spans{Spans: rec.Spans})
tCtx.state.NumSpans -= uint64(len(rec.DeprecatedSpans))
encoded, err := protoutil.Marshal(&ptstorage.Spans{Spans: rec.DeprecatedSpans})
require.NoError(t, err)
tCtx.state.TotalBytes -= uint64(len(encoded) + len(rec.Meta) + len(rec.MetaType))
}
Expand Down Expand Up @@ -392,7 +392,7 @@ func (p protectOp) run(ctx context.Context, t *testing.T, tCtx *testContext) {
tCtx.state.Records = append(tCtx.state.Records, tail...)
tCtx.state.Version++
tCtx.state.NumRecords++
tCtx.state.NumSpans += uint64(len(rec.Spans))
tCtx.state.NumSpans += uint64(len(rec.DeprecatedSpans))
encoded, err := protoutil.Marshal(&ptstorage.Spans{Spans: p.spans})
require.NoError(t, err)
tCtx.state.TotalBytes += uint64(len(encoded) + len(p.meta) + len(p.metaType))
Expand Down Expand Up @@ -503,12 +503,12 @@ func tableSpans(tableIDs ...uint32) []roachpb.Span {

func newRecord(ts hlc.Timestamp, metaType string, meta []byte, spans ...roachpb.Span) ptpb.Record {
return ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: ts,
Mode: ptpb.PROTECT_AFTER,
MetaType: metaType,
Meta: meta,
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
Timestamp: ts,
Mode: ptpb.PROTECT_AFTER,
MetaType: metaType,
Meta: meta,
DeprecatedSpans: spans,
}
}

Expand Down
34 changes: 17 additions & 17 deletions pkg/kv/kvserver/protectedts/ptstorage/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,38 +34,38 @@ func TestValidateRecordForProtect(t *testing.T) {
}{
{
r: &ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
MetaType: "job",
Meta: []byte("junk"),
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
MetaType: "job",
Meta: []byte("junk"),
DeprecatedSpans: spans,
},
err: nil,
},
{
r: &ptpb.Record{
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
MetaType: "job",
Meta: []byte("junk"),
Spans: spans,
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
MetaType: "job",
Meta: []byte("junk"),
DeprecatedSpans: spans,
},
err: errZeroID,
},
{
r: &ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
MetaType: "job",
Meta: []byte("junk"),
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
MetaType: "job",
Meta: []byte("junk"),
DeprecatedSpans: spans,
},
err: errZeroTimestamp,
},
{
r: &ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
Meta: []byte("junk"),
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
Meta: []byte("junk"),
DeprecatedSpans: spans,
},
err: errInvalidMeta,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/protectedts/ptverifier/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func getRecordWithTimestamp(

func makeVerificationBatch(r *ptpb.Record, aliveAt hlc.Timestamp) kv.Batch {
// Need to perform validation, build a batch and run it.
mergedSpans, _ := roachpb.MergeSpans(&r.Spans)
mergedSpans, _ := roachpb.MergeSpans(&r.DeprecatedSpans)
var b kv.Batch
for _, s := range mergedSpans {
var req roachpb.AdminVerifyProtectedTimestampRequest
Expand Down
24 changes: 12 additions & 12 deletions pkg/kv/kvserver/protectedts/ptverifier/verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ func TestVerifier(t *testing.T) {
spans[i] = makeTableSpan(tid)
}
r := ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: s.Clock().Now(),
Mode: ptpb.PROTECT_AFTER,
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
Timestamp: s.Clock().Now(),
Mode: ptpb.PROTECT_AFTER,
DeprecatedSpans: spans,
}
require.Nil(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return pts.Protect(ctx, txn, &r)
Expand Down Expand Up @@ -138,8 +138,8 @@ func TestVerifier(t *testing.T) {
resp.Add(&roachpb.AdminVerifyProtectedTimestampResponse{
VerificationFailedRanges: []roachpb.AdminVerifyProtectedTimestampResponse_FailedRange{{
RangeID: 42,
StartKey: roachpb.RKey(r.Spans[0].Key),
EndKey: roachpb.RKey(r.Spans[0].EndKey),
StartKey: roachpb.RKey(r.DeprecatedSpans[0].Key),
EndKey: roachpb.RKey(r.DeprecatedSpans[0].EndKey),
}},
})
return &resp, nil
Expand All @@ -166,16 +166,16 @@ func TestVerifier(t *testing.T) {
resp.Add(&roachpb.AdminVerifyProtectedTimestampResponse{
VerificationFailedRanges: []roachpb.AdminVerifyProtectedTimestampResponse_FailedRange{{
RangeID: 42,
StartKey: roachpb.RKey(r.Spans[0].Key),
EndKey: roachpb.RKey(r.Spans[0].EndKey),
StartKey: roachpb.RKey(r.DeprecatedSpans[0].Key),
EndKey: roachpb.RKey(r.DeprecatedSpans[0].EndKey),
Reason: "foo",
}},
})
resp.Add(&roachpb.AdminVerifyProtectedTimestampResponse{
VerificationFailedRanges: []roachpb.AdminVerifyProtectedTimestampResponse_FailedRange{{
RangeID: 12,
StartKey: roachpb.RKey(r.Spans[1].Key),
EndKey: roachpb.RKey(r.Spans[1].EndKey),
StartKey: roachpb.RKey(r.DeprecatedSpans[1].Key),
EndKey: roachpb.RKey(r.DeprecatedSpans[1].EndKey),
Reason: "bar",
}},
})
Expand Down Expand Up @@ -206,8 +206,8 @@ func TestVerifier(t *testing.T) {
resp.Add(&roachpb.AdminVerifyProtectedTimestampResponse{
DeprecatedFailedRanges: []roachpb.RangeDescriptor{{
RangeID: 42,
StartKey: roachpb.RKey(r.Spans[0].Key),
EndKey: roachpb.RKey(r.Spans[0].EndKey),
StartKey: roachpb.RKey(r.DeprecatedSpans[0].Key),
EndKey: roachpb.RKey(r.DeprecatedSpans[0].EndKey),
}},
})
return &resp, nil
Expand Down
Loading

0 comments on commit 826929d

Please sign in to comment.