Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22599][SQL] In-Memory Table Pruning without Extra Reading #19810

Closed
wants to merge 47 commits into from

Conversation

CodingCat
Copy link
Contributor

@CodingCat CodingCat commented Nov 24, 2017

What changes were proposed in this pull request?

In the current implementation of Spark, InMemoryTableExec read all data in a cached table, filter CachedBatch according to stats and pass data to the downstream operators. This implementation makes it inefficient to reside the whole table in memory to serve various queries against different partitions of the table, which occupies a certain portion of our users' scenarios.

design doc: https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing

The following is an example of such a use case:

store_sales is a 1TB-sized table in cloud storage, which is partitioned by 'location'. The first query, Q1, wants to output several metrics A, B, C for all stores in all locations. After that, a small team of 3 data scientists wants to do some causal analysis for the sales in different locations. To avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole table in memory in Q1.

With the current implementation, even any one of the data scientists is only interested in one out of three locations, the queries they submit to Spark cluster is still reading 1TB data completely.

The reason behind the extra reading operation is that we implement CachedBatch as

case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)

where the stats is a part of every CachedBatch, so we can only filter batches for output of InMemoryTableExec operator by reading all data in in-memory table as input. The extra reading would be even more unacceptable when some of the table's data is evicted to disks.

We propose to introduce a new type of block, metadata block, for the partitions of RDD representing data in the cached table. Every metadata block contains stats info for all columns in a partition and is saved to BlockManager when executing compute() method for the partition. To minimize the number of bytes to read,

How was this patch tested?

  1. unit test: add 3 new unit tests

  2. performance test:

Environment: 6 Executors, each of which has 16 cores 90G memory

dataset: 1T TPCDS data

queries: tested 4 queries (Q19, Q46, Q34, Q27) in https://github.com/databricks/spark-sql-perf/blob/c2224f37e50628c5c8691be69414ec7f5a3d919a/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala

results: https://docs.google.com/spreadsheets/d/1A20LxqZzAxMjW7ptAJZF4hMBaHxKGk3TBEQoAJXfzCI/edit?usp=sharing

@CodingCat CodingCat changed the title Partition level pruning 2 [SQL][SPARK-22599] Partition level pruning 2 Nov 24, 2017
@CodingCat CodingCat changed the title [SQL][SPARK-22599] Partition level pruning 2 [SQL][SPARK-22599] In-memory table pruning without extra reading Nov 24, 2017
@CodingCat CodingCat changed the title [SQL][SPARK-22599] In-memory table pruning without extra reading [SQL][SPARK-22599] In-Memory Table Pruning without Extra Reading Nov 24, 2017
@CodingCat CodingCat changed the title [SQL][SPARK-22599] In-Memory Table Pruning without Extra Reading [SPARK-22599][SQL] In-Memory Table Pruning without Extra Reading Nov 24, 2017
@CodingCat
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Nov 24, 2017

Test build #84178 has finished for PR 19810 at commit a853ce6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 25, 2017

Test build #84181 has finished for PR 19810 at commit 9d450ad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@CodingCat
Copy link
Contributor Author

@dongjoon-hyun
Copy link
Member

Retest this please.

@cloud-fan
Copy link
Contributor

are you trying to optimize the case that data is too large to fit in memory? Spark RDD cache doesn't work well for this case.

@CodingCat
Copy link
Contributor Author

CodingCat commented Nov 28, 2017

Hi, @cloud-fan, this PR is not only for the case where the data size is larger than the memory size, even when all data is in-memory, I observed 10-40% speedup because the implementation here

(1) read less data

(2) started less tasks

you can understand this PR as it implements the functionality of Parquet's footer for the in-memory table

@SparkQA
Copy link

SparkQA commented Nov 28, 2017

Test build #84240 has finished for PR 19810 at commit 9d450ad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

When all data is in memory, what do you mean by reading less data? Starting less tasks makes sense.

@CodingCat
Copy link
Contributor Author

reading less data is a observation from the input metrics in Spark UI which includes both of local/remote read in BlockManagers, and also the overhead in BlockManager layer itself (especially when the user chooses to cache with serialized format)

but I didn't count how much it contributes to the speedup (and a small portion of data is in disk in my perf test)

@CodingCat
Copy link
Contributor Author

@cloud-fan would you mind continuing the review?

Copy link
Contributor

@sadikovi sadikovi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CodingCat Thanks for working on this - looks great! I left couple of comments, mainly to understand the code. Would appreciate if you could have a look. Thanks!

var rowCount = 0
var totalSize = 0L

val terminateLoop = (singleBatch: Boolean, rowIter: Iterator[InternalRow],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CodingCat Could you explain me what singleBatch means here? I cannot get my head around it:) Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make getting partition stats easier, we construct only one CatchedBatch for each partition when enabling the functionality proposed in this PR. singleBatch distinguishes the scenarios which enables/disables the functionality by introducing different while loop termination conditions, making the other code reusable

partitionFilters.reduceOption(And).getOrElse(Literal(true)),
partitionStatsSchema)
partitionFilter.initialize(partitionIndex)
if (!partitionFilter.eval(cachedBatch.stats)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any issues with discarding a partition based on statistics that could be partially computed (e.g. when total size in bytes of a partition iterator is larger than configurable batch size) as per https://github.com/apache/spark/pull/19810/files#diff-5fc188468d3066580ea9a766114b8f1dR74?

Would be it be beneficial to record such situation by logging it, and still include such partition when statistics are partially computed and filters are evaluated to false, or discard all statistics when some of the partitions hit this situation? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might not understand your proposal well...are you trying to simplify the logic in https://github.com/apache/spark/pull/19810/files#diff-5fc188468d3066580ea9a766114b8f1dR74? it would make the code simpler but degrade pruning effect here,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All good, no need to change. I was trying to understand the code, so my question would be referring to statistics collection overall, not changes in this PR. Link points to a condition (exists before this PR) that could potentially result in exiting iterator before exhausting all records in it, so statistics would be partially collected, which might affect any filtering that uses such statistics - though it is quite possibly handled later, or a theoretical use case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sadikovi this while loop is building CatchedBatch, it just decides what's the right time to seal the building window of a CatchedBatch and start the next one....so, in any way, you need to go through every all records in the partition,

@CodingCat
Copy link
Contributor Author

@sadikovi thanks for the review, I replied in comments

@cloud-fan
Copy link
Contributor

store_sales is a 1TB-sized table in cloud storage, which is partitioned by 'location'. The first query, Q1, wants to output several metrics A, B, C for all stores in all locations. After that, a small team of 3 data scientists wants to do some causal analysis for the sales in different locations. To avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole table in memory in Q1.

Reading your use case, it sounds like you are trying to optimize the case that data is too large to fit in memory. For that case, if a partition is on the disk, Spark needs to load the entire partition to memory before filtering blocks.

It sounds like something can be done better in 3rd party data sources, or we need to change the Spark core just for a better table cache, which seems risky.

@CodingCat
Copy link
Contributor Author

CodingCat commented Dec 6, 2017

@cloud-fan for this case, if the data has been dumped to disk or some non-local tasks are started, I/O is involved in addition to the overhead to start extra tasks. If all data is in-memory, only the overhead related to tasks' launching is there

It sounds like something can be done better in 3rd party data sources, or we need to change the Spark core just for a better table cache, which seems risky.

Yes, some work can be done in 3rd party data sources, e.g. to avoid parsing overhead in parquet,

Regarding the risk, in the current implementation, I directly modify the core part to add a new type of block and make it recognizable by BlockManager. The new RDD and dependency implementation is in SQL module. An alternative way to do that is implementing this new type block in SQL as well (but it needs some small refactoring to make BlockManager open to anything outside of Spark Core)

I personally think it's a good feature to add which is beneficial to the user and without actual threat to existing code

if (!singleBatch) {
rowIter.hasNext && rowCount < batchSize && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE
} else {
rowIter.hasNext
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this run the risk of OOM for large partitions?

case (partitionIndex, cachedBatches) =>
if (inMemoryPartitionPruningEnabled) {
cachedBatches.filter { cachedBatch =>
val partitionFilter = newPredicate(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be pulled up out of usePartitionLevelMetadata ? seems like you're constructing the predicate per record


private[columnar] object CachedColumnarRDD {

private val rddIdToMetadata = new ConcurrentHashMap[Int, mutable.ArraySeq[Option[InternalRow]]]()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could these be moved to become a member of the RDD class? seems like a map of this->some.property, in this case can be made an instance member.

override protected def getPartitions: Array[Partition] = dataRDD.partitions

override private[spark] def getOrCompute(split: Partition, context: TaskContext):
Iterator[CachedBatch] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be avoided by maintaining two (zipped) RDDs? one of CachedBatchs and the other holding only the stats?
can this approach avoid the need for a specialized block type for managing metadata?
correct me if i'm wrong, but first time the stats are accessed, your approach performs a full scan to extract the stats (happens in the sql code), so having a second RDD which is something like : batches.map(_.stats).persist should give the same behavior, right?

@maropu
Copy link
Member

maropu commented Sep 19, 2018

@CodingCat Is this still active?

).getOrElse {
val batchIter = superGetOrCompute(split, context)
if (containsPartitionMetadata && getStorageLevel != StorageLevel.NONE && batchIter.hasNext) {
val cachedBatch = batchIter.next()
Copy link

@eyalfa eyalfa Sep 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert post condition !batchIter.hasNext, you expect this partition to contain a single batch

@eyalfa
Copy link

eyalfa commented Sep 19, 2018

@maropu , this looks rather cold 😎 , but extremely interesting and relevant.

@maropu
Copy link
Member

maropu commented Sep 19, 2018

If the author is inactive, its ok to take this over by someone. But, we should first discuss more to get consensus for this feature. I'm not sure we could get enough performance gains at the expense of the current simple cache logic.

@CodingCat
Copy link
Contributor Author

When I contributed it back, the community is as looking at something else, so I didn’t spend too much time to convince the people to review....but if the interests are raised again now, I am happy to pick it up again,

@SparkQA
Copy link

SparkQA commented Oct 22, 2018

Test build #97714 has finished for PR 19810 at commit 9d450ad.

  • This patch fails to generate documentation.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 14, 2020
@github-actions github-actions bot closed this Jan 15, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants