-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48656][CORE] Do a length check and throw COLLECTION_SIZE_LIMIT_EXCEEDED error in CartesianRDD.getPartitions
#47019
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@timlee0119 also opened a PR for this this at #47016 . Compared to that other PR, this PR has unit tests and also uses the new structured error framework / error class, so I lean towards taking this PR's approach.
@@ -53,11 +54,16 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( | |||
extends RDD[(T, U)](sc, Nil) | |||
with Serializable { | |||
|
|||
val numPartitionsInRdd1 = rdd1.partitions.length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change means that rdd1.partitions
will be computed possibly more eagerly than it would have been computed before, which is potentially an internal behavior change w.r.t. the old behavior. In principle this should not matter, but if we ever wanted to consider backport of this change then I might weakly incline towards keeping the existing call pattern just for the sake of minimizing the scope of changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JoshRosen Thank you for your thoughtful advice, I will fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I'll wait a day or so to see if anyone else has feedback, otherwise I'll loop back and merge this tomorrow. Thanks for working on this.
val numPartitionsInRdd1 = rdd1.partitions.length | ||
val partitionNum: Long = numPartitionsInRdd1.toLong * numPartitionsInRdd2.toLong | ||
if (partitionNum > Int.MaxValue) { | ||
throw SparkCoreErrors.cartesianPartitionNumOverflow(numPartitionsInRdd1, numPartitionsInRdd2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to reuse COLLECTION_SIZE_LIMIT_EXCEEDED
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yaooqinn Thank you for your suggestion, let me change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See org.apache.spark.sql.errors.QueryExecutionErrors.tooManyArrayElementsError
@yaooqinn This method is defined in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM
Please update the PR description @wayneguow |
cartesian
operation of RDDsCartesianRDD.getPartitions
Both title & description updated~ |
Thank you all. @JoshRosen @LuciferYang @yaooqinn |
What changes were proposed in this pull request?
This pr aims to optimize the error message by doing a length check and throw
COLLECTION_SIZE_LIMIT_EXCEEDED.INITIALIZE
error inCartesianRDD.getPartitions
.Why are the changes needed?
Optimize the error prompts for Spark users.
Does this PR introduce any user-facing change?
Yes, this PR changes user-facing error class and message.
How was this patch tested?
Add a new test case in
RDDSuite
.Was this patch authored or co-authored using generative AI tooling?
No.