Skip to content

Commit

Permalink
[SPARK-22109][SQL] Resolves type conflicts between strings and timest…
Browse files Browse the repository at this point in the history
…amps in partition column

## What changes were proposed in this pull request?

This PR proposes to resolve the type conflicts in strings and timestamps in partition column values.
It looks we need to set the timezone as it needs a cast between strings and timestamps.

```scala
val df = Seq((1, "2015-01-01 00:00:00"), (2, "2014-01-01 00:00:00"), (3, "blah")).toDF("i", "str")
val path = "/tmp/test.parquet"
df.write.format("parquet").partitionBy("str").save(path)
spark.read.parquet(path).show()
```

**Before**

```
java.util.NoSuchElementException: None.get
  at scala.None$.get(Option.scala:347)
  at scala.None$.get(Option.scala:345)
  at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression$class.timeZone(datetimeExpressions.scala:46)
  at org.apache.spark.sql.catalyst.expressions.Cast.timeZone$lzycompute(Cast.scala:172)
  at org.apache.spark.sql.catalyst.expressions.Cast.timeZone(Cast.scala:172)
  at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208)
  at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208)
  at org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$buildCast(Cast.scala:201)
  at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3.apply(Cast.scala:207)
  at org.apache.spark.sql.catalyst.expressions.Cast.nullSafeEval(Cast.scala:533)
  at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:331)
  at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:481)
  at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:480)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
```

**After**

```
+---+-------------------+
|  i|                str|
+---+-------------------+
|  2|2014-01-01 00:00:00|
|  1|2015-01-01 00:00:00|
|  3|               blah|
+---+-------------------+
```

## How was this patch tested?

Unit tests added in `ParquetPartitionDiscoverySuite` and manual tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19331 from HyukjinKwon/SPARK-22109.
  • Loading branch information
HyukjinKwon authored and ueshin committed Sep 23, 2017
1 parent 50ada2a commit 04975a6
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ object PartitioningUtils {
"root directory of the table. If there are multiple root directories, " +
"please load them separately and then union them.")

val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, timeZone)

// Creates the StructType which represents the partition columns.
val fields = {
Expand Down Expand Up @@ -318,7 +318,8 @@ object PartitioningUtils {
* }}}
*/
def resolvePartitions(
pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
pathsWithPartitionValues: Seq[(Path, PartitionValues)],
timeZone: TimeZone): Seq[PartitionValues] = {
if (pathsWithPartitionValues.isEmpty) {
Seq.empty
} else {
Expand All @@ -333,7 +334,7 @@ object PartitioningUtils {
val values = pathsWithPartitionValues.map(_._2)
val columnCount = values.head.columnNames.size
val resolvedValues = (0 until columnCount).map { i =>
resolveTypeConflicts(values.map(_.literals(i)))
resolveTypeConflicts(values.map(_.literals(i)), timeZone)
}

// Fills resolved literals back to each partition
Expand Down Expand Up @@ -470,15 +471,15 @@ object PartitioningUtils {
* Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
* types.
*/
private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = {
private def resolveTypeConflicts(literals: Seq[Literal], timeZone: TimeZone): Seq[Literal] = {
val desiredType = {
val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
// Falls back to string if all values of this column are null or empty string
if (topType == NullType) StringType else topType
}

literals.map { case l @ Literal(_, dataType) =>
Literal.create(Cast(l, desiredType).eval(), desiredType)
Literal.create(Cast(l, desiredType, Some(timeZone.getID)).eval(), desiredType)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1055,4 +1055,16 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}
}

test("SPARK-22109: Resolve type conflicts between strings and timestamps in partition column") {
val df = Seq(
(1, "2015-01-01 00:00:00"),
(2, "2014-01-01 00:00:00"),
(3, "blah")).toDF("i", "str")

withTempPath { path =>
df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath)
checkAnswer(spark.read.load(path.getAbsolutePath), df)
}
}
}

0 comments on commit 04975a6

Please sign in to comment.