Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5361]Multiple Java RDD <-> Python RDD conversions not working correctly #4146

Closed
wants to merge 5 commits into from

Conversation

wingchen
Copy link

This is found through reading RDD from sc.newAPIHadoopRDD and writing it back using rdd.saveAsNewAPIHadoopFile in pyspark.

It turns out that whenever there are multiple RDD conversions from JavaRDD to PythonRDD then back to JavaRDD, the exception below happens:

15/01/16 10:28:31 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 7)
java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to java.util.ArrayList
    at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:157)
    at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:153)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)

The test case code below reproduces it:

from pyspark.rdd import RDD

dl = [
    (u'2', {u'director': u'David Lean'}), 
    (u'7', {u'director': u'Andrew Dominik'})
]

dl_rdd = sc.parallelize(dl)
tmp = dl_rdd._to_java_object_rdd()
tmp2 = sc._jvm.SerDe.javaToPython(tmp)
t = RDD(tmp2, sc)
t.count()

tmp = t._to_java_object_rdd()
tmp2 = sc._jvm.SerDe.javaToPython(tmp)
t = RDD(tmp2, sc)
t.count() # it blows up here during the 2nd time of conversion

@JoshRosen
Copy link
Contributor

Do you mind opening a JIRA issue for this and updating your pull request title to reference that issue (e.g. [SPARK-XXX] My PR title…)?

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@wingchen wingchen changed the title add in tuple handling while converting form python RDD back to JavaRDD [SPARK-5361]python tuple not supported while converting PythonRDD back to JavaRDD Jan 21, 2015
@wingchen
Copy link
Author

@JoshRosen Updated. Thanks

@JoshRosen
Copy link
Contributor

Jenkins, this is ok to test.

@SparkQA
Copy link

SparkQA commented Jan 21, 2015

Test build #25925 has started for PR 4146 at commit 9f1a097.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 21, 2015

Test build #25925 has finished for PR 4146 at commit 9f1a097.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25925/
Test FAILed.

@wingchen
Copy link
Author

checking

@JoshRosen
Copy link
Contributor

Oh, this looks like a spurious failure due to some Jenkins flakiness (which I'm going to investigate separately).

@wingchen
Copy link
Author

yeah, build #25927 is down to only 1 unrelated failed test case.
I am watching #25928 to see if it goes away itself.

@JoshRosen
Copy link
Contributor

Can you add a regression test for this? See python/pyspark/tests.py. Make sure to reference the JIRA number in the test case name.

@wingchen
Copy link
Author

@JoshRosen Sure thing. I did not see any similar regression test case in this file though.
Is there any convention to follow? Or will I just create a test class called Regression_5361 or Spark_5361?

Thanks

@JoshRosen
Copy link
Contributor

If it was me, I'd probably put it as a new case in the SerializationTestCase suite. I think our convention has been to give the tests a descriptive name and just leave a comment next to it linking to the JIRA. For example: https://github.com/apache/spark/blob/master/python/pyspark/tests.py#L216

@wingchen
Copy link
Author

great! I will follow it then.

@wingchen
Copy link
Author

Found a good way to reproduce it:

from pyspark.rdd import RDD

dl = [
    (u'2', {u'director': u'David Lean'}), 
    (u'7', {u'director': u'Andrew Dominik'})
]

dl_rdd = sc.parallelize(dl)
tmp = dl_rdd._to_java_object_rdd()
tmp2 = sc._jvm.SerDe.javaToPython(tmp)
t = RDD(tmp2, sc)
t.count()

tmp = t._to_java_object_rdd()
tmp2 = sc._jvm.SerDe.javaToPython(tmp)
t = RDD(tmp2, sc)
t.count() # it blows up here during the 2nd time of conversion

I am going to make a test case from this example.

@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25977 has started for PR 4146 at commit 4cf1187.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25977 has finished for PR 4146 at commit 4cf1187.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25977/
Test FAILed.

@wingchen
Copy link
Author

@JoshRosen Added in a test case. Thanks

@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25978 has started for PR 4146 at commit 126be6b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25978 has finished for PR 4146 at commit 126be6b.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25978/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25981 has started for PR 4146 at commit 5d90a83.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 23, 2015

Test build #25981 has finished for PR 4146 at commit 5d90a83.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25981/
Test PASSed.

@wingchen
Copy link
Author

@JoshRosen Anything else I can do here?

@@ -153,7 +153,10 @@ private[spark] object SerDeUtil extends Logging {
iter.flatMap { row =>
val obj = unpickle.loads(row)
if (batched) {
obj.asInstanceOf[JArrayList[_]].asScala
obj match {
case array: Array[Any] => array.toList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that array.toList constructs a Scala List class, which is basically a linked list. Is there a reason why we can't call toSeq instead, or can't simply return the array?

@JoshRosen
Copy link
Contributor

This seems good to me overall; I left one minor comment regarding a small performance optimization, but aside from that this seems good to me. Sorry for the delay in review.

@SparkQA
Copy link

SparkQA commented Jan 26, 2015

Test build #26123 has started for PR 4146 at commit 903df7d.

  • This patch merges cleanly.

@wingchen
Copy link
Author

@JoshRosen updated to toSeq. Seq has a better consistency with line ln no 161. Thanks

@SparkQA
Copy link

SparkQA commented Jan 27, 2015

Test build #26123 has finished for PR 4146 at commit 903df7d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26123/
Test PASSed.

@wingchen
Copy link
Author

@JoshRosen Can we have this in the next release? We will have to use our own fork if it's not in. Thanks

@JoshRosen
Copy link
Contributor

Let me take one final look to see if I can pull this in for 1.2.1 (since we're cutting a new RC tonight). In general, this looks safe since only adds new code paths in cases where we'd otherwise throw exception, as opposed to changing the behavior of existing code paths. If things check out, I'll pull it in for both 1.3.0 and 1.2.1.

@JoshRosen
Copy link
Contributor

@wingchen Actually, just to be clear here, is this problem related to tuple handling, or is the actual issue related to multiple Java <-> Python conversions not working correctly? If there's nothing tuple-specific about this, do you mind editing the PR title, description, and JIRA to reflect this?

@wingchen
Copy link
Author

@JoshRosen I found this one reading RDD from sc.newAPIHadoopRDD and writing it back using rdd.saveAsNewAPIHadoopFile. It's only throwing exceptions at the tuple part. That's why I though tuple was the reason.

But it seems that multiple Java <-> Python conversions not working correctly was the real reason for it to fail. I am updating PR title, description, and JIRA accordingly. Thanks

@wingchen wingchen changed the title [SPARK-5361]python tuple not supported while converting PythonRDD back to JavaRDD [SPARK-5361]Multiple Java RDD <-> Python RDD conversions not working correctly Jan 28, 2015
@wingchen
Copy link
Author

@JoshRosen updated. thanks :)

@JoshRosen
Copy link
Contributor

@wingchen Thanks for updating this. The new description + code both look good to me, so I'm going to merge this into master for inclusion in 1.3.0 and tag it for a backport into 1.2.2.

@asfgit asfgit closed this in 453d799 Jan 28, 2015
@wingchen
Copy link
Author

@JoshRosen Thanks a lot for your help.

@JoshRosen
Copy link
Contributor

I've cherry-picked this into branch-1.2 (1.2.2).

asfgit pushed a commit that referenced this pull request Feb 17, 2015
…correctly

This is found through reading RDD from `sc.newAPIHadoopRDD` and writing it back using `rdd.saveAsNewAPIHadoopFile` in pyspark.

It turns out that whenever there are multiple RDD conversions from JavaRDD to PythonRDD then back to JavaRDD, the exception below happens:

```
15/01/16 10:28:31 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 7)
java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to java.util.ArrayList
	at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:157)
	at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:153)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
```

The test case code below reproduces it:

```
from pyspark.rdd import RDD

dl = [
    (u'2', {u'director': u'David Lean'}),
    (u'7', {u'director': u'Andrew Dominik'})
]

dl_rdd = sc.parallelize(dl)
tmp = dl_rdd._to_java_object_rdd()
tmp2 = sc._jvm.SerDe.javaToPython(tmp)
t = RDD(tmp2, sc)
t.count()

tmp = t._to_java_object_rdd()
tmp2 = sc._jvm.SerDe.javaToPython(tmp)
t = RDD(tmp2, sc)
t.count() # it blows up here during the 2nd time of conversion
```

Author: Winston Chen <wchen@quid.com>

Closes #4146 from wingchen/master and squashes the following commits:

903df7d [Winston Chen] SPARK-5361, update to toSeq based on the PR
5d90a83 [Winston Chen] SPARK-5361, make python pretty, so to pass PEP 8 checks
126be6b [Winston Chen] SPARK-5361, add in test case
4cf1187 [Winston Chen] SPARK-5361, add in test case
9f1a097 [Winston Chen] add in tuple handling while converting form python RDD back to JavaRDD

(cherry picked from commit 453d799)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
markhamstra pushed a commit to markhamstra/spark that referenced this pull request Feb 24, 2015
…correctly

This is found through reading RDD from `sc.newAPIHadoopRDD` and writing it back using `rdd.saveAsNewAPIHadoopFile` in pyspark.

It turns out that whenever there are multiple RDD conversions from JavaRDD to PythonRDD then back to JavaRDD, the exception below happens:

```
15/01/16 10:28:31 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 7)
java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to java.util.ArrayList
	at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:157)
	at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:153)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
```

The test case code below reproduces it:

```
from pyspark.rdd import RDD

dl = [
    (u'2', {u'director': u'David Lean'}),
    (u'7', {u'director': u'Andrew Dominik'})
]

dl_rdd = sc.parallelize(dl)
tmp = dl_rdd._to_java_object_rdd()
tmp2 = sc._jvm.SerDe.javaToPython(tmp)
t = RDD(tmp2, sc)
t.count()

tmp = t._to_java_object_rdd()
tmp2 = sc._jvm.SerDe.javaToPython(tmp)
t = RDD(tmp2, sc)
t.count() # it blows up here during the 2nd time of conversion
```

Author: Winston Chen <wchen@quid.com>

Closes apache#4146 from wingchen/master and squashes the following commits:

903df7d [Winston Chen] SPARK-5361, update to toSeq based on the PR
5d90a83 [Winston Chen] SPARK-5361, make python pretty, so to pass PEP 8 checks
126be6b [Winston Chen] SPARK-5361, add in test case
4cf1187 [Winston Chen] SPARK-5361, add in test case
9f1a097 [Winston Chen] add in tuple handling while converting form python RDD back to JavaRDD

(cherry picked from commit 453d799)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants