Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5759] Supports add column on mor table with log #7915

Conversation

qidian99
Copy link
Contributor

@qidian99 qidian99 commented Feb 10, 2023

Change Logs

Hudi do not support add column on mor table with log when extractPartitionValuesFromPartitionPath is turned on.

If partition pruning is enable, it will pruned out default values from avro schema, which is due to incorrect conversion logic in SchemaConverters::toAvroType. This PR fixed the issue by adding a default null value when converting StructType to Avro FieldType.

Thus, HoodieBaseRelation::convertToAvroSchema has the correct behavior and the mor table is queryable.

Impact

None.

Risk level (write none, low medium or high below)

Low.

Documentation Update

None.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@stream2000
Copy link
Contributor

Could you please add some uts?

@stream2000
Copy link
Contributor

@alexeykudinkin @xiarixiaoyao @leesf Could you please help review this pr?

@qidian99
Copy link
Contributor Author

@stream2000 Thanks for providing UT test case. As per our communication, an UT is added which would fail before but pass now.

@qidian99 qidian99 force-pushed the issue/#5759_support_add_column_on_mor_table_with_log branch from c520ca4 to 3609b74 Compare February 10, 2023 03:21
@qidian99
Copy link
Contributor Author

Commits squashed.

@@ -204,4 +204,48 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
}
})
}

test("Test Add Column and Update Table") {
withTempDir { tmp =>
Copy link
Contributor

@xiarixiaoyao xiarixiaoyao Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qidian99
thanks for your contribution
I ran this UT directly in the master branch, expected to fail but finally succeeded
could you pls check your UT thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the timely reply. I changed the UT to manually set partition pruning to true.

@stream2000 and I both tested on master branch and the test will fail
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qidian99 can you please paste the whole stacktrace? Would like to understand better what exactly is failing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you pasted the stacktrace failing when you query your data via server. Can you please paste the stacktrace of this particular test failing?

I want to better understand which operation is failing in this test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qidian99 only non-partitioned tables has this problem?

@qidian99 qidian99 force-pushed the issue/#5759_support_add_column_on_mor_table_with_log branch from 3609b74 to 7f2456c Compare February 10, 2023 04:06
@xiarixiaoyao
Copy link
Contributor

@qidian99
could you pls check the compile fail. thanks

@xiarixiaoyao xiarixiaoyao self-assigned this Feb 10, 2023
} else {
fieldBuilder.noDefault()
}

fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to remove this line now, right?

@@ -202,6 +202,13 @@ private[sql] object SchemaConverters {
st.foreach { f =>
val fieldAvroType =
toAvroType(f.dataType, f.nullable, f.name, childNameSpace)
val fieldBuilder = fieldsAssembler.name(f.name).`type`(fieldAvroType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is actually borrowed from Spark, and we try to avoid any changes to such code to make sure we're not diverging from Spark

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
When extractPartitionValuesFromPartitionPath is turned on, the StructType schema and AvroSchema differs. convertToAvroSchema is missing the default value when the field is nullable, making the table not queryable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think i understand why you believe this is an appropriate fix for the issue you're observing:

  • Spark's schemas don't have defaults at all
  • In case Avro schema's field is nullable doesn't entail that it should have null as default value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what i understand so far the issue is not in the conversion, but in the fact that we're not handling schema evolution properly in HoodieAvroDataBlock -- whenever we decode a record from an existing data block we should make sure that any nullable field has actually null as default value so that Avro reader is able to decode the data in case this particular field is not present

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @alexeykudinkin , we should not change the code of SchemaConverters.scala, this is the bug of logscanner

@@ -204,4 +204,48 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
}
})
}

test("Test Add Column and Update Table") {
withTempDir { tmp =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qidian99 can you please paste the whole stacktrace? Would like to understand better what exactly is failing

@danny0405 danny0405 added writer-core Issues relating to core transactions/write actions spark Issues related to spark labels Feb 11, 2023
@qidian99
Copy link
Contributor Author

Here's the stacktrace when I tried to add a column named new_col1 in mor table:


Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:394)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:421)
	at org.apache.spark.sql.execution.HiveResult$.hiveResultString(HiveResult.scala:76)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.$anonfun$run$2(SparkSQLDriver.scala:69)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:69)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:287)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file 
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:352)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:324)
	at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:378)
	at org.apache.hudi.HoodieMergeOnReadRDD$LogFileIterator.<init>(HoodieMergeOnReadRDD.scala:173)
	at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:93)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroTypeException: Found record, expecting Record, missing required field new_col1
	at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
	at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
	at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:239)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
	at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:207)
	at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:144)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:382)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:464)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:343)
	... 23 more

@alexeykudinkin

@qidian99 qidian99 force-pushed the issue/#5759_support_add_column_on_mor_table_with_log branch from 7f2456c to 6b3cafb Compare February 13, 2023 02:53
@qidian99 qidian99 force-pushed the issue/#5759_support_add_column_on_mor_table_with_log branch from 6b3cafb to 52ff32a Compare February 13, 2023 04:31
@xiarixiaoyao
Copy link
Contributor

@qidian99
could you pls help me reproduce this problems. thanks
i use the latest master branch
test on spark3.2 and spark3.3 everythings is ok.

@stream2000
Copy link
Contributor

@qidian99 could you pls help me reproduce this problems. thanks i use the latest master branch test on spark3.2 and spark3.3 everythings is ok.

Hi, I have updated the cases so that we can reproduce the error in the master branch. The key change is that producing the log before adding the column

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@stream2000
Copy link
Contributor

@xiarixiaoyao Could we close this PR since #8026 is merged?
cc @qidian99

@xiarixiaoyao
Copy link
Contributor

@xiarixiaoyao Could we close this PR since #8026 is merged? cc @qidian99

yes,
@qidian99 @stream2000
Thank you again for your contribution

@bvaradar bvaradar closed this Mar 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
spark Issues related to spark writer-core Issues relating to core transactions/write actions
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

7 participants