Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22109][SQL][BRANCH-2.2] Resolves type conflicts between strings and timestamps in partition column #19333

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ object PartitioningUtils {
"root directory of the table. If there are multiple root directories, " +
"please load them separately and then union them.")

val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, timeZone)

// Creates the StructType which represents the partition columns.
val fields = {
Expand Down Expand Up @@ -322,7 +322,8 @@ object PartitioningUtils {
* }}}
*/
def resolvePartitions(
pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
pathsWithPartitionValues: Seq[(Path, PartitionValues)],
timeZone: TimeZone): Seq[PartitionValues] = {
if (pathsWithPartitionValues.isEmpty) {
Seq.empty
} else {
Expand All @@ -337,7 +338,7 @@ object PartitioningUtils {
val values = pathsWithPartitionValues.map(_._2)
val columnCount = values.head.columnNames.size
val resolvedValues = (0 until columnCount).map { i =>
resolveTypeConflicts(values.map(_.literals(i)))
resolveTypeConflicts(values.map(_.literals(i)), timeZone)
}

// Fills resolved literals back to each partition
Expand Down Expand Up @@ -474,15 +475,15 @@ object PartitioningUtils {
* Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
* types.
*/
private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = {
private def resolveTypeConflicts(literals: Seq[Literal], timeZone: TimeZone): Seq[Literal] = {
val desiredType = {
val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
// Falls back to string if all values of this column are null or empty string
if (topType == NullType) StringType else topType
}

literals.map { case l @ Literal(_, dataType) =>
Literal.create(Cast(l, desiredType).eval(), desiredType)
Literal.create(Cast(l, desiredType, Some(timeZone.getID)).eval(), desiredType)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1022,4 +1022,16 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}
}

test("SPARK-22109: Resolve type conflicts between strings and timestamps in partition column") {
val df = Seq(
(1, "2015-01-01 00:00:00"),
(2, "2014-01-01 00:00:00"),
(3, "blah")).toDF("i", "str")

withTempPath { path =>
df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath)
checkAnswer(spark.read.load(path.getAbsolutePath), df)
}
}
}