Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25379][SQL] Improve AttributeSet and ColumnPruning performance #22364

Closed
wants to merge 3 commits into from

Conversation

mgaido91
Copy link
Contributor

@mgaido91 mgaido91 commented Sep 8, 2018

What changes were proposed in this pull request?

This PR contains 3 optimizations:

  1. it improves significantly the operation -- on AttributeSet. As a benchmark for the -- operation, the following code has been run
test("AttributeSet -- benchmark") {
    val attrSetA = AttributeSet((1 to 100).map { i => AttributeReference(s"c$i", IntegerType)() })
    val attrSetB = AttributeSet(attrSetA.take(80).toSeq)
    val attrSetC = AttributeSet((1 to 100).map { i => AttributeReference(s"c2_$i", IntegerType)() })
    val attrSetD = AttributeSet((attrSetA.take(50) ++ attrSetC.take(50)).toSeq)
    val attrSetE = AttributeSet((attrSetC.take(50) ++ attrSetA.take(50)).toSeq)
    val n_iter = 1000000
    val t0 = System.nanoTime()
    (1 to n_iter) foreach { _ =>
      val r1 = attrSetA -- attrSetB
      val r2 = attrSetA -- attrSetC
      val r3 = attrSetA -- attrSetD
      val r4 = attrSetA -- attrSetE
    }
    val t1 = System.nanoTime()
    val totalTime = t1 - t0
    println(s"Average time: ${totalTime / n_iter} us")
  }

The results are:

Before PR - Average time: 67674 us (100  %)
After PR -  Average time: 28827 us (42.6 %)
  1. In ColumnPruning, it replaces the occurrences of (attributeSet1 -- attributeSet2).nonEmpty with attributeSet1.subsetOf(attributeSet2) which is order of magnitudes more efficient (especially where there are many attributes). Running the previous benchmark replacing -- with subsetOf returns:
Average time: 67 us (0.1 %)
  1. Provides a more efficient way of building AttributeSets, which can greatly improve the performance of the methods references and outputSet of Expression and QueryPlan. This basically avoids unneeded operations (eg. creating many AttributeEqual wrapper classes which could be avoided)

The overall effect of those optimizations has been tested on ColumnPruning with the following benchmark:

test("ColumnPruning benchmark") {
    val attrSetA = (1 to 100).map { i => AttributeReference(s"c$i", IntegerType)() }
    val attrSetB = attrSetA.take(80)
    val attrSetC = attrSetA.take(20).map(a => Alias(Add(a, Literal(1)), s"${a.name}_1")())

    val input = LocalRelation(attrSetA)
    val query1 = Project(attrSetB, Project(attrSetA, input)).analyze
    val query2 = Project(attrSetC, Project(attrSetA, input)).analyze
    val query3 = Project(attrSetA, Project(attrSetA, input)).analyze
    val nIter = 100000
    val t0 = System.nanoTime()
    (1 to nIter).foreach { _ =>
      ColumnPruning(query1)
      ColumnPruning(query2)
      ColumnPruning(query3)
    }
    val t1 = System.nanoTime()
    val totalTime = t1 - t0
    println(s"Average time: ${totalTime / nIter} us")
}

The output of the test is:

Before PR - Average time: 733471 us (100  %)
After PR  - Average time: 362455 us (49.4 %)

The performance improvement has been evaluated also on the SQLQueryTestSuite's queries:

(before) org.apache.spark.sql.catalyst.optimizer.ColumnPruning                                              518413198 / 1377707172                          2756 / 15717                                   
(after)  org.apache.spark.sql.catalyst.optimizer.ColumnPruning                                              415432579 / 1121147950                          2756 / 15717                                   
% Running time                                                                                                  80.1% / 81.3%

Also other rules benefit especially from (3), despite the impact is lower, eg:

(before) org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences                                  307341442 / 623436806                           2154 / 16480                                   
(after)  org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences                                  290511312 / 560962495                           2154 / 16480                                   
% Running time                                                                                                  94.5% / 90.0%

The reason why the impact on the SQLQueryTestSuite's queries is lower compared to the other benchmark is that the optimizations are more significant when the number of attributes involved is higher. Since in the tests we often have very few attributes, the effect there is lower.

How was this patch tested?

run benchmarks + existing UTs

@SparkQA
Copy link

SparkQA commented Sep 8, 2018

Test build #95828 has finished for PR 22364 at commit 14edbe6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

mgaido91 commented Sep 8, 2018

cc @gatorsmile @maropu

new AttributeSet(
baseSet
.flatMap(_.references)
.map(new AttributeEquals(_)).toSet)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, good catch. one question here; is it bad to just simply write new AttributeSet(baseSet.map(_.references.baseSet).foldLeft(Set.empty[AttributeEquals])(_ ++ _)))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just duplicating the code in fromAttributeSets, right? What would be the benefit of doing that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The simple one is better? maropu@cba767e

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so there are 2 reasons why the current one is better than the proposed:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, ok. can you rename fromAttributeSets -> apply, and then change the mutable set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot rename to apply because there is already an apply with Iterable and we cannot have 2 apply one with Iterable[Expression] and one with Iterable[AbstractSet] because they would be conflicting.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is ok. We still need to use the mutable set instead of new AttributeSet(sets.foldLeft(Set.empty[AttributeEquals])(_ ++ _.baseSet))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to use a LinkedHashSet because without keeping the order of insertion I had some UT failures. But I can use the more compact syntax you suggested here, thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your comment and help here!

@SparkQA
Copy link

SparkQA commented Sep 10, 2018

Test build #95869 has finished for PR 22364 at commit 2afbe9b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 11, 2018

Test build #95944 has finished for PR 22364 at commit 6aeda5f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Sep 11, 2018

Test build #95957 has finished for PR 22364 at commit 6aeda5f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

anymore comments @maropu @gatorsmile ?

@maropu
Copy link
Member

maropu commented Sep 14, 2018

Can we replace the syntax((ouputSetA -- outputSetB).nonEmpty) in other places, too? e.g.,

if ((newJoin.outputSet -- neededFromNewJoin).nonEmpty) {

@mgaido91
Copy link
Contributor Author

mgaido91 commented Sep 14, 2018

@maropu yes, that can be done as well, but I think the main focus of this PR is the ColumnPruning rule, so I think it would be great to do that in a separate PR.

In this way, we can also better track and evaluate the performance gain we can obtain with a given change.

What do you think? Thanks.

@maropu
Copy link
Member

maropu commented Sep 14, 2018

IIUC this pr targets to improve AttributeSet operations, so all the places get the same performance gain with ColumnPruning? If so, I think its ok to fix all the places in this pr. cc: @gatorsmile

@mgaido91
Copy link
Contributor Author

@maropu anyway I checked and that is the only other places where this pattern happens. So I am ok including it here. The point is that there the situation is a bit different, ie. it is not an (AttributeSet -- AttributeSet).nonEmpty case, but it is a (AttributeSet -- Seq[Attribute]).nonEmpty. So I am not sure the performance gain/difference will be significant in this case.

I'll try and do some benchmarks and I'll do the change if I see a significant difference. Thanks.

@mgaido91
Copy link
Contributor Author

@maropu I have run the following benchmark:

  test("AttributeSet -- benchmark") {
    val attrSetA = AttributeSet((1 to 100).map { i => AttributeReference(s"c$i", IntegerType)() })
    val attrSetB = attrSetA.take(80).toSeq
    val attrSetC = (1 to 100).map { i => AttributeReference(s"c2_$i", IntegerType)() }
    val attrSetD = (attrSetA.take(50) ++ attrSetC.take(50)).toSeq
    val attrSetE = attrSetC.take(50) ++ attrSetA.take(50)
    val n_iter = 1000000
    val t0 = System.nanoTime()
    (1 to n_iter) foreach { _ =>
      val r1 = attrSetA -- attrSetB
      val r2 = attrSetA -- attrSetC
      val r3 = attrSetA -- attrSetD
      val r4 = attrSetA -- attrSetE
    }
    val t1 = System.nanoTime()
    (1 to n_iter) foreach { _ =>
      val r1 = attrSetA subsetOf AttributeSet(attrSetB)
      val r2 = attrSetA subsetOf AttributeSet(attrSetC)
      val r3 = attrSetA subsetOf AttributeSet(attrSetD)
      val r4 = attrSetA subsetOf AttributeSet(attrSetE)
    }
    val t2 = System.nanoTime()
    val totalTime1 = t1 - t0
    val totalTime2 = t2 - t1
    println(s"Average time for --: ${totalTime1 / n_iter} us")
    println(s"Average time for subsetOf: ${totalTime2 / n_iter} us")
  }

And the output is:

Average time for --: 25065 us
Average time for subsetOf: 108638 us

So for the case you mentioned, using subsetOf would instead introduce a performance regression. I have also run all the tests in StarJoinCostBasedReorderSuite for 1000 times and the perf regression was confirmed:

Running StarJoinCostBasedReorderSuite's tests 1000 times takes w/o  change: 68877186927 us
Running StarJoinCostBasedReorderSuite's tests 1000 times takes with change: 70689955856 us

The point is that there we have a Seq[Attribute] instead of an AttributeSet as parameter.

Hope this is clear, let me know otherwise. Thanks.

@maropu
Copy link
Member

maropu commented Sep 18, 2018

oh, yea, thanks! I wrongly mixed up (AttributeSet -- Seq[Attribute]).nonEmpty with this case.

@maropu
Copy link
Member

maropu commented Sep 18, 2018

Basically, this change looks good to me. I leave this to other reviewers.

@mgaido91
Copy link
Contributor Author

thanks @maropu for your review! @gatorsmile do you have any comments?

@maropu
Copy link
Member

maropu commented Sep 18, 2018

also, cc: @dongjoon-hyun

@mgaido91
Copy link
Contributor Author

mgaido91 commented Sep 24, 2018

I also run on the TPCDS and TPCH benchmark with 10 runs:

Rule Effective After Effective Before Total After Total Before % Eff % Total
org.apache.spark.sql.catalyst.optimizer.ColumnPruning (TPCDS) 8492237639 9927405142 37554775945 44771729889 85.5433773 83.8805560
org.apache.spark.sql.catalyst.optimizer.ColumnPruning (TPCH) 214235083 292912646 933477165 1166285066 73.1395813 80.0385079

Which confirm a ~20% improvement after the change (with wide-column datasets, the improvement is much higher as showed by the benchmark referenced in the PR description).

@mgaido91
Copy link
Contributor Author

@cloud-fan @dongjoon-hyun @gatorsmile any luck with this? Thanks.

@cloud-fan
Copy link
Contributor

LGTM, merging to master!

@asfgit asfgit closed this in 44a7174 Sep 26, 2018
@maropu
Copy link
Member

maropu commented Sep 26, 2018

@mgaido91 well done!

@mgaido91
Copy link
Contributor Author

thanks @cloud-fan and @maropu

daspalrahul pushed a commit to daspalrahul/spark that referenced this pull request Sep 29, 2018
## What changes were proposed in this pull request?

This PR contains 3 optimizations:
 1)  it improves significantly the operation `--` on `AttributeSet`. As a benchmark for the `--` operation, the following code has been run
```
test("AttributeSet -- benchmark") {
    val attrSetA = AttributeSet((1 to 100).map { i => AttributeReference(s"c$i", IntegerType)() })
    val attrSetB = AttributeSet(attrSetA.take(80).toSeq)
    val attrSetC = AttributeSet((1 to 100).map { i => AttributeReference(s"c2_$i", IntegerType)() })
    val attrSetD = AttributeSet((attrSetA.take(50) ++ attrSetC.take(50)).toSeq)
    val attrSetE = AttributeSet((attrSetC.take(50) ++ attrSetA.take(50)).toSeq)
    val n_iter = 1000000
    val t0 = System.nanoTime()
    (1 to n_iter) foreach { _ =>
      val r1 = attrSetA -- attrSetB
      val r2 = attrSetA -- attrSetC
      val r3 = attrSetA -- attrSetD
      val r4 = attrSetA -- attrSetE
    }
    val t1 = System.nanoTime()
    val totalTime = t1 - t0
    println(s"Average time: ${totalTime / n_iter} us")
  }
```
The results are:
```
Before PR - Average time: 67674 us (100  %)
After PR -  Average time: 28827 us (42.6 %)
```
2) In `ColumnPruning`, it replaces the occurrences of `(attributeSet1 -- attributeSet2).nonEmpty` with `attributeSet1.subsetOf(attributeSet2)` which is order of magnitudes more efficient (especially where there are many attributes). Running the previous benchmark replacing `--` with `subsetOf` returns:
```
Average time: 67 us (0.1 %)
```

3) Provides a more efficient way of building `AttributeSet`s, which can greatly improve the performance of the methods `references` and `outputSet` of `Expression` and `QueryPlan`. This basically avoids unneeded operations (eg. creating many `AttributeEqual` wrapper classes which could be avoided)

The overall effect of those optimizations has been tested on `ColumnPruning` with the following benchmark:

```
test("ColumnPruning benchmark") {
    val attrSetA = (1 to 100).map { i => AttributeReference(s"c$i", IntegerType)() }
    val attrSetB = attrSetA.take(80)
    val attrSetC = attrSetA.take(20).map(a => Alias(Add(a, Literal(1)), s"${a.name}_1")())

    val input = LocalRelation(attrSetA)
    val query1 = Project(attrSetB, Project(attrSetA, input)).analyze
    val query2 = Project(attrSetC, Project(attrSetA, input)).analyze
    val query3 = Project(attrSetA, Project(attrSetA, input)).analyze
    val nIter = 100000
    val t0 = System.nanoTime()
    (1 to nIter).foreach { _ =>
      ColumnPruning(query1)
      ColumnPruning(query2)
      ColumnPruning(query3)
    }
    val t1 = System.nanoTime()
    val totalTime = t1 - t0
    println(s"Average time: ${totalTime / nIter} us")
}
```

The output of the test is:

```
Before PR - Average time: 733471 us (100  %)
After PR  - Average time: 362455 us (49.4 %)
```

The performance improvement has been evaluated also on the `SQLQueryTestSuite`'s queries:

```
(before) org.apache.spark.sql.catalyst.optimizer.ColumnPruning                                              518413198 / 1377707172                          2756 / 15717
(after)  org.apache.spark.sql.catalyst.optimizer.ColumnPruning                                              415432579 / 1121147950                          2756 / 15717
% Running time                                                                                                  80.1% / 81.3%
```

Also other rules benefit especially from (3), despite the impact is lower, eg:
```
(before) org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences                                  307341442 / 623436806                           2154 / 16480
(after)  org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences                                  290511312 / 560962495                           2154 / 16480
% Running time                                                                                                  94.5% / 90.0%
```

The reason why the impact on the `SQLQueryTestSuite`'s queries is lower compared to the other benchmark is that the optimizations are more significant when the number of attributes involved is higher. Since in the tests we often have very few attributes, the effect there is lower.

## How was this patch tested?

run benchmarks + existing UTs

Closes apache#22364 from mgaido91/SPARK-25379.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants