From 19b0c099a4375e3baf4af429a52c90e7ef7757b4 Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Mon, 9 Jun 2014 09:57:13 -0700 Subject: [PATCH 1/2] [SPARK-2079] Removing unnecessary wrapping in SchemaRDD.javaToPython --- python/pyspark/sql.py | 3 ++- .../src/main/scala/org/apache/spark/sql/SchemaRDD.scala | 7 +------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index b4e9618cc25b5..3cecd1f7864fc 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -16,6 +16,7 @@ # from pyspark.rdd import RDD +from pyspark.serializers import PickleSerializer from py4j.protocol import Py4JError @@ -346,7 +347,7 @@ def _toPython(self): # TODO: This is inefficient, we should construct the Python Row object # in Java land in the javaToPython function. May require a custom # pickle serializer in Pyrolite - return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d)) + return RDD(jrdd, self._sc, PickleSerializer()).map(lambda d: Row(d)) # We override the default cache/persist/checkpoint behavior as we want to cache the underlying # SchemaRDD object in the JVM, not the PythonRDD checkpointed by the super class diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 8855c4e876917..1b1c3037b8247 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -343,15 +343,10 @@ class SchemaRDD( val pickle = new Pickler iter.map { row => val map: JMap[String, Any] = new java.util.HashMap - // TODO: We place the map in an ArrayList so that the object is pickled to a List[Dict]. - // Ideally we should be able to pickle an object directly into a Python collection so we - // don't have to create an ArrayList every time. - val arr: java.util.ArrayList[Any] = new java.util.ArrayList row.zip(fieldNames).foreach { case (obj, name) => map.put(name, obj) } - arr.add(map) - pickle.dumps(arr) + pickle.dumps(map) } } } From 2d1915e1c75b7b0405a693b8fceb0ab4c9bf9e52 Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Thu, 12 Jun 2014 17:16:30 -0700 Subject: [PATCH 2/2] [SPARK-2079] Add batching in SchemaRDD.javaToPython --- python/pyspark/sql.py | 5 +++-- sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 3cecd1f7864fc..03dc3648aec27 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -16,7 +16,7 @@ # from pyspark.rdd import RDD -from pyspark.serializers import PickleSerializer +from pyspark.serializers import BatchedSerializer, PickleSerializer from py4j.protocol import Py4JError @@ -347,7 +347,8 @@ def _toPython(self): # TODO: This is inefficient, we should construct the Python Row object # in Java land in the javaToPython function. May require a custom # pickle serializer in Pyrolite - return RDD(jrdd, self._sc, PickleSerializer()).map(lambda d: Row(d)) + return RDD(jrdd, self._sc, BatchedSerializer( + PickleSerializer())).map(lambda d: Row(d)) # We override the default cache/persist/checkpoint behavior as we want to cache the underlying # SchemaRDD object in the JVM, not the PythonRDD checkpointed by the super class diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 1b1c3037b8247..3b02d73d444af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -346,8 +346,8 @@ class SchemaRDD( row.zip(fieldNames).foreach { case (obj, name) => map.put(name, obj) } - pickle.dumps(map) - } + map + }.grouped(10).map(batched => pickle.dumps(batched.toArray)) } }