Skip to content

Commit

Permalink
[SPARK-33621][SQL] Add a way to inject data source rewrite rules
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR adds a way to inject data source rewrite rules.

### Why are the changes needed?

Right now `SparkSessionExtensions` allow us to inject optimization rules but they are added to operator optimization batch. There are cases when users need to run rules after the operator optimization batch (e.g. cases when a rule relies on the fact that expressions have been optimized). Currently, this is not possible.

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

This PR comes with a new test.

Closes #30577 from aokolnychyi/spark-33621-v3.

Authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
aokolnychyi authored and dongjoon-hyun committed Dec 7, 2020
1 parent c0874ba commit 02508b6
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 1 deletion.
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}
* <li>Analyzer Rules.</li>
* <li>Check Analysis Rules.</li>
* <li>Optimizer Rules.</li>
* <li>Data Source Rewrite Rules.</li>
* <li>Planning Strategies.</li>
* <li>Customized Parser.</li>
* <li>(External) Catalog listeners.</li>
Expand Down Expand Up @@ -199,6 +200,21 @@ class SparkSessionExtensions {
optimizerRules += builder
}

private[this] val dataSourceRewriteRules = mutable.Buffer.empty[RuleBuilder]

private[sql] def buildDataSourceRewriteRules(session: SparkSession): Seq[Rule[LogicalPlan]] = {
dataSourceRewriteRules.map(_.apply(session)).toSeq
}

/**
* Inject an optimizer `Rule` builder that rewrites data source plans into the [[SparkSession]].
* The injected rules will be executed after the operator optimization batch and before rules
* that depend on stats.
*/
def injectDataSourceRewriteRule(builder: RuleBuilder): Unit = {
dataSourceRewriteRules += builder
}

private[this] val plannerStrategyBuilders = mutable.Buffer.empty[StrategyBuilder]

private[sql] def buildPlannerStrategies(session: SparkSession): Seq[Strategy] = {
Expand Down
Expand Up @@ -273,7 +273,9 @@ abstract class BaseSessionStateBuilder(
*
* Note that this may NOT depend on the `optimizer` function.
*/
protected def customDataSourceRewriteRules: Seq[Rule[LogicalPlan]] = Nil
protected def customDataSourceRewriteRules: Seq[Rule[LogicalPlan]] = {
extensions.buildDataSourceRewriteRules(session)
}

/**
* Planner that converts optimized logical plans to physical plans.
Expand Down
Expand Up @@ -88,6 +88,12 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
}
}

test("SPARK-33621: inject data source rewrite rule") {
withSession(Seq(_.injectDataSourceRewriteRule(MyRule))) { session =>
assert(session.sessionState.optimizer.dataSourceRewriteRules.contains(MyRule(session)))
}
}

test("inject spark planner strategy") {
withSession(Seq(_.injectPlannerStrategy(MySparkStrategy))) { session =>
assert(session.sessionState.planner.strategies.contains(MySparkStrategy(session)))
Expand Down

0 comments on commit 02508b6

Please sign in to comment.