Skip to content

Commit

Permalink
[SPARK-22109][SQL][BRANCH-2.2] Resolves type conflicts between string…
Browse files Browse the repository at this point in the history
…s and timestamps in partition column

## What changes were proposed in this pull request?

This PR backports 04975a6 into branch-2.2.

## How was this patch tested?

Unit tests in `ParquetPartitionDiscoverySuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19333 from HyukjinKwon/SPARK-22109-backport-2.2.
  • Loading branch information
HyukjinKwon authored and ueshin committed Sep 23, 2017
1 parent 1a829df commit 211d81b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ object PartitioningUtils {
"root directory of the table. If there are multiple root directories, " +
"please load them separately and then union them.")

val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, timeZone)

// Creates the StructType which represents the partition columns.
val fields = {
Expand Down Expand Up @@ -322,7 +322,8 @@ object PartitioningUtils {
* }}}
*/
def resolvePartitions(
pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
pathsWithPartitionValues: Seq[(Path, PartitionValues)],
timeZone: TimeZone): Seq[PartitionValues] = {
if (pathsWithPartitionValues.isEmpty) {
Seq.empty
} else {
Expand All @@ -337,7 +338,7 @@ object PartitioningUtils {
val values = pathsWithPartitionValues.map(_._2)
val columnCount = values.head.columnNames.size
val resolvedValues = (0 until columnCount).map { i =>
resolveTypeConflicts(values.map(_.literals(i)))
resolveTypeConflicts(values.map(_.literals(i)), timeZone)
}

// Fills resolved literals back to each partition
Expand Down Expand Up @@ -474,15 +475,15 @@ object PartitioningUtils {
* Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
* types.
*/
private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = {
private def resolveTypeConflicts(literals: Seq[Literal], timeZone: TimeZone): Seq[Literal] = {
val desiredType = {
val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
// Falls back to string if all values of this column are null or empty string
if (topType == NullType) StringType else topType
}

literals.map { case l @ Literal(_, dataType) =>
Literal.create(Cast(l, desiredType).eval(), desiredType)
Literal.create(Cast(l, desiredType, Some(timeZone.getID)).eval(), desiredType)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1022,4 +1022,16 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}
}

test("SPARK-22109: Resolve type conflicts between strings and timestamps in partition column") {
val df = Seq(
(1, "2015-01-01 00:00:00"),
(2, "2014-01-01 00:00:00"),
(3, "blah")).toDF("i", "str")

withTempPath { path =>
df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath)
checkAnswer(spark.read.load(path.getAbsolutePath), df)
}
}
}

0 comments on commit 211d81b

Please sign in to comment.