Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39259][SQL] Evaluate timestamps consistently in subqueries #36654

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.spark.sql.catalyst.optimizer

import scala.collection.mutable
import java.time.{Instant, LocalDateTime}

import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ}
import org.apache.spark.sql.catalyst.trees.TreePatternBits
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, instantToMicros, localDateTimeToMicros, localDateToDays}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -72,30 +73,34 @@ object RewriteNonCorrelatedExists extends Rule[LogicalPlan] {
* Computes the current date and time to make sure we return the same result in a single query.
*/
object ComputeCurrentTime extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
val currentDates = mutable.Map.empty[String, Literal]
val timeExpr = CurrentTimestamp()
val timestamp = timeExpr.eval(EmptyRow).asInstanceOf[Long]
val currentTime = Literal.create(timestamp, timeExpr.dataType)
def apply(plan: LogicalPlan): LogicalPlan = applyWithTimestamp(plan, Instant.now())
olaky marked this conversation as resolved.
Show resolved Hide resolved

/** Required to build custom rules for commands that do not keep sub-plans as children in Delta */
olaky marked this conversation as resolved.
Show resolved Hide resolved
def applyWithTimestamp(plan: LogicalPlan, instant: Instant): LogicalPlan = {
val currentTimestamp = instantToMicros(instant)
olaky marked this conversation as resolved.
Show resolved Hide resolved
val currentTime = Literal.create(currentTimestamp, TimestampType)
val timezone = Literal.create(conf.sessionLocalTimeZone, StringType)
val localTimestamps = mutable.Map.empty[String, Literal]

plan.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) {
case currentDate @ CurrentDate(Some(timeZoneId)) =>
currentDates.getOrElseUpdate(timeZoneId, {
Literal.create(currentDate.eval().asInstanceOf[Int], DateType)
})
case CurrentTimestamp() | Now() => currentTime
case CurrentTimeZone() => timezone
case localTimestamp @ LocalTimestamp(Some(timeZoneId)) =>
localTimestamps.getOrElseUpdate(timeZoneId, {
Literal.create(localTimestamp.eval().asInstanceOf[Long], TimestampNTZType)
})
def transformCondition(treePatternbits: TreePatternBits): Boolean = {
treePatternbits.containsPattern(CURRENT_LIKE)
}

plan.transformDownWithSubqueries(transformCondition) {
case subQuery =>
subQuery.transformAllExpressionsWithPruning(transformCondition) {
case cd: CurrentDate =>
Literal.create(
localDateToDays(instant.atZone(cd.zoneId).toLocalDate).asInstanceOf[Int], DateType)
olaky marked this conversation as resolved.
Show resolved Hide resolved
case CurrentTimestamp() | Now() => currentTime
case CurrentTimeZone() => timezone
case localTimestamp: LocalTimestamp =>
val asDateTime = LocalDateTime.ofInstant(instant, localTimestamp.zoneId)
Literal.create(localDateTimeToMicros(asDateTime).asInstanceOf[Long], TimestampNTZType)
MaxGekk marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}


/**
* Replaces the expression of CurrentDatabase with the current database name.
* Replaces the expression of CurrentCatalog with the current catalog name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
* to rewrite the whole plan, include its subqueries, in one go.
*/
def transformWithSubqueries(f: PartialFunction[PlanType, PlanType]): PlanType =
transformDownWithSubqueries(f)
transformDownWithSubqueries(AlwaysProcess.fn, UnknownRuleId)(f)

/**
* Returns a copy of this node where the given partial function has been recursively applied
Expand All @@ -479,21 +479,24 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
* first to this node, then this node's subqueries and finally this node's children.
* When the partial function does not apply to a given node, it is left unchanged.
*/
def transformDownWithSubqueries(f: PartialFunction[PlanType, PlanType]): PlanType = {
def transformDownWithSubqueries(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of modifying this method's signature, I slightly prefer adding an overload named transformDownWithSubqueriesAndPruning in order to remain consistent with the naming conventions for other transform methods.

Adding a new method also avoids introducing source or binary compatibility issues for third-party code that calls Catalyst APIs. Technically speaking, Catalyst APIs are considered internal to Spark and are subject to change between minor releases (see source), but I think it's nice to try to avoid API breakage when feasible.

I also ran into problems when trying to call transformDownWithSubqueries without supplying any arguments in the first argument group: calling transformDownWithSubqueries() { f } resulted in confusing compilation errors.

As a result, I'm going to submit a followup to this PR to split this into two methods as I've suggested above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's my PR: #36765

cond: TreePatternBits => Boolean = AlwaysProcess.fn, ruleId: RuleId = UnknownRuleId)
(f: PartialFunction[PlanType, PlanType])
: PlanType = {
val g: PartialFunction[PlanType, PlanType] = new PartialFunction[PlanType, PlanType] {
override def isDefinedAt(x: PlanType): Boolean = true

override def apply(plan: PlanType): PlanType = {
val transformed = f.applyOrElse[PlanType, PlanType](plan, identity)
transformed transformExpressionsDown {
case planExpression: PlanExpression[PlanType] =>
val newPlan = planExpression.plan.transformDownWithSubqueries(f)
val newPlan = planExpression.plan.transformDownWithSubqueries(cond, ruleId)(f)
planExpression.withNewPlan(newPlan)
}
}
}

transformDown(g)
transformDownWithPruning(cond, ruleId)(g)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package org.apache.spark.sql.catalyst.optimizer

import java.time.{LocalDateTime, ZoneId}

import scala.collection.JavaConverters.mapAsScalaMap
import scala.concurrent.duration._

import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, Literal, LocalTimestamp}
import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, InSubquery, ListQuery, Literal, LocalTimestamp, Now}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -41,11 +44,7 @@ class ComputeCurrentTimeSuite extends PlanTest {
val plan = Optimize.execute(in.analyze).asInstanceOf[Project]
val max = (System.currentTimeMillis() + 1) * 1000

val lits = new scala.collection.mutable.ArrayBuffer[Long]
plan.transformAllExpressions { case e: Literal =>
lits += e.value.asInstanceOf[Long]
e
}
val lits = literals[Long](plan)
assert(lits.size == 2)
assert(lits(0) >= min && lits(0) <= max)
assert(lits(1) >= min && lits(1) <= max)
Expand All @@ -59,11 +58,7 @@ class ComputeCurrentTimeSuite extends PlanTest {
val plan = Optimize.execute(in.analyze).asInstanceOf[Project]
val max = DateTimeUtils.currentDate(ZoneId.systemDefault())

val lits = new scala.collection.mutable.ArrayBuffer[Int]
plan.transformAllExpressions { case e: Literal =>
lits += e.value.asInstanceOf[Int]
e
}
val lits = literals[Int](plan)
assert(lits.size == 2)
assert(lits(0) >= min && lits(0) <= max)
assert(lits(1) >= min && lits(1) <= max)
Expand All @@ -73,13 +68,9 @@ class ComputeCurrentTimeSuite extends PlanTest {
test("SPARK-33469: Add current_timezone function") {
val in = Project(Seq(Alias(CurrentTimeZone(), "c")()), LocalRelation())
val plan = Optimize.execute(in.analyze).asInstanceOf[Project]
val lits = new scala.collection.mutable.ArrayBuffer[String]
plan.transformAllExpressions { case e: Literal =>
lits += e.value.asInstanceOf[UTF8String].toString
e
}
val lits = literals[UTF8String](plan)
assert(lits.size == 1)
assert(lits.head == SQLConf.get.sessionLocalTimeZone)
assert(lits.head == UTF8String.fromString(SQLConf.get.sessionLocalTimeZone))
}

test("analyzer should replace localtimestamp with literals") {
Expand All @@ -92,14 +83,66 @@ class ComputeCurrentTimeSuite extends PlanTest {
val plan = Optimize.execute(in.analyze).asInstanceOf[Project]
val max = DateTimeUtils.localDateTimeToMicros(LocalDateTime.now(zoneId))

val lits = new scala.collection.mutable.ArrayBuffer[Long]
plan.transformAllExpressions { case e: Literal =>
lits += e.value.asInstanceOf[Long]
e
}
val lits = literals[Long](plan)
assert(lits.size == 2)
assert(lits(0) >= min && lits(0) <= max)
assert(lits(1) >= min && lits(1) <= max)
assert(lits(0) == lits(1))
}

test("analyzer should use equal timestamps across subqueries") {
val timestampInSubQuery = Project(Seq(Alias(LocalTimestamp(), "timestamp1")()), LocalRelation())
val listSubQuery = ListQuery(timestampInSubQuery)
val valueSearchedInSubQuery = Seq(Alias(LocalTimestamp(), "timestamp2")())
val inFilterWithSubQuery = InSubquery(valueSearchedInSubQuery, listSubQuery)
val input = Project(Nil, Filter(inFilterWithSubQuery, LocalRelation()))

val plan = Optimize.execute(input.analyze).asInstanceOf[Project]

val lits = literals[Long](plan)
assert(lits.size == 3) // transformDownWithSubqueries covers the inner timestamp twice
assert(lits.toSet.size == 1)
}

test("analyzer should use consistent timestamps for different timezones") {
val localTimestamps = mapAsScalaMap(ZoneId.SHORT_IDS)
.map { case (zoneId, _) => Alias(LocalTimestamp(Some(zoneId)), zoneId)() }.toSeq
val input = Project(localTimestamps, LocalRelation())

val plan = Optimize.execute(input).asInstanceOf[Project]

val lits = literals[Long](plan)
assert(lits.size === localTimestamps.size)
// there are timezones with a 30 or 45 minute offset
val offsetsFromQuarterHour = lits.map( _ % Duration(15, MINUTES).toMicros).toSet
assert(offsetsFromQuarterHour.size == 1)
}

test("analyzer should use consistent timestamps for different timestamp functions") {
val differentTimestamps = Seq(
Alias(CurrentTimestamp(), "currentTimestamp")(),
Alias(Now(), "now")(),
Alias(LocalTimestamp(Some("PLT")), "localTimestampWithTimezone")()
)
val input = Project(differentTimestamps, LocalRelation())

val plan = Optimize.execute(input).asInstanceOf[Project]

val lits = literals[Long](plan)
assert(lits.size === differentTimestamps.size)
// there are timezones with a 30 or 45 minute offset
val offsetsFromQuarterHour = lits.map( _ % Duration(15, MINUTES).toMicros).toSet
assert(offsetsFromQuarterHour.size == 1)
}

private def literals[T](plan: LogicalPlan): Seq[T] = {
val literals = new scala.collection.mutable.ArrayBuffer[T]
plan.transformWithSubqueries { case subQuery =>
subQuery.transformAllExpressions { case expression: Literal =>
literals += expression.value.asInstanceOf[T]
expression
}
}
literals.asInstanceOf[Seq[T]]
}
}