Skip to content

Commit

Permalink
[HUDI-5977] Fix Date to String column schema evolution (#8280)
Browse files Browse the repository at this point in the history
  • Loading branch information
voonhous committed Mar 28, 2023
1 parent 192ee43 commit 04ec593
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -735,4 +735,52 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
}
}

test("Test DATE to STRING conversions when vectorized reading is not enabled") {
withTempDir { tmp =>
Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType =>
val tableName = generateTableName
val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_1) {
// adding a struct column to force reads to use non-vectorized readers
spark.sql(
s"""
| create table $tableName (
| id int,
| name string,
| price double,
| struct_col struct<f0: int, f1: string>,
| ts long
|) using hudi
| location '$tablePath'
| options (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
| partitioned by (ts)
""".stripMargin)
spark.sql(
s"""
| insert into $tableName
| values (1, 'a1', 10, struct(1, 'f_1'), 1000)
""".stripMargin)
spark.sql(s"select * from $tableName")

spark.sql("set hoodie.schema.on.read.enable=true")
spark.sql(s"alter table $tableName add columns(date_to_string_col date)")
spark.sql(
s"""
| insert into $tableName
| values (2, 'a2', 20, struct(2, 'f_2'), date '2023-03-22', 1001)
""".stripMargin)
spark.sql(s"alter table $tableName alter column date_to_string_col type string")

// struct and string (converted from date) column must be read to ensure that non-vectorized reader is used
// not checking results as we just need to ensure that the table can be read without any errors thrown
spark.sql(s"select * from $tableName")
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val timeZoneId = Option(sqlConf.sessionLocalTimeZone)

(file: PartitionedFile) => {
assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
Expand Down Expand Up @@ -238,7 +239,10 @@ class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
}).toAttributes ++ partitionSchema.toAttributes
val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
if (implicitTypeChangeInfos.containsKey(i)) {
Cast(attr, implicitTypeChangeInfos.get(i).getLeft)
val srcType = implicitTypeChangeInfos.get(i).getRight
val dstType = implicitTypeChangeInfos.get(i).getLeft
val needTimeZone = Cast.needsTimeZone(srcType, dstType)
Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
} else attr
}
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val timeZoneId = Option(sqlConf.sessionLocalTimeZone)

(file: PartitionedFile) => {
assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
Expand Down Expand Up @@ -319,7 +320,10 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
}).toAttributes ++ partitionSchema.toAttributes
val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
if (typeChangeInfos.containsKey(i)) {
Cast(attr, typeChangeInfos.get(i).getLeft)
val srcType = typeChangeInfos.get(i).getRight
val dstType = typeChangeInfos.get(i).getLeft
val needTimeZone = Cast.needsTimeZone(srcType, dstType)
Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
} else attr
}
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class Spark32PlusHoodieParquetFileFormat(private val shouldAppendPartitionValues
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
val timeZoneId = Option(sqlConf.sessionLocalTimeZone)

(file: PartitionedFile) => {
assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
Expand Down Expand Up @@ -374,7 +375,10 @@ class Spark32PlusHoodieParquetFileFormat(private val shouldAppendPartitionValues
}).toAttributes ++ partitionSchema.toAttributes
val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
if (typeChangeInfos.containsKey(i)) {
Cast(attr, typeChangeInfos.get(i).getLeft)
val srcType = typeChangeInfos.get(i).getRight
val dstType = typeChangeInfos.get(i).getLeft
val needTimeZone = Cast.needsTimeZone(srcType, dstType)
Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
} else attr
}
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
Expand Down

0 comments on commit 04ec593

Please sign in to comment.