Skip to content

Commit

Permalink
Merge pull request #43037 from danhhz/backport19.1-41842-42053
Browse files Browse the repository at this point in the history
release-19.1: changefeed schema change nemeses and fixes
  • Loading branch information
danhhz committed Dec 17, 2019
2 parents feda2d6 + da56d30 commit f8a9923
Show file tree
Hide file tree
Showing 12 changed files with 842 additions and 345 deletions.
569 changes: 407 additions & 162 deletions pkg/ccl/changefeedccl/cdctest/nemeses.go

Large diffs are not rendered by default.

40 changes: 21 additions & 19 deletions pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,25 @@ func (c *cloudFeed) Partitions() []string {
return []string{cloudFeedPartition}
}

// ReformatJSON marshals a golang stdlib based JSON into a byte slice preserving
// whitespace in accordance with the crdb json library.
func ReformatJSON(j interface{}) ([]byte, error) {
printed, err := gojson.Marshal(j)
if err != nil {
return nil, err
}
// The golang stdlib json library prints whitespace differently than our
// internal one. Roundtrip through the crdb json library to get the
// whitespace back to where it started.
parsed, err := json.ParseJSON(string(printed))
if err != nil {
return nil, err
}
var buf bytes.Buffer
parsed.Format(&buf)
return buf.Bytes(), nil
}

// extractKeyFromJSONValue extracts the `WITH key_in_value` key from a `WITH
// format=json, envelope=wrapped` value.
func extractKeyFromJSONValue(wrapped []byte) (key []byte, value []byte, _ error) {
Expand All @@ -528,28 +547,11 @@ func extractKeyFromJSONValue(wrapped []byte) (key []byte, value []byte, _ error)
keyParsed := parsed[`key`]
delete(parsed, `key`)

reformatJSON := func(j interface{}) ([]byte, error) {
printed, err := gojson.Marshal(j)
if err != nil {
return nil, err
}
// The golang stdlib json library prints whitespace differently than our
// internal one. Roundtrip through the crdb json library to get the
// whitespace back to where it started.
parsed, err := json.ParseJSON(string(printed))
if err != nil {
return nil, err
}
var buf bytes.Buffer
parsed.Format(&buf)
return buf.Bytes(), nil
}

var err error
if key, err = reformatJSON(keyParsed); err != nil {
if key, err = ReformatJSON(keyParsed); err != nil {
return nil, nil, err
}
if value, err = reformatJSON(parsed); err != nil {
if value, err = ReformatJSON(parsed); err != nil {
return nil, nil, err
}
return key, value, nil
Expand Down
246 changes: 157 additions & 89 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,29 @@ type fingerprintValidator struct {
// exists, which is valid but complicates the way fingerprintValidator works.
// Don't create a fingerprint earlier than the first seen row.
firstRowTimestamp hlc.Timestamp

buffer []validatorRow
// previousRowUpdateTs keeps track of the timestamp of the most recently processed row
// update. Before starting to process row updates belonging to a particular timestamp
// X, we want to fingerprint at `X.Prev()` to catch any "missed" row updates.
// Maintaining `previousRowUpdateTs` allows us to do this. See `NoteResolved()` for
// more details.
previousRowUpdateTs hlc.Timestamp

// `fprintOrigColumns` keeps track of the number of non test columns in `fprint`.
fprintOrigColumns int
fprintTestColumns int
buffer []validatorRow

failures []string
}

// NewFingerprintValidator returns a new FingerprintValidator that uses
// `fprintTable` as scratch space to recreate `origTable`. `fprintTable` must
// exist before calling this constructor.
// NewFingerprintValidator returns a new FingerprintValidator that uses `fprintTable` as
// scratch space to recreate `origTable`. `fprintTable` must exist before calling this
// constructor. `maxTestColumnCount` indicates the maximum number of columns that can be
// expected in `origTable` due to test-related schema changes. This fingerprint validator
// will modify `fprint`'s schema to add `maxTestColumnCount` columns to avoid having to
// accommodate schema changes on the fly.
func NewFingerprintValidator(
sqlDB *gosql.DB, origTable, fprintTable string, partitions []string,
sqlDB *gosql.DB, origTable, fprintTable string, partitions []string, maxTestColumnCount int,
) (Validator, error) {
// Fetch the primary keys though information_schema schema inspections so we
// can use them to construct the SQL for DELETEs and also so we can verify
Expand All @@ -153,6 +165,16 @@ func NewFingerprintValidator(
if err != nil {
return nil, err
}
// Record the non-test%d columns in `fprint`.
var fprintOrigColumns int
if err := sqlDB.QueryRow(`
SELECT count(column_name)
FROM information_schema.columns
WHERE table_name=$1
`, fprintTable).Scan(&fprintOrigColumns); err != nil {
return nil, err
}

defer func() { _ = rows.Close() }()
for rows.Next() {
var primaryKeyCol string
Expand All @@ -165,11 +187,28 @@ func NewFingerprintValidator(
return nil, errors.Errorf("no primary key information found for %s", fprintTable)
}

// Add test columns to fprint.
if maxTestColumnCount > 0 {
var addColumnStmt bytes.Buffer
addColumnStmt.WriteString(`ALTER TABLE fprint `)
for i := 0; i < maxTestColumnCount; i++ {
if i != 0 {
addColumnStmt.WriteString(`, `)
}
fmt.Fprintf(&addColumnStmt, `ADD COLUMN test%d STRING`, i)
}
if _, err := sqlDB.Query(addColumnStmt.String()); err != nil {
return nil, err
}
}

v := &fingerprintValidator{
sqlDB: sqlDB,
origTable: origTable,
fprintTable: fprintTable,
primaryKeyCols: primaryKeyCols,
sqlDB: sqlDB,
origTable: origTable,
fprintTable: fprintTable,
primaryKeyCols: primaryKeyCols,
fprintOrigColumns: fprintOrigColumns,
fprintTestColumns: maxTestColumnCount,
}
v.partitionResolved = make(map[string]hlc.Timestamp)
for _, partition := range partitions {
Expand All @@ -192,6 +231,93 @@ func (v *fingerprintValidator) NoteRow(
})
}

// applyRowUpdate applies the update represented by `row` to the scratch table.
func (v *fingerprintValidator) applyRowUpdate(row validatorRow) error {
txn, err := v.sqlDB.Begin()
if err != nil {
return err
}
var args []interface{}

var primaryKeyDatums []interface{}
if err := gojson.Unmarshal([]byte(row.key), &primaryKeyDatums); err != nil {
return err
}
if len(primaryKeyDatums) != len(v.primaryKeyCols) {
return errors.Errorf(`expected primary key columns %s got datums %s`,
v.primaryKeyCols, primaryKeyDatums)
}

var stmtBuf bytes.Buffer
type wrapper struct {
After map[string]interface{} `json:"after"`
}
var value wrapper
if err := gojson.Unmarshal([]byte(row.value), &value); err != nil {
return err
}
if value.After != nil {
// UPDATE or INSERT
fmt.Fprintf(&stmtBuf, `UPSERT INTO %s (`, v.fprintTable)
for col, colValue := range value.After {
if len(args) != 0 {
stmtBuf.WriteString(`,`)
}
stmtBuf.WriteString(col)
args = append(args, colValue)
}
for i := len(value.After) - v.fprintOrigColumns; i < v.fprintTestColumns; i++ {
fmt.Fprintf(&stmtBuf, `, test%d`, i)
args = append(args, nil)
}
stmtBuf.WriteString(`) VALUES (`)
for i := range args {
if i != 0 {
stmtBuf.WriteString(`,`)
}
fmt.Fprintf(&stmtBuf, `$%d`, i+1)
}
stmtBuf.WriteString(`)`)

// Also verify that the key matches the value.
primaryKeyDatums = make([]interface{}, len(v.primaryKeyCols))
for idx, primaryKeyCol := range v.primaryKeyCols {
primaryKeyDatums[idx] = value.After[primaryKeyCol]
}
primaryKeyJSON, err := gojson.Marshal(primaryKeyDatums)
if err != nil {
return err
}

if string(primaryKeyJSON) != row.key {
v.failures = append(v.failures,
fmt.Sprintf(`key %s did not match expected key %s for value %s`,
row.key, primaryKeyJSON, row.value))
}

if _, err := txn.Exec(stmtBuf.String(), args...); err != nil {
return err
}
} else {
// DELETE
fmt.Fprintf(&stmtBuf, `DELETE FROM %s WHERE `, v.fprintTable)
for i, datum := range primaryKeyDatums {
if len(args) != 0 {
stmtBuf.WriteString(`,`)
}
fmt.Fprintf(&stmtBuf, `%s = $%d`, v.primaryKeyCols[i], i+1)
args = append(args, datum)
}
if _, err := txn.Exec(stmtBuf.String(), args...); err != nil {
return err
}
}
if err := txn.Commit(); err != nil {
return err
}
return nil
}

// NoteResolved implements the Validator interface.
func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Timestamp) error {
if r, ok := v.partitionResolved[partition]; !ok {
Expand All @@ -213,109 +339,51 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times
if !v.resolved.Less(newResolved) {
return nil
}
initialScanComplete := v.resolved != (hlc.Timestamp{})
v.resolved = newResolved

// NB: Intentionally not stable sort because it shouldn't matter.
sort.Slice(v.buffer, func(i, j int) bool {
return v.buffer[i].updated.Less(v.buffer[j].updated)
})

var lastUpdated hlc.Timestamp
var lastFingerprintedAt hlc.Timestamp
// We apply all the row updates we received in the time window between the last
// resolved timestamp and this one. We process all row updates belonging to a given
// timestamp and then `fingerprint` to ensure the scratch table and the original table
// match.
for len(v.buffer) > 0 {
if v.resolved.Less(v.buffer[0].updated) {
break
}
row := v.buffer[0]
v.buffer = v.buffer[1:]

// If we have already completed the initial scan, verify the fingerprint at
// every point in time. Before the initial scan is complete, the fingerprint
// table might not have the earliest version of every row present in the
// table.
if initialScanComplete {
if row.updated != lastUpdated {
if lastUpdated != (hlc.Timestamp{}) {
if err := v.fingerprint(lastUpdated); err != nil {
return err
}
}
if err := v.fingerprint(row.updated.Prev()); err != nil {
return err
}
// If we've processed all row updates belonging to the previous row's timestamp,
// we fingerprint at `updated.Prev()` since we want to catch cases where one or
// more row updates are missed. For example: If k1 was written at t1, t2, t3 and
// the update for t2 was missed.
if v.previousRowUpdateTs != (hlc.Timestamp{}) && v.previousRowUpdateTs.Less(row.updated) {
if err := v.fingerprint(row.updated.Prev()); err != nil {
return err
}
lastUpdated = row.updated
}

type wrapper struct {
After map[string]interface{} `json:"after"`
}
var value wrapper
if err := gojson.Unmarshal([]byte(row.value), &value); err != nil {
if err := v.applyRowUpdate(row); err != nil {
return err
}

var stmtBuf bytes.Buffer
var args []interface{}
if value.After != nil {
// UPDATE or INSERT
fmt.Fprintf(&stmtBuf, `UPSERT INTO %s (`, v.fprintTable)
for col, colValue := range value.After {
if len(args) != 0 {
stmtBuf.WriteString(`,`)
}
stmtBuf.WriteString(col)
args = append(args, colValue)
}
stmtBuf.WriteString(`) VALUES (`)
for i := range args {
if i != 0 {
stmtBuf.WriteString(`,`)
}
fmt.Fprintf(&stmtBuf, `$%d`, i+1)
}
stmtBuf.WriteString(`)`)

// Also verify that the key matches the value.
primaryKeyDatums := make([]interface{}, len(v.primaryKeyCols))
for idx, primaryKeyCol := range v.primaryKeyCols {
primaryKeyDatums[idx] = value.After[primaryKeyCol]
}
primaryKeyJSON, err := gojson.Marshal(primaryKeyDatums)
if err != nil {
// If any updates have exactly the same timestamp, we have to apply them all
// before fingerprinting.
if len(v.buffer) == 0 || v.buffer[0].updated != row.updated {
lastFingerprintedAt = row.updated
if err := v.fingerprint(row.updated); err != nil {
return err
}
if string(primaryKeyJSON) != row.key {
v.failures = append(v.failures, fmt.Sprintf(
`key %s did not match expected key %s for value %s`, row.key, primaryKeyJSON, row.value))
}
} else {
// DELETE
var primaryKeyDatums []interface{}
if err := gojson.Unmarshal([]byte(row.key), &primaryKeyDatums); err != nil {
return err
}
if len(primaryKeyDatums) != len(v.primaryKeyCols) {
return errors.Errorf(
`expected primary key columns %s got datums %s`, v.primaryKeyCols, primaryKeyDatums)
}
fmt.Fprintf(&stmtBuf, `DELETE FROM %s WHERE `, v.fprintTable)
for i, datum := range primaryKeyDatums {
if len(args) != 0 {
stmtBuf.WriteString(`,`)
}
fmt.Fprintf(&stmtBuf, `%s = $%d`, v.primaryKeyCols[i], i+1)
args = append(args, datum)
}
}
if len(args) > 0 {
if _, err := v.sqlDB.Exec(stmtBuf.String(), args...); err != nil {
return errors.Wrap(err, stmtBuf.String())
}
}
v.previousRowUpdateTs = row.updated
}

if !v.firstRowTimestamp.IsEmpty() && !resolved.Less(v.firstRowTimestamp) {
if !v.firstRowTimestamp.IsEmpty() && !resolved.Less(v.firstRowTimestamp) &&
lastFingerprintedAt != resolved {
return v.fingerprint(resolved)
}
return nil
Expand Down
Loading

0 comments on commit f8a9923

Please sign in to comment.