Skip to content

MERGE-INTO/UPDATE blob write error #18562

@voonhous

Description

@voonhous

Bug Description

What happened:

What you expected:

Steps to reproduce:

MERGE-INTO

test("Test MergeInto preserves BLOB custom-type metadata") {
    withTempDir { tmp =>
      val tableName = generateTableName
      spark.sql(
        s"""
           |create table $tableName (
           |  id bigint,
           |  payload BLOB
           |) using hudi
           | location '${tmp.getCanonicalPath}/$tableName'
           | tblproperties (
           |  type = 'cow',
           |  primaryKey = 'id'
           | )
         """.stripMargin)

      spark.sql(
        s"""
           |insert into $tableName values
           |  (1, named_struct(
           |        'type', 'INLINE',
           |        'data', cast(X'010203' as binary),
           |        'reference', cast(null as struct<external_path:string,offset:bigint,length:bigint,managed:boolean>)))
           """.stripMargin)

      // MERGE exercises both NOT MATCHED (new row with BLOB literal) and MATCHED
      // (UPDATE SET on the BLOB column). Without the metadata re-attach in
      // MergeIntoHoodieTableCommand, the Avro schema-compat check throws
      // MISSING_UNION_BRANCH.
      spark.sql(
        s"""
           |merge into $tableName t
           |using (
           |  select 1L as id, named_struct(
           |    'type', 'INLINE',
           |    'data', cast(X'0A0B0C' as binary),
           |    'reference', cast(null as struct<external_path:string,offset:bigint,length:bigint,managed:boolean>)) as payload
           |  union all
           |  select 2L as id, named_struct(
           |    'type', 'INLINE',
           |    'data', cast(X'0D0E0F' as binary),
           |    'reference', cast(null as struct<external_path:string,offset:bigint,length:bigint,managed:boolean>)) as payload
           |) s
           |on t.id = s.id
           |when matched then update set t.payload = s.payload
           |when not matched then insert (id, payload) values (s.id, s.payload)
           """.stripMargin)
    }
  }

UPDATE

test("Test UPDATE on BLOB column preserves custom-type metadata") {
    withTempDir { tmp =>
      val tableName = generateTableName
      spark.sql(
        s"""
           |create table $tableName (
           |  id bigint,
           |  payload BLOB
           |) using hudi
           | location '${tmp.getCanonicalPath}/$tableName'
           | tblproperties (
           |  type = 'cow',
           |  primaryKey = 'id'
           | )
         """.stripMargin)

      spark.sql(
        s"""
           |insert into $tableName values
           |  (1, named_struct(
           |        'type', 'INLINE',
           |        'data', cast(X'010203' as binary),
           |        'reference', cast(null as struct<external_path:string,offset:bigint,length:bigint,managed:boolean>)))
           """.stripMargin)

      // Assigning a BLOB column goes through castIfNeeded; without the metadata
      // re-attach it would fail schema compat with MISSING_UNION_BRANCH.
      spark.sql(
        s"""
           |update $tableName
           |set payload = named_struct(
           |  'type', 'INLINE',
           |  'data', cast(X'040506' as binary),
           |  'reference', cast(null as struct<external_path:string,offset:bigint,length:bigint,managed:boolean>))
           |where id = 1
           """.stripMargin)
    }
  }

Environment

Hudi version:
Query engine: (Spark/Flink/Trino etc)
Spark3.5 Scala2.12
Relevant configs:

Logs and Stack Trace

Schema validation backwards compatibility check failed with the following issues: {TYPE_MISMATCH: reader type 'LONG' not compatible with writer type 'NULL' for field 'htestmergeintotable_1_record.id'}
writerSchema: {"type":"record","name":"htestmergeintotable_1_record","namespace":"hoodie.htestmergeintotable_1","fields":[{"name":"id","type":["null","long"],"default":null},{"name":"payload","type":["null",{"type":"record","name":"blob","namespace":"","fields":[{"name":"type","type":{"type":"enum","name":"blob_storage_type","symbols":["INLINE","OUT_OF_LINE"]}},{"name":"data","type":["null","bytes"],"default":null},{"name":"reference","type":["null",{"type":"record","name":"reference","fields":[{"name":"external_path","type":"string"},{"name":"offset","type":["null","long"]},{"name":"length","type":["null","long"]},{"name":"managed","type":"boolean"}]}],"default":null}],"logicalType":"blob"}],"default":null}]}
tableSchema: {"type":"record","name":"htestmergeintotable_1_record","namespace":"hoodie.htestmergeintotable_1","fields":[{"name":"id","type":"long"},{"name":"payload","type":["null",{"type":"record","name":"blob","namespace":"","fields":[{"name":"type","type":{"type":"enum","name":"blob_storage_type","symbols":["INLINE","OUT_OF_LINE"]}},{"name":"data","type":["null","bytes"],"default":null},{"name":"reference","type":["null",{"type":"record","name":"reference","fields":[{"name":"external_path","type":"string"},{"name":"offset","type":["null","long"]},{"name":"length","type":["null","long"]},{"name":"managed","type":"boolean"}]}],"default":null}],"logicalType":"blob"}],"default":null}]}
org.apache.hudi.exception.SchemaBackwardsCompatibilityException: Schema validation backwards compatibility check failed with the following issues: {TYPE_MISMATCH: reader type 'LONG' not compatible with writer type 'NULL' for field 'htestmergeintotable_1_record.id'}
writerSchema: {"type":"record","name":"htestmergeintotable_1_record","namespace":"hoodie.htestmergeintotable_1","fields":[{"name":"id","type":["null","long"],"default":null},{"name":"payload","type":["null",{"type":"record","name":"blob","namespace":"","fields":[{"name":"type","type":{"type":"enum","name":"blob_storage_type","symbols":["INLINE","OUT_OF_LINE"]}},{"name":"data","type":["null","bytes"],"default":null},{"name":"reference","type":["null",{"type":"record","name":"reference","fields":[{"name":"external_path","type":"string"},{"name":"offset","type":["null","long"]},{"name":"length","type":["null","long"]},{"name":"managed","type":"boolean"}]}],"default":null}],"logicalType":"blob"}],"default":null}]}
tableSchema: {"type":"record","name":"htestmergeintotable_1_record","namespace":"hoodie.htestmergeintotable_1","fields":[{"name":"id","type":"long"},{"name":"payload","type":["null",{"type":"record","name":"blob","namespace":"","fields":[{"name":"type","type":{"type":"enum","name":"blob_storage_type","symbols":["INLINE","OUT_OF_LINE"]}},{"name":"data","type":["null","bytes"],"default":null},{"name":"reference","type":["null",{"type":"record","name":"reference","fields":[{"name":"external_path","type":"string"},{"name":"offset","type":["null","long"]},{"name":"length","type":["null","long"]},{"name":"managed","type":"boolean"}]}],"default":null}],"logicalType":"blob"}],"default":null}]}
	at org.apache.hudi.common.schema.HoodieSchemaCompatibility.checkValidEvolution(HoodieSchemaCompatibility.java:156)
	at org.apache.hudi.HoodieSchemaUtils$.deduceWriterSchemaWithoutReconcile(HoodieSchemaUtils.scala:180)
	at org.apache.hudi.HoodieSchemaUtils$.deduceWriterSchema(HoodieSchemaUtils.scala:147)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:469)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.$anonfun$write$1(HoodieSparkSqlWriter.scala:187)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:205)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:127)
	at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:122)
	at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:73)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:691)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
	at org.apache.spark.sql.hudi.dml.others.TestMergeIntoTable.$anonfun$new$110(TestMergeIntoTable.scala:1865)
	at org.apache.spark.sql.hudi.dml.others.TestMergeIntoTable.$anonfun$new$110$adapted(TestMergeIntoTable.scala:1848)
	at org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.withTempDir(HoodieSparkSqlTestBase.scala:102)
	at org.apache.spark.sql.hudi.dml.others.TestMergeIntoTable.$anonfun$new$109(TestMergeIntoTable.scala:1848)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.$anonfun$test$1(HoodieSparkSqlTestBase.scala:114)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
	at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
	at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
	at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1562)
	at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181)
	at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1562)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231)
	at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
	at org.scalatest.Suite.run(Suite.scala:1112)
	at org.scalatest.Suite.run$(Suite.scala:1094)
	at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236)
	at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235)
	at org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.org$scalatest$BeforeAndAfterAll$$super$run(HoodieSparkSqlTestBase.scala:58)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.run(HoodieSparkSqlTestBase.scala:58)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1314)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1308)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1308)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1474)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)

Metadata

Metadata

Assignees

Labels

type:bugBug reports and fixes

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions