Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Comet native shuffle in rust doesn't handle empty projection properly #102

Closed
advancedxy opened this issue Feb 24, 2024 · 5 comments · Fixed by #103
Closed

Comet native shuffle in rust doesn't handle empty projection properly #102

advancedxy opened this issue Feb 24, 2024 · 5 comments · Fixed by #103
Labels
bug Something isn't working

Comments

@advancedxy
Copy link
Contributor

Describe the bug

When testing #100, I noticed that Comet's columnar shuffle doesn't handle empty projection correctly. The shuffle write
thread throws an exception as follows:

Caused by: org.apache.comet.CometNativeException: Arrow error: External error: Arrow error: Invalid argument error: must either specify a row count or at least one column
	at org.apache.comet.Native.executePlan(Native Method)
	at org.apache.comet.CometExecIterator.executeNative(CometExecIterator.scala:65)
	at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:111)
	at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:126)
	at org.apache.spark.sql.comet.execution.shuffle.CometShuffleWriteProcessor.write(CometShuffleExchangeExec.scala:290)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Steps to reproduce

In org.apache.comet.exec.CometExecSuite, modify the empty projection test case as following:

  test("empty projection") {
    withParquetDataFrame((0 until 5).map(i => (i, i + 1))) { df =>
      assert(df.where("_1 IS NOT NULL").count() == 5)
      checkSparkAnswerAndOperator(df)
      assert(df.select().limit(2).count() === 2)
    }
  }

Expected behavior

Test case could passed correctly.

Additional context

No response

@advancedxy advancedxy added the bug Something isn't working label Feb 24, 2024
@sunchao
Copy link
Member

sunchao commented Feb 24, 2024

I think this is Comet native shuffle, not columnar shuffle?

@advancedxy
Copy link
Contributor Author

I think this is Comet native shuffle, not columnar shuffle?

I'm not that familiar with the rust shuffle part yet. So I may have misused columnar shuffle or native shuffle. When I'm referring the Columnar shuffle, I may be talking all the shuffle part in the rust code.

It would be helpful if you can elaborate a bit the difference between Comet native shuffle and columnar shuffle.

@sunchao
Copy link
Member

sunchao commented Feb 24, 2024

I think for native shuffle, the majority of shuffle write happens on the native side. The native ShuffleWriterExec takes the input columnar batch from the previous operators and write them out to different shuffle files based on the output partitions. The feature is limited however: it only supports hash-based shuffle but not sort-based shuffle, e.g., sort the shuffle rows based on the partitions they belong to and write data from different partitions in a single sorted file instead of separate files per partition (this happens by default when # of partitions is > 200). It only supports hash partitioning but not other partitioning types like range or round-robin.

On the other hand, for the columnar shuffle, the JVM side does the heavy duty work but only the writing of sorted shuffle files happen on the native side. In addition, several optimizations were made on the JVM side, including async shuffle mode (the sorting of shuffle data and writing sorted data to disk are de-coupled). Columnar shuffle supports both hash-based shuffle and sort-based shuffle, and also support all (I think?) partitioning types (hash/range/round-robin). In addition, columnar shuffle also support a non-native operator as input, unlike native shuffle whose previous operator must be a native Comet operator.

The downside of columnar shuffle, I think, is that it has to do extra row-to-columnar conversion regardless of whether the input is columnar batch (from native Comet operator), or rows (e.g., from Spark operator). Therefore, performance may not be as good as native shuffle. However, I think it is relatively more robust and has better coverage. We are trying to move columnar shuffle as the default mode and prioritize this more for now.

@advancedxy advancedxy changed the title Columnar Shuffle in rust doesn't handle empty projection properly Comet native shuffle in rust doesn't handle empty projection properly Feb 25, 2024
@advancedxy
Copy link
Contributor Author

Thanks for detailed explanation, it helps a lot. I changed the issue title as well..

I think, is that it has to do extra row-to-columnar conversion regardless of whether the input is columnar batch (from native Comet operator), or rows (e.g., from Spark operator)

It seems there's still place to improve. Will native shuffle supports sort-based shuffle in your long-term roadmap?
I understand that columnar shuffle has better coverage and all the dirty works are done at the JVM side and prioritize that for now. I can imagine that It would require great engineering effort to support feature parity in the pure native shuffle.

@sunchao
Copy link
Member

sunchao commented Feb 25, 2024

Will native shuffle supports sort-based shuffle in your long-term roadmap?

Yes definitely. We would shift our focus to native shuffle once we have a mature columnar shuffle implementation, which enables us to unlock more native executions. Currently there are still some issues with columnar shuffle.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants