Skip to content

Conversation

@davies
Copy link
Contributor

@davies davies commented Jan 5, 2016

This PR manage the memory used by window functions (buffered rows), also enable external spilling.

After this PR, we can run window functions on a partition with hundreds of millions of rows with only 1G.

@SparkQA
Copy link

SparkQA commented Jan 5, 2016

Test build #48787 has finished for PR 10605 at commit 6b98593.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ExternalRowBuffer(sorter: UnsafeExternalSorter, numFields: Int)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[execution]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are skipping all the records we have already processed here; this shouldn't be to expensive; but it still adds some complexity (O(n * (n - 1) /2) towards O(n * n)). I'd like to see/do a few benchmarks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it doubles the time. If N is large, it's kind of like double of infinity is still infinity :-)

Actually I tried to add a skip(n) API for UnsafeSortIterator to improve this a little bit, but it seems not worth that complexity, so removed.

I think the next step could be have a fast path for those functions that has communitativity, we could reduce the complexity to O(N * 2)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true that :)

This is fine for now; it is not used that much.

@hvanhovell
Copy link
Contributor

@davies this is pretty awesome! I have taken long look at the window code and it looks solid. I am less of an expert on the Memory management front, so maybe someone else should take a look at that.

I do have one small concern: I am absolutely convinced that in the case of large partition sizes this will outperform the current implementation by a margin. However, I am wondering what happens if we consider smaller partition sizes (e.g. n 2-32). We might take a small hit in these cases, because of the added complexity. Have you done some benchmarking on this? If you haven't this is a link to benchmark I used for my initial window prototype: https://issues.apache.org/jira/secure/attachment/12745984/perf_test3.scala

I'd like to finish with something we should not address in this PR (thinking out loud if you will). The child node of a Window operator is allmost always an ExternalSort operator. Wouldn't it be cool if we could eliminate the row buffer of the Window by using the external sorts buffer?

@SparkQA
Copy link

SparkQA commented Jan 5, 2016

Test build #48790 has finished for PR 10605 at commit 2f8705a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Jan 6, 2016

@hvanhovell Here is the result after having a fast path for small partition:

+-----+----------------+------+-----------+-----+
| Size|            Name|Master|SPARK-12295| Diff|
+-----+----------------+------+-----------+-----+
|    2|Entire Partition|  2007|       1933|0.963|
|    2|         Growing|  2576|       2193|0.851|
|    2|       Shrinking|  2945|       2829|0.961|
|    2|         Sliding|  2206|       2320|1.052|
|    8|Entire Partition|  2097|       1913|0.912|
|    8|         Growing|  1959|       2235|1.141|
|    8|       Shrinking|  2099|       2359|1.124|
|    8|         Sliding|  2124|       2040|0.960|
|   32|Entire Partition|  2261|       2260|1.000|
|   32|         Growing|  2143|       2218|1.035|
|   32|       Shrinking|  2357|       2401|1.019|
|   32|         Sliding|  2691|       2370|0.881|
|  128|Entire Partition|  2146|       1907|0.889|
|  128|         Growing|  2019|       1928|0.955|
|  128|       Shrinking|  3085|       3725|1.207|
|  128|         Sliding|  2040|       2110|1.034|
|  256|Entire Partition|  1962|       2042|1.041|
|  256|         Growing|  2777|       2335|0.841|
|  256|       Shrinking|  4179|       5027|1.203|
|  256|         Sliding|  2106|       2155|1.023|
| 1024|Entire Partition|  1710|       1734|1.014|
| 1024|         Growing|  1873|       2022|1.080|
| 1024|       Shrinking| 10456|      14588|1.395|
| 1024|         Sliding|  1928|       1917|0.994|
| 4096|Entire Partition|  1769|       1670|0.944|
| 4096|         Growing|  1986|       1844|0.928|
| 4096|       Shrinking| 39184|      67151|1.714|
| 4096|         Sliding|  2080|       2108|1.013|
|16192|Entire Partition|  2835|       1805|0.637|
|16192|         Growing|  2189|       1851|0.846|
|16192|       Shrinking|151624|     337517|2.226|
|16192|         Sliding|  4734|       5615|1.186|
+-----+----------------+------+-----------+-----+

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
asfgit pushed a commit that referenced this pull request Jan 6, 2016
…uct)

Cartesian product use UnsafeExternalSorter without comparator to do spilling, it will NPE if spilling happens.

This bug also hitted by #10605

cc JoshRosen

Author: Davies Liu <davies@databricks.com>

Closes #10606 from davies/fix_spilling.
Conflicts:
	core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
	project/MimaExcludes.scala
@SparkQA
Copy link

SparkQA commented Jan 6, 2016

Test build #48812 has finished for PR 10605 at commit 60edf13.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 6, 2016

Test build #48806 has finished for PR 10605 at commit f464cf6.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 6, 2016

Test build #48817 has finished for PR 10605 at commit 18c7a22.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 6, 2016

Test build #48835 has finished for PR 10605 at commit 560ef90.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this configurable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on making this configurable

@hvanhovell
Copy link
Contributor

@davies I left some final comments; feel free to use/ignore them.

The benchmark looks good. The shrinking case was expected to perform horrible, and it does. The other figures are so close to the old figure that I don't think we can safely say that they are different (given the limited scope and duration of the test). I was wondering, how does this perform without the optimized (hybrid) approach?

LGTM (pending review of the ExternalSort* modifications)

@davies
Copy link
Contributor Author

davies commented Jan 6, 2016

@hvanhovell Before the fast path, tiny partitions look horrible:

+-----+----------------+------+----------+-----+
| Size|            Name|Master|This PR(before fast path) | Diff|
+-----+----------------+------+----------+-----+
|    2|Entire Partition|  2007|     12395|6.176|
|    2|         Growing|  2576|     15729|6.106|
|    2|       Shrinking|  2945|     13376|4.542|
|    2|         Sliding|  2206|     13419|6.083|
|    8|Entire Partition|  2097|      4869|2.322|
|    8|         Growing|  1959|      4495|2.295|
|    8|       Shrinking|  2099|      5021|2.392|
|    8|         Sliding|  2124|      4739|2.231|
|   32|Entire Partition|  2261|      2750|1.216|
|   32|         Growing|  2143|      3062|1.429|
|   32|       Shrinking|  2357|      3233|1.372|
|   32|         Sliding|  2691|      3270|1.215|
|  128|Entire Partition|  2146|      1964|0.915|
|  128|         Growing|  2019|      2331|1.155|
|  128|       Shrinking|  3085|      4845|1.571|
|  128|         Sliding|  2040|      2406|1.179|
|  256|Entire Partition|  1962|      2110|1.075|
|  256|         Growing|  2777|      2479|0.893|
|  256|       Shrinking|  4179|      7733|1.850|
|  256|         Sliding|  2106|      2344|1.113|
| 1024|Entire Partition|  1710|      2367|1.384|
| 1024|         Growing|  1873|      2096|1.119|
| 1024|       Shrinking| 10456|     22028|2.107|
| 1024|         Sliding|  1928|      2456|1.274|
| 4096|Entire Partition|  1769|      2062|1.166|
| 4096|         Growing|  1986|      1941|0.977|
| 4096|       Shrinking| 39184|     75981|1.939|
| 4096|         Sliding|  2080|      2376|1.142|
|16192|Entire Partition|  2835|      1630|0.575|
|16192|         Growing|  2189|      1801|0.823|
|16192|       Shrinking|151624|    305837|2.017|
|16192|         Sliding|  4734|      4430|0.936|
+-----+----------------+------+----------+-----+

I think the cost came from UnsafeExternalSorter, once the partition is larger than 4096, the overhead is under 10%. For larger partitions, this PR may be faster. it's also safe that 4k rows are not managed (4M memory if each row is 1k bytes), So I think we don't need a configuration for this, we already have too much configurations.

@davies
Copy link
Contributor Author

davies commented Jan 6, 2016

@JoshRosen Could you help to review the ExternalSort* parts?

@davies
Copy link
Contributor Author

davies commented Jan 7, 2016

I'm going to merge this to avoid conflict, any comments will be addressed by follow-up PR.

@asfgit asfgit closed this in 6a1c864 Jan 7, 2016
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ExternalRowBuffer, can this copy() be saved ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tedyu both buffers implementations are statefull (they contain an iterator). Each frame traverses the partition in a different way, so we cannot share iterators between them.

davies pushed a commit to davies/spark that referenced this pull request Mar 9, 2016
This PR manage the memory used by window functions (buffered rows), also enable external spilling.

After this PR, we can run window functions on a partition with hundreds of millions of rows with only 1G.

Author: Davies Liu <davies@databricks.com>

Closes apache#10605 from davies/unsafe_window.

Conflicts:
	core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
	core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
	sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
davies pushed a commit to davies/spark that referenced this pull request Mar 9, 2016
This PR manage the memory used by window functions (buffered rows), also enable external spilling.

After this PR, we can run window functions on a partition with hundreds of millions of rows with only 1G.

Author: Davies Liu <davies@databricks.com>

Closes apache#10605 from davies/unsafe_window.

import java.util.Comparator;

import org.apache.avro.reflect.Nullable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI this should probably have been import javax.annotation.Nullable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants