Skip to content

Commit

Permalink
[SPARK-27252][SQL] Make current_date() independent from time zones
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This makes the `CurrentDate` expression and `current_date` function independent from time zone settings. New result is number of days since epoch in `UTC` time zone. Previously, Spark shifted the current date (in `UTC` time zone) according the session time zone which violets definition of `DateType` - number of days since epoch (which is an absolute point in time, midnight of Jan 1 1970 in UTC time).

The changes makes `CurrentDate` consistent to `CurrentTimestamp` which is independent from time zone too.

## How was this patch tested?

The changes were tested by existing test suites like `DateExpressionsSuite`.

Closes #24185 from MaxGekk/current-date.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
2 people authored and cloud-fan committed Mar 29, 2019
1 parent 50cded5 commit 06abd06
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 39 deletions.
4 changes: 3 additions & 1 deletion docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ displayTitle: Spark SQL Upgrading Guide

- In Spark version 2.4 and earlier, the `current_timestamp` function returns a timestamp with millisecond resolution only. Since Spark 3.0, the function can return the result with microsecond resolution if the underlying clock available on the system offers such resolution.

- In Spark version 2.4 abd earlier, when reading a Hive Serde table with Spark native data sources(parquet/orc), Spark will infer the actual file schema and update the table schema in metastore. Since Spark 3.0, Spark doesn't infer the schema anymore. This should not cause any problems to end users, but if it does, please set `spark.sql.hive.caseSensitiveInferenceMode` to `INFER_AND_SAVE`.
- In Spark version 2.4 and earlier, when reading a Hive Serde table with Spark native data sources(parquet/orc), Spark will infer the actual file schema and update the table schema in metastore. Since Spark 3.0, Spark doesn't infer the schema anymore. This should not cause any problems to end users, but if it does, please set `spark.sql.hive.caseSensitiveInferenceMode` to `INFER_AND_SAVE`.

- In Spark version 2.4 and earlier, the `current_date` function returns the current date shifted according to the SQL config `spark.sql.session.timeZone`. Since Spark 3.0, the function always returns the current date in the `UTC` time zone.

- Since Spark 3.0, `TIMESTAMP` literals are converted to strings using the SQL config `spark.sql.session.timeZone`, and `DATE` literals are formatted using the UTC time zone. In Spark version 2.4 and earlier, both conversions use the default time zone of the Java virtual machine.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import java.sql.Timestamp
import java.time.{Instant, LocalDate, ZoneId}
import java.time.{Instant, LocalDate, ZoneId, ZoneOffset}
import java.time.temporal.IsoFields
import java.util.{Locale, TimeZone}

Expand Down Expand Up @@ -52,30 +52,26 @@ trait TimeZoneAwareExpression extends Expression {
@transient lazy val zoneId: ZoneId = DateTimeUtils.getZoneId(timeZoneId.get)
}

// scalastyle:off line.size.limit
/**
* Returns the current date at the start of query evaluation.
* Returns the current date in the UTC time zone at the start of query evaluation.
* All calls of current_date within the same query return the same value.
*
* There is no code generation since this expression should get constant folded by the optimizer.
*/
@ExpressionDescription(
usage = "_FUNC_() - Returns the current date at the start of query evaluation.",
usage = "_FUNC_() - Returns the current date in the UTC time zone at the start of query evaluation.",
since = "1.5.0")
case class CurrentDate(timeZoneId: Option[String] = None)
extends LeafExpression with TimeZoneAwareExpression with CodegenFallback {

def this() = this(None)
// scalastyle:on line.size.limit
case class CurrentDate() extends LeafExpression with CodegenFallback {

override def foldable: Boolean = true
override def nullable: Boolean = false

override def dataType: DataType = DateType

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override def eval(input: InternalRow): Any = {
DateTimeUtils.millisToDays(System.currentTimeMillis(), timeZone)
LocalDate.now(ZoneOffset.UTC).toEpochDay.toInt
}

override def prettyName: String = "current_date"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,20 @@ object ReplaceExpressions extends Rule[LogicalPlan] {
*/
object ComputeCurrentTime extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
val currentDates = mutable.Map.empty[String, Literal]
val timeExpr = CurrentTimestamp()
val timestamp = timeExpr.eval(EmptyRow).asInstanceOf[Long]
val currentTime = Literal.create(timestamp, timeExpr.dataType)
val currentDate = {
val dateExpr = CurrentDate()
val date = dateExpr.eval(EmptyRow).asInstanceOf[Int]
Literal.create(date, dateExpr.dataType)
}
val currentTimestamp = {
val timeExpr = CurrentTimestamp()
val timestamp = timeExpr.eval(EmptyRow).asInstanceOf[Long]
Literal.create(timestamp, timeExpr.dataType)
}

plan transformAllExpressions {
case CurrentDate(Some(timeZoneId)) =>
currentDates.getOrElseUpdate(timeZoneId, {
Literal.create(
DateTimeUtils.millisToDays(
MICROSECONDS.toMillis(timestamp),
DateTimeUtils.getTimeZone(timeZoneId)),
DateType)
})
case CurrentTimestamp() => currentTime
case CurrentDate() => currentDate
case CurrentTimestamp() => currentTimestamp
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {

test("datetime function current_date") {
val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis(), TimeZoneGMT)
val cd = CurrentDate(gmtId).eval(EmptyRow).asInstanceOf[Int]
val cd = CurrentDate().eval(EmptyRow).asInstanceOf[Int]
val d1 = DateTimeUtils.millisToDays(System.currentTimeMillis(), TimeZoneGMT)
assert(d0 <= cd && cd <= d1 && d1 - d0 <= 1)

val cdjst = CurrentDate(jstId).eval(EmptyRow).asInstanceOf[Int]
val cdpst = CurrentDate(pstId).eval(EmptyRow).asInstanceOf[Int]
val cdjst = CurrentDate().eval(EmptyRow).asInstanceOf[Int]
val cdpst = CurrentDate().eval(EmptyRow).asInstanceOf[Int]
assert(cdpst <= cd && cd <= cdjst)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ class MicroBatchExecution(
ct.dataType, Some("Dummy TimeZoneId"))
case cd: CurrentDate =>
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
cd.dataType, cd.timeZoneId)
cd.dataType, Some("UTC"))
}

val triggerLogicalPlan = sink match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2572,7 +2572,7 @@ object functions {
}

/**
* Returns the current date as a date column.
* Returns the current date in the UTC time zone as a date column.
*
* @group datetime_funcs
* @since 1.5.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
assert(testData.groupBy(col("key")).toString.contains(
"[grouping expressions: [key], value: [key: int, value: string], type: GroupBy]"))
assert(testData.groupBy(current_date()).toString.contains(
"grouping expressions: [current_date(None)], value: [key: int, value: string], " +
"grouping expressions: [current_date()], value: [key: int, value: string], " +
"type: GroupBy]"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,28 @@ package org.apache.spark.sql

import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.time.LocalDate
import java.util.Locale
import java.util.concurrent.TimeUnit

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.unsafe.types.CalendarInterval

class DateFunctionsSuite extends QueryTest with SharedSQLContext {
import testImplicits._

test("function current_date") {
val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis())
val d1 = DateTimeUtils.fromJavaDate(df1.select(current_date()).collect().head.getDate(0))
val d2 = DateTimeUtils.fromJavaDate(
sql("""SELECT CURRENT_DATE()""").collect().head.getDate(0))
val d3 = DateTimeUtils.millisToDays(System.currentTimeMillis())
assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1)
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") {
val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
val d0 = System.currentTimeMillis() / MILLIS_PER_DAY
val d1 = localDateToDays(df1.select(current_date()).collect().head.getAs[LocalDate](0))
val d2 = localDateToDays(sql("""SELECT CURRENT_DATE()""").collect().head.getAs[LocalDate](0))
val d3 = System.currentTimeMillis() / MILLIS_PER_DAY
assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1)
}
}

test("function current_timestamp and now") {
Expand Down

0 comments on commit 06abd06

Please sign in to comment.