Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
110048: c2c: write all span config updates from intial scan in one transaction r=stevendanna a=msbutler

Previously, the spanConfigIngestor treated updates from the rangefeed initial scan the same way as regular updates: the ingestor would incrementally update the destination side span config table by flushing updates with same source side transaction timestamp. To undestand why this is problematic, recall that an initial scan contains all the latest span config records for the relevant tenant. So, some of these updates may have already been written to the destination side. Further, the initial scan does not replicate source side span config updates that have since been written over by later updates. To summarize, its not possible to replicate a consistent view of the span config table by writing incremental updates surfaced by the initial scan.

In this patch, the ingestor now buffers all updates surfaced by the initial scan and flushes these updates in one transaction which also deletes all existing span config records for the replicating tenant.

Fixes #106823

Release note: None

110215: changefeedccl: Fix test data race r=miretskiy a=miretskiy

Fixes #110155

Release note: None

Co-authored-by: Michael Butler <butler@cockroachlabs.com>
Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
  • Loading branch information
3 people committed Sep 8, 2023
3 parents dbab89e + 7b1505e + a3f0373 commit 1184f96
Show file tree
Hide file tree
Showing 10 changed files with 550 additions and 172 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
}
case *kvpb.RangeFeedCheckpoint:
if p.knobs.ModifyTimestamps != nil {
p.knobs.ModifyTimestamps(&t.ResolvedTS)
e = kvcoord.RangeFeedMessage{RangeFeedEvent: e.ShallowCopy(), RegisteredSpan: e.RegisteredSpan}
p.knobs.ModifyTimestamps(&e.Checkpoint.ResolvedTS)
}
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) {
// RangeFeed happily forwards any closed timestamps it receives as
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ go_library(
"//pkg/security/username",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigkvaccessor",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/execinfra",
"//pkg/sql/isql",
"//pkg/sql/rowenc",
"//pkg/sql/sem/catconstants",
"//pkg/sql/sem/tree",
"//pkg/storage",
"//pkg/testutils",
Expand Down
75 changes: 75 additions & 0 deletions pkg/ccl/streamingccl/replicationtestutils/span_config_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,21 @@
package replicationtestutils

import (
"context"
"fmt"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -42,3 +53,67 @@ func RecordToEntry(record spanconfig.Record) roachpb.SpanConfigEntry {
Config: c,
}
}

// PrettyRecords pretty prints the span config target and config ttl.
func PrettyRecords(records []spanconfig.Record) string {
var b strings.Builder
for _, update := range records {
b.WriteString(fmt.Sprintf(" %s: ttl %d,", update.GetTarget().GetSpan(), update.GetConfig().GCPolicy.TTLSeconds))
}
return b.String()
}

// NewReplicationHelperWithDummySpanConfigTable creates a new ReplicationHelper,
// a tenant, and a mock span config table that a spanConfigStreamClient can
// listen for updates on. To mimic tenant creation, this helper writes a
// spanConfig with the target [tenantPrefix,tenantPrefix.Next()). During tenant
// creation, this span config induces a range split on the tenant's start key.
func NewReplicationHelperWithDummySpanConfigTable(
ctx context.Context, t *testing.T, streamingTestKnobs *sql.StreamingTestingKnobs,
) (*ReplicationHelper, *spanconfigkvaccessor.KVAccessor, TenantState, func()) {
const dummySpanConfigurationsName = "dummy_span_configurations"
dummyFQN := tree.NewTableNameWithSchema("d", catconstants.PublicSchemaName, dummySpanConfigurationsName)

streamingTestKnobs.MockSpanConfigTableName = dummyFQN
h, cleanup := NewReplicationHelper(t, base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly,
Knobs: base.TestingKnobs{
Streaming: streamingTestKnobs,
},
})

h.SysSQL.Exec(t, `
CREATE DATABASE d;
USE d;`)
h.SysSQL.Exec(t, fmt.Sprintf("CREATE TABLE %s (LIKE system.span_configurations INCLUDING ALL)", dummyFQN))

sourceAccessor := spanconfigkvaccessor.New(
h.SysServer.DB(),
h.SysServer.InternalExecutor().(isql.Executor),
h.SysServer.ClusterSettings(),
h.SysServer.Clock(),
dummyFQN.String(),
nil, /* knobs */
)

sourceTenantID := roachpb.MustMakeTenantID(uint64(10))
sourceTenant, tenantCleanup := h.CreateTenant(t, sourceTenantID, "app")

// To mimic tenant creation, write the source tenant split key to the dummy
// span config table. For more info on this split key, read up on
// https://github.com/cockroachdb/cockroach/pull/104920
tenantPrefix := keys.MakeTenantPrefix(sourceTenantID)
tenantSplitSpan := roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.Next()}

require.NoError(t, sourceAccessor.UpdateSpanConfigRecords(
ctx,
[]spanconfig.Target{},
[]spanconfig.Record{MakeSpanConfigRecord(t, tenantSplitSpan, 14400)},
hlc.MinTimestamp,
hlc.MaxTimestamp))

return h, sourceAccessor, sourceTenant, func() {
tenantCleanup()
cleanup()
}
}
2 changes: 0 additions & 2 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,13 @@ go_test(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigkvaccessor",
"//pkg/sql",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/isql",
"//pkg/sql/physicalplan",
"//pkg/sql/sem/catconstants",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlliveness",
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamingest/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ func TestDataDriven(t *testing.T) {
jobutils.WaitForJobToFail(t, runner, jobPBID)
case "reverting":
jobutils.WaitForJobReverting(t, runner, jobPBID)
case "running":
jobutils.WaitForJobToRun(t, runner, jobPBID)
default:
t.Fatalf("unknown state %s", state)
}
Expand Down
146 changes: 110 additions & 36 deletions pkg/ccl/streamingccl/streamingest/ingest_span_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -48,24 +49,31 @@ import (
// applies to) never overlaps with any other span configuration target. Else,
// C2C would break the span config reconciliation system.
//
// TODO(msbutler): on an initial scan, we need to buffer up all updates and
// write them to the span config table in one transaction, along with a delete
// over the whole tenant key span. Since C2C does not lay a PTS on the source
// side span config table, the initial scan on resumption may miss updates;
// therefore, the only way to cleanly update the destination side span config
// table is to write the latest state of the source table and delete all
// existing state, in one transaction.
// During the rangefeed initial scan, the spanConfigIngestor buffers up all
// updates and writes them to the span config table in one transaction, along
// with a delete over the whole tenant key span. The span configuration
// ingestion does not create a PTS record for the source side span configuration
// table to avoid the possibility of an errant physical replication job from
// impacting a system table's GC. As a result, on resumption we must do a new
// initial scan, rebuilding the config from scratch, to avoid missing data that
// may have been GC'd.
type spanConfigIngestor struct {
accessor spanconfig.KVAccessor
// State passed at creation.
accessor spanconfig.KVAccessor
session sqlliveness.Session
stopperCh chan struct{}
settings *cluster.Settings
client streamclient.Client
rekeyer *backupccl.KeyRewriter
destinationTenantKeySpan roachpb.Span
db *kv.DB
testingKnobs *sql.StreamingTestingKnobs

// Dynamic state maintained during ingestion.
initialScanComplete bool
bufferedUpdates []spanconfig.Record
bufferedDeletes []spanconfig.Target
lastBufferedSourceTimestamp hlc.Timestamp
session sqlliveness.Session
stopperCh chan struct{}
settings *cluster.Settings
client streamclient.Client
rekeyer *backupccl.KeyRewriter
testingKnobs *sql.StreamingTestingKnobs
}

func makeSpanConfigIngestor(
Expand Down Expand Up @@ -93,15 +101,20 @@ func makeSpanConfigIngestor(
if err != nil {
return nil, err
}

destTenantStartKey := keys.MakeTenantPrefix(details.DestinationTenantID)
destTenantSpan := roachpb.Span{Key: destTenantStartKey, EndKey: destTenantStartKey.PrefixEnd()}
log.Infof(ctx, "initialized span config ingestor")
return &spanConfigIngestor{
accessor: execCfg.SpanConfigKVAccessor,
settings: execCfg.Settings,
session: ingestionJob.Session(),
client: client,
rekeyer: rekeyer,
stopperCh: stopperCh,
testingKnobs: execCfg.StreamingTestingKnobs,
accessor: execCfg.SpanConfigKVAccessor,
settings: execCfg.Settings,
session: ingestionJob.Session(),
client: client,
rekeyer: rekeyer,
stopperCh: stopperCh,
destinationTenantKeySpan: destTenantSpan,
db: execCfg.DB,
testingKnobs: execCfg.StreamingTestingKnobs,
}, nil
}

Expand Down Expand Up @@ -147,7 +160,7 @@ func (sc *spanConfigIngestor) consumeEvent(ctx context.Context, event streamingc
case streamingccl.SpanConfigEvent:
return sc.bufferRecord(ctx, event.GetSpanConfigEvent())
case streamingccl.CheckpointEvent:
return sc.maybeFlushEvents(ctx)
return sc.maybeFlushOnCheckpoint(ctx)
default:
return errors.AssertionFailedf("received non span config update %s", event)
}
Expand Down Expand Up @@ -179,11 +192,8 @@ func (sc *spanConfigIngestor) bufferRecord(
return nil
}
targetSpan := roachpb.Span{Key: destStartKey, EndKey: destEndKey}
if sc.lastBufferedSourceTimestamp.Less(update.Timestamp) {
// If this event was originally written at a later timestamp than what's in the buffer, flush the buffer.
if err := sc.maybeFlushEvents(ctx); err != nil {
return err
}
if err := sc.maybeFlushOnUpdate(ctx, update.Timestamp); err != nil {
return err
}
target := spanconfig.MakeTargetFromSpan(targetSpan)
if update.SpanConfig.Config.IsEmpty() {
Expand All @@ -198,9 +208,27 @@ func (sc *spanConfigIngestor) bufferRecord(
sc.lastBufferedSourceTimestamp = update.Timestamp
return nil
}
func (sc *spanConfigIngestor) maybeFlushEvents(ctx context.Context) error {
if len(sc.bufferedUpdates) != 0 || len(sc.bufferedDeletes) != 0 {

func (sc *spanConfigIngestor) bufferIsEmpty() bool {
return len(sc.bufferedUpdates) == 0 && len(sc.bufferedDeletes) == 0
}
func (sc *spanConfigIngestor) maybeFlushOnUpdate(
ctx context.Context, updateTimestamp hlc.Timestamp,
) error {
// If this event was originally written at a later timestamp and the initial scan has complete, flush the current buffer.
if sc.initialScanComplete &&
sc.lastBufferedSourceTimestamp.Less(updateTimestamp) &&
!sc.bufferIsEmpty() {
return sc.flushEvents(ctx)
}
return nil
}

func (sc *spanConfigIngestor) maybeFlushOnCheckpoint(ctx context.Context) error {
if !sc.bufferIsEmpty() {
return sc.flushEvents(ctx)
} else if !sc.initialScanComplete {
return errors.AssertionFailedf("a flush after the initial scan checkpoint must have data in it")
}
return nil
}
Expand All @@ -209,10 +237,6 @@ func (sc *spanConfigIngestor) maybeFlushEvents(ctx context.Context) error {
// in one transaction via kvAccesor.UpdateSpanConfigRecords.
func (sc *spanConfigIngestor) flushEvents(ctx context.Context) error {
log.VEventf(ctx, 2, "flushing span config %d updates and %d deletes", len(sc.bufferedUpdates), len(sc.bufferedDeletes))
if sc.testingKnobs != nil && sc.testingKnobs.BeforeIngestSpanConfigFlush != nil {
sc.testingKnobs.BeforeIngestSpanConfigFlush(ctx, sc.bufferedUpdates, sc.bufferedDeletes)
}

retryOpts := retry.Options{
InitialBackoff: 1 * time.Second,
MaxBackoff: 5 * time.Second,
Expand All @@ -224,9 +248,18 @@ func (sc *spanConfigIngestor) flushEvents(ctx context.Context) error {
if sessionExpiration.IsEmpty() {
return errors.Errorf("sqlliveness session has expired")
}
err := sc.accessor.UpdateSpanConfigRecords(
ctx, sc.bufferedDeletes, sc.bufferedUpdates, sessionStart, sessionExpiration,
)
var err error
if !sc.initialScanComplete {
// The first flush will always contain all span configs found during the initial scan.
err = sc.flushInitialScan(ctx, sessionStart, sessionExpiration)
} else {
err = sc.accessor.UpdateSpanConfigRecords(
ctx, sc.bufferedDeletes, sc.bufferedUpdates, sessionStart, sessionExpiration,
)
if sc.testingKnobs != nil && sc.testingKnobs.RightAfterSpanConfigFlush != nil {
sc.testingKnobs.RightAfterSpanConfigFlush(ctx, sc.bufferedUpdates, sc.bufferedDeletes)
}
}
if err != nil {
if spanconfig.IsCommitTimestampOutOfBoundsError(err) {
// We expect the underlying sqlliveness session's expiration to be
Expand All @@ -239,7 +272,48 @@ func (sc *spanConfigIngestor) flushEvents(ctx context.Context) error {
}
break
}

sc.bufferedUpdates = sc.bufferedUpdates[:0]
sc.bufferedDeletes = sc.bufferedDeletes[:0]
return nil
}

// flushInitialScan flushes all contents from the source side rangefeed's
// initial scan. The function assumes the buffer contains only updates from the
// initial scan. To obey destination side span config invariants, the function
// deletes all existing span config records related to the replicating tenant in
// the same transaction that it writes all initial scan updates.
func (sc *spanConfigIngestor) flushInitialScan(
ctx context.Context, sessionStart, sessionExpiration hlc.Timestamp,
) error {
log.Infof(ctx, "flushing initial span configuration state (%d records)", len(sc.bufferedUpdates))

if len(sc.bufferedDeletes) != 0 {
return errors.AssertionFailedf("initial scan flush should not contain records to delete")
}
target := spanconfig.MakeTargetFromSpan(sc.destinationTenantKeySpan)
return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
accessor := sc.accessor.WithTxn(ctx, txn)
existingRecords, err := accessor.GetSpanConfigRecords(ctx, []spanconfig.Target{target})
if err != nil {
return err
}
// Within the txn, we allocate a new buffer for deletes, instead of using
// sc.BufferedDeletes, because if the transaction retries, we don't want to
// worry about clearing the spanConfigIngestor's delete buffer.
bufferedDeletes := make([]spanconfig.Target, 0, len(existingRecords))
for _, record := range existingRecords {
bufferedDeletes = append(bufferedDeletes, record.GetTarget())
}
if err := accessor.UpdateSpanConfigRecords(
ctx, bufferedDeletes, sc.bufferedUpdates, sessionStart, sessionExpiration,
); err != nil {
return err
}
if sc.testingKnobs != nil && sc.testingKnobs.RightAfterSpanConfigFlush != nil {
sc.testingKnobs.RightAfterSpanConfigFlush(ctx, sc.bufferedUpdates, bufferedDeletes)
}
sc.initialScanComplete = true
return nil
})
}
Loading

0 comments on commit 1184f96

Please sign in to comment.