Skip to content

Commit

Permalink
protectedts: add target field to pts record
Browse files Browse the repository at this point in the history
Protected timestamp records are moving away from
the notion of protecting spans, and instead will operate
on objects. Objects will be defined as:

- Cluster
- Tenant
- Schema objects (database and table)

This change deprecates the Spans field on `ptpb.Record`.
Additionally, it adds a `oneof` target field that reflects
which of the above objects the record corresponds to.
This information will be needed by the SQLTranslator/Reconciler
in future work to emit SpanConfigurations based on the type
of object the job is protecting.

Informs: #73727

Release note: None
  • Loading branch information
adityamaru committed Dec 22, 2021
1 parent bf8034e commit 2ed96a6
Show file tree
Hide file tree
Showing 15 changed files with 809 additions and 120 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/schedule_pts_chaining.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func getSpansProtectedByBackup(
return nil, err
}

return ptsRecord.Spans, nil
return ptsRecord.DeprecatedSpans, nil
}

func protectTimestampRecordForSchedule(
Expand Down
12 changes: 6 additions & 6 deletions pkg/jobs/jobsprotectedts/jobs_protected_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ func MakeRecord(
metaType MetaType,
) *ptpb.Record {
return &ptpb.Record{
ID: recordID.GetBytesMut(),
Timestamp: tsToProtect,
Mode: ptpb.PROTECT_AFTER,
MetaType: metaTypes[metaType],
Meta: encodeID(metaID),
Spans: spans,
ID: recordID.GetBytesMut(),
Timestamp: tsToProtect,
Mode: ptpb.PROTECT_AFTER,
MetaType: metaTypes[metaType],
Meta: encodeID(metaID),
DeprecatedSpans: spans,
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_protectedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestProtectedTimestamps(t *testing.T) {
ID: uuid.MakeV4().GetBytes(),
Timestamp: s0.Clock().Now(),
Mode: ptpb.PROTECT_AFTER,
Spans: []roachpb.Span{
DeprecatedSpans: []roachpb.Span{
{
Key: startKey,
EndKey: startKey.PrefixEnd(),
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/protectedts/ptcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ func (c *Cache) upToDate(asOf hlc.Timestamp) bool {
}

func overlaps(r *ptpb.Record, sp roachpb.Span) bool {
for i := range r.Spans {
if r.Spans[i].Overlaps(sp) {
for i := range r.DeprecatedSpans {
if r.DeprecatedSpans[i].Overlaps(sp) {
return true
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/protectedts/ptcache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,10 @@ func protect(
) (r *ptpb.Record, createdAt hlc.Timestamp) {
protectTS := s.Clock().Now()
r = &ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: protectTS,
Mode: ptpb.PROTECT_AFTER,
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
Timestamp: protectTS,
Mode: ptpb.PROTECT_AFTER,
DeprecatedSpans: spans,
}
ctx := context.Background()
txn := s.DB().NewTxn(ctx, "test")
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/protectedts/ptpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ proto_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/roachpb:roachpb_proto",
"//pkg/sql/catalog/descpb:descpb_proto",
"//pkg/util/hlc:hlc_proto",
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
"@go_googleapis//google/api:annotations_proto",
Expand All @@ -22,6 +23,7 @@ go_proto_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/roachpb:with-mocks",
"//pkg/sql/catalog/descpb",
"//pkg/util/hlc",
"//pkg/util/uuid", # keep
"@com_github_gogo_protobuf//gogoproto",
Expand Down
760 changes: 712 additions & 48 deletions pkg/kv/kvserver/protectedts/ptpb/protectedts.pb.go

Large diffs are not rendered by default.

27 changes: 25 additions & 2 deletions pkg/kv/kvserver/protectedts/ptpb/protectedts.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import "gogoproto/gogo.proto";
import "google/api/annotations.proto";
import "roachpb/data.proto";
import "util/hlc/timestamp.proto";
import "sql/catalog/descpb/structured.proto";


// TODO(ajwerner): Consider splitting up Record into two pieces. It would
Expand Down Expand Up @@ -86,6 +87,18 @@ message Metadata {
uint64 total_bytes = 4;
}

message SchemaObjectsTarget {
// IDs are the descriptor IDs of the schema objects in this target.
// egs: tables, databases.
repeated uint32 ids = 1 [(gogoproto.customname) = "IDs",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"];
}

message TenantsTarget {
// IDs are the tenant IDs in this target.
repeated roachpb.TenantID ids = 1 [(gogoproto.customname) = "IDs"];
}

// Record corresponds to a protected timestamp.
message Record {

Expand Down Expand Up @@ -119,8 +132,18 @@ message Record {
// change the Version of the subsystem.
bool verified = 6;

// Spans are the spans which this Record protects.
repeated roachpb.Span spans = 7 [(gogoproto.nullable) = false];
// DeprecatedSpans are the spans which this Record protects.
repeated roachpb.Span deprecated_spans = 7 [(gogoproto.nullable) = false];

// Target holds information about the objects on which the protected timestamp
// record applies.
oneof target {
SchemaObjectsTarget schema_objects = 8;
TenantsTarget tenants = 9;
bool cluster = 10;
}

// next ID: 11
}

// State is the complete system state.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestReconciler(t *testing.T) {
Mode: ptpb.PROTECT_AFTER,
MetaType: testTaskType,
Meta: []byte(recMeta),
Spans: []roachpb.Span{
DeprecatedSpans: []roachpb.Span{
{Key: keys.MinKey, EndKey: keys.MaxKey},
},
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/protectedts/ptstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro
if txn == nil {
return errNoTxn
}
encodedSpans, err := protoutil.Marshal(&Spans{Spans: r.Spans})
encodedSpans, err := protoutil.Marshal(&Spans{Spans: r.DeprecatedSpans})
if err != nil { // how can this possibly fail?
return errors.Wrap(err, "failed to marshal spans")
}
Expand All @@ -95,10 +95,10 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro
it, err := p.ex.QueryIteratorEx(ctx, "protectedts-protect", txn,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
protectQuery,
s.maxSpans, s.maxBytes, len(r.Spans),
s.maxSpans, s.maxBytes, len(r.DeprecatedSpans),
r.ID, r.Timestamp.AsOfSystemTime(),
r.MetaType, meta,
len(r.Spans), encodedSpans)
len(r.DeprecatedSpans), encodedSpans)
if err != nil {
return errors.Wrapf(err, "failed to write record %v", r.ID)
}
Expand All @@ -115,10 +115,10 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro
}
if failed := *row[0].(*tree.DBool); failed {
curNumSpans := int64(*row[1].(*tree.DInt))
if s.maxSpans > 0 && curNumSpans+int64(len(r.Spans)) > s.maxSpans {
if s.maxSpans > 0 && curNumSpans+int64(len(r.DeprecatedSpans)) > s.maxSpans {
return errors.WithHint(
errors.Errorf("protectedts: limit exceeded: %d+%d > %d spans", curNumSpans,
len(r.Spans), s.maxSpans),
len(r.DeprecatedSpans), s.maxSpans),
"SET CLUSTER SETTING kv.protectedts.max_spans to a higher value")
}
curBytes := int64(*row[2].(*tree.DInt))
Expand Down Expand Up @@ -271,7 +271,7 @@ func rowToRecord(ctx context.Context, row tree.Datums, r *ptpb.Record) error {
if err := protoutil.Unmarshal([]byte(*row[4].(*tree.DBytes)), &spans); err != nil {
return errors.Wrapf(err, "failed to unmarshal spans for %v", r.ID)
}
r.Spans = spans.Spans
r.DeprecatedSpans = spans.Spans
r.Verified = bool(*row[5].(*tree.DBool))
return nil
}
Expand Down Expand Up @@ -303,7 +303,7 @@ func validateRecordForProtect(r *ptpb.Record) error {
if r.ID.GetUUID() == uuid.Nil {
return errZeroID
}
if len(r.Spans) == 0 {
if len(r.DeprecatedSpans) == 0 {
return errEmptySpans
}
if len(r.Meta) > 0 && len(r.MetaType) == 0 {
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ func (r releaseOp) run(ctx context.Context, t *testing.T, tCtx *testContext) {
}
tCtx.state.Version++
tCtx.state.NumRecords--
tCtx.state.NumSpans -= uint64(len(rec.Spans))
encoded, err := protoutil.Marshal(&ptstorage.Spans{Spans: rec.Spans})
tCtx.state.NumSpans -= uint64(len(rec.DeprecatedSpans))
encoded, err := protoutil.Marshal(&ptstorage.Spans{Spans: rec.DeprecatedSpans})
require.NoError(t, err)
tCtx.state.TotalBytes -= uint64(len(encoded) + len(rec.Meta) + len(rec.MetaType))
}
Expand Down Expand Up @@ -392,7 +392,7 @@ func (p protectOp) run(ctx context.Context, t *testing.T, tCtx *testContext) {
tCtx.state.Records = append(tCtx.state.Records, tail...)
tCtx.state.Version++
tCtx.state.NumRecords++
tCtx.state.NumSpans += uint64(len(rec.Spans))
tCtx.state.NumSpans += uint64(len(rec.DeprecatedSpans))
encoded, err := protoutil.Marshal(&ptstorage.Spans{Spans: p.spans})
require.NoError(t, err)
tCtx.state.TotalBytes += uint64(len(encoded) + len(p.meta) + len(p.metaType))
Expand Down Expand Up @@ -503,12 +503,12 @@ func tableSpans(tableIDs ...uint32) []roachpb.Span {

func newRecord(ts hlc.Timestamp, metaType string, meta []byte, spans ...roachpb.Span) ptpb.Record {
return ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: ts,
Mode: ptpb.PROTECT_AFTER,
MetaType: metaType,
Meta: meta,
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
Timestamp: ts,
Mode: ptpb.PROTECT_AFTER,
MetaType: metaType,
Meta: meta,
DeprecatedSpans: spans,
}
}

Expand Down
34 changes: 17 additions & 17 deletions pkg/kv/kvserver/protectedts/ptstorage/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,38 +34,38 @@ func TestValidateRecordForProtect(t *testing.T) {
}{
{
r: &ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
MetaType: "job",
Meta: []byte("junk"),
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
MetaType: "job",
Meta: []byte("junk"),
DeprecatedSpans: spans,
},
err: nil,
},
{
r: &ptpb.Record{
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
MetaType: "job",
Meta: []byte("junk"),
Spans: spans,
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
MetaType: "job",
Meta: []byte("junk"),
DeprecatedSpans: spans,
},
err: errZeroID,
},
{
r: &ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
MetaType: "job",
Meta: []byte("junk"),
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
MetaType: "job",
Meta: []byte("junk"),
DeprecatedSpans: spans,
},
err: errZeroTimestamp,
},
{
r: &ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
Meta: []byte("junk"),
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
Meta: []byte("junk"),
DeprecatedSpans: spans,
},
err: errInvalidMeta,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/protectedts/ptverifier/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func getRecordWithTimestamp(

func makeVerificationBatch(r *ptpb.Record, aliveAt hlc.Timestamp) kv.Batch {
// Need to perform validation, build a batch and run it.
mergedSpans, _ := roachpb.MergeSpans(&r.Spans)
mergedSpans, _ := roachpb.MergeSpans(&r.DeprecatedSpans)
var b kv.Batch
for _, s := range mergedSpans {
var req roachpb.AdminVerifyProtectedTimestampRequest
Expand Down
24 changes: 12 additions & 12 deletions pkg/kv/kvserver/protectedts/ptverifier/verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ func TestVerifier(t *testing.T) {
spans[i] = makeTableSpan(tid)
}
r := ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: s.Clock().Now(),
Mode: ptpb.PROTECT_AFTER,
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
Timestamp: s.Clock().Now(),
Mode: ptpb.PROTECT_AFTER,
DeprecatedSpans: spans,
}
require.Nil(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return pts.Protect(ctx, txn, &r)
Expand Down Expand Up @@ -138,8 +138,8 @@ func TestVerifier(t *testing.T) {
resp.Add(&roachpb.AdminVerifyProtectedTimestampResponse{
VerificationFailedRanges: []roachpb.AdminVerifyProtectedTimestampResponse_FailedRange{{
RangeID: 42,
StartKey: roachpb.RKey(r.Spans[0].Key),
EndKey: roachpb.RKey(r.Spans[0].EndKey),
StartKey: roachpb.RKey(r.DeprecatedSpans[0].Key),
EndKey: roachpb.RKey(r.DeprecatedSpans[0].EndKey),
}},
})
return &resp, nil
Expand All @@ -166,16 +166,16 @@ func TestVerifier(t *testing.T) {
resp.Add(&roachpb.AdminVerifyProtectedTimestampResponse{
VerificationFailedRanges: []roachpb.AdminVerifyProtectedTimestampResponse_FailedRange{{
RangeID: 42,
StartKey: roachpb.RKey(r.Spans[0].Key),
EndKey: roachpb.RKey(r.Spans[0].EndKey),
StartKey: roachpb.RKey(r.DeprecatedSpans[0].Key),
EndKey: roachpb.RKey(r.DeprecatedSpans[0].EndKey),
Reason: "foo",
}},
})
resp.Add(&roachpb.AdminVerifyProtectedTimestampResponse{
VerificationFailedRanges: []roachpb.AdminVerifyProtectedTimestampResponse_FailedRange{{
RangeID: 12,
StartKey: roachpb.RKey(r.Spans[1].Key),
EndKey: roachpb.RKey(r.Spans[1].EndKey),
StartKey: roachpb.RKey(r.DeprecatedSpans[1].Key),
EndKey: roachpb.RKey(r.DeprecatedSpans[1].EndKey),
Reason: "bar",
}},
})
Expand Down Expand Up @@ -206,8 +206,8 @@ func TestVerifier(t *testing.T) {
resp.Add(&roachpb.AdminVerifyProtectedTimestampResponse{
DeprecatedFailedRanges: []roachpb.RangeDescriptor{{
RangeID: 42,
StartKey: roachpb.RKey(r.Spans[0].Key),
EndKey: roachpb.RKey(r.Spans[0].EndKey),
StartKey: roachpb.RKey(r.DeprecatedSpans[0].Key),
EndKey: roachpb.RKey(r.DeprecatedSpans[0].EndKey),
}},
})
return &resp, nil
Expand Down
Loading

0 comments on commit 2ed96a6

Please sign in to comment.