-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18003][Spark Core] Fix bug of RDD zipWithIndex & zipWithUniqueId index value overflowing #15550
[SPARK-18003][Spark Core] Fix bug of RDD zipWithIndex & zipWithUniqueId index value overflowing #15550
Conversation
@@ -64,8 +64,14 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) | |||
|
|||
override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = { | |||
val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition] | |||
firstParent[T].iterator(split.prev, context).zipWithIndex.map { x => | |||
(x._1, split.startIndex + x._2) | |||
val parentIter = firstParent[T].iterator(split.prev, context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add a line of comment saying we don't use Scala's zipWithIndex to avoid overflowing.
(x._1, split.startIndex + x._2) | ||
val parentIter = firstParent[T].iterator(split.prev, context) | ||
new Iterator[(T, Long)] { | ||
var idxAcc: Long = -1L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private[this]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also why don't we initialize this to split.startIndex?
I'd also rename this just "index"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or rather split.startIndex - 1 given the current code for idxAcc.
@@ -833,6 +833,30 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { | |||
} | |||
} | |||
|
|||
test("zipWithIndex with partition size exceeding MaxInt") { | |||
val result = sc.parallelize(Seq(1), 1).mapPartitions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this test case take forever to run? I think the way you want to do this is to create a helper function def zipWithIndex[T](iterator: Iterator[T], startingOffset: Long): Iterator[T]
, and then just use a large starting offset. Then you just test that helper method without creating an end-to-end test case that loops over 2 billion elements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, here I run the test, because the loop of CPU is very fast, running this testcase on my machine is about 3s. Does it still need optimized ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm that's about 2.99s longer than I'd like for a unit test ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all right. I'll update the testcase.
Have you looked for other places in the codebase which would also produce wrong result by using scala's |
Test build #67170 has finished for PR 15550 at commit
|
@tejasapatil I check the code where reference |
18c3f49
to
c942084
Compare
I meant scala's |
The current change LGTM (pending Jenkins). It would be great to check the usage as @tejasapatil said. |
Test build #67202 has finished for PR 15550 at commit
|
@tejasapatil check other reference to scala's |
Cool - LGTM! (I will merge once Jenkins comes back positive) |
Test build #67228 has finished for PR 15550 at commit
|
Merging in master / branch-2.0. |
…Id index value overflowing ## What changes were proposed in this pull request? - Fix bug of RDD `zipWithIndex` generating wrong result when one partition contains more than 2147483647 records. - Fix bug of RDD `zipWithUniqueId` generating wrong result when one partition contains more than 2147483647 records. ## How was this patch tested? test added. Author: WeichenXu <WeichenXu123@outlook.com> Closes #15550 from WeichenXu123/fix_rdd_zipWithIndex_overflow. (cherry picked from commit 3975516) Signed-off-by: Reynold Xin <rxin@databricks.com>
…niqueId index value overflowing apache#15550
…Id index value overflowing ## What changes were proposed in this pull request? - Fix bug of RDD `zipWithIndex` generating wrong result when one partition contains more than 2147483647 records. - Fix bug of RDD `zipWithUniqueId` generating wrong result when one partition contains more than 2147483647 records. ## How was this patch tested? test added. Author: WeichenXu <WeichenXu123@outlook.com> Closes apache#15550 from WeichenXu123/fix_rdd_zipWithIndex_overflow.
…Id index value overflowing ## What changes were proposed in this pull request? - Fix bug of RDD `zipWithIndex` generating wrong result when one partition contains more than 2147483647 records. - Fix bug of RDD `zipWithUniqueId` generating wrong result when one partition contains more than 2147483647 records. ## How was this patch tested? test added. Author: WeichenXu <WeichenXu123@outlook.com> Closes apache#15550 from WeichenXu123/fix_rdd_zipWithIndex_overflow.
What changes were proposed in this pull request?
zipWithIndex
generating wrong result when one partition contains more than 2147483647 records.zipWithUniqueId
generating wrong result when one partition contains more than 2147483647 records.How was this patch tested?
test added.