Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20233] [SQL] Apply star-join filter heuristics to dynamic programming join enumeration #17546

Closed
wants to merge 5 commits into from

Conversation

ioana-delaney
Copy link
Contributor

What changes were proposed in this pull request?

Implements star-join filter to reduce the search space for dynamic programming join enumeration. Consider the following join graph:

T1       D1 - T2 - T3
  \     /
    F1
     |
    D2

star-join: {F1, D1, D2}
non-star: {T1, T2, T3}

The following join combinations will be generated:

level 0: (F1), (D1), (D2), (T1), (T2), (T3)
level 1: {F1, D1}, {F1, D2}, {T2, T3}
level 2: {F1, D1, D2}
level 3: {F1, D1, D2, T1}, {F1, D1, D2, T2}
level 4: {F1, D1, D2, T1, T2}, {F1, D1, D2, T2, T3 }
level 6: {F1, D1, D2, T1, T2, T3}

How was this patch tested?

New test suite StarJOinCostBasedReorderSuite.scala.

// Build plans for next levels until the last level has only one plan. This plan contains
// all items that can be joined, so there's no need to continue.
val topOutputSet = AttributeSet(output)
while (foundPlans.size < items.length && foundPlans.last.size > 1) {
while (foundPlans.size < items.length) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzhfy Condition "foundPlans.last.size > 1" does not apply when filters are used with the join enumeration since not all the plan combinations are generated. Without filters, I think the condition will always be satisfied, so I removed it completely. Let me know if you have a counter example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea we should remove this.

@ioana-delaney
Copy link
Contributor Author

@wzhfy @gatorsmile @cloud-fan I've integrated star-join with join enumeration. Would you please take a look? Thanks.

val join = outer.union(inner)

// Disjoint sets
outer.intersect(inner).isEmpty &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the check in the beginning of buildJoin can guarantee this already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzhfy @viirya I intentionally added the condition here, in addition to the outside disjointness check, so the star algorithm is self contained. I would prefer to keep it here as well, unless you have strong objections.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok for me.

// Join is a subset of the star-join
join.subsetOf(starJoins) ||
// Star-join is a subset of join
starJoins.subsetOf(join) ||
Copy link
Member

@viirya viirya Apr 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't a star join be reordered later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya The star filter ensures that star-joins will be planned together. It is the cost based optimizer that decide on the best execution plan within a star-join. Let me know if I answer your question.

Copy link
Member

@viirya viirya Apr 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReorderJoin will reorder the star join plans based on heuristics. Doesn't this cost-based join reorder rule breaks the order created by ReorderJoin? Here we only ask this rule doesn't try to reorder part of star join plans and non-star join plans, but it still can reorder the order among star join plans.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this cost-based join reorder rule breaks the order created by ReorderJoin?

This is expected from cost based reordering. ReorderJoin only puts connected items together, the order among these items is not optimized.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, with this added filter, CostBasedJoinReorder can also let the star join plans together, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So do we still need ReorderJoin? Looks like we don't need it anymore if we don't care about the order created by it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReorderJoin is done heuristically. It can be useful when cbo is off.

Copy link
Member

@viirya viirya Apr 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. right. forgot that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya The cost-based optimizer will find the best plan for the star-join. The star filter is a heuristic within join enumeration to limit the join sequences evaluated.

@SparkQA
Copy link

SparkQA commented Apr 6, 2017

Test build #75561 has finished for PR 17546 at commit 5eb9b30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class JoinReorderDP(conf: SQLConf) extends PredicateHelper with Logging
  • case class JoinReorderDPFilters(conf: SQLConf) extends PredicateHelper
  • case class JoinGraphInfo (starJoins: Set[Int], nonStarJoins: Set[Int])

@wzhfy
Copy link
Contributor

wzhfy commented Apr 6, 2017

So now we have two entry points for star schema, one is in ReorderJoin and the other is in CostBasedJoinReorder?

}

/**
* Helper class that keeps information about the join graph as sets of Int.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sets of Int -> sets of item ids

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzhfy I will correct that. Thank you.

attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
nullCount = 0, avgLen = 4, maxLen = 4),

// T1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some comment to indicate it's a table not in a star schema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzhfy I will update the comment.


private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
// F1 (fact table)
attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(50),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an integer attribute, if its value range is [1, 50], ndv can't be larger than 50.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for some other attributes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzhfy I will change the max values.

To control the layout of the join plans, I intentionally kept certain stats constant (e.g. size on the non-fact tables) and only varied the rowcount and the number of distinct values.

assertEqualPlans(query, expected)
}

test("Test 6: No RI star") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a test for multi-star?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzhfy Star detection algorithm returns the star-join for the largest fact table. I will generalize the algorithm in a follow up PR and add corresponding test cases.

val join = outer.union(inner)

// Disjoint sets
outer.intersect(inner).isEmpty &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

def buildJoinGraphInfo(
items: Seq[LogicalPlan],
conditions: Set[Expression],
planIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems multi-star detection is not supported?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yap, you can see StarSchemaDetection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzhfy I will generalize the star detection algorithm in a follow up PR.

@ioana-delaney
Copy link
Contributor Author

@wzhfy Yes, star-schema is called from both ReorderJoin and CostBasedJoinReorder.

@SparkQA
Copy link

SparkQA commented Apr 6, 2017

Test build #75583 has finished for PR 17546 at commit 9e81154.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -736,6 +736,12 @@ object SQLConf {
.checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].")
.createWithDefault(0.7)

val JOIN_REORDER_DP_STAR_FILTER =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any cases we don't want to enable this if cbo is enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Star join plans are expected to have an optimal execution based on their referential integrity constraints among the tables. It is a good heuristic. I expect that once CBO is enabled by default, star joins will also be enabled.

Copy link
Member

@viirya viirya Apr 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we can have this as true by default? Or even we don't need this flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Yes, we can enable the feature by default, but I would like to keep the filter under the config option.

(nameToAttr("f1_fk1") === nameToAttr("d1_pk")) &&
(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))

val expected =
Copy link
Member

@viirya viirya Apr 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have another optimizer without this filter and see the difference between two optimized plans from two optimizers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzhfy We have the results presented in SPARK-17791. I didn't run TPC-DS with CBO enabled, but it is on my to-do list.

// level 1: {f1 d1 }, {d2 f1 }
// level 2: {d2 f1 d1 }
// level 3: {t2 d1 d2 f1 }, {t1 d1 d2 f1 }
// level 4: {f1 t1 t2 d1 d2 }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the order of items in this representation matter? If so, {f1 t1 t2 d1 d2 } is confusing because it looks like f1 will join with t1, t2 first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya No, it doesn't matter. It does not reflect the order in which the tables are joined. It only shows the set of tables that will be joined together. To find out the actual order, you can look at the order of the joins in the "expected" query.

buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
.doc("Applies star-join filter heuristics to cost based join enumeration.")
.booleanConf
.createWithDefault(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logics will be enabled if and only if both conf.cboEnabled and conf.joinReorderEnabled are true. Thus, it is safe to be true by default?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Regardless of the default value, I still want to control the filters with their own knobs. The filters are applied on top of the join enumeration. They need to have their own control.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine to keep this conf. I am just thinking whether we should change the default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile I am also fine with changing the default.
@wzhfy What do you think?

Copy link
Contributor

@ron8hu ron8hu Apr 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Spark 2.2, we introduced a couple of new configuration parameters in optimizer area. In order to play on the safe side, we set the default value to false. I suggest that we can change the default value to true AFTER we are sure that the new optimizer feature does not cause any regression. I think the system regression/integration test suites can help us make a decision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ron8hu Thank you. We will keep the default false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I also think we keep the default false.

@@ -134,7 +132,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr
* For cost evaluation, since physical costs for operators are not available currently, we use
* cardinalities and sizes to compute costs.
*/
object JoinReorderDP extends PredicateHelper with Logging {
case class JoinReorderDP(conf: SQLConf) extends PredicateHelper with Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert it back?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile I would like to control the filters on top of the join enumeration. We might have other filters, e.g. left-deep trees only.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, we can add the change if needed. Now, this change is not being used by this PR. We already pass conf in the function call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile I misunderstood your previous comment. Yes, I will revert it.

val result =
// Do reordering if the number of items is appropriate and join conditions exist.
// We also need to check if costs of all items can be evaluated.
if (items.size > 2 && items.size <= conf.joinReorderDPThreshold && conditions.nonEmpty &&
items.forall(_.stats(conf).rowCount.isDefined)) {
JoinReorderDP.search(conf, items, conditions, output)
JoinReorderDP(conf).search(conf, items, conditions, output)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert it back?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted.

@viirya
Copy link
Member

viirya commented Apr 7, 2017

This looks pretty good over all.

@SparkQA
Copy link

SparkQA commented Apr 8, 2017

Test build #75616 has finished for PR 17546 at commit 830255c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ioana-delaney
Copy link
Contributor Author

@cloud-fan Do you have any comments?

*
* Given the outer/inner and the star/non-star sets,
* the following plan combinations are allowed:
* 1) (outer U inner) is a subset of star-join
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what outer and inner refer to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan The outer/inner represents the join plan combinations generated by JoinReorderDP. JoinReorderDP calls them oneJoinPlan/otherJoinPlan. I will rename them to align to join DP.

* d2
*
* star: {d1, f1, d2}
* non-star: {t2, t1, t3}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also have an example about outer and inner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan outer/inner i.e. oneJoinPlan/otherJoinPlan represent all the plan permutations generated by JoinReorderDP. For example, at level 1, join enumeration will combine plans from level 0 e.g. oneJoinPlan = (f1) and otherJoinPlan = (d1). At level 2, it will generate plans from plan combinations at level 0 and level 1 e.g. oneJoinPlan = (d2) and otherJoinPlan = {f1, d1}, and so on. I will clarify the comment with more details.

val isValidJoinCombination =
JoinReorderDPFilters(conf).starJoinFilter(oneJoinPlan.itemIds, otherJoinPlan.itemIds,
filters.get)
if (!isValidJoinCombination) return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the logic here? If it's not a star join then it's not a valid join combination?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan The star filter will eliminate joins among star and non-star tables until the star-joins are build. The assumption is that star-join should be planned together. I will add more comments.

@@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr

private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = {
val (items, conditions) = extractInnerJoins(plan)
// TODO: Compute the set of star-joins and use them in the join enumeration
// algorithm to prune un-optimal plan choices.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the overall approach? Run star join reorder rule first, and make the DP join reorder rule respect star join and keep the join order generated by star join reorder rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Star-schema detection is first called to compute the set of tables connected by star-schema relationship e.g. {F1, D1, D2} in our code example. This call does not do any join reordering among the tables. It simply computes the set of tables in a star-schema relationship. Then, DP join enumeration generates all possible plan combinations among the entire set of tables in a the join e.g. {F1, D1}, {F1, T1}, {T2, T3}, etc. Star-filter, if called, will eliminate plan combinations among the star and non-star tables until the star join combinations are built. For example, {F1, D1} combination will be retained since it involves tables in a star schema, but {F1, T1} will be eliminated since it mixes star and non-star tables. Star-filter simply decides what combinations to retain but it will not decide on the order of execution of those tables. The order of the joins within a star-join and for the overall plan is decided by the DP join enumeration. Star-filter only ensures that tables in a star-join are planned together.

Copy link
Contributor

@cloud-fan cloud-fan Apr 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, so if users enable the star join reorder and cbo join reorder rules together, stat join will still be overwritten by cbo join reorder rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan That’s correct. If CBO is enabled, it will do the final planning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a plan to completely merge these 2 rules?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Once CBO is enabled by default, I can remove the call from ReorderJoin.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the meaning of "remove the call from ReorderJoin"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Star-schema detection is called from both CostBasedJoinReorder and ReorderJoin. In the latter case, it is called to reorder star-joins based on heuristics if cbo is disabled.

When cost-based optimizer becomes the default optimizer, we don’t need to reorder star-joins in ReorderJoin based on heuristics since the cost-based optimizer will choose the best plan based on cost.

@SparkQA
Copy link

SparkQA commented Apr 10, 2017

Test build #75674 has finished for PR 17546 at commit 7a5d1d0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// 2. star-join is a subset of (oneJoinPlan U otherJoinPlan)
// 3. (oneJoinPlan U otherJoinPlan) is a subset of non star-join
val isValidJoinCombination =
JoinReorderDPFilters(conf).starJoinFilter(oneJoinPlan.itemIds, otherJoinPlan.itemIds,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will create a new JoinReorderDPFilters instance every time we try to build a join node.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass the conf as a parameter of def buildJoinGraphInfo, then JoinReorderDPFilters can be an object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzhfy I agree with passing conf as a parameter to buildJoinGraphInfo.

class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase {

override val conf = new SQLConf().copy(
CASE_SENSITIVE -> true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzhfy Removed.

case _ =>
(None, None)
}.unzip
Some(JoinGraphInfo(starInt.flatten.toSet, nonStarInt.flatten.toSet))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above logic can be simplified:

val itemMap = itemIndex.toMap
Some(JoinGraphInfo(starJoin.map(itemMap).toSet, nonStarJoin.map(itemMap).toSet))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzhfy This is a very useful code simplification. Thank you!


if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
// Should not join two overlapping item sets.
return None
}

if (conf.joinReorderDPStarFilter && filters.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move condition conf.joinReorderDPStarFilter to def buildJoinGraphInfo? We only need to check it once, if it's turned off, just return an empty filter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzhfy I moved the conf condition inside buildJoinGraphInfo.

@@ -76,7 +76,7 @@ case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper {

val emptyStarJoinPlan = Seq.empty[LogicalPlan]

if (!conf.starSchemaDetection || input.size < 2) {
if (input.size < 2) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed unnecessary call to conf.starSchemaDetection.


class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase {

override val conf = new SQLConf().copy(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed star-filter dependency on the STARSCHEMA_DETECTION conf. When star-schema is called from CBO, it is under the control of JOIN_REORDER_DP_STAR_FILTER. When called from ReorderJoin, it is under the control of the STARSCHEMA_DETECTION conf.

@SparkQA
Copy link

SparkQA commented Apr 11, 2017

Test build #75713 has finished for PR 17546 at commit c9ec7c9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -150,12 +148,15 @@ object JoinReorderDP extends PredicateHelper with Logging {
case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0))
}.toMap)

// Build filters from the join graph to be used by the search algorithm.
val filters = JoinReorderDPFilters.buildJoinGraphInfo(conf, items, conditions, itemIndex)
Copy link
Contributor

@cloud-fan cloud-fan Apr 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why call it filters? should we name it joinInfo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I call it "filters" since the join graph information is used as filters on top of the DP join enumeration. It suggests the purpose for which the graph info was gathered.

If this is confusing, I can rename. Let me know.

*
* 1) Star-join filters: Plan star-joins together since they are assumed
* to have an optimal execution based on their RI relationship.
* 2) Cartesian products: Defer their planning later in the graph to avoid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have this logic in the dp join reorder algorithm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan DP join enumeration relies on ReorderJoin to pull up the Cartesian products. Instead, I think that Cartesian pull-up should be implemented as another filter on top of DP join enumeration.

* 2) Cartesian products: Defer their planning later in the graph to avoid
* large intermediate results (expanding joins, in general).
* 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing
* intermediate results.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it, doesn't left-deep tree materialize intermediate results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Left-deep trees are executed in a pipelined fashion. Given the following join trees:

left-deep tree:

     join
    /     \ 
  join     t3 
 /     \
t1     t2

bushy-tree:

  join
 /     \
t1      join
        /    \
      t2     t3

The bushy-tree plan (right-deep in this case) requires the result of (t2 join t3) to be materialized before joining with t1. The left-deep tree doesn’t have this requirement.

@cloud-fan
Copy link
Contributor

LGTM, merging to master!

@asfgit asfgit closed this in fbe4216 Apr 13, 2017
@ioana-delaney
Copy link
Contributor Author

@cloud-fan Thank you for merging!

peter-toth pushed a commit to peter-toth/spark that referenced this pull request Oct 6, 2018
…amming join enumeration

## What changes were proposed in this pull request?

Implements star-join filter to reduce the search space for dynamic programming join enumeration. Consider the following join graph:

```
T1       D1 - T2 - T3
  \     /
    F1
     |
    D2

star-join: {F1, D1, D2}
non-star: {T1, T2, T3}
```
The following join combinations will be generated:
```
level 0: (F1), (D1), (D2), (T1), (T2), (T3)
level 1: {F1, D1}, {F1, D2}, {T2, T3}
level 2: {F1, D1, D2}
level 3: {F1, D1, D2, T1}, {F1, D1, D2, T2}
level 4: {F1, D1, D2, T1, T2}, {F1, D1, D2, T2, T3 }
level 6: {F1, D1, D2, T1, T2, T3}
```

## How was this patch tested?

New test suite ```StarJOinCostBasedReorderSuite.scala```.

Author: Ioana Delaney <ioanamdelaney@gmail.com>

Closes apache#17546 from ioana-delaney/starSchemaCBOv3.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants