test(s"test delete with primary key, partial-update, remove-record-on-delete, lookup") {
spark.sql(s"""
|CREATE TABLE T (item_id INT NOT NULL, sku_id INT NOT NULL)
|TBLPROPERTIES ('primary-key' = 'item_id', 'bucket' = '4',
|'merge-engine' = 'partial-update',
|'partial-update.remove-record-on-delete' = 'true',
|'changelog-producer' = 'lookup')
|""".stripMargin)
spark.sql("INSERT INTO T VALUES (1, 2), (3, 4)")
spark.sql("DELETE FROM T WHERE item_id = 3")
val rows1 = spark.sql("SELECT * FROM T").collectAsList()
assertThat(rows1.toString).isEqualTo("[[1,2]]")
val rows2 = spark.sql("SELECT * FROM `T$audit_log`").collectAsList()
assertThat(rows2.toString).isEqualTo("[[+I,1,2], [-D,3,4]]")
}
Search before asking
Paimon version
release-1.2
Compute Engine
spark 3.4.2
flink 1.17.0
Minimal reproduce step
org.apache.paimon.spark.sql.DeleteFromTableTestBase
What doesn't meet your expectations?
When deleting rows from a primary key table via Spark-SQL (
DELETE FROM), theaudit_logdoes not record the-Doperation. However, the same operation in Flink correctly logs-D.Anything else?
reference to #3294
vs flink:
org.apache.paimon.flink.PartialUpdateITCase#testRemoveRecordOnDeleteLookupAre you willing to submit a PR?