From 43791acd9458c539c7008d1b6aa670f3b9965758 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sat, 21 Apr 2018 16:37:32 -0700 Subject: [PATCH 1/3] Initial commit --- .../spark/sql/catalyst/expressions/predicates.scala | 12 ++++++++++++ .../sql/catalyst/expressions/PredicateSuite.scala | 6 ++++++ 2 files changed, 18 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index e195ec17f3bcf..34a3f50759dd1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -36,6 +36,18 @@ object InterpretedPredicate { case class InterpretedPredicate(expression: Expression) extends BasePredicate { override def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean] + + override def initialize(partitionIndex: Int): Unit = { + initExpression(expression, partitionIndex) + } + + private def initExpression(expression: Expression, partitionIndex: Int): Unit = { + expression match { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => + } + expression.children.foreach(initExpression(_, partitionIndex)) + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala index 1bfd180ae4393..ac76b17ef4761 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala @@ -449,4 +449,10 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(EqualNullSafe(Literal(null, DoubleType), Literal(-1.0d)), false) checkEvaluation(EqualNullSafe(Literal(-1.0d), Literal(null, DoubleType)), false) } + + test("Interpreted Predicate should initialize nondeterministic expressions") { + val interpreted = InterpretedPredicate.create(LessThan(Rand(7), Literal(1.0))) + interpreted.initialize(0) + assert(interpreted.eval(new UnsafeRow())) + } } From 525bc1a475c5007830eb2e35c30f05bafa600cdd Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sun, 22 Apr 2018 19:12:10 -0700 Subject: [PATCH 2/3] Use expression's foreach --- .../apache/spark/sql/catalyst/expressions/predicates.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 34a3f50759dd1..301ef89074547 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -38,15 +38,10 @@ case class InterpretedPredicate(expression: Expression) extends BasePredicate { override def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean] override def initialize(partitionIndex: Int): Unit = { - initExpression(expression, partitionIndex) - } - - private def initExpression(expression: Expression, partitionIndex: Int): Unit = { - expression match { + expression.foreach { case n: Nondeterministic => n.initialize(partitionIndex) case _ => } - expression.children.foreach(initExpression(_, partitionIndex)) } } From 46972f7ce2123d7ebda23daad9362650532ca59e Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Tue, 24 Apr 2018 07:35:50 -0700 Subject: [PATCH 3/3] Call base class's initialize as well --- .../org/apache/spark/sql/catalyst/expressions/predicates.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 301ef89074547..f8c6dc4e6adc9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -38,6 +38,7 @@ case class InterpretedPredicate(expression: Expression) extends BasePredicate { override def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean] override def initialize(partitionIndex: Int): Unit = { + super.initialize(partitionIndex) expression.foreach { case n: Nondeterministic => n.initialize(partitionIndex) case _ =>