Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rust] [DataFusion] Implement "coalesce batches" operator #18437

Closed
asfimport opened this issue Dec 29, 2020 · 5 comments
Closed

[Rust] [DataFusion] Implement "coalesce batches" operator #18437

asfimport opened this issue Dec 29, 2020 · 5 comments

Comments

@asfimport
Copy link

When we have a FilterExec in the plan, it can produce lots of small batches and we therefore lose efficiency of vectorized operations.

We should implement a new CoalesceBatchExec and wrap every FilterExec with one of these so that small batches can be recombined into larger batches to improve the efficiency of upstream operators.

Reporter: Andy Grove / @andygrove
Assignee: Andy Grove / @andygrove

PRs and other links:

Note: This issue was originally created as ARROW-11058. Please see the migration documentation for further details.

@asfimport
Copy link
Author

Jorge Leitão / @jorgecarleitao:
This aspect of datafusion is a bit unclear atm: in datafusion, it seems that we have two types of "buckets": parts and batches, while in spark there is only parts (via partitioning). In spark, the partitioning tradeoff is related to higher parallelism vs slower exchanges, but I can't find the equivalent tradeoff with number of batches per part in datafusion.

6 months ago, my hypothesis was that partitioning would be used for inter-process parallelism, while batches would be used for intra-process parallelism. My idea at the time was: there is a stream of parts, and each part is an iterator of batches: batch execution runs in rayon, and each part is a future and part of a stream (via Tokio, potentially in another machine). In this design, the two "buckets" represent different parts of parallelism: thread parallelism and process parallelism (e.g. cross-machine), that in a single machine would be run by two different thread pools.

But since we use a stream of batches and a stream of parts, I can't think of a way to differentiate them. E.g. let's say that we implement the "coalesce batches". When does using it is expected to improve performance? When should we add them in the optimizer?

More quantitatively: given N rows, in spark we can distribute them in P parts, while in datafusion we can distribute them in P parts and B batches. In spark, P deals with parallelism in very specific ways (higher P => higher parallelism and more tasks). In DataFusion, it is a bit unclear how the tuple (P,B) leads to one or the other, and what is the reason we have P and B in the first place (since they all run on the same thread pool and both are async).

AFAIK B is not necessarily related to vectorization, as the vectorization (at least on the CPU level) happens at much smaller chunks (lane size). B does not lead to higher parallelism also: since they are part of a stream, there is no way to run two batches from a single part in parallel, as we need to finish the execution of one before the start of the next.

This aspect of parallelism in datafusion was clear to me 6 months ago but became unclear when we converted the recordbatchreader to a stream.

@asfimport
Copy link
Author

Andy Grove / @andygrove:
@jorgecarleitao

I think that the PR #9043 will help explain this, but Apache Spark actually does do something very similar. Spark has partitions which are the unit of parallelism (either on threads or executors) and each partition is an iterator[T].

Spark supports row-based (Iterator[Row]) and column-based operators (Iterator[ColumnarBatch]) operators out-the-box although most of the built-in operators are row-based. Spark will insert transitions as required to convert between row and column-based operators.

Because filters can produce empty batches or batches with a single or small number of rows, we lose some efficiency both with SIMD and also just due to per-batch overheads in particular kernels (as we have seen with MutableArrayData).

Small batches can also be inefficient when writing out to Parquet because we lose the benefits of compression to some degree, so this is another use case where we would want to coalesce them.

Coalescing batches is especially important for GPU if we ever add support for that because the cost of an operation on GPU is the same (once data is loaded) regardless of how many items it is operating on, so it is beneficial to operate on as much data in parallel as possible.

@asfimport
Copy link
Author

Jorge Leitão / @jorgecarleitao:
Thank you so much for your explanation, @andygrove . I agree with that.

Maybe this is too obvious and I am just not knowledgeable here: what is the problem with P = B? I.e. what do we gain from having both a batch size and number of parts, instead of having just one batch per part?

I am asking this because it seems to me that we have a fragmentation problem: we start with a bunch of contiguous blocks of memory, and as we operate on them, we fragement / filter them in smaller and smaller parts, that, at some point, make them slow to operate individually (and we defragment via coalesces to bring them back together). Just like in a OS.

With (P,B), we need to deal with fragmentation both at the partition level and batch level: we need to worry about having a partition that is balanced (in number of rows per part), and also have each part balanced (in number of rows per batch on each part).

Wouldn't be simpler if P=B, where we only need to worry about fragmentation of parts (and coalesce parts)? I suspect that that would be too simple, i.e. I am missing the benefit of the extra degree of freedom (P,B) vs (P=B).

 

 

@asfimport
Copy link
Author

Andy Grove / @andygrove:
Issue resolved by pull request 9043
#9043

@asfimport
Copy link
Author

Andy Grove / @andygrove:
I can see your argument @jorgecarleitao

I can see how we could parallelize operations across batches within one or more partition in some cases rather than repartitioning to increase parallelism.

If an operator requires its input to be in a specific order we would need to fall back to single-threaded behavior per partition though (SortMergeJoin, SortAggregate, etc).

It is possible I am just too familiar with how this is normally done in other query engines but I see partitions as the unit of parallelism and batches are just there so we can do vectorized processing and we need to manage the size of the batches for efficient processing (this is more important on GPU than CPU though).

I think if/when we get to distributed queries in DataFusion, the partition model is even more important, especially when reading Parquet files that are partitioned based on keys, so we keep processing of related data together in the same partitions/nodes.

I'd be interested to hear what others have to say on all of this, of course.

 

 

 

@asfimport asfimport added this to the 3.0.0 milestone Jan 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants