Skip to content

MIT doesn't handle some spark types correctly #16634

@hudi-bot

Description

@hudi-bot

Here is a test
{code:java}
test("Test MergeInto For PreCombineField With Different Types") {
spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
Seq("mor").foreach { tableType =>
val tableName1 = generateTableName
spark.sql(
s"""
| create table $tableName1 (
| id int,
| name string,
| price double,
| v long,
| dt string
| ) using hudi
| tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'v',
| hoodie.compaction.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
| )
| partitioned by(dt)
| location '${tmp.getCanonicalPath}/$tableName1'
""".stripMargin)

  // Insert data; pre-combine field value type is long.
  spark.sql(
    s"""
       | merge into $tableName1 as t0
       | using (
       |  select 1 as id, 'a1' as name, 10 as price, 1001L as v, '2021-03-21' as dt
       | ) as s0
       | on t0.id = s0.id
       | when not matched and s0.id % 2 = 1 then insert *
   """.stripMargin
  )
  checkAnswer(s"select id,name,price,dt,v from $tableName1")(
    Seq(1, "a1", 10, "2021-03-21", 1001)
  )

  // Insert data; pre-combine field value type is short.
spark.sql(
  s"""
     | merge into $tableName1 as t0
     | using (
     |  select 1 as id, 'a1' as name, 12 as price, 1002S as v, '2021-03-21' as dt
     | ) as s0
     | on t0.id = s0.id
     | when matched then update set
     | id = s0.id, name = s0.name, price = s0.price, v = s0.v, dt = s0.dt
     | when not matched then insert *
 """.stripMargin
)
}

})
}{code}
If you run this it will fail. This is because when we convert from spark to avro, short types just become integers. Then, in the expression payload we convert back from avro to spark to evaluate the conditions and assignments. However, we only have the avro schema so we use :
{code:java}
private def getAvroDeserializerFor(schema: Schema) = {
avroDeserializerCache.get()
.get(schema, new Function[Schema, HoodieAvroDeserializer] {
override def apply(t: Schema): HoodieAvroDeserializer =
sparkAdapter.createAvroDeserializer(schema, convertAvroSchemaToStructType(schema))
})
} {code}
We are getting the spark schema from the avro schema, so the field will now be an int in the spark schema. Then, when we try to evaluate the assignment it will fail.
This failure occurs because when it was doing analysis resolution earlier, spark sees that we are trying to assign a short to a long. It adds Cast(short->long) in the assignment expression. When we try to evaluate that assignment, the field type of the data is int. It tries to do the cast, but since the cast is from short to long, the int is casted as a short. This fails because it is an illegal operation

JIRA info

Metadata

Metadata

Assignees

Labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions