Skip to content

Commit

Permalink
[SPARK-21739][SQL] Cast expression should initialize timezoneId when …
Browse files Browse the repository at this point in the history
…it is called statically to convert something into TimestampType

## What changes were proposed in this pull request?

https://issues.apache.org/jira/projects/SPARK/issues/SPARK-21739

This issue is caused by introducing TimeZoneAwareExpression.
When the **Cast** expression converts something into TimestampType, it should be resolved with setting `timezoneId`. In general, it is resolved in LogicalPlan phase.

However, there are still some places that use Cast expression statically to convert datatypes without setting `timezoneId`. In such cases,  `NoSuchElementException: None.get` will be thrown for TimestampType.

This PR is proposed to fix the issue. We have checked the whole project and found two such usages(i.e., in`TableReader` and `HiveTableScanExec`).

## How was this patch tested?

unit test

Author: donnyzone <wellfengzhu@gmail.com>

Closes #18960 from DonnyZone/spark-21739.

(cherry picked from commit 310454b)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
DonnyZone authored and gatorsmile committed Aug 18, 2017
1 parent 2a96975 commit fdea642
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 4 deletions.
Expand Up @@ -39,8 +39,10 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.CastSupport
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{SerializableConfiguration, Utils}

Expand All @@ -65,7 +67,7 @@ class HadoopTableReader(
@transient private val tableDesc: TableDesc,
@transient private val sparkSession: SparkSession,
hadoopConf: Configuration)
extends TableReader with Logging {
extends TableReader with CastSupport with Logging {

// Hadoop honors "mapreduce.job.maps" as hint,
// but will ignore when mapreduce.jobtracker.address is "local".
Expand All @@ -86,6 +88,8 @@ class HadoopTableReader(
private val _broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

override def conf: SQLConf = sparkSession.sessionState.conf

override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
makeRDDForTable(
hiveTable,
Expand Down Expand Up @@ -227,7 +231,7 @@ class HadoopTableReader(
def fillPartitionKeys(rawPartValues: Array[String], row: InternalRow): Unit = {
partitionKeyAttrs.foreach { case (attr, ordinal) =>
val partOrdinal = partitionKeys.indexOf(attr)
row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null)
row(ordinal) = cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null)
}
}

Expand Down
Expand Up @@ -30,13 +30,15 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.CastSupport
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BooleanType, DataType}
import org.apache.spark.util.Utils

Expand All @@ -53,11 +55,13 @@ case class HiveTableScanExec(
relation: HiveTableRelation,
partitionPruningPred: Seq[Expression])(
@transient private val sparkSession: SparkSession)
extends LeafExecNode {
extends LeafExecNode with CastSupport {

require(partitionPruningPred.isEmpty || relation.isPartitioned,
"Partition pruning predicates only supported for partitioned tables.")

override def conf: SQLConf = sparkSession.sessionState.conf

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

Expand Down Expand Up @@ -104,7 +108,7 @@ case class HiveTableScanExec(
hadoopConf)

private def castFromString(value: String, dataType: DataType) = {
Cast(Literal(value), dataType).eval(null)
cast(Literal(value), dataType).eval(null)
}

private def addColumnMetadataToConf(hiveConf: Configuration): Unit = {
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import java.io.File
import java.sql.Timestamp

import com.google.common.io.Files
import org.apache.hadoop.fs.FileSystem
Expand Down Expand Up @@ -68,4 +69,20 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl
sql("DROP TABLE IF EXISTS createAndInsertTest")
}
}

test("SPARK-21739: Cast expression should initialize timezoneId") {
withTable("table_with_timestamp_partition") {
sql("CREATE TABLE table_with_timestamp_partition(value int) PARTITIONED BY (ts TIMESTAMP)")
sql("INSERT OVERWRITE TABLE table_with_timestamp_partition " +
"PARTITION (ts = '2010-01-01 00:00:00.000') VALUES (1)")

// test for Cast expression in TableReader
checkAnswer(sql("SELECT * FROM table_with_timestamp_partition"),
Seq(Row(1, Timestamp.valueOf("2010-01-01 00:00:00.000"))))

// test for Cast expression in HiveTableScanExec
checkAnswer(sql("SELECT value FROM table_with_timestamp_partition " +
"WHERE ts = '2010-01-01 00:00:00.000'"), Row(1))
}
}
}

0 comments on commit fdea642

Please sign in to comment.