Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-19.1: changefeed schema change nemeses and fixes #43037

Merged
merged 3 commits into from
Dec 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
569 changes: 407 additions & 162 deletions pkg/ccl/changefeedccl/cdctest/nemeses.go

Large diffs are not rendered by default.

40 changes: 21 additions & 19 deletions pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,25 @@ func (c *cloudFeed) Partitions() []string {
return []string{cloudFeedPartition}
}

// ReformatJSON marshals a golang stdlib based JSON into a byte slice preserving
// whitespace in accordance with the crdb json library.
func ReformatJSON(j interface{}) ([]byte, error) {
printed, err := gojson.Marshal(j)
if err != nil {
return nil, err
}
// The golang stdlib json library prints whitespace differently than our
// internal one. Roundtrip through the crdb json library to get the
// whitespace back to where it started.
parsed, err := json.ParseJSON(string(printed))
if err != nil {
return nil, err
}
var buf bytes.Buffer
parsed.Format(&buf)
return buf.Bytes(), nil
}

// extractKeyFromJSONValue extracts the `WITH key_in_value` key from a `WITH
// format=json, envelope=wrapped` value.
func extractKeyFromJSONValue(wrapped []byte) (key []byte, value []byte, _ error) {
Expand All @@ -528,28 +547,11 @@ func extractKeyFromJSONValue(wrapped []byte) (key []byte, value []byte, _ error)
keyParsed := parsed[`key`]
delete(parsed, `key`)

reformatJSON := func(j interface{}) ([]byte, error) {
printed, err := gojson.Marshal(j)
if err != nil {
return nil, err
}
// The golang stdlib json library prints whitespace differently than our
// internal one. Roundtrip through the crdb json library to get the
// whitespace back to where it started.
parsed, err := json.ParseJSON(string(printed))
if err != nil {
return nil, err
}
var buf bytes.Buffer
parsed.Format(&buf)
return buf.Bytes(), nil
}

var err error
if key, err = reformatJSON(keyParsed); err != nil {
if key, err = ReformatJSON(keyParsed); err != nil {
return nil, nil, err
}
if value, err = reformatJSON(parsed); err != nil {
if value, err = ReformatJSON(parsed); err != nil {
return nil, nil, err
}
return key, value, nil
Expand Down
246 changes: 157 additions & 89 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,29 @@ type fingerprintValidator struct {
// exists, which is valid but complicates the way fingerprintValidator works.
// Don't create a fingerprint earlier than the first seen row.
firstRowTimestamp hlc.Timestamp

buffer []validatorRow
// previousRowUpdateTs keeps track of the timestamp of the most recently processed row
// update. Before starting to process row updates belonging to a particular timestamp
// X, we want to fingerprint at `X.Prev()` to catch any "missed" row updates.
// Maintaining `previousRowUpdateTs` allows us to do this. See `NoteResolved()` for
// more details.
previousRowUpdateTs hlc.Timestamp

// `fprintOrigColumns` keeps track of the number of non test columns in `fprint`.
fprintOrigColumns int
fprintTestColumns int
buffer []validatorRow

failures []string
}

// NewFingerprintValidator returns a new FingerprintValidator that uses
// `fprintTable` as scratch space to recreate `origTable`. `fprintTable` must
// exist before calling this constructor.
// NewFingerprintValidator returns a new FingerprintValidator that uses `fprintTable` as
// scratch space to recreate `origTable`. `fprintTable` must exist before calling this
// constructor. `maxTestColumnCount` indicates the maximum number of columns that can be
// expected in `origTable` due to test-related schema changes. This fingerprint validator
// will modify `fprint`'s schema to add `maxTestColumnCount` columns to avoid having to
// accommodate schema changes on the fly.
func NewFingerprintValidator(
sqlDB *gosql.DB, origTable, fprintTable string, partitions []string,
sqlDB *gosql.DB, origTable, fprintTable string, partitions []string, maxTestColumnCount int,
) (Validator, error) {
// Fetch the primary keys though information_schema schema inspections so we
// can use them to construct the SQL for DELETEs and also so we can verify
Expand All @@ -153,6 +165,16 @@ func NewFingerprintValidator(
if err != nil {
return nil, err
}
// Record the non-test%d columns in `fprint`.
var fprintOrigColumns int
if err := sqlDB.QueryRow(`
SELECT count(column_name)
FROM information_schema.columns
WHERE table_name=$1
`, fprintTable).Scan(&fprintOrigColumns); err != nil {
return nil, err
}

defer func() { _ = rows.Close() }()
for rows.Next() {
var primaryKeyCol string
Expand All @@ -165,11 +187,28 @@ func NewFingerprintValidator(
return nil, errors.Errorf("no primary key information found for %s", fprintTable)
}

// Add test columns to fprint.
if maxTestColumnCount > 0 {
var addColumnStmt bytes.Buffer
addColumnStmt.WriteString(`ALTER TABLE fprint `)
for i := 0; i < maxTestColumnCount; i++ {
if i != 0 {
addColumnStmt.WriteString(`, `)
}
fmt.Fprintf(&addColumnStmt, `ADD COLUMN test%d STRING`, i)
}
if _, err := sqlDB.Query(addColumnStmt.String()); err != nil {
return nil, err
}
}

v := &fingerprintValidator{
sqlDB: sqlDB,
origTable: origTable,
fprintTable: fprintTable,
primaryKeyCols: primaryKeyCols,
sqlDB: sqlDB,
origTable: origTable,
fprintTable: fprintTable,
primaryKeyCols: primaryKeyCols,
fprintOrigColumns: fprintOrigColumns,
fprintTestColumns: maxTestColumnCount,
}
v.partitionResolved = make(map[string]hlc.Timestamp)
for _, partition := range partitions {
Expand All @@ -192,6 +231,93 @@ func (v *fingerprintValidator) NoteRow(
})
}

// applyRowUpdate applies the update represented by `row` to the scratch table.
func (v *fingerprintValidator) applyRowUpdate(row validatorRow) error {
txn, err := v.sqlDB.Begin()
if err != nil {
return err
}
var args []interface{}

var primaryKeyDatums []interface{}
if err := gojson.Unmarshal([]byte(row.key), &primaryKeyDatums); err != nil {
return err
}
if len(primaryKeyDatums) != len(v.primaryKeyCols) {
return errors.Errorf(`expected primary key columns %s got datums %s`,
v.primaryKeyCols, primaryKeyDatums)
}

var stmtBuf bytes.Buffer
type wrapper struct {
After map[string]interface{} `json:"after"`
}
var value wrapper
if err := gojson.Unmarshal([]byte(row.value), &value); err != nil {
return err
}
if value.After != nil {
// UPDATE or INSERT
fmt.Fprintf(&stmtBuf, `UPSERT INTO %s (`, v.fprintTable)
for col, colValue := range value.After {
if len(args) != 0 {
stmtBuf.WriteString(`,`)
}
stmtBuf.WriteString(col)
args = append(args, colValue)
}
for i := len(value.After) - v.fprintOrigColumns; i < v.fprintTestColumns; i++ {
fmt.Fprintf(&stmtBuf, `, test%d`, i)
args = append(args, nil)
}
stmtBuf.WriteString(`) VALUES (`)
for i := range args {
if i != 0 {
stmtBuf.WriteString(`,`)
}
fmt.Fprintf(&stmtBuf, `$%d`, i+1)
}
stmtBuf.WriteString(`)`)

// Also verify that the key matches the value.
primaryKeyDatums = make([]interface{}, len(v.primaryKeyCols))
for idx, primaryKeyCol := range v.primaryKeyCols {
primaryKeyDatums[idx] = value.After[primaryKeyCol]
}
primaryKeyJSON, err := gojson.Marshal(primaryKeyDatums)
if err != nil {
return err
}

if string(primaryKeyJSON) != row.key {
v.failures = append(v.failures,
fmt.Sprintf(`key %s did not match expected key %s for value %s`,
row.key, primaryKeyJSON, row.value))
}

if _, err := txn.Exec(stmtBuf.String(), args...); err != nil {
return err
}
} else {
// DELETE
fmt.Fprintf(&stmtBuf, `DELETE FROM %s WHERE `, v.fprintTable)
for i, datum := range primaryKeyDatums {
if len(args) != 0 {
stmtBuf.WriteString(`,`)
}
fmt.Fprintf(&stmtBuf, `%s = $%d`, v.primaryKeyCols[i], i+1)
args = append(args, datum)
}
if _, err := txn.Exec(stmtBuf.String(), args...); err != nil {
return err
}
}
if err := txn.Commit(); err != nil {
return err
}
return nil
}

// NoteResolved implements the Validator interface.
func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Timestamp) error {
if r, ok := v.partitionResolved[partition]; !ok {
Expand All @@ -213,109 +339,51 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times
if !v.resolved.Less(newResolved) {
return nil
}
initialScanComplete := v.resolved != (hlc.Timestamp{})
v.resolved = newResolved

// NB: Intentionally not stable sort because it shouldn't matter.
sort.Slice(v.buffer, func(i, j int) bool {
return v.buffer[i].updated.Less(v.buffer[j].updated)
})

var lastUpdated hlc.Timestamp
var lastFingerprintedAt hlc.Timestamp
// We apply all the row updates we received in the time window between the last
// resolved timestamp and this one. We process all row updates belonging to a given
// timestamp and then `fingerprint` to ensure the scratch table and the original table
// match.
for len(v.buffer) > 0 {
if v.resolved.Less(v.buffer[0].updated) {
break
}
row := v.buffer[0]
v.buffer = v.buffer[1:]

// If we have already completed the initial scan, verify the fingerprint at
// every point in time. Before the initial scan is complete, the fingerprint
// table might not have the earliest version of every row present in the
// table.
if initialScanComplete {
if row.updated != lastUpdated {
if lastUpdated != (hlc.Timestamp{}) {
if err := v.fingerprint(lastUpdated); err != nil {
return err
}
}
if err := v.fingerprint(row.updated.Prev()); err != nil {
return err
}
// If we've processed all row updates belonging to the previous row's timestamp,
// we fingerprint at `updated.Prev()` since we want to catch cases where one or
// more row updates are missed. For example: If k1 was written at t1, t2, t3 and
// the update for t2 was missed.
if v.previousRowUpdateTs != (hlc.Timestamp{}) && v.previousRowUpdateTs.Less(row.updated) {
if err := v.fingerprint(row.updated.Prev()); err != nil {
return err
}
lastUpdated = row.updated
}

type wrapper struct {
After map[string]interface{} `json:"after"`
}
var value wrapper
if err := gojson.Unmarshal([]byte(row.value), &value); err != nil {
if err := v.applyRowUpdate(row); err != nil {
return err
}

var stmtBuf bytes.Buffer
var args []interface{}
if value.After != nil {
// UPDATE or INSERT
fmt.Fprintf(&stmtBuf, `UPSERT INTO %s (`, v.fprintTable)
for col, colValue := range value.After {
if len(args) != 0 {
stmtBuf.WriteString(`,`)
}
stmtBuf.WriteString(col)
args = append(args, colValue)
}
stmtBuf.WriteString(`) VALUES (`)
for i := range args {
if i != 0 {
stmtBuf.WriteString(`,`)
}
fmt.Fprintf(&stmtBuf, `$%d`, i+1)
}
stmtBuf.WriteString(`)`)

// Also verify that the key matches the value.
primaryKeyDatums := make([]interface{}, len(v.primaryKeyCols))
for idx, primaryKeyCol := range v.primaryKeyCols {
primaryKeyDatums[idx] = value.After[primaryKeyCol]
}
primaryKeyJSON, err := gojson.Marshal(primaryKeyDatums)
if err != nil {
// If any updates have exactly the same timestamp, we have to apply them all
// before fingerprinting.
if len(v.buffer) == 0 || v.buffer[0].updated != row.updated {
lastFingerprintedAt = row.updated
if err := v.fingerprint(row.updated); err != nil {
return err
}
if string(primaryKeyJSON) != row.key {
v.failures = append(v.failures, fmt.Sprintf(
`key %s did not match expected key %s for value %s`, row.key, primaryKeyJSON, row.value))
}
} else {
// DELETE
var primaryKeyDatums []interface{}
if err := gojson.Unmarshal([]byte(row.key), &primaryKeyDatums); err != nil {
return err
}
if len(primaryKeyDatums) != len(v.primaryKeyCols) {
return errors.Errorf(
`expected primary key columns %s got datums %s`, v.primaryKeyCols, primaryKeyDatums)
}
fmt.Fprintf(&stmtBuf, `DELETE FROM %s WHERE `, v.fprintTable)
for i, datum := range primaryKeyDatums {
if len(args) != 0 {
stmtBuf.WriteString(`,`)
}
fmt.Fprintf(&stmtBuf, `%s = $%d`, v.primaryKeyCols[i], i+1)
args = append(args, datum)
}
}
if len(args) > 0 {
if _, err := v.sqlDB.Exec(stmtBuf.String(), args...); err != nil {
return errors.Wrap(err, stmtBuf.String())
}
}
v.previousRowUpdateTs = row.updated
}

if !v.firstRowTimestamp.IsEmpty() && !resolved.Less(v.firstRowTimestamp) {
if !v.firstRowTimestamp.IsEmpty() && !resolved.Less(v.firstRowTimestamp) &&
lastFingerprintedAt != resolved {
return v.fingerprint(resolved)
}
return nil
Expand Down
Loading