Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hlc: remove the Synthetic field from Timestamp and LegacyTimestamp #116830

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
}
}

cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection, feedTestNoForcedSyntheticTimestamps)
cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection)
}

func TestAlterChangefeedInitialScan(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown metadata timestamp type: %T`, u))
}
native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), timestampToString(ts))
native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime())
}
}
if r.opts.resolvedField {
Expand All @@ -1040,7 +1040,7 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown metadata timestamp type: %T`, u))
}
native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), timestampToString(ts))
native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime())
}
}
for k := range meta {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6894,7 +6894,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
// TODO(ssd): Tenant testing disabled because of use of DB()
for _, sz := range []int64{100 << 20, 100} {
maxCheckpointSize = sz
cdcTestNamedWithSystem(t, fmt.Sprintf("limit=%s", humanize.Bytes(uint64(sz))), testFn, feedTestForceSink("webhook"), feedTestNoForcedSyntheticTimestamps)
cdcTestNamedWithSystem(t, fmt.Sprintf("limit=%s", humanize.Bytes(uint64(sz))), testFn, feedTestForceSink("webhook"))
}
}

Expand Down
7 changes: 0 additions & 7 deletions pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,3 @@ func getEncoder(
return nil, errors.AssertionFailedf(`unknown format: %s`, opts.Format)
}
}

// timestampToString converts an internal timestamp to the string form used in
// all encoders. This could be made more efficient. And/or it could be configurable
// to include the Synthetic flag when present, but that's unlikely to be needed.
func timestampToString(t hlc.Timestamp) string {
return t.WithSynthetic(false).AsOfSystemTime()
}
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/encoder_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,13 @@ func (e *jsonEncoder) initRawEnvelope() error {
}

if e.updatedField {
if err := metaBuilder.Set("updated", json.FromString(timestampToString(evCtx.updated))); err != nil {
if err := metaBuilder.Set("updated", json.FromString(evCtx.updated.AsOfSystemTime())); err != nil {
return nil, err
}
}

if e.mvccTimestampField {
if err := metaBuilder.Set("mvcc_timestamp", json.FromString(timestampToString(evCtx.mvcc))); err != nil {
if err := metaBuilder.Set("mvcc_timestamp", json.FromString(evCtx.mvcc.AsOfSystemTime())); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -355,13 +355,13 @@ func (e *jsonEncoder) initWrappedEnvelope() error {
}

if e.updatedField {
if err := b.Set("updated", json.FromString(timestampToString(evCtx.updated))); err != nil {
if err := b.Set("updated", json.FromString(evCtx.updated.AsOfSystemTime())); err != nil {
return nil, err
}
}

if e.mvccTimestampField {
if err := b.Set("mvcc_timestamp", json.FromString(timestampToString(evCtx.mvcc))); err != nil {
if err := b.Set("mvcc_timestamp", json.FromString(evCtx.mvcc.AsOfSystemTime())); err != nil {
return nil, err
}
}
Expand Down
31 changes: 0 additions & 31 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,6 @@ type feedTestOptions struct {
externalIODir string
allowedSinkTypes []string
disabledSinkTypes []string
disableSyntheticTimestamps bool
settings *cluster.Settings
}

Expand All @@ -570,12 +569,6 @@ var feedTestNoExternalConnection = func(opts *feedTestOptions) { opts.forceNoExt
// has privileges to create changefeeds on tables in the default database `d` only.
var feedTestUseRootUserConnection = func(opts *feedTestOptions) { opts.forceRootUserConnection = true }

// feedTestNoForcedSyntheticTimestamps is a feedTestOption that will prevent
// the test from randomly forcing timestamps to be synthetic and offset five seconds into the future from
// what they would otherwise be. It doesn't prevent synthetic timestamps but they're otherwise unlikely to
// occur in tests.
var feedTestNoForcedSyntheticTimestamps = func(opts *feedTestOptions) { opts.disableSyntheticTimestamps = true }

var feedTestForceSink = func(sinkType string) feedTestOption {
return feedTestRestrictSinks(sinkType)
}
Expand Down Expand Up @@ -631,30 +624,6 @@ func makeOptions(opts ...feedTestOption) feedTestOptions {
for _, o := range opts {
o(&options)
}
if !options.disableSyntheticTimestamps && rand.Intn(2) == 0 {
// Offset all timestamps a random (but consistent per test) amount into the
// future to ensure we can handle that. Always chooses an integer number of
// seconds for easier debugging and so that 0 is a possibility.
offset := int64(rand.Intn(6)) * time.Second.Nanoseconds()
// TODO(#105053): Remove this line
_ = offset
oldKnobsFn := options.knobsFn
options.knobsFn = func(knobs *base.TestingKnobs) {
if oldKnobsFn != nil {
oldKnobsFn(knobs)
}
knobs.DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs).FeedKnobs.ModifyTimestamps = func(t *hlc.Timestamp) {
// NOTE(ricky): This line of code should be uncommented.
// It used to be just t.Add(offset, 0), but t.Add() has no side
// effects so this was a no-op. *t = t.Add(offset, 0) is correct,
// but causes test failures.
// TODO(#105053): Uncomment and fix test failures
//*t = t.Add(offset, 0)
t.Synthetic = true
}
}
}
return options
}

Expand Down
8 changes: 0 additions & 8 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,12 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
return err
}
}
if p.knobs.ModifyTimestamps != nil {
e = kvcoord.RangeFeedMessage{RangeFeedEvent: e.ShallowCopy(), RegisteredSpan: e.RegisteredSpan}
p.knobs.ModifyTimestamps(&e.Val.Value.Timestamp)
}
if err := p.memBuf.Add(
ctx, kvevent.MakeKVEvent(e.RangeFeedEvent),
); err != nil {
return err
}
case *kvpb.RangeFeedCheckpoint:
if p.knobs.ModifyTimestamps != nil {
e = kvcoord.RangeFeedMessage{RangeFeedEvent: e.ShallowCopy(), RegisteredSpan: e.RegisteredSpan}
p.knobs.ModifyTimestamps(&e.Checkpoint.ResolvedTS)
}
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) {
// RangeFeed happily forwards any closed timestamps it receives as
// soon as there are no outstanding intents under them.
Expand Down
4 changes: 0 additions & 4 deletions pkg/ccl/changefeedccl/kvfeed/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// TestingKnobs are the testing knobs for kvfeed.
Expand All @@ -30,9 +29,6 @@ type TestingKnobs struct {
// EndTimeReached is a callback that may return true to indicate the
// feed should exit because its end time has been reached.
EndTimeReached func() bool
// ModifyTimestamps is called on the timestamp for each RangefeedMessage
// before converting it into a kv event.
ModifyTimestamps func(*hlc.Timestamp)
// RangefeedOptions lets the kvfeed override rangefeed settings.
RangefeedOptions []kvcoord.RangeFeedOption
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,10 @@ func (w *parquetWriter) populateDatums(
datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString())

if w.encodingOpts.UpdatedTimestamps {
datums = append(datums, tree.NewDString(timestampToString(updated)))
datums = append(datums, tree.NewDString(updated.AsOfSystemTime()))
}
if w.encodingOpts.MVCCTimestamps {
datums = append(datums, tree.NewDString(timestampToString(mvcc)))
datums = append(datums, tree.NewDString(mvcc.AsOfSystemTime()))
}
if w.encodingOpts.Diff {
if prevRow.IsDeleted() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2300,8 +2300,8 @@ message SubsumeResponse {
// through a range merge, to make the merge less disruptive to writes on
// the post-merge range because the timestamp cache won't be bumped as
// high.
// 2. it can transfer information about reads with synthetic timestamps, which
// are not otherwise captured by the FreezeStart clock timestamp.
// 2. it can transfer information about reads with future-time timestamps,
// which are not otherwise captured by the FreezeStart clock timestamp.
kv.kvserver.readsummary.ReadSummary read_summary = 7;
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,7 @@ func (e *ReadWithinUncertaintyIntervalError) RetryTimestamp() hlc.Timestamp {
// advance the txn's timestamp up to the local uncertainty limit on the node
// which hit the error. This ensures that no future read after the retry on
// this node (ignoring lease complications in ComputeLocalUncertaintyLimit
// and values with synthetic timestamps) will throw an uncertainty error,
// and values with future-time timestamps) will throw an uncertainty error,
// even when reading other keys.
//
// Note that if the request was not able to establish a local uncertainty
Expand All @@ -1153,9 +1153,9 @@ func (e *ReadWithinUncertaintyIntervalError) RetryTimestamp() hlc.Timestamp {
// In general, we expect the local uncertainty limit, if set, to be above
// the uncertainty value's timestamp. So we expect this Forward to advance
// ts. However, this is not always the case. The one exception is if the
// uncertain value had a synthetic timestamp, so it was compared against the
// global uncertainty limit to determine uncertainty (see IsUncertain). In
// such cases, we're ok advancing just past the value's timestamp. Either
// uncertain value had a future-time timestamp, so it was compared against
// the global uncertainty limit to determine uncertainty (see IsUncertain).
// In such cases, we're ok advancing just past the value's timestamp. Either
// way, we won't see the same value in our uncertainty interval on a retry.
ts.Forward(e.LocalUncertaintyLimit.ToTimestamp())
return ts
Expand Down
10 changes: 1 addition & 9 deletions pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ func TestBelowRaftProtosDontChange(t *testing.T) {
func(r *rand.Rand) protoutil.Message {
m := enginepb.NewPopulatedMVCCMetadata(r, false)
m.Txn = nil // never populated below Raft
m.Timestamp.Synthetic = nil // never populated below Raft
if m.MergeTimestamp != nil {
m.MergeTimestamp.Synthetic = nil // never populated below Raft
}
m.TxnDidNotUpdateMeta = nil // never populated below Raft
return m
},
Expand Down Expand Up @@ -85,11 +81,7 @@ func TestBelowRaftProtosDontChange(t *testing.T) {
return roachpb.NewPopulatedInternalTimeSeriesData(r, false)
},
func(r *rand.Rand) protoutil.Message {
m := enginepb.NewPopulatedMVCCMetadataSubsetForMergeSerialization(r, false)
if m.MergeTimestamp != nil {
m.MergeTimestamp.Synthetic = nil // never populated below Raft
}
return m
return enginepb.NewPopulatedMVCCMetadataSubsetForMergeSerialization(r, false)
},
func(r *rand.Rand) protoutil.Message {
return kvserverpb.NewPopulatedRaftReplicaID(r, false)
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ import (

// TestReplicaClockUpdates verifies that the leaseholder updates its clocks
// when executing a command to the command's timestamp, as long as the
// request timestamp is from a clock (i.e. is not synthetic).
// request timestamp is from a clock (i.e. is not in the future).
func TestReplicaClockUpdates(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

run := func(t *testing.T, write bool, synthetic bool) {
run := func(t *testing.T, write bool, futureTime bool) {
const numNodes = 3
var manuals []*hlc.HybridManualClock
var clocks []*hlc.Clock
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestReplicaClockUpdates(t *testing.T) {
// MaxOffset.
reqTS := clocks[0].Now().Add(clocks[0].MaxOffset().Nanoseconds()/2, 0)
h := kvpb.Header{Timestamp: reqTS}
if !synthetic {
if !futureTime {
h.Now = hlc.ClockTimestamp(reqTS)
}

Expand All @@ -144,13 +144,13 @@ func TestReplicaClockUpdates(t *testing.T) {
// practice an assertion against followers' clocks being updated is very
// difficult to make without being flaky because it's difficult to prevent
// other channels (background work, etc.) from carrying the clock update.
expUpdated := !synthetic
expUpdated := !futureTime
require.Equal(t, expUpdated, reqTS.Less(clocks[0].Now()))
}

testutils.RunTrueAndFalse(t, "write", func(t *testing.T, write bool) {
testutils.RunTrueAndFalse(t, "synthetic", func(t *testing.T, synthetic bool) {
run(t, write, synthetic)
testutils.RunTrueAndFalse(t, "future-time", func(t *testing.T, futureTime bool) {
run(t, write, futureTime)
})
})
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,10 +442,6 @@ func (ir *IntentResolver) MaybePushTransactions(
return nil, b.MustPErr()
}

// TODO(nvanbenschoten): if we succeed because the transaction has already
// been pushed _past_ where we were pushing, we need to set the synthetic
// bit. This is part of #36431.

br := b.RawResponse()
pushedTxns := make(map[uuid.UUID]*roachpb.Transaction, len(br.Responses))
for _, resp := range br.Responses {
Expand Down
5 changes: 2 additions & 3 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import (
var maxRaftCommandFooterSize = (&RaftCommandFooter{
MaxLeaseIndex: math.MaxUint64,
ClosedTimestamp: hlc.Timestamp{
WallTime: math.MaxInt64,
Logical: math.MaxInt32,
Synthetic: true,
WallTime: math.MaxInt64,
Logical: math.MaxInt32,
},
}).Size()

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ message ReplicatedEvalResult {
// 1. it can transfer a higher-resolution snapshot of the reads on the range
// through a lease transfer, to make the lease transfers less disruptive to
// writes because the timestamp cache won't be bumped as high.
// 2. it can transfer information about reads with synthetic timestamps, which
// are not otherwise captured by the new lease's start time.
// 2. it can transfer information about reads with future-time timestamps,
// which are not otherwise captured by the new lease's start time.
//
// When a ReadSummary is set in a ReplicatedEvalResult, there is always also a
// write to the RangePriorReadSummaryKey in the RaftCommand.WriteBatch. The
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/protectedts/ptstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (p *storage) Protect(ctx context.Context, r *ptpb.Record) error {
sessiondata.NodeUserSessionDataOverride,
protectQuery,
s.maxSpans, s.maxBytes, len(r.DeprecatedSpans),
r.ID, r.Timestamp.WithSynthetic(false).AsOfSystemTime(),
r.ID, r.Timestamp.AsOfSystemTime(),
r.MetaType, meta,
len(r.DeprecatedSpans), encodedTarget, encodedTarget)
if err != nil {
Expand Down Expand Up @@ -230,7 +230,7 @@ func (p *storage) getRecords(ctx context.Context) ([]ptpb.Record, error) {
func (p storage) UpdateTimestamp(ctx context.Context, id uuid.UUID, timestamp hlc.Timestamp) error {
row, err := p.txn.QueryRowEx(ctx, "protectedts-update", p.txn.KV(),
sessiondata.NodeUserSessionDataOverride,
updateTimestampQuery, id.GetBytesMut(), timestamp.WithSynthetic(false).AsOfSystemTime())
updateTimestampQuery, id.GetBytesMut(), timestamp.AsOfSystemTime())
if err != nil {
return errors.Wrapf(err, "failed to update record %v", id)
}
Expand Down Expand Up @@ -322,7 +322,7 @@ func (p *storage) deprecatedProtect(ctx context.Context, r *ptpb.Record, meta []
sessiondata.NodeUserSessionDataOverride,
protectQueryWithoutTarget,
s.maxSpans, s.maxBytes, len(r.DeprecatedSpans),
r.ID, r.Timestamp.WithSynthetic(false).AsOfSystemTime(),
r.ID, r.Timestamp.AsOfSystemTime(),
r.MetaType, meta,
len(r.DeprecatedSpans), encodedSpans)
if err != nil {
Expand Down
29 changes: 0 additions & 29 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,35 +299,6 @@ var testCases = []testCase{
}),
},
},
{
name: "Protect using synthetic timestamp",
ops: []op{
funcOp(func(ctx context.Context, t *testing.T, tCtx *testContext) {
rec := newRecord(tCtx, tCtx.tc.Server(0).Clock().Now().WithSynthetic(true), "", nil, tableTarget(42),
tableSpan(42))
err := tCtx.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return tCtx.pts.WithTxn(txn).Protect(ctx, &rec)
})
require.NoError(t, err)
// Synthetic should be reset when writing timestamps to make it
// compatible with underlying sql schema.
rec.Timestamp.Synthetic = false
tCtx.state.Records = append(tCtx.state.Records, rec)
tCtx.state.Version++
tCtx.state.NumRecords++
tCtx.state.NumSpans += uint64(len(rec.DeprecatedSpans))
var encoded []byte
if tCtx.runWithDeprecatedSpans {
encoded, err = protoutil.Marshal(&ptstorage.Spans{Spans: rec.DeprecatedSpans})
require.NoError(t, err)
} else {
encoded, err = protoutil.Marshal(&ptpb.Target{Union: rec.Target.GetUnion()})
require.NoError(t, err)
}
tCtx.state.TotalBytes += uint64(len(encoded))
}),
},
},
}

type testContext struct {
Expand Down
2 changes: 0 additions & 2 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,8 +680,6 @@ func CalcReplicaDigest(
result.PersistedMS = rangeAppliedState.RangeStats.ToStats()

if statsOnly {
// Unset the synthetic flag, to ease the migration of deleting this field.
rangeAppliedState.RaftClosedTimestamp.Synthetic = false
b, err := protoutil.Marshal(rangeAppliedState)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
16747849667884397839
10315043615536467344
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
12542053708208219209
12249944093974449552
Loading