Skip to content

INLINE blob + Lance basefile format write fail #18602

@voonhous

Description

@voonhous

Bug Description

What happened:

What you expected:

Steps to reproduce:

test("Test Query Log Only MOR Table With BLOB INLINE column triggers compaction (Lance)") {
    assume(System.getProperty("lance.skip.tests") != "true",
      "Lance tests disabled via -Dlance.skip.tests=true")
    // Lance writer has no BLOB handling today (RFC-100 Phase 2). Expected to fail
    // until support lands in HoodieSparkLanceWriter; this test pins the gap.

    withRecordType()(withTempDir { tmp =>
      val tablePath = new File(tmp, "hudi").getCanonicalPath
      val tableName = generateTableName
      spark.sql(
        s"""
           |create table $tableName (
           |  id int,
           |  data blob,
           |  ts long
           |) using hudi
           | location '$tablePath'
           | tblproperties (
           |  primaryKey = 'id',
           |  type = 'mor',
           |  preCombineField = 'ts',
           |  hoodie.index.type = 'INMEMORY',
           |  hoodie.compact.inline = 'true',
           |  hoodie.clean.commits.retained = '1',
           |  'hoodie.table.base.file.format' = 'LANCE'
           | )
       """.stripMargin)

      // Verify the LANCE config was actually persisted to hoodie.properties.
      assertResult(HoodieFileFormat.LANCE)(
        createMetaClient(spark, tablePath).getTableConfig.getBaseFileFormat)

      spark.sql(s"insert into $tableName values (1, ${inlineBlobLiteral("01")}, 1000)")
      spark.sql(s"insert into $tableName values (2, ${inlineBlobLiteral("02")}, 1000)")
      spark.sql(s"insert into $tableName values (3, ${inlineBlobLiteral("03")}, 1000)")
      assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath))

      spark.sql(
        s"""
           |merge into $tableName h0
           |using (
           |  select 1 as id, ${inlineBlobLiteral("11")} as data, 1001L as ts
           |) s0
           | on h0.id = s0.id
           | when matched then update set *
           |""".stripMargin)
      assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath))

      spark.sql(
        s"""
           |merge into $tableName h0
           |using (
           |  select 4 as id, ${inlineBlobLiteral("04")} as data, 1000L as ts
           |) s0
           | on h0.id = s0.id
           | when not matched then insert *
           |""".stripMargin)

      assertResult(false)(DataSourceTestUtils.isLogFileOnly(tablePath))

      val bytesById = spark.sql(
        s"select id, read_blob(data) as bytes from $tableName order by id"
      ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap
      assertResult(4)(bytesById.size)
      assert(bytesById(1).sameElements(Array(0x11.toByte)))
      assert(bytesById(2).sameElements(Array(0x02.toByte)))
      assert(bytesById(3).sameElements(Array(0x03.toByte)))
      assert(bytesById(4).sameElements(Array(0x04.toByte)))

      spark.sql(s"select id, data from $tableName order by id").collect().foreach { row =>
        val blob = row.getStruct(1)
        assertResult("INLINE")(blob.getString(blob.fieldIndex(HoodieSchema.Blob.TYPE)))
        assert(!blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)))
        assert(blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE)))
      }

      val blobField = spark.table(tableName).schema.find(_.name == "data").get
      assert(blobField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD),
        s"Expected BLOB type metadata on data field after compaction, " +
          s"got: ${blobField.metadata}")
      assertResult(HoodieSchemaType.BLOB.name())(
        blobField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))

      // 6th commit drives an auto-clean that retires the now-superseded log-only slice.
      spark.sql(
        s"""
           |merge into $tableName h0
           |using (
           |  select 2 as id, ${inlineBlobLiteral("22")} as data, 1002L as ts
           |) s0
           | on h0.id = s0.id
           | when matched then update set *
           |""".stripMargin)
      val updatedBytesById = spark.sql(
        s"select id, read_blob(data) as bytes from $tableName order by id"
      ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap
      assert(updatedBytesById(2).sameElements(Array(0x22.toByte)))

      val metaClient = createMetaClient(spark, tablePath)
      metaClient.reloadActiveTimeline()
      assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0,
        "Expected at least one .clean instant on the timeline after compaction")
    })
  }

Environment

Hudi version:
Query engine: (Spark/Flink/Trino etc)
Relevant configs:

Logs and Stack Trace

blob.isNullAt(blob.fieldIndex("reference")) was false
ScalaTestFailureLocation: org.apache.spark.sql.hudi.dml.schema.TestBlobDataType at (TestBlobDataType.scala:358)
org.scalatest.exceptions.TestFailedException: blob.isNullAt(blob.fieldIndex("reference")) was false
	at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
	at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
	at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
	at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
	at org.apache.spark.sql.hudi.dml.schema.TestBlobDataType.$anonfun$new$19(TestBlobDataType.scala:358)
	at scala.collection.ArrayOps$.foreach$extension(ArrayOps.scala:1323)
	at org.apache.spark.sql.hudi.dml.schema.TestBlobDataType.$anonfun$new$17(TestBlobDataType.scala:354)
	at org.apache.spark.sql.hudi.dml.schema.TestBlobDataType.$anonfun$new$17$adapted(TestBlobDataType.scala:291)
	at org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.withTempDir(HoodieSparkSqlTestBase.scala:102)
	at org.apache.spark.sql.hudi.dml.schema.TestBlobDataType.$anonfun$new$16(TestBlobDataType.scala:291)
	at org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.$anonfun$withRecordType$3(HoodieSparkSqlTestBase.scala:375)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.withSQLConf(HoodieSparkSqlTestBase.scala:326)
	at org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.$anonfun$withRecordType$1(HoodieSparkSqlTestBase.scala:374)
	at org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.$anonfun$withRecordType$1$adapted(HoodieSparkSqlTestBase.scala:366)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.withRecordType(HoodieSparkSqlTestBase.scala:366)
	at org.apache.spark.sql.hudi.dml.schema.TestBlobDataType.$anonfun$new$15(TestBlobDataType.scala:291)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.$anonfun$test$1(HoodieSparkSqlTestBase.scala:114)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
	at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
	at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
	at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1562)
	at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181)
	at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1562)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231)
	at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
	at org.scalatest.Suite.run(Suite.scala:1112)
	at org.scalatest.Suite.run$(Suite.scala:1094)
	at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236)
	at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235)
	at org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.org$scalatest$BeforeAndAfterAll$$super$run(HoodieSparkSqlTestBase.scala:58)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.run(HoodieSparkSqlTestBase.scala:58)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)

Metadata

Metadata

Assignees

Labels

type:bugBug reports and fixes

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions