Skip to content

Commit

Permalink
changefeedccl: add scan boundaries based on change in set of columns
Browse files Browse the repository at this point in the history
Currently, the changefeed poller detects a scan boundary when it
detects that the last version of a table descriptor has a pending
mutation but the current version doesn't. In case of an `ALTER TABLE
DROP COLUMN` statement, the point at which this happens is the point at
which the schema change backfill completes. However, this is incorrect
since the dropped column gets logically dropped before that.

This PR corrects this problem by instead using the set of column
descriptors within the current and previous table descriptors to detect a
scan boundary.

Fixes #41961

Release note (bug fix): Changefeeds now emit backfill row updates for
                        dropped column when the table descriptor drops
                        that column.
  • Loading branch information
aayushshah15 authored and danhhz committed Dec 10, 2019
1 parent 316a514 commit d976707
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 93 deletions.
24 changes: 12 additions & 12 deletions pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/fsm"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/errors"
"github.com/pkg/errors"
)

// RunNemesis runs a jepsen-style validation of whether a changefeed meets our
Expand Down Expand Up @@ -57,10 +57,10 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, omitPause bool) (Validator, err
eventPauseCount = 0
}
ns := &nemeses{
maxTestColumnCount: 10,
rowCount: 4,
db: db,
usingPoller: !usingRangeFeed,
maxTestColumnCount: 10,
rowCount: 4,
db: db,
usingPoller: !usingRangeFeed,
// eventMix does not have to add to 100
eventMix: map[fsm.Event]int{
// We don't want `eventFinished` to ever be returned by `nextEvent` so we set
Expand Down Expand Up @@ -225,11 +225,11 @@ const (
)

type nemeses struct {
rowCount int
rowCount int
maxTestColumnCount int
eventMix map[fsm.Event]int
mixTotal int
usingPoller bool
eventMix map[fsm.Event]int
mixTotal int
usingPoller bool

v *CountValidator
db *gosql.DB
Expand Down Expand Up @@ -576,7 +576,7 @@ func addColumn(a fsm.Args) error {
ns := a.Extended.(*nemeses)

if ns.currentTestColumnCount >= ns.maxTestColumnCount {
return errors.AssertionFailedf(`addColumn should be called when`+
return errors.Errorf(`addColumn should be called when`+
`there are less than %d columns.`, ns.maxTestColumnCount)
}

Expand All @@ -600,7 +600,7 @@ func removeColumn(a fsm.Args) error {
ns := a.Extended.(*nemeses)

if ns.currentTestColumnCount == 0 {
return errors.AssertionFailedf(`removeColumn should be called with` +
return errors.Errorf(`removeColumn should be called with` +
`at least one test column.`)
}
if _, err := ns.db.Exec(fmt.Sprintf(`ALTER TABLE foo DROP COLUMN test%d`,
Expand All @@ -623,7 +623,7 @@ func noteFeedMessage(a fsm.Args) error {
ns := a.Extended.(*nemeses)

if ns.availableRows <= 0 {
return errors.AssertionFailedf(`noteFeedMessage should be called with at` +
return errors.Errorf(`noteFeedMessage should be called with at` +
`least one available row.`)
}

Expand Down
40 changes: 21 additions & 19 deletions pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,25 @@ func (c *cloudFeed) Partitions() []string {
return []string{cloudFeedPartition}
}

// ReformatJSON marshals a golang stdlib based JSON into a byte slice preserving
// whitespace in accordance with the crdb json library.
func ReformatJSON(j interface{}) ([]byte, error) {
printed, err := gojson.Marshal(j)
if err != nil {
return nil, err
}
// The golang stdlib json library prints whitespace differently than our
// internal one. Roundtrip through the crdb json library to get the
// whitespace back to where it started.
parsed, err := json.ParseJSON(string(printed))
if err != nil {
return nil, err
}
var buf bytes.Buffer
parsed.Format(&buf)
return buf.Bytes(), nil
}

// extractKeyFromJSONValue extracts the `WITH key_in_value` key from a `WITH
// format=json, envelope=wrapped` value.
func extractKeyFromJSONValue(wrapped []byte) (key []byte, value []byte, _ error) {
Expand All @@ -528,28 +547,11 @@ func extractKeyFromJSONValue(wrapped []byte) (key []byte, value []byte, _ error)
keyParsed := parsed[`key`]
delete(parsed, `key`)

reformatJSON := func(j interface{}) ([]byte, error) {
printed, err := gojson.Marshal(j)
if err != nil {
return nil, err
}
// The golang stdlib json library prints whitespace differently than our
// internal one. Roundtrip through the crdb json library to get the
// whitespace back to where it started.
parsed, err := json.ParseJSON(string(printed))
if err != nil {
return nil, err
}
var buf bytes.Buffer
parsed.Format(&buf)
return buf.Bytes(), nil
}

var err error
if key, err = reformatJSON(keyParsed); err != nil {
if key, err = ReformatJSON(keyParsed); err != nil {
return nil, nil, err
}
if value, err = reformatJSON(parsed); err != nil {
if value, err = ReformatJSON(parsed); err != nil {
return nil, nil, err
}
return key, value, nil
Expand Down
19 changes: 2 additions & 17 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,23 +403,8 @@ func (v *fingerprintValidator) fingerprint(ts hlc.Timestamp) error {
return err
}
if orig != check {
// Ignore the fingerprint mismatch if there was an in-progress schema change job
// on the table.
// TODO(aayush): We currently need to have this hack here since we emit changefeed
// level backfill row updates at the wrong time in the `DROP COLUMN` case. See
// issue #41961 for more details.
var pendingJobs int
var countJobsStmt bytes.Buffer
fmt.Fprintf(&countJobsStmt, `SELECT count(*) from [show jobs] AS OF SYSTEM TIME '%s'`+
`where job_type = 'SCHEMA CHANGE' and status = 'running' or status = 'pending'`,
ts.AsOfSystemTime())
if err := v.sqlDB.QueryRow(countJobsStmt.String()).Scan(&pendingJobs); err != nil {
return err
}
if pendingJobs == 0 {
v.failures = append(v.failures, fmt.Sprintf(
`fingerprints did not match at %s: %s vs %s`, ts.AsOfSystemTime(), orig, check))
}
v.failures = append(v.failures, fmt.Sprintf(
`fingerprints did not match at %s: %s vs %s`, ts.AsOfSystemTime(), orig, check))
}
return nil
}
Expand Down
Loading

0 comments on commit d976707

Please sign in to comment.