From 5f207172cd9536da3b34f659810886f9a7461d2b Mon Sep 17 00:00:00 2001 From: Ole Sasse Date: Tue, 24 May 2022 15:56:08 +0200 Subject: [PATCH 01/11] [SPARK-39259] Evaluate timestamps consistently in subqueries --- .../catalyst/optimizer/finishAnalysis.scala | 37 +++++++++---------- .../optimizer/ComputeCurrentTimeSuite.scala | 24 +++++++++++- 2 files changed, 39 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index ef9c4b9af40d3..1bdadc50f103c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -17,14 +17,14 @@ package org.apache.spark.sql.catalyst.optimizer -import scala.collection.mutable +import java.time.{Instant, LocalDateTime} import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreePattern._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, currentDate, instantToMicros, localDateTimeToMicros} import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -72,30 +72,27 @@ object RewriteNonCorrelatedExists extends Rule[LogicalPlan] { * Computes the current date and time to make sure we return the same result in a single query. */ object ComputeCurrentTime extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = { - val currentDates = mutable.Map.empty[String, Literal] - val timeExpr = CurrentTimestamp() - val timestamp = timeExpr.eval(EmptyRow).asInstanceOf[Long] - val currentTime = Literal.create(timestamp, timeExpr.dataType) + def apply(plan: LogicalPlan): LogicalPlan = applyWithTimestamp(plan, Instant.now()) + + def applyWithTimestamp(plan: LogicalPlan, instant: Instant): LogicalPlan = { + val currentTimestamp = instantToMicros(instant) + val currentTime = Literal.create(currentTimestamp, TimestampType) val timezone = Literal.create(conf.sessionLocalTimeZone, StringType) - val localTimestamps = mutable.Map.empty[String, Literal] - plan.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) { - case currentDate @ CurrentDate(Some(timeZoneId)) => - currentDates.getOrElseUpdate(timeZoneId, { - Literal.create(currentDate.eval().asInstanceOf[Int], DateType) - }) - case CurrentTimestamp() | Now() => currentTime - case CurrentTimeZone() => timezone - case localTimestamp @ LocalTimestamp(Some(timeZoneId)) => - localTimestamps.getOrElseUpdate(timeZoneId, { - Literal.create(localTimestamp.eval().asInstanceOf[Long], TimestampNTZType) - }) + plan.transformDownWithSubqueries { + case subQuery => + subQuery.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) { + case cd: CurrentDate => Literal.create(currentDate(cd.zoneId).asInstanceOf[Int], DateType) + case CurrentTimestamp() | Now() => currentTime + case CurrentTimeZone() => timezone + case localTimestamp: LocalTimestamp => + val asDateTime = LocalDateTime.ofInstant(instant, localTimestamp.zoneId) + Literal.create(localDateTimeToMicros(asDateTime).asInstanceOf[Long], TimestampNTZType) + } } } } - /** * Replaces the expression of CurrentDatabase with the current database name. * Replaces the expression of CurrentCatalog with the current catalog name. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala index 9b04dcddfb2ce..e168f454f4d5b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala @@ -20,9 +20,9 @@ package org.apache.spark.sql.catalyst.optimizer import java.time.{LocalDateTime, ZoneId} import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, Literal, LocalTimestamp} +import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, InSubquery, ListQuery, Literal, LocalTimestamp} import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf @@ -102,4 +102,24 @@ class ComputeCurrentTimeSuite extends PlanTest { assert(lits(1) >= min && lits(1) <= max) assert(lits(0) == lits(1)) } + + test("analyzer should use equal timestamps across subqueries") { + val timestampInSubQuery = Project(Seq(Alias(LocalTimestamp(), "timestamp1")()), LocalRelation()) + val listSubQuery = ListQuery(timestampInSubQuery) + val valueSearchedInSubQuery = Seq(Alias(LocalTimestamp(), "timestamp2")()) + val inFilterWithSubQuery = InSubquery(valueSearchedInSubQuery, listSubQuery) + val input = Project(Nil, Filter(inFilterWithSubQuery, LocalRelation())) + + val plan = Optimize.execute(input.analyze).asInstanceOf[Project] + + val literals = new scala.collection.mutable.ArrayBuffer[Long] + plan.transformDownWithSubqueries { case subQuery => + subQuery.transformAllExpressions { case expression: Literal => + literals += expression.value.asInstanceOf[Long] + expression + } + } + assert(literals.size == 3) // transformDownWithSubqueries covers the inner timestamp twice + assert(literals.toSet.size == 1) + } } From ff68d5b399992fd0ccaaa7f7412d49d2e1222c66 Mon Sep 17 00:00:00 2001 From: Ole Sasse Date: Wed, 25 May 2022 09:02:45 +0200 Subject: [PATCH 02/11] wip --- .../optimizer/ComputeCurrentTimeSuite.scala | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala index e168f454f4d5b..d152a8c475ab5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala @@ -19,8 +19,11 @@ package org.apache.spark.sql.catalyst.optimizer import java.time.{LocalDateTime, ZoneId} +import scala.collection.JavaConverters.mapAsScalaMap +import scala.collection.mutable + import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, InSubquery, ListQuery, Literal, LocalTimestamp} +import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimeZone, CurrentTimestamp, InSubquery, ListQuery, Literal, LocalTimestamp} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -122,4 +125,20 @@ class ComputeCurrentTimeSuite extends PlanTest { assert(literals.size == 3) // transformDownWithSubqueries covers the inner timestamp twice assert(literals.toSet.size == 1) } + + test("analyzer should use consistent timestamps for different timezones") { + val localTimestamps = mapAsScalaMap(ZoneId.SHORT_IDS) + .map { case (zoneId, _) => Alias(LocalTimestamp(Some(zoneId)), zoneId)() }.toSeq + val input = Project(localTimestamps, LocalRelation()) + + val plan = Optimize.execute(input).asInstanceOf[Project] + + val literals = new scala.collection.mutable.ArrayBuffer[Long] + plan.transformAllExpressions { case e: Literal => + literals += e.value.asInstanceOf[Long] + e + } + + assert(literals.size === localTimestamps.size) + } } From 8075b256958b405ce66e169b967f798a2e07aa2e Mon Sep 17 00:00:00 2001 From: Ole Sasse Date: Wed, 25 May 2022 11:20:10 +0200 Subject: [PATCH 03/11] Use Instant for CurrentDate, more test coverage --- .../catalyst/optimizer/finishAnalysis.scala | 8 +- .../optimizer/ComputeCurrentTimeSuite.scala | 80 ++++++++++--------- 2 files changed, 47 insertions(+), 41 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index 1bdadc50f103c..bc536d492efa0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -17,14 +17,14 @@ package org.apache.spark.sql.catalyst.optimizer -import java.time.{Instant, LocalDateTime} +import java.time.{Instant, LocalDate, LocalDateTime} import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreePattern._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, currentDate, instantToMicros, localDateTimeToMicros} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, instantToMicros, localDateTimeToMicros, localDateToDays} import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -82,7 +82,9 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { plan.transformDownWithSubqueries { case subQuery => subQuery.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) { - case cd: CurrentDate => Literal.create(currentDate(cd.zoneId).asInstanceOf[Int], DateType) + case cd: CurrentDate => + Literal.create( + localDateToDays(LocalDate.ofInstant(instant, cd.zoneId)).asInstanceOf[Int], DateType) case CurrentTimestamp() | Now() => currentTime case CurrentTimeZone() => timezone case localTimestamp: LocalTimestamp => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala index d152a8c475ab5..0555f37f385b1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.catalyst.optimizer import java.time.{LocalDateTime, ZoneId} import scala.collection.JavaConverters.mapAsScalaMap -import scala.collection.mutable +import scala.concurrent.duration._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimeZone, CurrentTimestamp, InSubquery, ListQuery, Literal, LocalTimestamp} +import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, InSubquery, ListQuery, Literal, LocalTimestamp, Now} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -44,11 +44,7 @@ class ComputeCurrentTimeSuite extends PlanTest { val plan = Optimize.execute(in.analyze).asInstanceOf[Project] val max = (System.currentTimeMillis() + 1) * 1000 - val lits = new scala.collection.mutable.ArrayBuffer[Long] - plan.transformAllExpressions { case e: Literal => - lits += e.value.asInstanceOf[Long] - e - } + val lits = literals[Long](plan) assert(lits.size == 2) assert(lits(0) >= min && lits(0) <= max) assert(lits(1) >= min && lits(1) <= max) @@ -62,11 +58,7 @@ class ComputeCurrentTimeSuite extends PlanTest { val plan = Optimize.execute(in.analyze).asInstanceOf[Project] val max = DateTimeUtils.currentDate(ZoneId.systemDefault()) - val lits = new scala.collection.mutable.ArrayBuffer[Int] - plan.transformAllExpressions { case e: Literal => - lits += e.value.asInstanceOf[Int] - e - } + val lits = literals[Int](plan) assert(lits.size == 2) assert(lits(0) >= min && lits(0) <= max) assert(lits(1) >= min && lits(1) <= max) @@ -76,13 +68,9 @@ class ComputeCurrentTimeSuite extends PlanTest { test("SPARK-33469: Add current_timezone function") { val in = Project(Seq(Alias(CurrentTimeZone(), "c")()), LocalRelation()) val plan = Optimize.execute(in.analyze).asInstanceOf[Project] - val lits = new scala.collection.mutable.ArrayBuffer[String] - plan.transformAllExpressions { case e: Literal => - lits += e.value.asInstanceOf[UTF8String].toString - e - } + val lits = literals[UTF8String](plan) assert(lits.size == 1) - assert(lits.head == SQLConf.get.sessionLocalTimeZone) + assert(lits.head == UTF8String.fromString(SQLConf.get.sessionLocalTimeZone)) } test("analyzer should replace localtimestamp with literals") { @@ -95,11 +83,7 @@ class ComputeCurrentTimeSuite extends PlanTest { val plan = Optimize.execute(in.analyze).asInstanceOf[Project] val max = DateTimeUtils.localDateTimeToMicros(LocalDateTime.now(zoneId)) - val lits = new scala.collection.mutable.ArrayBuffer[Long] - plan.transformAllExpressions { case e: Literal => - lits += e.value.asInstanceOf[Long] - e - } + val lits = literals[Long](plan) assert(lits.size == 2) assert(lits(0) >= min && lits(0) <= max) assert(lits(1) >= min && lits(1) <= max) @@ -115,15 +99,9 @@ class ComputeCurrentTimeSuite extends PlanTest { val plan = Optimize.execute(input.analyze).asInstanceOf[Project] - val literals = new scala.collection.mutable.ArrayBuffer[Long] - plan.transformDownWithSubqueries { case subQuery => - subQuery.transformAllExpressions { case expression: Literal => - literals += expression.value.asInstanceOf[Long] - expression - } - } - assert(literals.size == 3) // transformDownWithSubqueries covers the inner timestamp twice - assert(literals.toSet.size == 1) + val lits = literals[Long](plan) + assert(lits.size == 3) // transformDownWithSubqueries covers the inner timestamp twice + assert(lits.toSet.size == 1) } test("analyzer should use consistent timestamps for different timezones") { @@ -133,12 +111,38 @@ class ComputeCurrentTimeSuite extends PlanTest { val plan = Optimize.execute(input).asInstanceOf[Project] - val literals = new scala.collection.mutable.ArrayBuffer[Long] - plan.transformAllExpressions { case e: Literal => - literals += e.value.asInstanceOf[Long] - e - } + val lits = literals[Long](plan) + assert(lits.size === localTimestamps.size) + // there are timezones with a 30 or 45 minute offset + val offsetsFromQuarterHour = lits.map( _ % Duration(15, MINUTES).toMicros).toSet + assert(offsetsFromQuarterHour.size == 1) + } - assert(literals.size === localTimestamps.size) + test("analyzer should use consistent timestamps for different timestamp functions") { + val differentTimestamps = Seq( + Alias(CurrentTimestamp(), "currentTimestamp")(), + Alias(Now(), "now")(), + Alias(LocalTimestamp(Some("PLT")), "localTimestampWithTimezone")(), + ) + val input = Project(differentTimestamps, LocalRelation()) + + val plan = Optimize.execute(input).asInstanceOf[Project] + + val lits = literals[Long](plan) + assert(lits.size === differentTimestamps.size) + // there are timezones with a 30 or 45 minute offset + val offsetsFromQuarterHour = lits.map( _ % Duration(15, MINUTES).toMicros).toSet + assert(offsetsFromQuarterHour.size == 1) + } + + private def literals[T](plan: LogicalPlan): Seq[T] = { + val literals = new scala.collection.mutable.ArrayBuffer[T] + plan.transformDownWithSubqueries { case subQuery => + subQuery.transformAllExpressions { case expression: Literal => + literals += expression.value.asInstanceOf[T] + expression + } + } + literals } } From 7897342befaa330618e3ed5520e1da4151f5fc11 Mon Sep 17 00:00:00 2001 From: Ole Sasse Date: Fri, 27 May 2022 09:55:45 +0200 Subject: [PATCH 04/11] Add pruning for subqueries, fix scalastyle --- .../spark/sql/catalyst/optimizer/finishAnalysis.scala | 9 +++++++-- .../apache/spark/sql/catalyst/plans/QueryPlan.scala | 11 +++++++---- .../catalyst/optimizer/ComputeCurrentTimeSuite.scala | 4 ++-- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index bc536d492efa0..adcbd458e3075 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreePattern._ +import org.apache.spark.sql.catalyst.trees.TreePatternBits import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, instantToMicros, localDateTimeToMicros, localDateToDays} import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.types._ @@ -79,9 +80,13 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { val currentTime = Literal.create(currentTimestamp, TimestampType) val timezone = Literal.create(conf.sessionLocalTimeZone, StringType) - plan.transformDownWithSubqueries { + def transformCondition(treePatternbits: TreePatternBits): Boolean = { + treePatternbits.containsPattern(CURRENT_LIKE) + } + + plan.transformDownWithSubqueries(transformCondition) { case subQuery => - subQuery.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) { + subQuery.transformAllExpressionsWithPruning(transformCondition) { case cd: CurrentDate => Literal.create( localDateToDays(LocalDate.ofInstant(instant, cd.zoneId)).asInstanceOf[Int], DateType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 0f8df5df3764a..d0283f4d36720 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -454,7 +454,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] * to rewrite the whole plan, include its subqueries, in one go. */ def transformWithSubqueries(f: PartialFunction[PlanType, PlanType]): PlanType = - transformDownWithSubqueries(f) + transformDownWithSubqueries(AlwaysProcess.fn, UnknownRuleId)(f) /** * Returns a copy of this node where the given partial function has been recursively applied @@ -479,7 +479,10 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] * first to this node, then this node's subqueries and finally this node's children. * When the partial function does not apply to a given node, it is left unchanged. */ - def transformDownWithSubqueries(f: PartialFunction[PlanType, PlanType]): PlanType = { + def transformDownWithSubqueries( + cond: TreePatternBits => Boolean = AlwaysProcess.fn, ruleId: RuleId = UnknownRuleId) + (f: PartialFunction[PlanType, PlanType]) +: PlanType = { val g: PartialFunction[PlanType, PlanType] = new PartialFunction[PlanType, PlanType] { override def isDefinedAt(x: PlanType): Boolean = true @@ -487,13 +490,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] val transformed = f.applyOrElse[PlanType, PlanType](plan, identity) transformed transformExpressionsDown { case planExpression: PlanExpression[PlanType] => - val newPlan = planExpression.plan.transformDownWithSubqueries(f) + val newPlan = planExpression.plan.transformDownWithSubqueries(cond, ruleId)(f) planExpression.withNewPlan(newPlan) } } } - transformDown(g) + transformDownWithPruning(cond, ruleId)(g) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala index 0555f37f385b1..31f881fb60f25 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala @@ -122,7 +122,7 @@ class ComputeCurrentTimeSuite extends PlanTest { val differentTimestamps = Seq( Alias(CurrentTimestamp(), "currentTimestamp")(), Alias(Now(), "now")(), - Alias(LocalTimestamp(Some("PLT")), "localTimestampWithTimezone")(), + Alias(LocalTimestamp(Some("PLT")), "localTimestampWithTimezone")() ) val input = Project(differentTimestamps, LocalRelation()) @@ -137,7 +137,7 @@ class ComputeCurrentTimeSuite extends PlanTest { private def literals[T](plan: LogicalPlan): Seq[T] = { val literals = new scala.collection.mutable.ArrayBuffer[T] - plan.transformDownWithSubqueries { case subQuery => + plan.transformWithSubqueries { case subQuery => subQuery.transformAllExpressions { case expression: Literal => literals += expression.value.asInstanceOf[T] expression From a854a3727e74edbaae07e0139e6293396f606b69 Mon Sep 17 00:00:00 2001 From: Ole Sasse Date: Fri, 27 May 2022 10:03:44 +0200 Subject: [PATCH 05/11] Comment why there is an extension point not used in Spark --- .../org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index adcbd458e3075..f1057657c0000 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -75,6 +75,7 @@ object RewriteNonCorrelatedExists extends Rule[LogicalPlan] { object ComputeCurrentTime extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = applyWithTimestamp(plan, Instant.now()) + /** Required to build custom rules for commands that do not keep sub-plans as children in Delta */ def applyWithTimestamp(plan: LogicalPlan, instant: Instant): LogicalPlan = { val currentTimestamp = instantToMicros(instant) val currentTime = Literal.create(currentTimestamp, TimestampType) From e9e2c69adb3f5ec1f0893ec290522feba2600eef Mon Sep 17 00:00:00 2001 From: Ole Sasse Date: Fri, 27 May 2022 11:14:49 +0200 Subject: [PATCH 06/11] Java 8 compliant local date conversion --- .../apache/spark/sql/catalyst/optimizer/finishAnalysis.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index f1057657c0000..916b306446853 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer -import java.time.{Instant, LocalDate, LocalDateTime} +import java.time.{Instant, LocalDateTime} import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER import org.apache.spark.sql.catalyst.expressions._ @@ -90,7 +90,7 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { subQuery.transformAllExpressionsWithPruning(transformCondition) { case cd: CurrentDate => Literal.create( - localDateToDays(LocalDate.ofInstant(instant, cd.zoneId)).asInstanceOf[Int], DateType) + localDateToDays(instant.atZone(cd.zoneId).toLocalDate).asInstanceOf[Int], DateType) case CurrentTimestamp() | Now() => currentTime case CurrentTimeZone() => timezone case localTimestamp: LocalTimestamp => From 60b4bcf3294cee80862c1588eca671b852087e1f Mon Sep 17 00:00:00 2001 From: Ole Sasse Date: Fri, 27 May 2022 11:49:12 +0200 Subject: [PATCH 07/11] Add explicit cast to fix compilation --- .../spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala index 31f881fb60f25..c034906c09bb6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala @@ -143,6 +143,6 @@ class ComputeCurrentTimeSuite extends PlanTest { expression } } - literals + literals.asInstanceOf[Seq[T]] } } From 87e1d9283e786db1b40d5db2f694765e999611e4 Mon Sep 17 00:00:00 2001 From: Ole Sasse Date: Fri, 27 May 2022 17:03:58 +0200 Subject: [PATCH 08/11] Use DateTimeUtils.microsToDays --- .../spark/sql/catalyst/optimizer/finishAnalysis.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index 916b306446853..ae5b09cce0515 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.trees.TreePatternBits +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, instantToMicros, localDateTimeToMicros, localDateToDays} import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.types._ @@ -77,8 +78,8 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { /** Required to build custom rules for commands that do not keep sub-plans as children in Delta */ def applyWithTimestamp(plan: LogicalPlan, instant: Instant): LogicalPlan = { - val currentTimestamp = instantToMicros(instant) - val currentTime = Literal.create(currentTimestamp, TimestampType) + val currentTimestampMicros = instantToMicros(instant) + val currentTime = Literal.create(currentTimestampMicros, TimestampType) val timezone = Literal.create(conf.sessionLocalTimeZone, StringType) def transformCondition(treePatternbits: TreePatternBits): Boolean = { @@ -90,7 +91,8 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { subQuery.transformAllExpressionsWithPruning(transformCondition) { case cd: CurrentDate => Literal.create( - localDateToDays(instant.atZone(cd.zoneId).toLocalDate).asInstanceOf[Int], DateType) + DateTimeUtils.microsToDays(currentTimestampMicros, cd.zoneId).asInstanceOf[Int], + DateType) case CurrentTimestamp() | Now() => currentTime case CurrentTimeZone() => timezone case localTimestamp: LocalTimestamp => From d70ea4ddd0c67e690f58bd62f92e66fbee2c99ee Mon Sep 17 00:00:00 2001 From: Ole Sasse Date: Mon, 30 May 2022 09:27:35 +0200 Subject: [PATCH 09/11] Remove unused import --- .../apache/spark/sql/catalyst/optimizer/finishAnalysis.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index ae5b09cce0515..658f27e4f88ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.trees.TreePatternBits import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, instantToMicros, localDateTimeToMicros, localDateToDays} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, instantToMicros, localDateTimeToMicros} import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.types._ import org.apache.spark.util.Utils From a242bcf241924aa888e951b0bf3eaa53c2735676 Mon Sep 17 00:00:00 2001 From: Ole Sasse Date: Mon, 30 May 2022 14:05:23 +0200 Subject: [PATCH 10/11] Inline applyWithTimestamp --- .../spark/sql/catalyst/optimizer/finishAnalysis.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index 658f27e4f88ea..84388059bf4cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -74,10 +74,8 @@ object RewriteNonCorrelatedExists extends Rule[LogicalPlan] { * Computes the current date and time to make sure we return the same result in a single query. */ object ComputeCurrentTime extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = applyWithTimestamp(plan, Instant.now()) - - /** Required to build custom rules for commands that do not keep sub-plans as children in Delta */ - def applyWithTimestamp(plan: LogicalPlan, instant: Instant): LogicalPlan = { + def apply(plan: LogicalPlan): LogicalPlan = { + val instant = Instant.now() val currentTimestampMicros = instantToMicros(instant) val currentTime = Literal.create(currentTimestampMicros, TimestampType) val timezone = Literal.create(conf.sessionLocalTimeZone, StringType) From e48dc4c44654594fa65718af14dce03229116ad0 Mon Sep 17 00:00:00 2001 From: Ole Sasse Date: Thu, 2 Jun 2022 18:13:37 +0200 Subject: [PATCH 11/11] Remove asInstanceOf casts --- .../spark/sql/catalyst/optimizer/finishAnalysis.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index 84388059bf4cf..242c799dd226e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -88,14 +88,12 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { case subQuery => subQuery.transformAllExpressionsWithPruning(transformCondition) { case cd: CurrentDate => - Literal.create( - DateTimeUtils.microsToDays(currentTimestampMicros, cd.zoneId).asInstanceOf[Int], - DateType) + Literal.create(DateTimeUtils.microsToDays(currentTimestampMicros, cd.zoneId), DateType) case CurrentTimestamp() | Now() => currentTime case CurrentTimeZone() => timezone case localTimestamp: LocalTimestamp => val asDateTime = LocalDateTime.ofInstant(instant, localTimestamp.zoneId) - Literal.create(localDateTimeToMicros(asDateTime).asInstanceOf[Long], TimestampNTZType) + Literal.create(localDateTimeToMicros(asDateTime), TimestampNTZType) } } }