[HUDI-6579] Adding support for upsert and deletes with spark datasource for pk less table#9261
Conversation
...spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
Outdated
Show resolved
Hide resolved
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
Outdated
Show resolved
Hide resolved
| private def canRemoveMetaFields(optParams: Map[String, String]) : Boolean = { | ||
| !(optParams.getOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, "false").toBoolean | ||
| || optParams.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY, "false").toBoolean | ||
| || !optParams.containsKey(RECORDKEY_FIELD.key())) |
There was a problem hiding this comment.
For pk-less tables, RECORDKEY_FIELD may not be present and this condition can be true. Do we want to drop the meta fields in such cases? From the comment it seems like for pk-less we don;t want to drop the meta fields.
There was a problem hiding this comment.
not sure I get it. we will drop the meta fields down the line while creating the HoodieRecordPayload.
here we are just trying to gauge if we can go w/ prepped writes or regular non-prepped writes. if incoming df has meta fields and if its pk less table, we go with prepped flow.
There was a problem hiding this comment.
My point was for pk-less this condition would be true and canRemoveMetaFields can return true, isn't it? However, we need the meta fields for pk-less.
!optParams.containsKey(RECORDKEY_FIELD.key()
|
@codope : addressed all comments. |
codope
left a comment
There was a problem hiding this comment.
Can you please look into the CI failures?
| private def canRemoveMetaFields(optParams: Map[String, String]) : Boolean = { | ||
| !(optParams.getOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, "false").toBoolean | ||
| || optParams.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY, "false").toBoolean | ||
| || !optParams.containsKey(RECORDKEY_FIELD.key())) |
There was a problem hiding this comment.
My point was for pk-less this condition would be true and canRemoveMetaFields can return true, isn't it? However, we need the meta fields for pk-less.
!optParams.containsKey(RECORDKEY_FIELD.key()
|
oops, my bad. not sure how I missed. will fix it |
|
@codope : Updated |
codope
left a comment
There was a problem hiding this comment.
Will land once the CI succeeds.
7d85ec6 to
7e7efc7
Compare
7e7efc7 to
3968bf3
Compare
| val df = if (preppedWriteOperation || preppedSparkSqlWrites || preppedSparkSqlMergeInto) { | ||
| sourceDf | ||
| } else { | ||
| sourceDf.drop(HoodieRecord.HOODIE_META_COLUMNS: _*) |
There was a problem hiding this comment.
dropping meta cols here caused a problem with HoodieStreamingSink: when val sourceDF = spark.readStream.format(hudi).load; sourceDF.writeStream.format(hudi).start(), the source DF is a streaming source and dropping metacols failed the assertion of "Queries with streaming sources must be executed with writeStream.start()" because internally Hudi is writing DF as a batch
we either keep df here the same as sourceDF when using sourceDF.writeStream.format(hudi).start() or use sourceDF.writeStream.foreachBatch{}
Change Logs
Adding support for upsert and deletes with spark datasource for pk less table.
Impact
This patch opens up possibility to do updates or delete records with spark datasource writes for a primary key less table.
For eg, if for a primary table, user prefers to delete all records for a given employee.
This is feasible for a pk less table.
Risk level (write none, low medium or high below)
low.
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist