[GLUTEN-8912][VL] Add Offset support for CollectLimitExec #8914
[GLUTEN-8912][VL] Add Offset support for CollectLimitExec #8914jinchengchenghh merged 11 commits intoapache:mainfrom
Conversation
|
Run Gluten Clickhouse CI on x86 |
| @@ -58,7 +58,7 @@ class GlutenSQLCollectLimitExecSuite extends WholeStageTransformerSuite { | |||
|
|
|||
| testWithSpecifiedSparkVersion( | |||
There was a problem hiding this comment.
testWithSpecifiedSparkVersion -> test
| } | ||
|
|
||
| testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with filter", Array("3.2", "3.3")) { | ||
| testWithSpecifiedSparkVersion( |
There was a problem hiding this comment.
ditto, so as others
| assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf, checkMatch = true) | ||
| } | ||
|
|
||
| testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - offset test", Array("3.4", "3.5")) { |
There was a problem hiding this comment.
What's the result for spark3.3? Is the result also correct but operator not matched?
There was a problem hiding this comment.
If that, please also add the result check for spark3.2 and spark3.3
There was a problem hiding this comment.
Please add the test to cover more code path, such as limit(12)
There was a problem hiding this comment.
For 3.3 it would fail at compile time since offset api is not available with collectlimitexec for older versions
There was a problem hiding this comment.
Added more tests to cover the above scenario, spark UTs should also help
| partition => { | ||
| val droppedRows = dropLimitedRows(partition, offset) | ||
| val adjustedLimit = Math.max(0, limit - offset) | ||
| collectLimitedRows(droppedRows, adjustedLimit) |
There was a problem hiding this comment.
Can we enhance the collectLimitedRows, we can slice the input RowVector from offset to adjustedLimit?
There was a problem hiding this comment.
Yes, however it would not preserve order, since the current implementation closely matches Spark, and users may see unexpected ordering and failure across UTs. This keeps it similar to Spark implementation and maintains similar order as spark thanks
|
Run Gluten Clickhouse CI on x86 |
8ab2f0b to
d0fd18c
Compare
|
Run Gluten Clickhouse CI on x86 |
d0fd18c to
3a84eb9
Compare
|
Run Gluten Clickhouse CI on x86 |
1 similar comment
|
Run Gluten Clickhouse CI on x86 |
a7cc0ae to
6316a59
Compare
|
Run Gluten Clickhouse CI on x86 |
|
cc @jinchengchenghh addressed the comments could you please take a look thanks |
| processedRDD.mapPartitions(partition => collectLimitedRows(partition, limit)) | ||
| processedRDD.mapPartitions( | ||
| partition => { | ||
| val droppedRows = dropLimitedRows(partition, offset) |
There was a problem hiding this comment.
We can add the argument offset to collectLimitedRows, just change it in function fetchNext, it can make the function much easier, right?
|
Run Gluten Clickhouse CI on x86 |
| val leftoverAfterSkip = batchSize - startIndex | ||
| rowsToSkip = 0 | ||
|
|
||
| val needed = math.min(rowsToCollect, leftoverAfterSkip) |
There was a problem hiding this comment.
if needed <= remaining, we still need this logic, may return the total batch instead of sliced batch
f (currentBatchRowCount <= remaining) {
rowsCollected += currentBatchRowCount
ColumnarBatches.retain(currentBatch)
nextBatch = Some(currentBatch)
} else {
val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0, remaining)
There was a problem hiding this comment.
In that case, startIndex would be 0, and leftoverAfterSkip = batchSize, leading to val prunedBatch = VeloxColumnarBatches.slice(batch, 0, batchSize)
Could you give some example of batch size with limit and offset for the above case
There was a problem hiding this comment.
@jinchengchenghh does this address the comment?
There was a problem hiding this comment.
So we don't need to do the slice in that case, slice batch is the total batch.
There was a problem hiding this comment.
I see, you mean moving out this case to not slice, let me do the refactor
1dd2ed7 to
4bde1a6
Compare
|
Run Gluten Clickhouse CI on x86 |
4bde1a6 to
6cddf4d
Compare
|
Run Gluten Clickhouse CI on x86 |
|
cc @jinchengchenghh addressed all comments, can you please take a look thanks! |
| ColumnarBatches.retain(batch) | ||
| batch | ||
| } else { | ||
| val sliced = VeloxColumnarBatches.slice(batch, startIndex, needed) |
There was a problem hiding this comment.
Don't need val sliced
6cddf4d to
df35d81
Compare
f15d81b to
c5415db
Compare
|
Run Gluten Clickhouse CI on x86 |
Do you mean you are incorporating a solution for #9166 in this PR? Would you help me locate the code? Thanks. |
Meant allowing using child to check for columnar execution and using it in this PR. We can take up the custom rule in the future. |
c5415db to
326ddff
Compare
|
Run Gluten Clickhouse CI on x86 |
I suggest we figure out the reason of the test failures first. We'd make sure the operator outputs exactly the same data no matter it's offloaded or not. Otherwise it's a mismatch. What did the broken tests look like? |
Yes, if we offload with the R2C in between collectLimit and it's child, it changes the number of jobs with Gluten. The operator outputs exactly the same data in both ways. However, the current implementation only supports if the child is columnar to avoid R2C overhead and the failing UTs. Thanks! |
326ddff to
376059c
Compare
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
01bea60 to
bd56092
Compare
|
Run Gluten Clickhouse CI on x86 |
bd56092 to
0f5b111
Compare
|
Run Gluten Clickhouse CI on x86 |
1 similar comment
|
Run Gluten Clickhouse CI on x86 |
ae1306c to
b67ccc7
Compare
|
Run Gluten Clickhouse CI on x86 |
b67ccc7 to
36a3300
Compare
|
Run Gluten Clickhouse CI on x86 |
|
This should be fixed with the post transform rule, could you please take a look and help re-run the uts thanks! @jinchengchenghh @zhztheplayer |
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row} | ||
|
|
||
| class GlutenSQLCollectLimitExecSuite extends GlutenSQLTestsTrait { |
There was a problem hiding this comment.
This is a big test file. Is it enough for us to only add this for the newest Spark version (3.5)? Further maintenance can be make easier then.
There was a problem hiding this comment.
It seems we may need the 3.3 since older versions do not support the offset API, the tests are slightly different depending on the offset support which was added in 3.4 thanks!
zhztheplayer
left a comment
There was a problem hiding this comment.
Thank you for iterating.
What changes were proposed in this pull request?
How was this patch tested?