Skip to content

[SUPPORT] Handling of DELETE operation using Debezium Kafka connector #10181

@seethb

Description

@seethb

Hi I am doing CRUD operation in YugabyteDB and my changes are tracked like below thru my Debezium Kafka connector config.

INSERT operation:

"before_id_value":null
"after_id_value":1000
"op": "c"

UPDATE operation:

"before_id_value":null
"after_id_value":1000
"op": "u"

DELETE operation
"before_id_value":1000
"after_id_value":null
"op": "d"

The HoodieSink connector perfectly handles Insert and update operations with after_id_value. it fails when i do DELETE operation as It will carry NULL value in the after_id_value. In Delete operation we will get before_id only. How do we dynamically assign this or is there any other way that we have to handle NULL values in Hoodie Kafka sink.

            "hoodie.datasource.write.recordkey.field": "after_id_value",
            "hoodie.datasource.write.partitionpath.field": "after_product_id_value",

My Kafka connector config below.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8085/connectors/ -d '{
"name": "hudi-mors-reviews",
"config": {
"bootstrap.servers": "localhost:9092",
"connector.class": "org.apache.hudi.connect.HoodieSinkConnector",
"tasks.max": "1",
"topics": "review1.public.reviews",
"hoodie.table.name": "public-mors-review-table",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "false",
"hoodie.table.type": "MERGE_ON_READ",
"hoodie.base.path": "file:///tmp/hoodie/public-mors-review-table",
"hoodie.datasource.write.recordkey.field": "after_id_value",
"hoodie.datasource.write.partitionpath.field": "after_product_id_value",
"hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
"hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8081/subjects/review1.public.reviews-value/versions/latest",
"hoodie.kafka.commit.interval.secs": 60
}
}'

getting below error

[2023-11-27 08:31:47,194] WARN [hudi-mors-baltest|task-0] Error received while writing records for transaction 20231127083047010 in partition 0 (org.apache.hudi.connect.transaction.ConnectTransactionParticipant:238)
org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "after_sno_value" cannot be null or empty.
at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:146)
at org.apache.hudi.keygen.SimpleAvroKeyGenerator.getRecordKey(SimpleAvroKeyGenerator.java:50)
at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:69)
at org.apache.hudi.connect.writers.AbstractConnectWriter.writeRecord(AbstractConnectWriter.java:85)
at org.apache.hudi.connect.transaction.ConnectTransactionParticipant.writeRecords(ConnectTransactionParticipant.java:219)
at org.apache.hudi.connect.transaction.ConnectTransactionParticipant.processRecords(ConnectTransactionParticipant.java:137)
at org.apache.hudi.connect.HoodieSinkTask.put(HoodieSinkTask.java:114)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[2023-11-27 08:31:47,194] WARN [hudi-mors-baltest|task-0] Error received while writing records for transaction 20231127083047010 in partition 0 (org.apache.hudi.connect.transaction.ConnectTransactionParticipant:238)

Environment Description

  • Hudi version : 0.13.1

  • Spark version : 3.2

  • Hive version : N/A

  • Hadoop version : N/A

  • Storage (HDFS/S3/GCS..) : Local Storage /tmp

  • Running on Docker? (yes/no) : No

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:cdcChange data capturearea:ingestIngestion into Hudipriority:highSignificant impact; potential bugs

    Type

    No type
    No fields configured for issues without a type.

    Projects

    Status

    ✅ Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions