Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protectedts: add target field to pts record #74211

Merged
merged 1 commit into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/schedule_pts_chaining.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func getSpansProtectedByBackup(
return nil, err
}

return ptsRecord.Spans, nil
return ptsRecord.DeprecatedSpans, nil
}

func protectTimestampRecordForSchedule(
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3553,8 +3553,8 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
if ptr == nil {
return errors.Errorf("expected protected timestamp")
}
require.Equal(t, len(ptr.Spans), len(expectedKeys), ptr.Spans, expectedKeys)
for _, s := range ptr.Spans {
require.Equal(t, len(ptr.DeprecatedSpans), len(expectedKeys), ptr.DeprecatedSpans, expectedKeys)
for _, s := range ptr.DeprecatedSpans {
require.Contains(t, expectedKeys, string(s.Key))
}
return nil
Expand Down
12 changes: 6 additions & 6 deletions pkg/jobs/jobsprotectedts/jobs_protected_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ func MakeRecord(
metaType MetaType,
) *ptpb.Record {
return &ptpb.Record{
ID: recordID.GetBytesMut(),
Timestamp: tsToProtect,
Mode: ptpb.PROTECT_AFTER,
MetaType: metaTypes[metaType],
Meta: encodeID(metaID),
Spans: spans,
ID: recordID.GetBytesMut(),
Timestamp: tsToProtect,
Mode: ptpb.PROTECT_AFTER,
MetaType: metaTypes[metaType],
Meta: encodeID(metaID),
DeprecatedSpans: spans,
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_protectedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestProtectedTimestamps(t *testing.T) {
ID: uuid.MakeV4().GetBytes(),
Timestamp: s0.Clock().Now(),
Mode: ptpb.PROTECT_AFTER,
Spans: []roachpb.Span{
DeprecatedSpans: []roachpb.Span{
{
Key: startKey,
EndKey: startKey.PrefixEnd(),
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3373,9 +3373,9 @@ func TestStrictGCEnforcement(t *testing.T) {
tableSpan = roachpb.Span{Key: tableKey, EndKey: tableKey.PrefixEnd()}
mkRecord = func() ptpb.Record {
return ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: tenSecondsAgo.Add(-10*time.Second.Nanoseconds(), 0),
Spans: []roachpb.Span{tableSpan},
ID: uuid.MakeV4().GetBytes(),
Timestamp: tenSecondsAgo.Add(-10*time.Second.Nanoseconds(), 0),
DeprecatedSpans: []roachpb.Span{tableSpan},
}
}
mkStaleTxn = func() *kv.Txn {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/protectedts/ptcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ func (c *Cache) upToDate(asOf hlc.Timestamp) bool {
}

func overlaps(r *ptpb.Record, sp roachpb.Span) bool {
for i := range r.Spans {
if r.Spans[i].Overlaps(sp) {
for i := range r.DeprecatedSpans {
if r.DeprecatedSpans[i].Overlaps(sp) {
return true
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/protectedts/ptcache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,10 @@ func protect(
) (r *ptpb.Record, createdAt hlc.Timestamp) {
protectTS := s.Clock().Now()
r = &ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: protectTS,
Mode: ptpb.PROTECT_AFTER,
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
Timestamp: protectTS,
Mode: ptpb.PROTECT_AFTER,
DeprecatedSpans: spans,
}
ctx := context.Background()
txn := s.DB().NewTxn(ctx, "test")
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/protectedts/ptpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ proto_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/roachpb:roachpb_proto",
"//pkg/sql/catalog/descpb:descpb_proto",
"//pkg/util/hlc:hlc_proto",
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
"@go_googleapis//google/api:annotations_proto",
Expand All @@ -22,6 +23,7 @@ go_proto_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/roachpb:with-mocks",
"//pkg/sql/catalog/descpb",
"//pkg/util/hlc",
"//pkg/util/uuid", # keep
"@com_github_gogo_protobuf//gogoproto",
Expand Down
37 changes: 35 additions & 2 deletions pkg/kv/kvserver/protectedts/ptpb/protectedts.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import "gogoproto/gogo.proto";
import "google/api/annotations.proto";
import "roachpb/data.proto";
import "util/hlc/timestamp.proto";
import "sql/catalog/descpb/structured.proto";


// TODO(ajwerner): Consider splitting up Record into two pieces. It would
Expand Down Expand Up @@ -88,6 +89,27 @@ message Metadata {

// Record corresponds to a protected timestamp.
message Record {
message SchemaObjectsTarget {
// IDs are the descriptor IDs of the schema objects being protected by this
// Record. This field will only contain database and table IDs.
repeated uint32 ids = 1 [(gogoproto.customname) = "IDs",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"];
}

message TenantsTarget {
// IDs correspond to the tenant keyspacs being protected by this Record.
repeated roachpb.TenantID ids = 1 [(gogoproto.customname) = "IDs"];
}

message ClusterTarget {
// ClusterTarget indicates that all SQL state in the cluster is being
// protected by this Record. This includes all user defined schema objects,
// as well as system tables used to configure the cluster. In a system
// tenant this target will also protect all secondary tenant keyspaces that
// exist in it.
//
// Today, this target is only used by cluster backups.
}

// ID uniquely identifies this row.
bytes id = 1 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.Bytes", (gogoproto.customname) = "ID"];
Expand Down Expand Up @@ -119,8 +141,19 @@ message Record {
// change the Version of the subsystem.
bool verified = 6;

// Spans are the spans which this Record protects.
repeated roachpb.Span spans = 7 [(gogoproto.nullable) = false];
// DeprecatedSpans are the spans which this Record protects.
repeated roachpb.Span deprecated_spans = 7 [(gogoproto.nullable) = false];

// Target holds information about what this Record protects. The Record can
// either protect the entire cluster, a subset of tenants, or individual
// schema objects (database and table).
oneof target {
SchemaObjectsTarget schema_objects = 8;
TenantsTarget tenants = 9;
ClusterTarget cluster = 10;
}

// next ID: 11
}

// State is the complete system state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestReconciler(t *testing.T) {
Mode: ptpb.PROTECT_AFTER,
MetaType: testTaskType,
Meta: []byte(recMeta),
Spans: []roachpb.Span{
DeprecatedSpans: []roachpb.Span{
{Key: keys.MinKey, EndKey: keys.MaxKey},
},
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/protectedts/ptstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro
if txn == nil {
return errNoTxn
}
encodedSpans, err := protoutil.Marshal(&Spans{Spans: r.Spans})
encodedSpans, err := protoutil.Marshal(&Spans{Spans: r.DeprecatedSpans})
if err != nil { // how can this possibly fail?
return errors.Wrap(err, "failed to marshal spans")
}
Expand All @@ -95,10 +95,10 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro
it, err := p.ex.QueryIteratorEx(ctx, "protectedts-protect", txn,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
protectQuery,
s.maxSpans, s.maxBytes, len(r.Spans),
s.maxSpans, s.maxBytes, len(r.DeprecatedSpans),
r.ID, r.Timestamp.AsOfSystemTime(),
r.MetaType, meta,
len(r.Spans), encodedSpans)
len(r.DeprecatedSpans), encodedSpans)
if err != nil {
return errors.Wrapf(err, "failed to write record %v", r.ID)
}
Expand All @@ -115,10 +115,10 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro
}
if failed := *row[0].(*tree.DBool); failed {
curNumSpans := int64(*row[1].(*tree.DInt))
if s.maxSpans > 0 && curNumSpans+int64(len(r.Spans)) > s.maxSpans {
if s.maxSpans > 0 && curNumSpans+int64(len(r.DeprecatedSpans)) > s.maxSpans {
return errors.WithHint(
errors.Errorf("protectedts: limit exceeded: %d+%d > %d spans", curNumSpans,
len(r.Spans), s.maxSpans),
len(r.DeprecatedSpans), s.maxSpans),
"SET CLUSTER SETTING kv.protectedts.max_spans to a higher value")
}
curBytes := int64(*row[2].(*tree.DInt))
Expand Down Expand Up @@ -271,7 +271,7 @@ func rowToRecord(ctx context.Context, row tree.Datums, r *ptpb.Record) error {
if err := protoutil.Unmarshal([]byte(*row[4].(*tree.DBytes)), &spans); err != nil {
return errors.Wrapf(err, "failed to unmarshal spans for %v", r.ID)
}
r.Spans = spans.Spans
r.DeprecatedSpans = spans.Spans
r.Verified = bool(*row[5].(*tree.DBool))
return nil
}
Expand Down Expand Up @@ -303,7 +303,7 @@ func validateRecordForProtect(r *ptpb.Record) error {
if r.ID.GetUUID() == uuid.Nil {
return errZeroID
}
if len(r.Spans) == 0 {
if len(r.DeprecatedSpans) == 0 {
return errEmptySpans
}
if len(r.Meta) > 0 && len(r.MetaType) == 0 {
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ func (r releaseOp) run(ctx context.Context, t *testing.T, tCtx *testContext) {
}
tCtx.state.Version++
tCtx.state.NumRecords--
tCtx.state.NumSpans -= uint64(len(rec.Spans))
encoded, err := protoutil.Marshal(&ptstorage.Spans{Spans: rec.Spans})
tCtx.state.NumSpans -= uint64(len(rec.DeprecatedSpans))
encoded, err := protoutil.Marshal(&ptstorage.Spans{Spans: rec.DeprecatedSpans})
require.NoError(t, err)
tCtx.state.TotalBytes -= uint64(len(encoded) + len(rec.Meta) + len(rec.MetaType))
}
Expand Down Expand Up @@ -392,7 +392,7 @@ func (p protectOp) run(ctx context.Context, t *testing.T, tCtx *testContext) {
tCtx.state.Records = append(tCtx.state.Records, tail...)
tCtx.state.Version++
tCtx.state.NumRecords++
tCtx.state.NumSpans += uint64(len(rec.Spans))
tCtx.state.NumSpans += uint64(len(rec.DeprecatedSpans))
encoded, err := protoutil.Marshal(&ptstorage.Spans{Spans: p.spans})
require.NoError(t, err)
tCtx.state.TotalBytes += uint64(len(encoded) + len(p.meta) + len(p.metaType))
Expand Down Expand Up @@ -503,12 +503,12 @@ func tableSpans(tableIDs ...uint32) []roachpb.Span {

func newRecord(ts hlc.Timestamp, metaType string, meta []byte, spans ...roachpb.Span) ptpb.Record {
return ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: ts,
Mode: ptpb.PROTECT_AFTER,
MetaType: metaType,
Meta: meta,
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
Timestamp: ts,
Mode: ptpb.PROTECT_AFTER,
MetaType: metaType,
Meta: meta,
DeprecatedSpans: spans,
}
}

Expand Down
34 changes: 17 additions & 17 deletions pkg/kv/kvserver/protectedts/ptstorage/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,38 +34,38 @@ func TestValidateRecordForProtect(t *testing.T) {
}{
{
r: &ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
MetaType: "job",
Meta: []byte("junk"),
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
MetaType: "job",
Meta: []byte("junk"),
DeprecatedSpans: spans,
},
err: nil,
},
{
r: &ptpb.Record{
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
MetaType: "job",
Meta: []byte("junk"),
Spans: spans,
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
MetaType: "job",
Meta: []byte("junk"),
DeprecatedSpans: spans,
},
err: errZeroID,
},
{
r: &ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
MetaType: "job",
Meta: []byte("junk"),
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
MetaType: "job",
Meta: []byte("junk"),
DeprecatedSpans: spans,
},
err: errZeroTimestamp,
},
{
r: &ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
Meta: []byte("junk"),
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
Meta: []byte("junk"),
DeprecatedSpans: spans,
},
err: errInvalidMeta,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/protectedts/ptverifier/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func getRecordWithTimestamp(

func makeVerificationBatch(r *ptpb.Record, aliveAt hlc.Timestamp) kv.Batch {
// Need to perform validation, build a batch and run it.
mergedSpans, _ := roachpb.MergeSpans(&r.Spans)
mergedSpans, _ := roachpb.MergeSpans(&r.DeprecatedSpans)
var b kv.Batch
for _, s := range mergedSpans {
var req roachpb.AdminVerifyProtectedTimestampRequest
Expand Down
24 changes: 12 additions & 12 deletions pkg/kv/kvserver/protectedts/ptverifier/verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ func TestVerifier(t *testing.T) {
spans[i] = makeTableSpan(tid)
}
r := ptpb.Record{
ID: uuid.MakeV4().GetBytes(),
Timestamp: s.Clock().Now(),
Mode: ptpb.PROTECT_AFTER,
Spans: spans,
ID: uuid.MakeV4().GetBytes(),
Timestamp: s.Clock().Now(),
Mode: ptpb.PROTECT_AFTER,
DeprecatedSpans: spans,
}
require.Nil(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return pts.Protect(ctx, txn, &r)
Expand Down Expand Up @@ -138,8 +138,8 @@ func TestVerifier(t *testing.T) {
resp.Add(&roachpb.AdminVerifyProtectedTimestampResponse{
VerificationFailedRanges: []roachpb.AdminVerifyProtectedTimestampResponse_FailedRange{{
RangeID: 42,
StartKey: roachpb.RKey(r.Spans[0].Key),
EndKey: roachpb.RKey(r.Spans[0].EndKey),
StartKey: roachpb.RKey(r.DeprecatedSpans[0].Key),
EndKey: roachpb.RKey(r.DeprecatedSpans[0].EndKey),
}},
})
return &resp, nil
Expand All @@ -166,16 +166,16 @@ func TestVerifier(t *testing.T) {
resp.Add(&roachpb.AdminVerifyProtectedTimestampResponse{
VerificationFailedRanges: []roachpb.AdminVerifyProtectedTimestampResponse_FailedRange{{
RangeID: 42,
StartKey: roachpb.RKey(r.Spans[0].Key),
EndKey: roachpb.RKey(r.Spans[0].EndKey),
StartKey: roachpb.RKey(r.DeprecatedSpans[0].Key),
EndKey: roachpb.RKey(r.DeprecatedSpans[0].EndKey),
Reason: "foo",
}},
})
resp.Add(&roachpb.AdminVerifyProtectedTimestampResponse{
VerificationFailedRanges: []roachpb.AdminVerifyProtectedTimestampResponse_FailedRange{{
RangeID: 12,
StartKey: roachpb.RKey(r.Spans[1].Key),
EndKey: roachpb.RKey(r.Spans[1].EndKey),
StartKey: roachpb.RKey(r.DeprecatedSpans[1].Key),
EndKey: roachpb.RKey(r.DeprecatedSpans[1].EndKey),
Reason: "bar",
}},
})
Expand Down Expand Up @@ -206,8 +206,8 @@ func TestVerifier(t *testing.T) {
resp.Add(&roachpb.AdminVerifyProtectedTimestampResponse{
DeprecatedFailedRanges: []roachpb.RangeDescriptor{{
RangeID: 42,
StartKey: roachpb.RKey(r.Spans[0].Key),
EndKey: roachpb.RKey(r.Spans[0].EndKey),
StartKey: roachpb.RKey(r.DeprecatedSpans[0].Key),
EndKey: roachpb.RKey(r.DeprecatedSpans[0].EndKey),
}},
})
return &resp, nil
Expand Down
Loading