New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27805][PYTHON] Propagate SparkExceptions during toPandas with arrow enabled #24677
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,7 @@ import scala.util.control.NonFatal | |
|
||
import org.apache.commons.lang3.StringUtils | ||
|
||
import org.apache.spark.TaskContext | ||
import org.apache.spark.{SparkException, TaskContext} | ||
import org.apache.spark.annotation.{DeveloperApi, Evolving, Experimental, Stable, Unstable} | ||
import org.apache.spark.api.java.JavaRDD | ||
import org.apache.spark.api.java.function._ | ||
|
@@ -3313,20 +3313,34 @@ class Dataset[T] private[sql]( | |
} | ||
} | ||
|
||
val arrowBatchRdd = toArrowBatchRdd(plan) | ||
sparkSession.sparkContext.runJob( | ||
arrowBatchRdd, | ||
(it: Iterator[Array[Byte]]) => it.toArray, | ||
handlePartitionBatches) | ||
var sparkException: Option[SparkException] = None | ||
try { | ||
val arrowBatchRdd = toArrowBatchRdd(plan) | ||
sparkSession.sparkContext.runJob( | ||
arrowBatchRdd, | ||
(it: Iterator[Array[Byte]]) => it.toArray, | ||
handlePartitionBatches) | ||
} catch { | ||
case e: SparkException => | ||
sparkException = Some(e) | ||
} | ||
|
||
// After processing all partitions, end the stream and write batch order indices | ||
// After processing all partitions, end the batch stream | ||
batchWriter.end() | ||
out.writeInt(batchOrder.length) | ||
// Sort by (index of partition, batch index in that partition) tuple to get the | ||
// overall_batch_index from 0 to N-1 batches, which can be used to put the | ||
// transferred batches in the correct order | ||
batchOrder.zipWithIndex.sortBy(_._1).foreach { case (_, overallBatchIndex) => | ||
out.writeInt(overallBatchIndex) | ||
sparkException match { | ||
case Some(exception) => | ||
// Signal failure and write error message | ||
out.writeInt(-1) | ||
PythonRDD.writeUTF(exception.getMessage, out) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for late response, @BryanCutler Yea, I had the same question #24677 (comment). Thanks for details at #24677 (comment). It would have been better if those are commented since at least two committers raised the same questions :-).
Yea, I don't mind backporting it (don't strongly feel we should do too). |
||
case None => | ||
// Write batch order indices | ||
out.writeInt(batchOrder.length) | ||
// Sort by (index of partition, batch index in that partition) tuple to get the | ||
// overall_batch_index from 0 to N-1 batches, which can be used to put the | ||
// transferred batches in the correct order | ||
batchOrder.zipWithIndex.sortBy(_._1).foreach { case (_, overallBatchIndex) => | ||
out.writeInt(overallBatchIndex) | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if this and the code below should be in a finally block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we put it into a finally block but only catch
SparkException
then that would be wrong: If a different exception gets thrown then we would go intocase None
, end the stream as if nothing happened and only get partial, incorrect data on the python side.If we want to put this into a finally block then we should catch all exceptions but I figured I'd do the same as in https://github.com/apache/spark/pull/24070/files#r279589039
It should be fine as is, if any exception that isn't a
SparkException
gets thrown then we will never reach this code. Instead theOutputStream
just gets closed and we get anEofError
on the python side (like we do right now for all Exceptions).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any more thoughts on this @BryanCutler ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think this is fine