-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Closed
Labels
area:writerWrite client and core write operationsWrite client and core write operationsissue:data-consistencyData consistency issues (duplicates/phantoms)Data consistency issues (duplicates/phantoms)priority:criticalProduction degraded; pipelines stalledProduction degraded; pipelines stalled
Description
Describe the problem you faced
We use MOR table, we found that when updating an existing set of rows to another partition will result in both a)generate a parquet file b)an update written to a log file. This brings duplicate records
To Reproduce
//action1: spark-dataframe write
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.mutable
val tableName = "f_schedule_test"
val basePath = "oss://datalake-poc/fred/warehouse/dw/f_schedule_test"
val spark = SparkSession.builder.enableHiveSupport.getOrCreate
import spark.implicits._
// spark-shell
val df = Seq(
("1", "10001", "2022-08-30","2022-08-30 12:00:00.000","2022-08-30"),
("2", "10002", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"),
("3", "10003", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"),
("4", "10004", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"),
("5", "10005", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"),
("6", "10006", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30")
).toDF("game_schedule_id", "game_id", "game_date_cn", "insert_date", "dt")
// df.show()
val hudiOptions = mutable.Map(
"hoodie.datasource.write.table.type" -> "MERGE_ON_READ",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.write.recordkey.field" -> "game_schedule_id",
"hoodie.datasource.write.precombine.field" -> "insert_date",
"hoodie.datasource.write.partitionpath.field" -> "dt",
"hoodie.index.type" -> "GLOBAL_BLOOM",
"hoodie.compact.inline" -> "true",
"hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.ComplexKeyGenerator"
)
//step1: insert --no issue
df.write.format("hudi").
options(hudiOptions).
mode(Append).
save(basePath)
//step2: move part data to another partition --no issue
val df1 = spark.sql("select * from dw.f_schedule_test where dt = '2022-08-30'").withColumn("dt",lit("2022-08-31")).limit(3)
df1.write.format("hudi").
options(hudiOptions).
mode(Append).
save(basePath)
//step3: move back --duplicate occurs
//Updating an existing set of rows will result in either a) a companion log/delta file for an existing base parquet file generated from a previous compaction or b) an update written to a log/delta file in case no compaction ever happened for it.
val df2 = spark.sql("select * from dw.f_schedule_test where dt = '2022-08-31'").withColumn("dt",lit("2022-08-30")).limit(3)
df2.write.format("hudi").
options(hudiOptions).
mode(Append).
save(basePath)
Checking scripts:
select * from dw.f_schedule_test where game_schedule_id = 1;
select _hoodie_file_name,count(*) as co from dw.f_schedule_test group by _hoodie_file_name;
Expected behavior
Duplicate records should not occur
Environment Description
-
Hudi version :0.10.1
-
Spark version :3.2.1
-
Hive version :3.1.2
-
Hadoop version :3.2.1
-
Storage (HDFS/S3/GCS..) :OSS
-
Running on Docker? (yes/no) :no
hoodie.zip
Stacktrace
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
area:writerWrite client and core write operationsWrite client and core write operationsissue:data-consistencyData consistency issues (duplicates/phantoms)Data consistency issues (duplicates/phantoms)priority:criticalProduction degraded; pipelines stalledProduction degraded; pipelines stalled
Type
Projects
Status
✅ Done

