Skip to content

Commit

Permalink
changefeedccl: add nil checking for avroDataRecord.refreshTypeMetadata
Browse files Browse the repository at this point in the history
Previously, the avro encoder could call `refreshTypeMetadata` on
`avroDataRecord` without proper nil checking. This could lead to node panics
because `avroDataRecord` could sometimes be nil. For example,
`registered.schema.after` is set only when using the wrapped envelope. Thus,
avro encoder could lead to panics when using with other envelope formats. This
patch addresses this issue by adding a defensive nil check when invoking
`refreshTypeMetadata`.

Fixes: #119428
Release note: None
  • Loading branch information
wenyihu6 committed Feb 26, 2024
1 parent a2260aa commit 23d44c9
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,9 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
// Refresh the metadata for user-defined types on a cached schema
// The only user-defined type is enum, so this is usually a no-op.
func (r *avroDataRecord) refreshTypeMetadata(row cdcevent.Row) error {
if r == nil {
return nil
}
return row.ForEachUDTColumn().Col(func(col cdcevent.ResultColumn) error {
if fieldIdx, ok := r.fieldIdxByName[col.Name]; ok {
r.Fields[fieldIdx].typ = col.Typ
Expand Down
63 changes: 63 additions & 0 deletions pkg/ccl/changefeedccl/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1224,3 +1224,66 @@ func TestJsonRountrip(t *testing.T) {
})
}
}

// TestAvroWithRegionalTable tests how the avro encoder works with regional
// tables and with different envelope formats. This is a regression test for
// #119428.
func TestAvroWithRegionalTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
tests := []struct {
format string
payload []string
}{
{
format: "wrapped",
payload: []string{
`table1: {"a":{"long":1},"crdb_region":{"string":"us-east1"}}->{"after":{"table1":{"a":{"long":1},"crdb_region":{"string":"us-east1"}}}}`,
`table1: {"a":{"long":2},"crdb_region":{"string":"us-east1"}}->{"after":{"table1":{"a":{"long":2},"crdb_region":{"string":"us-east1"}}}}`,
`table1: {"a":{"long":3},"crdb_region":{"string":"us-east1"}}->{"after":{"table1":{"a":{"long":3},"crdb_region":{"string":"us-east1"}}}}`,
},
},
{
format: "bare",
payload: []string{
`table1: {"a":{"long":1},"crdb_region":{"string":"us-east1"}}->{"record":{"table1":{"a":{"long":1},"crdb_region":{"string":"us-east1"}}}}`,
`table1: {"a":{"long":2},"crdb_region":{"string":"us-east1"}}->{"record":{"table1":{"a":{"long":2},"crdb_region":{"string":"us-east1"}}}}`,
`table1: {"a":{"long":3},"crdb_region":{"string":"us-east1"}}->{"record":{"table1":{"a":{"long":3},"crdb_region":{"string":"us-east1"}}}}`,
},
},
{
format: "key_only",
payload: []string{
`table1: {"a":{"long":1},"crdb_region":{"string":"us-east1"}}->`,
`table1: {"a":{"long":2},"crdb_region":{"string":"us-east1"}}->`,
`table1: {"a":{"long":3},"crdb_region":{"string":"us-east1"}}->`,
},
},
}
for _, test := range tests {
t.Run(test.format, func(t *testing.T) {
cluster, db, cleanup := startTestCluster(t)
defer cleanup()
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE table1 (a INT PRIMARY KEY) LOCALITY REGIONAL BY ROW`)
schemaReg := cdctest.StartTestSchemaRegistry()
defer schemaReg.Close()
stmt := fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE table1 WITH format = avro, envelope = %s,
confluent_schema_registry = "%s", schema_change_events = column_changes, schema_change_policy = nobackfill`,
test.format, schemaReg.URL())

f := makeKafkaFeedFactory(cluster, db)
testFeed := feed(t, f, stmt)
defer closeFeed(t, testFeed)

sqlDB.Exec(t, `INSERT INTO table1(a) values(1)`)
sqlDB.Exec(t, `INSERT INTO table1(a) values(2)`)
sqlDB.Exec(t, `INSERT INTO table1(a) values(3)`)
assertPayloads(t, testFeed, test.payload)
})
}
}
cdcTest(t, testFn)
}

0 comments on commit 23d44c9

Please sign in to comment.