Skip to content

Commit

Permalink
[CARMEL-6734] Fix reorder filter condition issue (#1293)
Browse files Browse the repository at this point in the history
* Update PredicateReorder.scala

* fix

* fix
  • Loading branch information
wangyum authored and GitHub Enterprise committed Apr 14, 2023
1 parent b3036d6 commit 905523e
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
Expand Up @@ -136,10 +136,10 @@ object PredicateReorder extends Rule[LogicalPlan] with PredicateHelper {
}

def apply(plan: LogicalPlan): LogicalPlan = {
if (conf.predicateReorder) {
if (conf.predicateReorder && !plan.isStreaming) {
plan transform {
case f @ Filter(cond, _) =>
val filterEstimation = if (conf.cboEnabled || conf.planStatsEnabled) {
val filterEstimation = if (plan.stats.attributeStats.nonEmpty) {
Some(FilterEstimation(f))
} else {
None
Expand Down
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet}
import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, Filter, LeafNode, LocalRelation}
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, Filter, LeafNode, Statistics}
import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, TimestampType}

class StreamingJoinHelperSuite extends AnalysisTest {
Expand All @@ -39,6 +39,7 @@ class StreamingJoinHelperSuite extends AnalysisTest {
case class DummyLeafNode() extends LeafNode {
override def output: Seq[Attribute] =
attributesToFindConstraintFor ++ attributesWithWatermark
override def computeStats(): Statistics = Statistics(sizeInBytes = BigInt(0))
}

def watermarkFrom(
Expand Down

0 comments on commit 905523e

Please sign in to comment.