Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
115888: sql: update pausable portal todo r=rharding6373 a=rharding6373

Epic: None
Informs: #115887

Release note: None

116830: hlc: remove the Synthetic field from Timestamp and LegacyTimestamp r=erikgrinaker a=nvanbenschoten

Closes #101938.

This PR completes the work to remove the `Synthetic` field from `Timestamp` and `LegacyTimestamp`. It removes the remaining uses, removes the fields from the proto definitions, and removes all access to the fields in methods.

Release note: None



117429: revertccl: ALTER VIRTUAL CLUSTER RESET DATA r=dt a=dt

This enables resetting a virtual cluster's data to a prior timestamp.

This is possible if the prior timestamp is still retained in the mvcc
history of the virtual cluster, the virtual cluster has stopped service,
and is run by a user with the MANAGEVIRTUALCLUSTER (or admin) privilege
in the system tenant.

Revisions of data in the system tenant newer than the target time to
which it is being reset are destroyed, reverting the tenant to the state
it was in as of the time reverted to. Destroyed revisions are not
recoverable; once a tenant has been reset to a timestamp, it cannot be
'un-reset' back to a higher timestamp.

Release note (cluster virtualization): Added a new 'flashback' command
to revert a virtual cluster to an earlier state using ALTER VIRTUAL
CLUSTER RESET DATA.


Epic: CRDB-34233.

117541: storage: fix a series of intent resolution bugs with ignored seq nums r=nvanbenschoten a=miraradeva

Previously, the logic in mvccResolveWriteIntent was structured in such a
way that if an intent contained both ignored and non-ignored seq nums
in its intent history, the intent may end up being updated instead of
aborted or unmodified (see examples in 9f00f2a5505).

This commit fixes the bugs by ensuring that the intent history is
modified only when an intent resolution update is not aborted, and the
update and the actual intent have the same epoch.

Fixes: #117553

Release note: None

117563: distsql: improve columnar operator test harness for decimals r=yuzefovich a=yuzefovich

We recently merged a change to add decimals with different numbers of trailing zeroes in the "interesting datums" set, and it made some existing tests fail because they used direct string comparison for equality. This commit adjusts the test harness to be smarter for decimals.

Fixes: #117543.

Release note: None

Co-authored-by: rharding6373 <rharding6373@users.noreply.github.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Co-authored-by: David Taylor <tinystatemachine@gmail.com>
Co-authored-by: Mira Radeva <mira@cockroachlabs.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
6 people committed Jan 10, 2024
6 parents 8793a26 + 5e2fd95 + 6f48942 + 7d6d7f2 + 9bd8ca5 + 80fea80 commit 3bcf088
Show file tree
Hide file tree
Showing 55 changed files with 737 additions and 602 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
}
}

cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection, feedTestNoForcedSyntheticTimestamps)
cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection)
}

func TestAlterChangefeedInitialScan(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown metadata timestamp type: %T`, u))
}
native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), timestampToString(ts))
native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime())
}
}
if r.opts.resolvedField {
Expand All @@ -1040,7 +1040,7 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown metadata timestamp type: %T`, u))
}
native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), timestampToString(ts))
native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime())
}
}
for k := range meta {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6894,7 +6894,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
// TODO(ssd): Tenant testing disabled because of use of DB()
for _, sz := range []int64{100 << 20, 100} {
maxCheckpointSize = sz
cdcTestNamedWithSystem(t, fmt.Sprintf("limit=%s", humanize.Bytes(uint64(sz))), testFn, feedTestForceSink("webhook"), feedTestNoForcedSyntheticTimestamps)
cdcTestNamedWithSystem(t, fmt.Sprintf("limit=%s", humanize.Bytes(uint64(sz))), testFn, feedTestForceSink("webhook"))
}
}

Expand Down
7 changes: 0 additions & 7 deletions pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,3 @@ func getEncoder(
return nil, errors.AssertionFailedf(`unknown format: %s`, opts.Format)
}
}

// timestampToString converts an internal timestamp to the string form used in
// all encoders. This could be made more efficient. And/or it could be configurable
// to include the Synthetic flag when present, but that's unlikely to be needed.
func timestampToString(t hlc.Timestamp) string {
return t.WithSynthetic(false).AsOfSystemTime()
}
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/encoder_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,13 @@ func (e *jsonEncoder) initRawEnvelope() error {
}

if e.updatedField {
if err := metaBuilder.Set("updated", json.FromString(timestampToString(evCtx.updated))); err != nil {
if err := metaBuilder.Set("updated", json.FromString(evCtx.updated.AsOfSystemTime())); err != nil {
return nil, err
}
}

if e.mvccTimestampField {
if err := metaBuilder.Set("mvcc_timestamp", json.FromString(timestampToString(evCtx.mvcc))); err != nil {
if err := metaBuilder.Set("mvcc_timestamp", json.FromString(evCtx.mvcc.AsOfSystemTime())); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -355,13 +355,13 @@ func (e *jsonEncoder) initWrappedEnvelope() error {
}

if e.updatedField {
if err := b.Set("updated", json.FromString(timestampToString(evCtx.updated))); err != nil {
if err := b.Set("updated", json.FromString(evCtx.updated.AsOfSystemTime())); err != nil {
return nil, err
}
}

if e.mvccTimestampField {
if err := b.Set("mvcc_timestamp", json.FromString(timestampToString(evCtx.mvcc))); err != nil {
if err := b.Set("mvcc_timestamp", json.FromString(evCtx.mvcc.AsOfSystemTime())); err != nil {
return nil, err
}
}
Expand Down
31 changes: 0 additions & 31 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,6 @@ type feedTestOptions struct {
externalIODir string
allowedSinkTypes []string
disabledSinkTypes []string
disableSyntheticTimestamps bool
settings *cluster.Settings
}

Expand All @@ -570,12 +569,6 @@ var feedTestNoExternalConnection = func(opts *feedTestOptions) { opts.forceNoExt
// has privileges to create changefeeds on tables in the default database `d` only.
var feedTestUseRootUserConnection = func(opts *feedTestOptions) { opts.forceRootUserConnection = true }

// feedTestNoForcedSyntheticTimestamps is a feedTestOption that will prevent
// the test from randomly forcing timestamps to be synthetic and offset five seconds into the future from
// what they would otherwise be. It doesn't prevent synthetic timestamps but they're otherwise unlikely to
// occur in tests.
var feedTestNoForcedSyntheticTimestamps = func(opts *feedTestOptions) { opts.disableSyntheticTimestamps = true }

var feedTestForceSink = func(sinkType string) feedTestOption {
return feedTestRestrictSinks(sinkType)
}
Expand Down Expand Up @@ -631,30 +624,6 @@ func makeOptions(opts ...feedTestOption) feedTestOptions {
for _, o := range opts {
o(&options)
}
if !options.disableSyntheticTimestamps && rand.Intn(2) == 0 {
// Offset all timestamps a random (but consistent per test) amount into the
// future to ensure we can handle that. Always chooses an integer number of
// seconds for easier debugging and so that 0 is a possibility.
offset := int64(rand.Intn(6)) * time.Second.Nanoseconds()
// TODO(#105053): Remove this line
_ = offset
oldKnobsFn := options.knobsFn
options.knobsFn = func(knobs *base.TestingKnobs) {
if oldKnobsFn != nil {
oldKnobsFn(knobs)
}
knobs.DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs).FeedKnobs.ModifyTimestamps = func(t *hlc.Timestamp) {
// NOTE(ricky): This line of code should be uncommented.
// It used to be just t.Add(offset, 0), but t.Add() has no side
// effects so this was a no-op. *t = t.Add(offset, 0) is correct,
// but causes test failures.
// TODO(#105053): Uncomment and fix test failures
//*t = t.Add(offset, 0)
t.Synthetic = true
}
}
}
return options
}

Expand Down
8 changes: 0 additions & 8 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,12 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
return err
}
}
if p.knobs.ModifyTimestamps != nil {
e = kvcoord.RangeFeedMessage{RangeFeedEvent: e.ShallowCopy(), RegisteredSpan: e.RegisteredSpan}
p.knobs.ModifyTimestamps(&e.Val.Value.Timestamp)
}
if err := p.memBuf.Add(
ctx, kvevent.MakeKVEvent(e.RangeFeedEvent),
); err != nil {
return err
}
case *kvpb.RangeFeedCheckpoint:
if p.knobs.ModifyTimestamps != nil {
e = kvcoord.RangeFeedMessage{RangeFeedEvent: e.ShallowCopy(), RegisteredSpan: e.RegisteredSpan}
p.knobs.ModifyTimestamps(&e.Checkpoint.ResolvedTS)
}
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) {
// RangeFeed happily forwards any closed timestamps it receives as
// soon as there are no outstanding intents under them.
Expand Down
4 changes: 0 additions & 4 deletions pkg/ccl/changefeedccl/kvfeed/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// TestingKnobs are the testing knobs for kvfeed.
Expand All @@ -30,9 +29,6 @@ type TestingKnobs struct {
// EndTimeReached is a callback that may return true to indicate the
// feed should exit because its end time has been reached.
EndTimeReached func() bool
// ModifyTimestamps is called on the timestamp for each RangefeedMessage
// before converting it into a kv event.
ModifyTimestamps func(*hlc.Timestamp)
// RangefeedOptions lets the kvfeed override rangefeed settings.
RangefeedOptions []kvcoord.RangeFeedOption
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,10 @@ func (w *parquetWriter) populateDatums(
datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString())

if w.encodingOpts.UpdatedTimestamps {
datums = append(datums, tree.NewDString(timestampToString(updated)))
datums = append(datums, tree.NewDString(updated.AsOfSystemTime()))
}
if w.encodingOpts.MVCCTimestamps {
datums = append(datums, tree.NewDString(timestampToString(mvcc)))
datums = append(datums, tree.NewDString(mvcc.AsOfSystemTime()))
}
if w.encodingOpts.Diff {
if prevRow.IsDeleted() {
Expand Down
21 changes: 20 additions & 1 deletion pkg/ccl/revertccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,38 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "revertccl",
srcs = ["revert.go"],
srcs = [
"alter_reset_tenant.go",
"revert.go",
"revert_tenant.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/revertccl",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/utilccl",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/multitenant/mtinfopb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/sql",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/clusterunique",
"//pkg/sql/exprutil",
"//pkg/sql/isql",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/asof",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessionprotectedts",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/syncutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@org_golang_x_sync//errgroup",
],
Expand Down
84 changes: 84 additions & 0 deletions pkg/ccl/revertccl/alter_reset_tenant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2024 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package revertccl

import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/asof"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
)

const (
alterTenantResetOp = "ALTER VIRTUAL CLUSTER RESET"
)

func alterTenantResetHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) {
alterTenantStmt, ok := stmt.(*tree.AlterTenantReset)
if !ok {
return nil, nil, nil, false, nil
}
if !p.ExecCfg().Codec.ForSystemTenant() {
return nil, nil, nil, false, pgerror.Newf(pgcode.InsufficientPrivilege, "only the system tenant can alter tenant")
}

timestamp, err := asof.EvalSystemTimeExpr(ctx, &p.ExtendedEvalContext().Context, p.SemaCtx(), alterTenantStmt.Timestamp,
alterTenantResetOp, asof.ReplicationCutover)
if err != nil {
return nil, nil, nil, false, err
}

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
if err := sql.CanManageTenant(ctx, p); err != nil {
return err
}
if err := utilccl.CheckEnterpriseEnabled(p.ExecCfg().Settings, alterTenantResetOp); err != nil {
return err
}

tenInfo, err := p.LookupTenantInfo(ctx, alterTenantStmt.TenantSpec, alterTenantResetOp)
if err != nil {
return err
}
return RevertTenantToTimestamp(ctx, &p.ExtendedEvalContext().Context, tenInfo.Name, timestamp, p.ExtendedEvalContext().SessionID)
}
return fn, nil, nil, false, nil
}

func alterTenantResetHookTypeCheck(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (bool, colinfo.ResultColumns, error) {
alterStmt, ok := stmt.(*tree.AlterTenantReset)
if !ok {
return false, nil, nil
}
if err := exprutil.TypeCheck(
ctx, alterTenantResetOp, p.SemaCtx(), exprutil.TenantSpec{TenantSpec: alterStmt.TenantSpec},
); err != nil {
return false, nil, err
}
if _, err := asof.TypeCheckSystemTimeExpr(
ctx, p.SemaCtx(), alterStmt.Timestamp, alterTenantResetOp,
); err != nil {
return false, nil, err
}
return true, nil, nil
}

func init() {
sql.AddPlanHook("alter virtual cluster reset", alterTenantResetHook, alterTenantResetHookTypeCheck)
}
Loading

0 comments on commit 3bcf088

Please sign in to comment.