-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Open
Labels
Description
What happened?
The debeziumRecordInstant() method in KafkaConnectUtils throws a NullPointerException when processing Debezium DELETE events because it doesn't handle the case where valueSchema is null.
Steps to Reproduce
- Set up a Debezium pipeline using DebeziumIO.read()
- Perform a DELETE operation on a record in the source database
- The pipeline crashes with a NullPointerException
Expected Behavior
The method should handle DELETE operations gracefully by extracting the timestamp from the sourceOffset metadata, specifically from the ts_usec field.
Actual Behavior
NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Schema.type()" because the return value
of "org.apache.kafka.connect.source.SourceRecord.valueSchema()" is null
at org.apache.beam.io.debezium.KafkaConnectUtils.debeziumRecordInstant(KafkaConnectUtils.java:83)
Root Cause
For DELETE operations, Debezium sets and to null, but the timestamp information is available in sourceOffset.ts_usec: value``valueSchema
SourceRecord{
sourcePartition={server=beam-debezium-connector},
sourceOffset={lsn_proc=42600784, messageType=DELETE, lsn_commit=42600728, lsn=42600784, txId=1073, ts_usec=1772459804615258}
ConnectRecord{..., value=null, valueSchema=null, ...}
}
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner
Reactions are currently unavailable