Skip to content

Commit

Permalink
changefeedccl: fix bug with avro encoding and zero-scale decimal cols
Browse files Browse the repository at this point in the history
This patch fixes a bug where creating a changefeed that targeted
tables with a `DECIMAL(n)` column (i.e. zero-scale `DECIMAL` column),
`format='avro'`, and `diff` would cause a panic.

The cause of this panic was the fact that the third-party `goavro`
library we use expected the JSON encoding of the schema to have
a numeric `scale` field for decimal types, but we omitted this
field whenever it was zero (using `omitempty`), which led to a
runtime type assertion failure. We've updated the field to a pointer
type in our type definition so that we can distinguish between an
unset value and a zero value.

Release note (enterprise change): Fixed a bug where creating a
changefeed that targeted tables with a `DECIMAL(n)` column
(i.e. zero-scale `DECIMAL` column), `format='avro'`, and `diff`
would cause a panic.
  • Loading branch information
andyyang890 committed Feb 6, 2024
1 parent 715628a commit 907aaa6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ const (
type avroLogicalType struct {
SchemaType avroSchemaType `json:"type"`
LogicalType string `json:"logicalType"`
Precision int `json:"precision,omitempty"`
Scale int `json:"scale,omitempty"`
Precision *int `json:"precision,omitempty"`
Scale *int `json:"scale,omitempty"`
}

type avroArrayType struct {
Expand Down Expand Up @@ -528,8 +528,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
decimalType := avroLogicalType{
SchemaType: avroSchemaBytes,
LogicalType: `decimal`,
Precision: prec,
Scale: width,
Precision: &prec,
Scale: &width,
}
setNullableWithStringFallback(
decimalType,
Expand Down
24 changes: 24 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9247,3 +9247,27 @@ func TestPubsubAttributes(t *testing.T) {

cdcTest(t, testFn, feedTestForceSink("pubsub"))
}

// TestChangefeedAvroDecimalColumnWithDiff is a regression test for
// https://github.com/cockroachdb/cockroach/issues/118647.
func TestChangefeedAvroDecimalColumnWithDiff(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE test1 (c1 INT PRIMARY KEY, c2 INT, c3 DECIMAL(19, 0))`)
sqlDB.Exec(t, `INSERT INTO test1 VALUES (1, 2, 3);`)

schemaReg := cdctest.StartTestSchemaRegistry()
defer schemaReg.Close()
str := fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE test1 WITH OPTIONS (avro_schema_prefix = 'crdb_cdc_', diff, confluent_schema_registry ="%s", format = 'avro', on_error = 'pause', updated);`, schemaReg.URL())
testFeed := feed(t, f, str)
defer closeFeed(t, testFeed)

_, ok := testFeed.(cdctest.EnterpriseTestFeed)
require.True(t, ok)
}

cdcTest(t, testFn, feedTestForceSink("kafka"))
}

0 comments on commit 907aaa6

Please sign in to comment.