Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5977] Fix Date to String column schema evolution #8280

Merged
merged 6 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -735,4 +735,52 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
}
}

test("Test DATE to STRING conversions when vectorized reading is not enabled") {
withTempDir { tmp =>
Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType =>
val tableName = generateTableName
val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_1) {
// adding a struct column to force reads to use non-vectorized readers
spark.sql(
s"""
| create table $tableName (
| id int,
| name string,
| price double,
| struct_col struct<f0: int, f1: string>,
| ts long
|) using hudi
| location '$tablePath'
| options (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
| partitioned by (ts)
""".stripMargin)
spark.sql(
s"""
| insert into $tableName
| values (1, 'a1', 10, struct(1, 'f_1'), 1000)
""".stripMargin)
spark.sql(s"select * from $tableName")

spark.sql("set hoodie.schema.on.read.enable=true")
spark.sql(s"alter table $tableName add columns(date_to_string_col date)")
spark.sql(
s"""
| insert into $tableName
| values (2, 'a2', 20, struct(2, 'f_2'), date '2023-03-22', 1001)
""".stripMargin)
spark.sql(s"alter table $tableName alter column date_to_string_col type string")

// struct and string (converted from date) column must be read to ensure that non-vectorized reader is used
// not checking results as we just need to ensure that the table can be read without any errors thrown
spark.sql(s"select * from $tableName")
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val timeZoneId = Option(sqlConf.sessionLocalTimeZone)

(file: PartitionedFile) => {
assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
Expand Down Expand Up @@ -238,7 +239,10 @@ class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
}).toAttributes ++ partitionSchema.toAttributes
val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
if (implicitTypeChangeInfos.containsKey(i)) {
Cast(attr, implicitTypeChangeInfos.get(i).getLeft)
val srcType = implicitTypeChangeInfos.get(i).getRight
val dstType = implicitTypeChangeInfos.get(i).getLeft
val needTimeZone = Cast.needsTimeZone(srcType, dstType)
Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
} else attr
}
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val timeZoneId = Option(sqlConf.sessionLocalTimeZone)

(file: PartitionedFile) => {
assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
Expand Down Expand Up @@ -319,7 +320,10 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
}).toAttributes ++ partitionSchema.toAttributes
val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
if (typeChangeInfos.containsKey(i)) {
Cast(attr, typeChangeInfos.get(i).getLeft)
val srcType = typeChangeInfos.get(i).getRight
val dstType = typeChangeInfos.get(i).getLeft
val needTimeZone = Cast.needsTimeZone(srcType, dstType)
Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
} else attr
}
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class Spark32PlusHoodieParquetFileFormat(private val shouldAppendPartitionValues
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
val timeZoneId = Option(sqlConf.sessionLocalTimeZone)

(file: PartitionedFile) => {
assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
Expand Down Expand Up @@ -374,7 +375,10 @@ class Spark32PlusHoodieParquetFileFormat(private val shouldAppendPartitionValues
}).toAttributes ++ partitionSchema.toAttributes
val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
if (typeChangeInfos.containsKey(i)) {
Cast(attr, typeChangeInfos.get(i).getLeft)
val srcType = typeChangeInfos.get(i).getRight
val dstType = typeChangeInfos.get(i).getLeft
val needTimeZone = Cast.needsTimeZone(srcType, dstType)
Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
} else attr
}
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
Expand Down