From d92942982a7e31272d159c5f35f92177f7ad671e Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Mon, 12 May 2014 22:33:11 -0700 Subject: [PATCH 1/3] [SPARK-1161] Add saveAsObjectFile and SparkContext.objectFile in Python [SPARK-1161] Changing to pickleFile and saveAsPickleFile. Using PickleSerializer with batch size 1024. --- python/pyspark/context.py | 9 +++++++++ python/pyspark/rdd.py | 32 +++++++++++++++++++++++++------- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index c9ff82d23b3cf..75757c93f410b 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -51,6 +51,7 @@ class SparkContext(object): _active_spark_context = None _lock = Lock() _python_includes = None # zip and egg files that need to be added to PYTHONPATH + _pickle_file_serializer = BatchedSerializer(PickleSerializer(), 1024) def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, @@ -261,6 +262,14 @@ def parallelize(self, c, numSlices=None): jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices) return RDD(jrdd, self, serializer) + def pickleFile(self, name, minPartitions=None): + """ + Load an RDD previously saved using L{RDD.saveAsPickleFile} method. + """ + minPartitions = minPartitions or self.defaultMinPartitions + return RDD(self._jsc.objectFile(name, minPartitions), self, + self._pickle_file_serializer) + def textFile(self, name, minPartitions=None): """ Read a text file from HDFS, a local file system (available on all diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 07578b8d937fc..95f32534dd731 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -427,11 +427,14 @@ def intersection(self, other): .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \ .keys() - def _reserialize(self): - if self._jrdd_deserializer == self.ctx.serializer: + def _reserialize(self, serializer=None): + serializer = serializer or self.ctx.serializer + if self._jrdd_deserializer == serializer: return self else: - return self.map(lambda x: x, preservesPartitioning=True) + converted = self.map(lambda x: x, preservesPartitioning=True) + converted._jrdd_deserializer = serializer + return converted def __add__(self, other): """ @@ -880,6 +883,22 @@ def first(self): """ return self.take(1)[0] + def saveAsPickleFile(self, path): + """ + Save this RDD as a SequenceFile of serialized objects. The serializer + used is L{pyspark.serializers.PickleSerializer} with batch size 1024. + + >>> tFile = NamedTemporaryFile(delete=True) + >>> tFile.close() + >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tFile.name) + >>> sorted(sc.pickleFile(tFile.name).collect()) + [1, 2, 'rdd', 'spark'] + >>> sorted(sc.pickleFile(tFile.name, 10).collect()) + [1, 2, 'rdd', 'spark'] + """ + self._reserialize( + self.ctx._pickle_file_serializer)._jrdd.saveAsObjectFile(path) + def saveAsTextFile(self, path): """ Save this RDD as a text file, using string representations of elements. @@ -1401,10 +1420,9 @@ def _jrdd(self): if self._jrdd_val: return self._jrdd_val if self._bypass_serializer: - serializer = NoOpSerializer() - else: - serializer = self.ctx.serializer - command = (self.func, self._prev_jrdd_deserializer, serializer) + self._jrdd_deserializer = NoOpSerializer() + command = (self.func, self._prev_jrdd_deserializer, + self._jrdd_deserializer) pickled_command = CloudPickleSerializer().dumps(command) broadcast_vars = ListConverter().convert( [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], From 44e0615be3939445ebb331be00d8340b7c893476 Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Tue, 3 Jun 2014 14:28:59 -0700 Subject: [PATCH 2/3] [SPARK-1161] Adding an optional batchSize with default value 10 --- python/pyspark/context.py | 9 +++++++-- python/pyspark/rdd.py | 25 ++++++++++++------------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 75757c93f410b..2439bc9e03a84 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -51,7 +51,6 @@ class SparkContext(object): _active_spark_context = None _lock = Lock() _python_includes = None # zip and egg files that need to be added to PYTHONPATH - _pickle_file_serializer = BatchedSerializer(PickleSerializer(), 1024) def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, @@ -265,10 +264,16 @@ def parallelize(self, c, numSlices=None): def pickleFile(self, name, minPartitions=None): """ Load an RDD previously saved using L{RDD.saveAsPickleFile} method. + + >>> tmpFile = NamedTemporaryFile(delete=True) + >>> tmpFile.close() + >>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5) + >>> sc.pickleFile(tmpFile.name, 3).collect() + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] """ minPartitions = minPartitions or self.defaultMinPartitions return RDD(self._jsc.objectFile(name, minPartitions), self, - self._pickle_file_serializer) + BatchedSerializer(PickleSerializer())) def textFile(self, name, minPartitions=None): """ diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 95f32534dd731..ed58ac0c75cfc 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -33,7 +33,8 @@ from random import Random from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ - BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long + BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ + PickleSerializer, pack_long from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_cogroup from pyspark.statcounter import StatCounter @@ -883,21 +884,19 @@ def first(self): """ return self.take(1)[0] - def saveAsPickleFile(self, path): + def saveAsPickleFile(self, path, batchSize=10): """ - Save this RDD as a SequenceFile of serialized objects. The serializer - used is L{pyspark.serializers.PickleSerializer} with batch size 1024. + Save this RDD as a SequenceFile of serialized objects. The serializer used is + L{pyspark.serializers.PickleSerializer}, default batch size is 10. - >>> tFile = NamedTemporaryFile(delete=True) - >>> tFile.close() - >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tFile.name) - >>> sorted(sc.pickleFile(tFile.name).collect()) - [1, 2, 'rdd', 'spark'] - >>> sorted(sc.pickleFile(tFile.name, 10).collect()) - [1, 2, 'rdd', 'spark'] + >>> tmpFile = NamedTemporaryFile(delete=True) + >>> tmpFile.close() + >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3) + >>> sc.pickleFile(tmpFile.name, 5).collect() + [1, 2, 'spark', 'rdd'] """ - self._reserialize( - self.ctx._pickle_file_serializer)._jrdd.saveAsObjectFile(path) + self._reserialize(BatchedSerializer(PickleSerializer(), + batchSize))._jrdd.saveAsObjectFile(path) def saveAsTextFile(self, path): """ From 24ed8a2932372395929e6268383a516c5b9b3c78 Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Tue, 3 Jun 2014 16:27:50 -0700 Subject: [PATCH 3/3] [SPARK-1161] Fixing doc tests --- python/pyspark/context.py | 2 +- python/pyspark/rdd.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 2439bc9e03a84..347f2de53efec 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -268,7 +268,7 @@ def pickleFile(self, name, minPartitions=None): >>> tmpFile = NamedTemporaryFile(delete=True) >>> tmpFile.close() >>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5) - >>> sc.pickleFile(tmpFile.name, 3).collect() + >>> sorted(sc.pickleFile(tmpFile.name, 3).collect()) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] """ minPartitions = minPartitions or self.defaultMinPartitions diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index ed58ac0c75cfc..a4f1c0d032a55 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -892,8 +892,8 @@ def saveAsPickleFile(self, path, batchSize=10): >>> tmpFile = NamedTemporaryFile(delete=True) >>> tmpFile.close() >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3) - >>> sc.pickleFile(tmpFile.name, 5).collect() - [1, 2, 'spark', 'rdd'] + >>> sorted(sc.pickleFile(tmpFile.name, 5).collect()) + [1, 2, 'rdd', 'spark'] """ self._reserialize(BatchedSerializer(PickleSerializer(), batchSize))._jrdd.saveAsObjectFile(path)