Skip to content

[HUDI-7035] Fix CDC Incremental Read When First Write Contains Delete And Upsert#10071

Merged
danny0405 merged 1 commit intoapache:masterfrom
watermelon12138:HUDI-7035
Nov 14, 2023
Merged

[HUDI-7035] Fix CDC Incremental Read When First Write Contains Delete And Upsert#10071
danny0405 merged 1 commit intoapache:masterfrom
watermelon12138:HUDI-7035

Conversation

@watermelon12138
Copy link
Contributor

@watermelon12138 watermelon12138 commented Nov 12, 2023

Change Logs

Incremental read when first write contains delete and upsert will fail. It will return precombined result after fix the issue.
Lated Issue:
https://issues.apache.org/jira/browse/HUDI-7035
#9987
Error Stack Following:
org.apache.hudi.exception.HoodieException: There should be a cdc log file.
at org.apache.hudi.common.table.cdc.HoodieCDCExtractor.parseWriteStat(HoodieCDCExtractor.java:276)
at org.apache.hudi.common.table.cdc.HoodieCDCExtractor.lambda$extractCDCFileSplits$1(HoodieCDCExtractor.java:131)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at org.apache.hudi.common.table.cdc.HoodieCDCExtractor.extractCDCFileSplits(HoodieCDCExtractor.java:126)
at org.apache.hudi.cdc.CDCRelation.buildScan0(CDCRelation.scala:105)
at org.apache.hudi.cdc.CDCRelation.buildScan(CDCRelation.scala:87)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:366)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:400)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:456)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:399)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:366)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:73)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:73)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:514)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:171)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:171)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:164)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:184)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:181)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:177)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:316)
at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:692)
at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:316)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:331)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:285)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:264)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:116)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4232)
at org.apache.spark.sql.Dataset.head(Dataset.scala:3205)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3426)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:286)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:325)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)

Impact

None

Risk level (write none, low medium or high below)

None

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@watermelon12138
Copy link
Contributor Author

@danny0405 Hi, can you help review this code? Thank you.

@watermelon12138
Copy link
Contributor Author

@hudi-bot run azure

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405 danny0405 merged commit 7667af6 into apache:master Nov 14, 2023
nsivabalan pushed a commit to nsivabalan/hudi that referenced this pull request Nov 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:cdc Change data capture area:streaming Streaming operations

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

3 participants