-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22109][SQL] Resolves type conflicts between strings and timestamps in partition column #19331
Conversation
cc @ueshin, could you take a look when you have some time please? |
Test build #82112 has finished for PR 19331 at commit
|
LGTM. |
Thanks! merging to master. |
Thank you @ueshin! |
@HyukjinKwon Could you send a pr to backport this to branch-2.2 please? |
Definitely. |
…amps in partition column This PR proposes to resolve the type conflicts in strings and timestamps in partition column values. It looks we need to set the timezone as it needs a cast between strings and timestamps. ```scala val df = Seq((1, "2015-01-01 00:00:00"), (2, "2014-01-01 00:00:00"), (3, "blah")).toDF("i", "str") val path = "/tmp/test.parquet" df.write.format("parquet").partitionBy("str").save(path) spark.read.parquet(path).show() ``` **Before** ``` java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression$class.timeZone(datetimeExpressions.scala:46) at org.apache.spark.sql.catalyst.expressions.Cast.timeZone$lzycompute(Cast.scala:172) at org.apache.spark.sql.catalyst.expressions.Cast.timeZone(Cast.scala:172) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208) at org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$buildCast(Cast.scala:201) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3.apply(Cast.scala:207) at org.apache.spark.sql.catalyst.expressions.Cast.nullSafeEval(Cast.scala:533) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:331) at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:481) at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:480) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ``` **After** ``` +---+-------------------+ | i| str| +---+-------------------+ | 2|2014-01-01 00:00:00| | 1|2015-01-01 00:00:00| | 3| blah| +---+-------------------+ ``` Unit tests added in `ParquetPartitionDiscoverySuite` and manual tests. Author: hyukjinkwon <gurwls223@gmail.com> Closes apache#19331 from HyukjinKwon/SPARK-22109.
@@ -470,15 +471,15 @@ object PartitioningUtils { | |||
* Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower" | |||
* types. | |||
*/ | |||
private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = { | |||
private def resolveTypeConflicts(literals: Seq[Literal], timeZone: TimeZone): Seq[Literal] = { | |||
val desiredType = { | |||
val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for such a quick fix!
but, don't we also need to update upCastingOrder
? It seems this happens to work because upcastingOrder.indexOf(TimestampType) = -1
. But I think that doesn't compare the right way with NullType, so if you add this to ParquetPartitionDiscoverySuite.test("parse partitions")
, it fails:
check(Seq(
s"hdfs://host:9000/path/a=$defaultPartitionName/b=blah",
s"hdfs://host:9000/path/a=2014-01-01 00%3A00%3A00.0/b=foo"),
PartitionSpec(
StructType(Seq(
StructField("a", TimestampType),
StructField("b", StringType))),
Seq(
Partition(InternalRow(null, "blah"),
s"hdfs://host:9000/path/a=$defaultPartitionName/b=blah"),
Partition(InternalRow(Timestamp.valueOf("2014-01-01 00:00:00.0"), "foo"),
s"hdfs://host:9000/path/a=2014-01-01 00%3A00%3A00.0/b=foo"))))
(I have to admit, I don't totally understand what the ramifications of that fail are -- the behavior in the resulting dataframe seems fine to me, but I figure there is probably some case this would mess up ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Actually, I believe this PR fixed only the (corner) regression between 2.1.0 and 2.2.0 because, if I understood correctly, we started to support infer timestamp in partition column from 2.1.0, SPARK-17388, and this corner regression was introduced from 2.2.0 SPARK-18939 (not tested).
For the issue you described above, which I believe existed from 2.1.0 (not tested), I think that also should be applied to DecimalType
, DateType
and TimestampType
which were started to be supported from SPARK-17388 and should strictly be related with this issue, SPARK-22109, but orthogonal.
For perfectness, I guess we should port this logic:
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
Lines 135 to 150 in 183d4cb
/** | |
* Case 2 type widening (see the classdoc comment above for TypeCoercion). | |
* | |
* i.e. the main difference with [[findTightestCommonType]] is that here we allow some | |
* loss of precision when widening decimal and double, and promotion to string. | |
*/ | |
private[analysis] def findWiderTypeForTwo(t1: DataType, t2: DataType): Option[DataType] = { | |
findTightestCommonType(t1, t2) | |
.orElse(findWiderTypeForDecimal(t1, t2)) | |
.orElse(stringPromotion(t1, t2)) | |
.orElse((t1, t2) match { | |
case (ArrayType(et1, containsNull1), ArrayType(et2, containsNull2)) => | |
findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || containsNull2)) | |
case _ => None | |
}) | |
} |
because the problem is, TimestampType
, DateType
and DecimalType
can't be upcasted to other types easily by comparing numeric precedence, but of course with few special handling because we are currently only inferring decimals when scale <= 0
(e.g., not 1.1) and castable to a decimal before trying a double:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
Lines 384 to 393 in 04975a6
val decimalTry = Try { | |
// `BigDecimal` conversion can fail when the `field` is not a form of number. | |
val bigDecimal = new JBigDecimal(raw) | |
// It reduces the cases for decimals by disallowing values having scale (eg. `1.1`). | |
require(bigDecimal.scale <= 0) | |
// `DecimalType` conversion can fail when | |
// 1. The precision is bigger than 38. | |
// 2. scale is bigger than precision. | |
Literal(bigDecimal) | |
} |
This actually also could a problem of types between DateType
and TimestampType
which should be upcastable (from date to timestamp), which might end up with DateType
.
Let me take a closer look and probably make a fix soon.
What changes were proposed in this pull request?
This PR proposes to resolve the type conflicts in strings and timestamps in partition column values.
It looks we need to set the timezone as it needs a cast between strings and timestamps.
Before
After
How was this patch tested?
Unit tests added in
ParquetPartitionDiscoverySuite
and manual tests.