[SPARK-39709][SQL] The result of executeCollect and doExecute of TakeOrderedAndProjectExec should be the same#37118
[SPARK-39709][SQL] The result of executeCollect and doExecute of TakeOrderedAndProjectExec should be the same#37118wangyum wants to merge 1 commit intoapache:masterfrom
Conversation
| val limited = if (orderingSatisfies) { | ||
| child.execute().mapPartitionsInternal(_.map(_.copy()).take(limit)).takeOrdered(limit)(ord) | ||
| } else { | ||
| child.execute().mapPartitionsInternal(_.map(_.copy())).takeOrdered(limit)(ord) |
There was a problem hiding this comment.
hmm, so RDD.takeOrdered does not return ordered records?
There was a problem hiding this comment.
No. The result does not match if it contains duplicate values. For example, order by column a:
+---+---+
| a| b|
+---+---+
| 1| 1|
| 1| 2|
| 2| 3|
| 2| 4|
| 3| 5|
| 3| 6|
+---+---+
JoshRosen
left a comment
There was a problem hiding this comment.
In this example, it looks like the ordering does not result in a total order: there are multiple values for each sort key:
>>> Seq((1, 1), (1, 2), (2, 3), (2, 4), (3, 5), (3, 6), (3, 7)).sortBy(_._1).map(_._2)
res5: Seq[Int] = List(1, 2, 3, 4, 5, 6, 7)
>>> Seq((1, 1), (1, 2), (2, 3), (2, 4), (3, 5), (3, 6), (3, 7)).reverse.sortBy(_._1).map(_._2)
res6: Seq[Int] = List(2, 1, 4, 3, 7, 6, 5)In these cases, the ordering of the inputs to the sort will affect the final outcome.
Spark does not guarantee the order in which shuffle blocks are fetched: doExecute() will shuffle the per-partition results into a single reduce partition and then does a final sorting there, but the order in which that final partition fetches data from mappers is not guaranteed. As a result, the final sort may receive its input in different orders in different executions and this can lead to non-deterministic results.
That effect doesn't show up in the toy examples that use LocalRelation / parallelize, but I think we could demonstrate it with a more realistic example involving multiple input partitions and multiple executors.
As a result, I don't think that this patch's changes are sufficient to guarantee determinism of results when the sorting order does not totally order the records. Therefore I don't think we should make this change: it adds additional performance overheads but I don't think it will actually solve the problem of differing results with non-total sort orders.
|
Thank you @JoshRosen. Make sense. I will close this PR. |
What changes were proposed in this pull request?
This PR makes
TakeOrderedAndProjectExec'sexecuteCollectusedoExecute()'s result.Why are the changes needed?
To make the result of
executeCollectanddoExecuteofTakeOrderedAndProjectExecthe same. For example:.show()will usedoExecuteand the result is:.collect().foreach(println)will useexecuteCollectand the result is:Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test.