diff --git a/pkg/ccl/changefeedccl/cdctest/nemeses.go b/pkg/ccl/changefeedccl/cdctest/nemeses.go index 77c50ffac26c..78792ea0898b 100644 --- a/pkg/ccl/changefeedccl/cdctest/nemeses.go +++ b/pkg/ccl/changefeedccl/cdctest/nemeses.go @@ -9,8 +9,10 @@ package cdctest import ( + "bytes" "context" gosql "database/sql" + "fmt" "math/rand" "strings" "time" @@ -55,35 +57,67 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, omitPause bool) (Validator, err eventPauseCount = 0 } ns := &nemeses{ - rowCount: 4, - db: db, - usingPoller: !usingRangeFeed, + maxTestColumnCount: 10, + rowCount: 4, + db: db, + usingPoller: !usingRangeFeed, // eventMix does not have to add to 100 eventMix: map[fsm.Event]int{ - // eventTransact opens an UPSERT transaction is there is not one open. If - // there is one open, it either commits it or rolls it back. - eventTransact{}: 50, + // We don't want `eventFinished` to ever be returned by `nextEvent` so we set + // its weight to 0. + eventFinished{}: 0, // eventFeedMessage reads a message from the feed, or if the state machine - // thinks there will be no message available, it falls back to - // eventTransact. + // thinks there will be no message available, it falls back to eventOpenTxn or + // eventCommit (if there is already a txn open). eventFeedMessage{}: 50, - // eventPause PAUSEs the changefeed. The state machine will handle - // RESUMEing it. - eventPause{}: eventPauseCount, + // eventSplit splits between two random rows (the split is a no-op if it + // already exists). + eventSplit{}: 5, - // eventPush pushes every open transaction by running a high priority - // SELECT. + // TRANSACTIONS + // eventOpenTxn opens an UPSERT or DELETE transaction. + eventOpenTxn{}: 10, + + // eventCommit commits the outstanding transaction. + eventCommit{}: 5, + + // eventRollback simply rolls the outstanding transaction back. + eventRollback{}: 5, + + // eventPush pushes every open transaction by running a high priority SELECT. eventPush{}: 5, - // eventAbort aborts every open transaction by running a high priority - // DELETE. + // eventAbort aborts every open transaction by running a high priority DELETE. eventAbort{}: 5, - // eventSplit splits between two random rows (the split is a no-op if it - // already exists). - eventSplit{}: 5, + // PAUSE / RESUME + // eventPause PAUSEs the changefeed. + eventPause{}: eventPauseCount, + + // eventResume RESUMEs the changefeed. + eventResume{}: 50, + + // SCHEMA CHANGES + // eventAddColumn performs a schema change by adding a new column with a default + // value in order to trigger a backfill. + eventAddColumn{ + CanAddColumnAfter: fsm.True, + }: 5, + + eventAddColumn{ + CanAddColumnAfter: fsm.False, + }: 5, + + // eventRemoveColumn performs a schema change by removing a column. + eventRemoveColumn{ + CanRemoveColumnAfter: fsm.True, + }: 5, + + eventRemoveColumn{ + CanRemoveColumnAfter: fsm.False, + }: 5, }, } @@ -98,14 +132,23 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, omitPause bool) (Validator, err return nil, err } - // Initialize table rows by repeatedly running the `transact` transition, - // which randomly starts, commits, and rolls back transactions. This will - // leave some committed rows and maybe an outstanding intent for the initial - // scan. + // Initialize table rows by repeatedly running the `openTxn` transition, + // then randomly either committing or rolling back transactions. This will + // leave some committed rows. for i := 0; i < ns.rowCount*5; i++ { - if err := transact(fsm.Args{Ctx: ctx, Extended: ns}); err != nil { + if err := openTxn(fsm.Args{Ctx: ctx, Extended: ns}); err != nil { return nil, err } + // Randomly commit or rollback, but commit at least one row to the table. + if rand.Intn(3) < 2 || i == 0 { + if err := commit(fsm.Args{Ctx: ctx, Extended: ns}); err != nil { + return nil, err + } + } else { + if err := rollback(fsm.Args{Ctx: ctx, Extended: ns}); err != nil { + return nil, err + } + } } foo, err := f.Feed(`CREATE CHANGEFEED FOR foo WITH updated, resolved`) @@ -115,10 +158,15 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, omitPause bool) (Validator, err ns.f = foo defer func() { _ = foo.Close() }() - if _, err := db.Exec(`CREATE TABLE fprint (id INT PRIMARY KEY, ts STRING)`); err != nil { + // Create scratch table with a pre-specified set of test columns to avoid having to + // accommodate schema changes on-the-fly. + scratchTableName := `fprint` + var createFprintStmtBuf bytes.Buffer + fmt.Fprintf(&createFprintStmtBuf, `CREATE TABLE %s (id INT PRIMARY KEY, ts STRING)`, scratchTableName) + if _, err := db.Exec(createFprintStmtBuf.String()); err != nil { return nil, err } - fprintV, err := NewFingerprintValidator(db, `foo`, `fprint`, foo.Partitions()) + fprintV, err := NewFingerprintValidator(db, `foo`, scratchTableName, foo.Partitions(), ns.maxTestColumnCount) if err != nil { return nil, err } @@ -127,38 +175,39 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, omitPause bool) (Validator, err fprintV, }) - // Initialize the actual row count, overwriting what `transact` did. - // `transact` has set this to the number of modified rows, which is correct - // during changefeed operation, but not for the initial scan, because some of - // the rows may have had the same primary key. + // Initialize the actual row count, overwriting what the initialization loop did. That + // loop has set this to the number of modified rows, which is correct during + // changefeed operation, but not for the initial scan, because some of the rows may + // have had the same primary key. if err := db.QueryRow(`SELECT count(*) FROM foo`).Scan(&ns.availableRows); err != nil { return nil, err } - // Kick everything off by reading the first message. This accomplishes two - // things. First, it maximizes the chance that we hit an unresolved intent - // during the initial scan. Second, it guarantees that the feed is running - // before anything else commits, which could mess up the availableRows count - // we just set. - if err := noteFeedMessage(fsm.Args{Ctx: ctx, Extended: ns}); err != nil { - return nil, err - } - // Now push everything to make sure the initial scan can complete, otherwise - // we may deadlock. - if err := push(fsm.Args{Ctx: ctx, Extended: ns}); err != nil { - return nil, err + txnOpenBeforeInitialScan := false + // Maybe open an intent. + if rand.Intn(2) < 1 { + txnOpenBeforeInitialScan = true + if err := openTxn(fsm.Args{Ctx: ctx, Extended: ns}); err != nil { + return nil, err + } } // Run the state machine until it finishes. Exit criteria is in `nextEvent` // and is based on the number of rows that have been resolved and the number // of resolved timestamp messages. - m := fsm.MakeMachine(txnStateTransitions, stateRunning{fsm.False}, ns) + initialState := stateRunning{ + FeedPaused: fsm.False, + TxnOpen: fsm.FromBool(txnOpenBeforeInitialScan), + CanAddColumn: fsm.True, + CanRemoveColumn: fsm.False, + } + m := fsm.MakeMachine(compiledStateTransitions, initialState, ns) for { state := m.CurState() if _, ok := state.(stateDone); ok { return ns.v, nil } - event, err := ns.nextEvent(rng, state, foo) + event, err := ns.nextEvent(rng, state, foo, &m) if err != nil { return nil, err } @@ -176,130 +225,277 @@ const ( ) type nemeses struct { - rowCount int - eventMix map[fsm.Event]int - mixTotal int - usingPoller bool + rowCount int + maxTestColumnCount int + eventMix map[fsm.Event]int + usingPoller bool v *CountValidator db *gosql.DB f TestFeed - availableRows int - txn *gosql.Tx - openTxnType openTxnType - openTxnID int - openTxnTs string + availableRows int + currentTestColumnCount int + txn *gosql.Tx + openTxnType openTxnType + openTxnID int + openTxnTs string } // nextEvent selects the next state transition. -func (ns *nemeses) nextEvent(rng *rand.Rand, state fsm.State, f TestFeed) (fsm.Event, error) { +func (ns *nemeses) nextEvent( + rng *rand.Rand, state fsm.State, f TestFeed, m *fsm.Machine, +) (se fsm.Event, err error) { if ns.v.NumResolvedWithRows >= 6 && ns.v.NumResolvedRows >= 10 { return eventFinished{}, nil } - - if ns.mixTotal == 0 { - for _, weight := range ns.eventMix { - ns.mixTotal += weight + possibleEvents, ok := compiledStateTransitions.GetExpanded()[state] + if !ok { + return nil, errors.Errorf(`unknown state: %T %s`, state, state) + } + mixTotal := 0 + for event := range possibleEvents { + weight, ok := ns.eventMix[event] + if !ok { + return nil, errors.Errorf(`unknown event: %T`, event) } + mixTotal += weight } - - switch state { - case stateRunning{Paused: fsm.True}: - return eventResume{}, nil - case stateRunning{Paused: fsm.False}: - r, t := rng.Intn(ns.mixTotal), 0 - for event, weight := range ns.eventMix { - t += weight - if r >= t { - continue - } - if _, ok := event.(eventFeedMessage); ok { - break + r, t := rng.Intn(mixTotal), 0 + for event := range possibleEvents { + t += ns.eventMix[event] + if r >= t { + continue + } + if _, ok := event.(eventFeedMessage); ok { + // If there are no available rows, openTxn or commit outstanding txn instead + // of reading. + if ns.availableRows < 1 { + s := state.(stateRunning) + if s.TxnOpen.Get() { + return eventCommit{}, nil + } + return eventOpenTxn{}, nil } - return event, nil + return eventFeedMessage{}, nil } - - // If there are no available rows, transact instead of reading. - if ns.availableRows < 1 { - return eventTransact{}, nil + if e, ok := event.(eventAddColumn); ok { + e.CanAddColumnAfter = fsm.FromBool(ns.currentTestColumnCount < ns.maxTestColumnCount-1) + return e, nil } - return eventFeedMessage{}, nil - default: - return nil, errors.Errorf(`unknown state: %T %s`, state, state) + if e, ok := event.(eventRemoveColumn); ok { + e.CanRemoveColumnAfter = fsm.FromBool(ns.currentTestColumnCount > 1) + return e, nil + } + return event, nil } + + panic(`unreachable`) } type stateRunning struct { - Paused fsm.Bool + FeedPaused fsm.Bool + TxnOpen fsm.Bool + CanRemoveColumn fsm.Bool + CanAddColumn fsm.Bool } type stateDone struct{} func (stateRunning) State() {} func (stateDone) State() {} -type eventTransact struct{} +type eventOpenTxn struct{} type eventFeedMessage struct{} type eventPause struct{} type eventResume struct{} +type eventCommit struct{} type eventPush struct{} type eventAbort struct{} +type eventRollback struct{} type eventSplit struct{} +type eventAddColumn struct { + CanAddColumnAfter fsm.Bool +} +type eventRemoveColumn struct { + CanRemoveColumnAfter fsm.Bool +} type eventFinished struct{} -func (eventTransact) Event() {} -func (eventFeedMessage) Event() {} -func (eventPause) Event() {} -func (eventResume) Event() {} -func (eventPush) Event() {} -func (eventAbort) Event() {} -func (eventSplit) Event() {} -func (eventFinished) Event() {} - -var txnStateTransitions = fsm.Compile(fsm.Pattern{ - stateRunning{Paused: fsm.Any}: { +func (eventOpenTxn) Event() {} +func (eventFeedMessage) Event() {} +func (eventPause) Event() {} +func (eventResume) Event() {} +func (eventCommit) Event() {} +func (eventPush) Event() {} +func (eventAbort) Event() {} +func (eventRollback) Event() {} +func (eventSplit) Event() {} +func (eventAddColumn) Event() {} +func (eventRemoveColumn) Event() {} +func (eventFinished) Event() {} + +var stateTransitions = fsm.Pattern{ + stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.Var("TxnOpen"), + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn"), + }: { + eventSplit{}: { + Next: stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.Var("TxnOpen"), + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, + Action: logEvent(split), + }, eventFinished{}: { Next: stateDone{}, Action: logEvent(cleanup), }, }, - stateRunning{Paused: fsm.False}: { - eventPause{}: { - Next: stateRunning{Paused: fsm.True}, - Action: logEvent(pause), + stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.False, + CanAddColumn: fsm.True, + CanRemoveColumn: fsm.Any, + }: { + eventAddColumn{ + CanAddColumnAfter: fsm.Var("CanAddColumnAfter"), + }: { + Next: stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.False, + CanAddColumn: fsm.Var("CanAddColumnAfter"), + CanRemoveColumn: fsm.True}, + Action: logEvent(addColumn), }, - eventTransact{}: { - Next: stateRunning{Paused: fsm.False}, - Action: logEvent(transact), + }, + stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.False, + CanAddColumn: fsm.Any, + CanRemoveColumn: fsm.True, + }: { + eventRemoveColumn{ + CanRemoveColumnAfter: fsm.Var("CanRemoveColumnAfter"), + }: { + Next: stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.False, + CanAddColumn: fsm.True, + CanRemoveColumn: fsm.Var("CanRemoveColumnAfter")}, + Action: logEvent(removeColumn), }, + }, + stateRunning{ + FeedPaused: fsm.False, + TxnOpen: fsm.False, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn"), + }: { eventFeedMessage{}: { - Next: stateRunning{Paused: fsm.False}, + Next: stateRunning{ + FeedPaused: fsm.False, + TxnOpen: fsm.False, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, Action: logEvent(noteFeedMessage), }, - eventPush{}: { - Next: stateRunning{Paused: fsm.False}, - Action: logEvent(push), + }, + stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.False, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn"), + }: { + eventOpenTxn{}: { + Next: stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.True, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, + Action: logEvent(openTxn), + }, + }, + stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.True, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn"), + }: { + eventCommit{}: { + Next: stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.False, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, + Action: logEvent(commit), + }, + eventRollback{}: { + Next: stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.False, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, + Action: logEvent(rollback), }, eventAbort{}: { - Next: stateRunning{Paused: fsm.False}, + Next: stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.True, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, Action: logEvent(abort), }, - eventSplit{}: { - Next: stateRunning{Paused: fsm.False}, - Action: logEvent(split), + eventPush{}: { + Next: stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.True, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, + Action: logEvent(push), }, }, - stateRunning{Paused: fsm.True}: { + stateRunning{ + FeedPaused: fsm.False, + TxnOpen: fsm.Var("TxnOpen"), + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn"), + }: { + eventPause{}: { + Next: stateRunning{ + FeedPaused: fsm.True, + TxnOpen: fsm.Var("TxnOpen"), + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, + Action: logEvent(pause), + }, + }, + stateRunning{ + FeedPaused: fsm.True, + TxnOpen: fsm.Var("TxnOpen"), + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn"), + }: { eventResume{}: { - Next: stateRunning{Paused: fsm.False}, + Next: stateRunning{ + FeedPaused: fsm.False, + TxnOpen: fsm.Var("TxnOpen"), + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, Action: logEvent(resume), }, }, -}) +} + +var compiledStateTransitions = fsm.Compile(stateTransitions) func logEvent(fn func(fsm.Args) error) func(fsm.Args) error { return func(a fsm.Args) error { - log.Infof(a.Ctx, "%#v\n", a.Event) + if log.V(1) { + log.Infof(a.Ctx, "%#v\n", a.Event) + } return fn(a) } } @@ -311,76 +507,125 @@ func cleanup(a fsm.Args) error { return nil } -func transact(a fsm.Args) error { +func openTxn(a fsm.Args) error { ns := a.Extended.(*nemeses) - // If there are no transactions, create one. - if ns.txn == nil { - const noDeleteSentinel = int(-1) - // 10% of the time attempt a DELETE. - deleteID := noDeleteSentinel - if rand.Intn(10) == 0 { - rows, err := ns.db.Query(`SELECT id FROM foo ORDER BY random() LIMIT 1`) - if err != nil { - return err - } - defer func() { _ = rows.Close() }() - if rows.Next() { - if err := rows.Scan(&deleteID); err != nil { - return err - } - } - // If there aren't any rows, skip the DELETE this time. - } - - txn, err := ns.db.Begin() + const noDeleteSentinel = int(-1) + // 10% of the time attempt a DELETE. + deleteID := noDeleteSentinel + if rand.Intn(10) == 0 { + rows, err := ns.db.Query(`SELECT id FROM foo ORDER BY random() LIMIT 1`) if err != nil { return err } - if deleteID == noDeleteSentinel { - if err := txn.QueryRow( - `UPSERT INTO foo VALUES ((random() * $1)::int, cluster_logical_timestamp()::string) RETURNING *`, - ns.rowCount, - ).Scan(&ns.openTxnID, &ns.openTxnTs); err != nil { - return err - } - ns.openTxnType = openTxnTypeUpsert - } else { - if err := txn.QueryRow( - `DELETE FROM foo WHERE id = $1 RETURNING *`, deleteID, - ).Scan(&ns.openTxnID, &ns.openTxnTs); err != nil { + defer func() { _ = rows.Close() }() + if rows.Next() { + if err := rows.Scan(&deleteID); err != nil { return err } - ns.openTxnType = openTxnTypeDelete } - ns.txn = txn - return nil + // If there aren't any rows, skip the DELETE this time. } - // If there is an outstanding transaction, roll it back half the time and - // commit it the other half. - txn := ns.txn - ns.txn = nil - - if rand.Intn(2) < 1 { - return txn.Rollback() + txn, err := ns.db.Begin() + if err != nil { + return err + } + if deleteID == noDeleteSentinel { + if err := txn.QueryRow( + `UPSERT INTO foo VALUES ((random() * $1)::int, cluster_logical_timestamp()::string) RETURNING id, ts`, + ns.rowCount, + ).Scan(&ns.openTxnID, &ns.openTxnTs); err != nil { + return err + } + ns.openTxnType = openTxnTypeUpsert + } else { + if err := txn.QueryRow( + `DELETE FROM foo WHERE id = $1 RETURNING id, ts`, deleteID, + ).Scan(&ns.openTxnID, &ns.openTxnTs); err != nil { + return err + } + ns.openTxnType = openTxnTypeDelete } - if err := txn.Commit(); err != nil { + ns.txn = txn + return nil +} + +func commit(a fsm.Args) error { + ns := a.Extended.(*nemeses) + defer func() { ns.txn = nil }() + if err := ns.txn.Commit(); err != nil { // Don't error out if we got pushed, but don't increment availableRows no // matter what error was hit. if strings.Contains(err.Error(), `restart transaction`) { return nil } - return err } - log.Infof(a.Ctx, "%s (%d, %s)", ns.openTxnType, ns.openTxnID, ns.openTxnTs) ns.availableRows++ return nil } +func rollback(a fsm.Args) error { + ns := a.Extended.(*nemeses) + defer func() { ns.txn = nil }() + return ns.txn.Rollback() +} + +func addColumn(a fsm.Args) error { + ns := a.Extended.(*nemeses) + + if ns.currentTestColumnCount >= ns.maxTestColumnCount { + return errors.Errorf(`addColumn should be called when`+ + `there are less than %d columns.`, ns.maxTestColumnCount) + } + + if _, err := ns.db.Exec(fmt.Sprintf(`ALTER TABLE foo ADD COLUMN test%d STRING DEFAULT 'x'`, + ns.currentTestColumnCount)); err != nil { + return err + } + ns.currentTestColumnCount++ + var rows int + // Adding a column should trigger a full table scan. + if err := ns.db.QueryRow(`SELECT count(*) FROM foo`).Scan(&rows); err != nil { + return err + } + // We expect one table scan that corresponds to the schema change backfill, and one + // scan that corresponds to the changefeed level backfill. + ns.availableRows += 2 * rows + return nil +} + +func removeColumn(a fsm.Args) error { + ns := a.Extended.(*nemeses) + + if ns.currentTestColumnCount == 0 { + return errors.Errorf(`removeColumn should be called with` + + `at least one test column.`) + } + if _, err := ns.db.Exec(fmt.Sprintf(`ALTER TABLE foo DROP COLUMN test%d`, + ns.currentTestColumnCount-1)); err != nil { + return err + } + ns.currentTestColumnCount-- + var rows int + // Dropping a column should trigger a full table scan. + if err := ns.db.QueryRow(`SELECT count(*) FROM foo`).Scan(&rows); err != nil { + return err + } + // We expect one table scan that corresponds to the schema change backfill, and one + // scan that corresponds to the changefeed level backfill. + ns.availableRows += 2 * rows + return nil +} + func noteFeedMessage(a fsm.Args) error { ns := a.Extended.(*nemeses) + if ns.availableRows <= 0 { + return errors.Errorf(`noteFeedMessage should be called with at` + + `least one available row.`) + } + // The poller works by continually selecting a timestamp to be the next // high-water and polling for changes between the last high-water and the new // one. It doesn't push any unresolved intents (it would enter the txnwaitq, @@ -437,12 +682,6 @@ func resume(a fsm.Args) error { return a.Extended.(*nemeses).f.Resume() } -func push(a fsm.Args) error { - ns := a.Extended.(*nemeses) - _, err := ns.db.Exec(`BEGIN TRANSACTION PRIORITY HIGH; SELECT * FROM foo; COMMIT`) - return err -} - func abort(a fsm.Args) error { ns := a.Extended.(*nemeses) const delete = `BEGIN TRANSACTION PRIORITY HIGH; ` + @@ -456,6 +695,12 @@ func abort(a fsm.Args) error { return nil } +func push(a fsm.Args) error { + ns := a.Extended.(*nemeses) + _, err := ns.db.Exec(`BEGIN TRANSACTION PRIORITY HIGH; SELECT * FROM foo; COMMIT`) + return err +} + func split(a fsm.Args) error { ns := a.Extended.(*nemeses) _, err := ns.db.Exec(`ALTER TABLE foo SPLIT AT VALUES ((random() * $1)::int)`, ns.rowCount) diff --git a/pkg/ccl/changefeedccl/cdctest/testfeed.go b/pkg/ccl/changefeedccl/cdctest/testfeed.go index 83feb05c3bc6..614813ced8fe 100644 --- a/pkg/ccl/changefeedccl/cdctest/testfeed.go +++ b/pkg/ccl/changefeedccl/cdctest/testfeed.go @@ -518,6 +518,25 @@ func (c *cloudFeed) Partitions() []string { return []string{cloudFeedPartition} } +// ReformatJSON marshals a golang stdlib based JSON into a byte slice preserving +// whitespace in accordance with the crdb json library. +func ReformatJSON(j interface{}) ([]byte, error) { + printed, err := gojson.Marshal(j) + if err != nil { + return nil, err + } + // The golang stdlib json library prints whitespace differently than our + // internal one. Roundtrip through the crdb json library to get the + // whitespace back to where it started. + parsed, err := json.ParseJSON(string(printed)) + if err != nil { + return nil, err + } + var buf bytes.Buffer + parsed.Format(&buf) + return buf.Bytes(), nil +} + // extractKeyFromJSONValue extracts the `WITH key_in_value` key from a `WITH // format=json, envelope=wrapped` value. func extractKeyFromJSONValue(wrapped []byte) (key []byte, value []byte, _ error) { @@ -528,28 +547,11 @@ func extractKeyFromJSONValue(wrapped []byte) (key []byte, value []byte, _ error) keyParsed := parsed[`key`] delete(parsed, `key`) - reformatJSON := func(j interface{}) ([]byte, error) { - printed, err := gojson.Marshal(j) - if err != nil { - return nil, err - } - // The golang stdlib json library prints whitespace differently than our - // internal one. Roundtrip through the crdb json library to get the - // whitespace back to where it started. - parsed, err := json.ParseJSON(string(printed)) - if err != nil { - return nil, err - } - var buf bytes.Buffer - parsed.Format(&buf) - return buf.Bytes(), nil - } - var err error - if key, err = reformatJSON(keyParsed); err != nil { + if key, err = ReformatJSON(keyParsed); err != nil { return nil, nil, err } - if value, err = reformatJSON(parsed); err != nil { + if value, err = ReformatJSON(parsed); err != nil { return nil, nil, err } return key, value, nil diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index bb34f0405444..67afb10b6315 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -126,17 +126,29 @@ type fingerprintValidator struct { // exists, which is valid but complicates the way fingerprintValidator works. // Don't create a fingerprint earlier than the first seen row. firstRowTimestamp hlc.Timestamp - - buffer []validatorRow + // previousRowUpdateTs keeps track of the timestamp of the most recently processed row + // update. Before starting to process row updates belonging to a particular timestamp + // X, we want to fingerprint at `X.Prev()` to catch any "missed" row updates. + // Maintaining `previousRowUpdateTs` allows us to do this. See `NoteResolved()` for + // more details. + previousRowUpdateTs hlc.Timestamp + + // `fprintOrigColumns` keeps track of the number of non test columns in `fprint`. + fprintOrigColumns int + fprintTestColumns int + buffer []validatorRow failures []string } -// NewFingerprintValidator returns a new FingerprintValidator that uses -// `fprintTable` as scratch space to recreate `origTable`. `fprintTable` must -// exist before calling this constructor. +// NewFingerprintValidator returns a new FingerprintValidator that uses `fprintTable` as +// scratch space to recreate `origTable`. `fprintTable` must exist before calling this +// constructor. `maxTestColumnCount` indicates the maximum number of columns that can be +// expected in `origTable` due to test-related schema changes. This fingerprint validator +// will modify `fprint`'s schema to add `maxTestColumnCount` columns to avoid having to +// accommodate schema changes on the fly. func NewFingerprintValidator( - sqlDB *gosql.DB, origTable, fprintTable string, partitions []string, + sqlDB *gosql.DB, origTable, fprintTable string, partitions []string, maxTestColumnCount int, ) (Validator, error) { // Fetch the primary keys though information_schema schema inspections so we // can use them to construct the SQL for DELETEs and also so we can verify @@ -153,6 +165,16 @@ func NewFingerprintValidator( if err != nil { return nil, err } + // Record the non-test%d columns in `fprint`. + var fprintOrigColumns int + if err := sqlDB.QueryRow(` + SELECT count(column_name) + FROM information_schema.columns + WHERE table_name=$1 + `, fprintTable).Scan(&fprintOrigColumns); err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() for rows.Next() { var primaryKeyCol string @@ -165,11 +187,28 @@ func NewFingerprintValidator( return nil, errors.Errorf("no primary key information found for %s", fprintTable) } + // Add test columns to fprint. + if maxTestColumnCount > 0 { + var addColumnStmt bytes.Buffer + addColumnStmt.WriteString(`ALTER TABLE fprint `) + for i := 0; i < maxTestColumnCount; i++ { + if i != 0 { + addColumnStmt.WriteString(`, `) + } + fmt.Fprintf(&addColumnStmt, `ADD COLUMN test%d STRING`, i) + } + if _, err := sqlDB.Query(addColumnStmt.String()); err != nil { + return nil, err + } + } + v := &fingerprintValidator{ - sqlDB: sqlDB, - origTable: origTable, - fprintTable: fprintTable, - primaryKeyCols: primaryKeyCols, + sqlDB: sqlDB, + origTable: origTable, + fprintTable: fprintTable, + primaryKeyCols: primaryKeyCols, + fprintOrigColumns: fprintOrigColumns, + fprintTestColumns: maxTestColumnCount, } v.partitionResolved = make(map[string]hlc.Timestamp) for _, partition := range partitions { @@ -192,6 +231,93 @@ func (v *fingerprintValidator) NoteRow( }) } +// applyRowUpdate applies the update represented by `row` to the scratch table. +func (v *fingerprintValidator) applyRowUpdate(row validatorRow) error { + txn, err := v.sqlDB.Begin() + if err != nil { + return err + } + var args []interface{} + + var primaryKeyDatums []interface{} + if err := gojson.Unmarshal([]byte(row.key), &primaryKeyDatums); err != nil { + return err + } + if len(primaryKeyDatums) != len(v.primaryKeyCols) { + return errors.Errorf(`expected primary key columns %s got datums %s`, + v.primaryKeyCols, primaryKeyDatums) + } + + var stmtBuf bytes.Buffer + type wrapper struct { + After map[string]interface{} `json:"after"` + } + var value wrapper + if err := gojson.Unmarshal([]byte(row.value), &value); err != nil { + return err + } + if value.After != nil { + // UPDATE or INSERT + fmt.Fprintf(&stmtBuf, `UPSERT INTO %s (`, v.fprintTable) + for col, colValue := range value.After { + if len(args) != 0 { + stmtBuf.WriteString(`,`) + } + stmtBuf.WriteString(col) + args = append(args, colValue) + } + for i := len(value.After) - v.fprintOrigColumns; i < v.fprintTestColumns; i++ { + fmt.Fprintf(&stmtBuf, `, test%d`, i) + args = append(args, nil) + } + stmtBuf.WriteString(`) VALUES (`) + for i := range args { + if i != 0 { + stmtBuf.WriteString(`,`) + } + fmt.Fprintf(&stmtBuf, `$%d`, i+1) + } + stmtBuf.WriteString(`)`) + + // Also verify that the key matches the value. + primaryKeyDatums = make([]interface{}, len(v.primaryKeyCols)) + for idx, primaryKeyCol := range v.primaryKeyCols { + primaryKeyDatums[idx] = value.After[primaryKeyCol] + } + primaryKeyJSON, err := gojson.Marshal(primaryKeyDatums) + if err != nil { + return err + } + + if string(primaryKeyJSON) != row.key { + v.failures = append(v.failures, + fmt.Sprintf(`key %s did not match expected key %s for value %s`, + row.key, primaryKeyJSON, row.value)) + } + + if _, err := txn.Exec(stmtBuf.String(), args...); err != nil { + return err + } + } else { + // DELETE + fmt.Fprintf(&stmtBuf, `DELETE FROM %s WHERE `, v.fprintTable) + for i, datum := range primaryKeyDatums { + if len(args) != 0 { + stmtBuf.WriteString(`,`) + } + fmt.Fprintf(&stmtBuf, `%s = $%d`, v.primaryKeyCols[i], i+1) + args = append(args, datum) + } + if _, err := txn.Exec(stmtBuf.String(), args...); err != nil { + return err + } + } + if err := txn.Commit(); err != nil { + return err + } + return nil +} + // NoteResolved implements the Validator interface. func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Timestamp) error { if r, ok := v.partitionResolved[partition]; !ok { @@ -213,7 +339,6 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times if !v.resolved.Less(newResolved) { return nil } - initialScanComplete := v.resolved != (hlc.Timestamp{}) v.resolved = newResolved // NB: Intentionally not stable sort because it shouldn't matter. @@ -221,7 +346,11 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times return v.buffer[i].updated.Less(v.buffer[j].updated) }) - var lastUpdated hlc.Timestamp + var lastFingerprintedAt hlc.Timestamp + // We apply all the row updates we received in the time window between the last + // resolved timestamp and this one. We process all row updates belonging to a given + // timestamp and then `fingerprint` to ensure the scratch table and the original table + // match. for len(v.buffer) > 0 { if v.resolved.Less(v.buffer[0].updated) { break @@ -229,93 +358,32 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times row := v.buffer[0] v.buffer = v.buffer[1:] - // If we have already completed the initial scan, verify the fingerprint at - // every point in time. Before the initial scan is complete, the fingerprint - // table might not have the earliest version of every row present in the - // table. - if initialScanComplete { - if row.updated != lastUpdated { - if lastUpdated != (hlc.Timestamp{}) { - if err := v.fingerprint(lastUpdated); err != nil { - return err - } - } - if err := v.fingerprint(row.updated.Prev()); err != nil { - return err - } + // If we've processed all row updates belonging to the previous row's timestamp, + // we fingerprint at `updated.Prev()` since we want to catch cases where one or + // more row updates are missed. For example: If k1 was written at t1, t2, t3 and + // the update for t2 was missed. + if v.previousRowUpdateTs != (hlc.Timestamp{}) && v.previousRowUpdateTs.Less(row.updated) { + if err := v.fingerprint(row.updated.Prev()); err != nil { + return err } - lastUpdated = row.updated - } - - type wrapper struct { - After map[string]interface{} `json:"after"` } - var value wrapper - if err := gojson.Unmarshal([]byte(row.value), &value); err != nil { + if err := v.applyRowUpdate(row); err != nil { return err } - var stmtBuf bytes.Buffer - var args []interface{} - if value.After != nil { - // UPDATE or INSERT - fmt.Fprintf(&stmtBuf, `UPSERT INTO %s (`, v.fprintTable) - for col, colValue := range value.After { - if len(args) != 0 { - stmtBuf.WriteString(`,`) - } - stmtBuf.WriteString(col) - args = append(args, colValue) - } - stmtBuf.WriteString(`) VALUES (`) - for i := range args { - if i != 0 { - stmtBuf.WriteString(`,`) - } - fmt.Fprintf(&stmtBuf, `$%d`, i+1) - } - stmtBuf.WriteString(`)`) - - // Also verify that the key matches the value. - primaryKeyDatums := make([]interface{}, len(v.primaryKeyCols)) - for idx, primaryKeyCol := range v.primaryKeyCols { - primaryKeyDatums[idx] = value.After[primaryKeyCol] - } - primaryKeyJSON, err := gojson.Marshal(primaryKeyDatums) - if err != nil { + // If any updates have exactly the same timestamp, we have to apply them all + // before fingerprinting. + if len(v.buffer) == 0 || v.buffer[0].updated != row.updated { + lastFingerprintedAt = row.updated + if err := v.fingerprint(row.updated); err != nil { return err } - if string(primaryKeyJSON) != row.key { - v.failures = append(v.failures, fmt.Sprintf( - `key %s did not match expected key %s for value %s`, row.key, primaryKeyJSON, row.value)) - } - } else { - // DELETE - var primaryKeyDatums []interface{} - if err := gojson.Unmarshal([]byte(row.key), &primaryKeyDatums); err != nil { - return err - } - if len(primaryKeyDatums) != len(v.primaryKeyCols) { - return errors.Errorf( - `expected primary key columns %s got datums %s`, v.primaryKeyCols, primaryKeyDatums) - } - fmt.Fprintf(&stmtBuf, `DELETE FROM %s WHERE `, v.fprintTable) - for i, datum := range primaryKeyDatums { - if len(args) != 0 { - stmtBuf.WriteString(`,`) - } - fmt.Fprintf(&stmtBuf, `%s = $%d`, v.primaryKeyCols[i], i+1) - args = append(args, datum) - } - } - if len(args) > 0 { - if _, err := v.sqlDB.Exec(stmtBuf.String(), args...); err != nil { - return errors.Wrap(err, stmtBuf.String()) - } } + v.previousRowUpdateTs = row.updated } - if !v.firstRowTimestamp.IsEmpty() && !resolved.Less(v.firstRowTimestamp) { + if !v.firstRowTimestamp.IsEmpty() && !resolved.Less(v.firstRowTimestamp) && + lastFingerprintedAt != resolved { return v.fingerprint(resolved) } return nil diff --git a/pkg/ccl/changefeedccl/cdctest/validator_test.go b/pkg/ccl/changefeedccl/cdctest/validator_test.go index 32ea8dd6fe85..2869d7ef2774 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator_test.go +++ b/pkg/ccl/changefeedccl/cdctest/validator_test.go @@ -10,6 +10,7 @@ package cdctest import ( "context" + "fmt" "reflect" "testing" @@ -126,16 +127,21 @@ func TestFingerprintValidator(t *testing.T) { } } + createTableStmt := func(tableName string) string { + return fmt.Sprintf(`CREATE TABLE %s (k INT PRIMARY KEY, v INT)`, tableName) + } + testColumns := 0 + t.Run(`empty`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE empty (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `empty`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`empty`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `empty`, []string{`p`}, testColumns) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) assertValidatorFailures(t, v) }) t.Run(`wrong_data`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE wrong_data (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `wrong_data`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`wrong_data`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `wrong_data`, []string{`p`}, testColumns) require.NoError(t, err) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":10}}`, ts[1]) noteResolved(t, v, `p`, ts[1]) @@ -145,8 +151,8 @@ func TestFingerprintValidator(t *testing.T) { ) }) t.Run(`all_resolved`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE all_resolved (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `all_resolved`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`all_resolved`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `all_resolved`, []string{`p`}, testColumns) require.NoError(t, err) if err := v.NoteResolved(`p`, ts[0]); err != nil { t.Fatal(err) @@ -164,8 +170,8 @@ func TestFingerprintValidator(t *testing.T) { assertValidatorFailures(t, v) }) t.Run(`rows_unsorted`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE rows_unsorted (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `rows_unsorted`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`rows_unsorted`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `rows_unsorted`, []string{`p`}, testColumns) require.NoError(t, err) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":3}}`, ts[3]) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":2}}`, ts[2]) @@ -176,27 +182,30 @@ func TestFingerprintValidator(t *testing.T) { assertValidatorFailures(t, v) }) t.Run(`missed_initial`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE missed_initial (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_initial`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`missed_initial`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_initial`, []string{`p`}, testColumns) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // Intentionally missing {"k":1,"v":1} at ts[1]. + // Insert a fake row since we don't fingerprint earlier than the first seen row. + v.NoteRow(ignored, `[2]`, `{"after": {"k":2,"v":2}}`, ts[2].Prev()) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":2}}`, ts[2]) v.NoteRow(ignored, `[2]`, `{"after": {"k":2,"v":2}}`, ts[2]) - noteResolved(t, v, `p`, ts[2]) + noteResolved(t, v, `p`, ts[2].Prev()) assertValidatorFailures(t, v, `fingerprints did not match at `+ts[2].Prev().AsOfSystemTime()+ - `: 590700560494856539 vs EMPTY`, + `: 590700560494856539 vs 590699460983228293`, ) }) t.Run(`missed_middle`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE missed_middle (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_middle`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`missed_middle`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_middle`, []string{`p`}, testColumns) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1]) // Intentionally missing {"k":1,"v":2} at ts[2]. v.NoteRow(ignored, `[2]`, `{"after": {"k":2,"v":2}}`, ts[2]) + noteResolved(t, v, `p`, ts[2]) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":3}}`, ts[3]) noteResolved(t, v, `p`, ts[3]) assertValidatorFailures(t, v, @@ -207,8 +216,8 @@ func TestFingerprintValidator(t *testing.T) { ) }) t.Run(`missed_end`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE missed_end (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_end`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`missed_end`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_end`, []string{`p`}, testColumns) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1]) @@ -222,8 +231,8 @@ func TestFingerprintValidator(t *testing.T) { ) }) t.Run(`initial_scan`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE initial_scan (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `initial_scan`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`initial_scan`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `initial_scan`, []string{`p`}, testColumns) require.NoError(t, err) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":3}}`, ts[3]) v.NoteRow(ignored, `[2]`, `{"after": {"k":2,"v":2}}`, ts[3]) @@ -231,16 +240,16 @@ func TestFingerprintValidator(t *testing.T) { assertValidatorFailures(t, v) }) t.Run(`unknown_partition`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE unknown_partition (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `unknown_partition`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`unknown_partition`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `unknown_partition`, []string{`p`}, testColumns) require.NoError(t, err) if err := v.NoteResolved(`nope`, ts[1]); !testutils.IsError(err, `unknown partition`) { t.Fatalf(`expected "unknown partition" error got: %+v`, err) } }) t.Run(`resolved_unsorted`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE resolved_unsorted (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `resolved_unsorted`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`resolved_unsorted`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `resolved_unsorted`, []string{`p`}, testColumns) require.NoError(t, err) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1]) noteResolved(t, v, `p`, ts[1]) @@ -249,8 +258,8 @@ func TestFingerprintValidator(t *testing.T) { assertValidatorFailures(t, v) }) t.Run(`two_partitions`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE two_partitions (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `two_partitions`, []string{`p0`, `p1`}) + sqlDB.Exec(t, createTableStmt(`two_partitions`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `two_partitions`, []string{`p0`, `p1`}, testColumns) require.NoError(t, err) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1]) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":2}}`, ts[2]) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 322564b0487a..b4a3a5fd4674 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -27,15 +27,21 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql/distsqlrun" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -565,23 +571,47 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(db) + // Expected semantics: + // + // 1) DROP COLUMN + // If the table descriptor is at version 1 when the `ALTER TABLE` stmt is issued, + // we expect the changefeed level backfill to be triggered at the `ModificationTime` of + // version 2 of the said descriptor. This is because this is the descriptor + // version at which the dropped column stops being visible to SELECTs. Note that + // this means we will see row updates resulting from the schema-change level + // backfill _after_ the changefeed level backfill. + // + // 2) ADD COLUMN WITH DEFAULT & ADD COLUMN AS ... STORED + // If the table descriptor is at version 1 when the `ALTER TABLE` stmt is issued, + // we expect the changefeed level backfill to be triggered at the + // `ModificationTime` of version 4 of said descriptor. This is because this is the + // descriptor version which makes the schema-change level backfill for the + // newly-added column public. This means we wil see row updates resulting from the + // schema-change level backfill _before_ the changefeed level backfill. + t.Run(`add column with default`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE add_column_def (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO add_column_def VALUES (1)`) sqlDB.Exec(t, `INSERT INTO add_column_def VALUES (2)`) - addColumnDef := feed(t, f, `CREATE CHANGEFEED FOR add_column_def`) + addColumnDef := feed(t, f, `CREATE CHANGEFEED FOR add_column_def WITH updated`) defer closeFeed(t, addColumnDef) - assertPayloads(t, addColumnDef, []string{ + assertPayloadsStripTs(t, addColumnDef, []string{ `add_column_def: [1]->{"after": {"a": 1}}`, `add_column_def: [2]->{"after": {"a": 2}}`, }) sqlDB.Exec(t, `ALTER TABLE add_column_def ADD COLUMN b STRING DEFAULT 'd'`) + ts := fetchDescVersionModificationTime(t, db, f, `add_column_def`, 4) + // Schema change backfill + assertPayloadsStripTs(t, addColumnDef, []string{ + `add_column_def: [1]->{"after": {"a": 1}}`, + `add_column_def: [2]->{"after": {"a": 2}}`, + }) + // Changefeed level backfill assertPayloads(t, addColumnDef, []string{ - // TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed. - // `add_column_def: [1]->{"after": {"a": 1}}`, - // `add_column_def: [2]->{"after": {"a": 2}}`, - `add_column_def: [1]->{"after": {"a": 1, "b": "d"}}`, - `add_column_def: [2]->{"after": {"a": 2, "b": "d"}}`, + fmt.Sprintf(`add_column_def: [1]->{"after": {"a": 1, "b": "d"}, "updated": "%s"}`, + ts.AsOfSystemTime()), + fmt.Sprintf(`add_column_def: [2]->{"after": {"a": 2, "b": "d"}, "updated": "%s"}`, + ts.AsOfSystemTime()), }) }) @@ -589,19 +619,23 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE add_col_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)`) sqlDB.Exec(t, `INSERT INTO add_col_comp VALUES (1)`) sqlDB.Exec(t, `INSERT INTO add_col_comp (a) VALUES (2)`) - addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp`) + addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`) defer closeFeed(t, addColComp) - assertPayloads(t, addColComp, []string{ + assertPayloadsStripTs(t, addColComp, []string{ `add_col_comp: [1]->{"after": {"a": 1, "b": 6}}`, `add_col_comp: [2]->{"after": {"a": 2, "b": 7}}`, }) sqlDB.Exec(t, `ALTER TABLE add_col_comp ADD COLUMN c INT AS (a + 10) STORED`) + assertPayloadsStripTs(t, addColComp, []string{ + `add_col_comp: [1]->{"after": {"a": 1, "b": 6}}`, + `add_col_comp: [2]->{"after": {"a": 2, "b": 7}}`, + }) + ts := fetchDescVersionModificationTime(t, db, f, `add_col_comp`, 4) assertPayloads(t, addColComp, []string{ - // TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed. - // `add_col_comp: [1]->{"after": {"a": 1, "b": 6}}`, - // `add_col_comp: [2]->{"after": {"a": 2, "b": 7}}`, - `add_col_comp: [1]->{"after": {"a": 1, "b": 6, "c": 11}}`, - `add_col_comp: [2]->{"after": {"a": 2, "b": 7, "c": 12}}`, + fmt.Sprintf(`add_col_comp: [1]->{"after": {"a": 1, "b": 6, "c": 11}, "updated": "%s"}`, + ts.AsOfSystemTime()), + fmt.Sprintf(`add_col_comp: [2]->{"after": {"a": 2, "b": 7, "c": 12}, "updated": "%s"}`, + ts.AsOfSystemTime()), }) }) @@ -609,22 +643,23 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO drop_column VALUES (1, '1')`) sqlDB.Exec(t, `INSERT INTO drop_column VALUES (2, '2')`) - dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column`) + dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`) defer closeFeed(t, dropColumn) - assertPayloads(t, dropColumn, []string{ + assertPayloadsStripTs(t, dropColumn, []string{ `drop_column: [1]->{"after": {"a": 1, "b": "1"}}`, `drop_column: [2]->{"after": {"a": 2, "b": "2"}}`, }) sqlDB.Exec(t, `ALTER TABLE drop_column DROP COLUMN b`) - sqlDB.Exec(t, `INSERT INTO drop_column VALUES (3)`) - // Dropped columns are immediately invisible. + ts := fetchDescVersionModificationTime(t, db, f, `drop_column`, 2) assertPayloads(t, dropColumn, []string{ + fmt.Sprintf(`drop_column: [1]->{"after": {"a": 1}, "updated": "%s"}`, ts.AsOfSystemTime()), + fmt.Sprintf(`drop_column: [2]->{"after": {"a": 2}, "updated": "%s"}`, ts.AsOfSystemTime()), + }) + sqlDB.Exec(t, `INSERT INTO drop_column VALUES (3)`) + assertPayloadsStripTs(t, dropColumn, []string{ + `drop_column: [3]->{"after": {"a": 3}}`, `drop_column: [1]->{"after": {"a": 1}}`, `drop_column: [2]->{"after": {"a": 2}}`, - `drop_column: [3]->{"after": {"a": 3}}`, - // TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed. - // `drop_column: [1]->{"after": {"a": 1}}`, - // `drop_column: [2]->{"after": {"a": 2}}`, }) }) @@ -644,9 +679,9 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = waitSinkHook - multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters`) + multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`) defer closeFeed(t, multipleAlters) - assertPayloads(t, multipleAlters, []string{ + assertPayloadsStripTs(t, multipleAlters, []string{ `multiple_alters: [1]->{"after": {"a": 1, "b": "1"}}`, `multiple_alters: [2]->{"after": {"a": 2, "b": "2"}}`, }) @@ -659,28 +694,39 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { waitForSchemaChange(t, sqlDB, `ALTER TABLE multiple_alters ADD COLUMN d STRING DEFAULT 'dee'`) wg.Done() + ts := fetchDescVersionModificationTime(t, db, f, `multiple_alters`, 2) + // Changefeed level backfill for DROP COLUMN b. assertPayloads(t, multipleAlters, []string{ - // Backfill no-ops for DROP. Dropped columns are immediately invisible. + fmt.Sprintf(`multiple_alters: [1]->{"after": {"a": 1}, "updated": "%s"}`, ts.AsOfSystemTime()), + fmt.Sprintf(`multiple_alters: [2]->{"after": {"a": 2}, "updated": "%s"}`, ts.AsOfSystemTime()), + }) + assertPayloadsStripTs(t, multipleAlters, []string{ + // Schema-change backfill for DROP COLUMN b. `multiple_alters: [1]->{"after": {"a": 1}}`, `multiple_alters: [2]->{"after": {"a": 2}}`, - // Scan output for DROP - // TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed. - // `multiple_alters: [1]->{"after": {"a": 1}}`, - // `multiple_alters: [2]->{"after": {"a": 2}}`, - // Backfill no-ops for column C - // TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed. - // `multiple_alters: [1]->{"after": {"a": 1}}`, - // `multiple_alters: [2]->{"after": {"a": 2}}`, - // Scan output for column C + // Schema-change backfill for ADD COLUMN c. + `multiple_alters: [1]->{"after": {"a": 1}}`, + `multiple_alters: [2]->{"after": {"a": 2}}`, + }) + ts = fetchDescVersionModificationTime(t, db, f, `multiple_alters`, 7) + // Changefeed level backfill for ADD COLUMN c. + assertPayloads(t, multipleAlters, []string{ + fmt.Sprintf(`multiple_alters: [1]->{"after": {"a": 1, "c": "cee"}, "updated": "%s"}`, ts.AsOfSystemTime()), + fmt.Sprintf(`multiple_alters: [2]->{"after": {"a": 2, "c": "cee"}, "updated": "%s"}`, ts.AsOfSystemTime()), + }) + // Schema change level backfill for ADD COLUMN d. + assertPayloadsStripTs(t, multipleAlters, []string{ `multiple_alters: [1]->{"after": {"a": 1, "c": "cee"}}`, `multiple_alters: [2]->{"after": {"a": 2, "c": "cee"}}`, + }) + ts = fetchDescVersionModificationTime(t, db, f, `multiple_alters`, 10) + // Changefeed level backfill for ADD COLUMN d. + assertPayloads(t, multipleAlters, []string{ // Backfill no-ops for column D (C schema change is complete) // TODO(dan): Track duplicates more precisely in sinklessFeed/tableFeed. - // `multiple_alters: [1]->{"after": {"a": 1, "c": "cee"}}`, - // `multiple_alters: [2]->{"after": {"a": 2, "c": "cee"}}`, // Scan output for column C - `multiple_alters: [1]->{"after": {"a": 1, "c": "cee", "d": "dee"}}`, - `multiple_alters: [2]->{"after": {"a": 2, "c": "cee", "d": "dee"}}`, + fmt.Sprintf(`multiple_alters: [1]->{"after": {"a": 1, "c": "cee", "d": "dee"}, "updated": "%s"}`, ts.AsOfSystemTime()), + fmt.Sprintf(`multiple_alters: [2]->{"after": {"a": 2, "c": "cee", "d": "dee"}, "updated": "%s"}`, ts.AsOfSystemTime()), }) }) } @@ -698,6 +744,74 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { } } +// fetchDescVersionModificationTime fetches the `ModificationTime` of the specified +// `version` of `tableName`'s table descriptor. +func fetchDescVersionModificationTime( + t testing.TB, db *gosql.DB, f cdctest.TestFeedFactory, tableName string, version int, +) hlc.Timestamp { + tblKey := roachpb.Key(keys.MakeTablePrefix(keys.DescriptorTableID)) + header := roachpb.RequestHeader{ + Key: tblKey, + EndKey: tblKey.PrefixEnd(), + } + dropColTblID := sqlutils.QueryTableID(t, db, `d`, tableName) + req := &roachpb.ExportRequest{ + RequestHeader: header, + MVCCFilter: roachpb.MVCCFilter_All, + StartTime: hlc.Timestamp{}, + ReturnSST: true, + } + clock := hlc.NewClock(hlc.UnixNano, time.Minute) + hh := roachpb.Header{Timestamp: clock.Now()} + res, pErr := client.SendWrappedWith(context.Background(), + f.Server().DB().NonTransactionalSender(), hh, req) + if pErr != nil { + t.Fatal(pErr.GoError()) + } + for _, file := range res.(*roachpb.ExportResponse).Files { + it, err := engine.NewMemSSTIterator(file.SST, false /* verify */) + if err != nil { + t.Fatal(err) + } + defer it.Close() + for it.Seek(engine.NilKey); ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + continue + } + k := it.UnsafeKey() + remaining, _, _, err := sqlbase.DecodeTableIDIndexID(k.Key) + if err != nil { + t.Fatal(err) + } + _, tableID, err := encoding.DecodeUvarintAscending(remaining) + if err != nil { + t.Fatal(err) + } + if tableID != uint64(dropColTblID) { + continue + } + unsafeValue := it.UnsafeValue() + if unsafeValue == nil { + t.Fatal(errors.New(`value was dropped or truncated`)) + } + value := roachpb.Value{RawBytes: unsafeValue} + var desc sqlbase.Descriptor + if err := value.GetProto(&desc); err != nil { + t.Fatal(err) + } + if tableDesc := desc.GetTable(); tableDesc != nil { + if int(tableDesc.Version) == version { + return tableDesc.ModificationTime + } + } + } + } + t.Fatal(errors.New(`couldn't find table desc for given version`)) + return hlc.Timestamp{} +} + // Regression test for #34314 func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index efe674fcf78a..38b4117c7ec8 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -51,11 +51,13 @@ func waitForSchemaChange( }) } -func assertPayloads(t testing.TB, f cdctest.TestFeed, expected []string) { +func readNextMessages(t testing.TB, f cdctest.TestFeed, numMessages int, stripTs bool) []string { t.Helper() var actual []string - for len(actual) < len(expected) { + var value []byte + var message map[string]interface{} + for len(actual) < numMessages { m, err := f.Next() if log.V(1) { log.Infof(context.TODO(), `%v %s: %s->%s`, err, m.Topic, m.Key, m.Value) @@ -65,12 +67,27 @@ func assertPayloads(t testing.TB, f cdctest.TestFeed, expected []string) { } else if m == nil { t.Fatal(`expected message`) } else if len(m.Key) > 0 || len(m.Value) > 0 { - actual = append(actual, fmt.Sprintf(`%s: %s->%s`, m.Topic, m.Key, m.Value)) + if stripTs { + if err := gojson.Unmarshal(m.Value, &message); err != nil { + t.Fatalf(`%s: %s`, m.Value, err) + } + delete(message, "updated") + value, err = cdctest.ReformatJSON(message) + if err != nil { + t.Fatal(err) + } + } else { + value = m.Value + } + actual = append(actual, fmt.Sprintf(`%s: %s->%s`, m.Topic, m.Key, value)) } } + return actual +} - // The tests that use this aren't concerned with order, just that these are - // the next len(expected) messages. +func assertPayloadsBase(t testing.TB, f cdctest.TestFeed, expected []string, stripTs bool) { + t.Helper() + actual := readNextMessages(t, f, len(expected), stripTs) sort.Strings(expected) sort.Strings(actual) if !reflect.DeepEqual(expected, actual) { @@ -79,6 +96,16 @@ func assertPayloads(t testing.TB, f cdctest.TestFeed, expected []string) { } } +func assertPayloads(t testing.TB, f cdctest.TestFeed, expected []string) { + t.Helper() + assertPayloadsBase(t, f, expected, false) +} + +func assertPayloadsStripTs(t testing.TB, f cdctest.TestFeed, expected []string) { + t.Helper() + assertPayloadsBase(t, f, expected, true) +} + func avroToJSON(t testing.TB, reg *testSchemaRegistry, avroBytes []byte) []byte { if len(avroBytes) == 0 { return nil diff --git a/pkg/ccl/changefeedccl/poller.go b/pkg/ccl/changefeedccl/poller.go index 10850d2297ba..7ff90b60db47 100644 --- a/pkg/ccl/changefeedccl/poller.go +++ b/pkg/ccl/changefeedccl/poller.go @@ -734,7 +734,7 @@ func (p *poller) validateTable(ctx context.Context, desc *sqlbase.TableDescripto if desc.ModificationTime.Less(lastVersion.ModificationTime) { return nil } - if lastVersion.HasColumnBackfillMutation() && !desc.HasColumnBackfillMutation() { + if shouldAddScanBoundary(lastVersion, desc) { boundaryTime := desc.GetModificationTime() // Only mutations that happened after the changefeed started are // interesting here. @@ -769,6 +769,33 @@ func (p *poller) validateTable(ctx context.Context, desc *sqlbase.TableDescripto return nil } +func shouldAddScanBoundary( + lastVersion *sqlbase.TableDescriptor, desc *sqlbase.TableDescriptor, +) (res bool) { + return newColumnBackfillComplete(lastVersion, desc) || + hasNewColumnDropBackfillMutation(lastVersion, desc) +} + +func hasNewColumnDropBackfillMutation(oldDesc, newDesc *sqlbase.TableDescriptor) (res bool) { + dropMutationExists := func(desc *sqlbase.TableDescriptor) bool { + for _, m := range desc.Mutations { + if m.Direction == sqlbase.DescriptorMutation_DROP && + m.State == sqlbase.DescriptorMutation_DELETE_AND_WRITE_ONLY { + return true + } + } + return false + } + // Make sure that the old descriptor *doesn't* have the same mutation to avoid adding + // the same scan boundary more than once. + return !dropMutationExists(oldDesc) && dropMutationExists(newDesc) +} + +func newColumnBackfillComplete(oldDesc, newDesc *sqlbase.TableDescriptor) (res bool) { + return len(oldDesc.Columns) < len(newDesc.Columns) && + oldDesc.HasColumnBackfillMutation() && !newDesc.HasColumnBackfillMutation() +} + func fetchSpansForTargets( ctx context.Context, db *client.DB, targets jobspb.ChangefeedTargets, ts hlc.Timestamp, ) ([]roachpb.Span, error) { diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index e6b423a4faf9..7f3eff97f291 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -324,10 +324,10 @@ func (s *cloudStorageSink) flushFile( fileID := s.fileID s.fileID++ // Pad file ID to maintain lexical ordering among files from the same sink. - // Note that we use `-` here to delimit the filename because we want - // `%d.RESOLVED` files to lexicographically succeed data files that have the - // same timestamp. This works because ascii `-` < ascii '.'. - filename := fmt.Sprintf(`%s-%s-%d-%d-%d-%08d%s`, s.dataFileTs, + // Ditto for schema ID. Note that we use `-` here to delimit the filename + // because we want `%d.RESOLVED` files to lexicographically succeed data files + // that have the same timestamp. This works because ascii `-` < ascii '.'. + filename := fmt.Sprintf(`%s-%s-%08d-%d-%d-%08d%s`, s.dataFileTs, key.Topic, key.SchemaID, s.nodeID, s.sinkID, fileID, s.ext) return s.es.WriteFile(ctx, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes())) } diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index 2c05ac1c20f2..c6652d8166f1 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -85,7 +85,7 @@ func TestCloudStorageSink(t *testing.T) { require.NoError(t, s.Flush(ctx)) dataFile, err := ioutil.ReadFile(filepath.Join( - dir, sinkDir, `1970-01-01`, `197001010000000000000000000000000-t1-0-1-7-00000000.ndjson`)) + dir, sinkDir, `1970-01-01`, `197001010000000000000000000000000-t1-00000000-1-7-00000000.ndjson`)) require.NoError(t, err) require.Equal(t, "v1\n", string(dataFile)) diff --git a/pkg/ccl/changefeedccl/validations_test.go b/pkg/ccl/changefeedccl/validations_test.go index 417e3f3dc452..e5f362b8d33a 100644 --- a/pkg/ccl/changefeedccl/validations_test.go +++ b/pkg/ccl/changefeedccl/validations_test.go @@ -70,7 +70,7 @@ func TestValidations(t *testing.T) { const requestedResolved = 7 sqlDB.Exec(t, `CREATE TABLE fprint (id INT PRIMARY KEY, balance INT, payload STRING)`) - fprintV, err := cdctest.NewFingerprintValidator(db, `bank`, `fprint`, bankFeed.Partitions()) + fprintV, err := cdctest.NewFingerprintValidator(db, `bank`, `fprint`, bankFeed.Partitions(), 0) require.NoError(t, err) v := cdctest.MakeCountValidator(cdctest.Validators{ cdctest.NewOrderValidator(`bank`), diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index 1f9d770386fb..d299c44699cb 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -298,7 +298,7 @@ func runCDCBank(ctx context.Context, t *test, c *cluster) { } const requestedResolved = 100 - fprintV, err := cdctest.NewFingerprintValidator(db, `bank.bank`, `fprint`, tc.partitions) + fprintV, err := cdctest.NewFingerprintValidator(db, `bank.bank`, `fprint`, tc.partitions, 0) if err != nil { return err } diff --git a/pkg/util/fsm/fsm.go b/pkg/util/fsm/fsm.go index 35d04843a022..43b1c28e6d20 100644 --- a/pkg/util/fsm/fsm.go +++ b/pkg/util/fsm/fsm.go @@ -84,6 +84,11 @@ type Transitions struct { expanded Pattern } +// GetExpanded returns the expanded map of transitions. +func (t Transitions) GetExpanded() Pattern { + return t.expanded +} + // Compile creates a set of state Transitions from a Pattern. This is relatively // expensive so it's expected that Compile is called once for each transition // graph and assigned to a static variable. This variable can then be given to