Skip to content

Commit

Permalink
revert all changes in InjectRuntimeFilterSuite, disable MergeScalarSu…
Browse files Browse the repository at this point in the history
…bqueries rule
  • Loading branch information
peter-toth committed Apr 26, 2022
1 parent bf4484a commit 7ac7981
Showing 1 changed file with 14 additions and 8 deletions.
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql

import org.apache.spark.sql.catalyst.expressions.{Alias, BloomFilterMightContain, Literal}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, BloomFilterAggregate}
import org.apache.spark.sql.catalyst.optimizer.MergeScalarSubqueries
import org.apache.spark.sql.catalyst.plans.LeftSemi
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -201,9 +202,16 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
sql("analyze table bf4 compute statistics for columns a4, b4, c4, d4, e4, f4")
sql("analyze table bf5part compute statistics for columns a5, b5, c5, d5, e5, f5")
sql("analyze table bf5filtered compute statistics for columns a5, b5, c5, d5, e5, f5")

// `MergeScalarSubqueries` can duplicate subqueries in the optimized plan and would make testing
// complicated.
conf.setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, MergeScalarSubqueries.ruleName)
}

protected override def afterAll(): Unit = try {
conf.setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
SQLConf.OPTIMIZER_EXCLUDED_RULES.defaultValueString)

sql("DROP TABLE IF EXISTS bf1")
sql("DROP TABLE IF EXISTS bf2")
sql("DROP TABLE IF EXISTS bf3")
Expand Down Expand Up @@ -264,9 +272,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
}
}

// `MergeScalarSubqueries` can duplicate subqueries in the optimized plan, but the subqueries will
// be reused in the physical plan.
def getNumBloomFilters(plan: LogicalPlan, scalarSubqueryCTEMultiplicator: Int = 1): Integer = {
def getNumBloomFilters(plan: LogicalPlan): Integer = {
val numBloomFilterAggs = plan.collect {
case Filter(condition, _) => condition.collect {
case subquery: org.apache.spark.sql.catalyst.expressions.ScalarSubquery
Expand All @@ -287,7 +293,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
case BloomFilterMightContain(_, _) => 1
}.sum
}.sum
assert(numBloomFilterAggs == numMightContains * scalarSubqueryCTEMultiplicator)
assert(numBloomFilterAggs == numMightContains)
numMightContains
}

Expand Down Expand Up @@ -391,7 +397,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
planEnabled = sql(query).queryExecution.optimizedPlan
checkAnswer(sql(query), expectedAnswer)
}
assert(getNumBloomFilters(planEnabled, 2) == getNumBloomFilters(planDisabled) + 2)
assert(getNumBloomFilters(planEnabled) == getNumBloomFilters(planDisabled) + 2)
}
}

Expand Down Expand Up @@ -419,10 +425,10 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
checkAnswer(sql(query), expectedAnswer)
}
if (numFilterThreshold < 3) {
assert(getNumBloomFilters(planEnabled, numFilterThreshold) ==
getNumBloomFilters(planDisabled) + numFilterThreshold)
assert(getNumBloomFilters(planEnabled) == getNumBloomFilters(planDisabled)
+ numFilterThreshold)
} else {
assert(getNumBloomFilters(planEnabled, 2) == getNumBloomFilters(planDisabled) + 2)
assert(getNumBloomFilters(planEnabled) == getNumBloomFilters(planDisabled) + 2)
}
}
}
Expand Down

0 comments on commit 7ac7981

Please sign in to comment.