From 0cdc8f87a02c5bf20f4f61a4dbd83d16431a1af9 Mon Sep 17 00:00:00 2001 From: Michael Nazario Date: Tue, 27 Jan 2015 11:59:32 -0800 Subject: [PATCH 1/2] Add toLocalIterator to PySpark --- python/pyspark/rdd.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index f4cfe4845dc20..e3bbf1cfb2b23 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2059,6 +2059,20 @@ def countApproxDistinct(self, relativeSD=0.05): hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF) return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD) + def toLocalIterator(self): + """ + Return an iterator that contains all of the elements in this RDD. + The iterator will consume as much memory as the largest partition in this RDD. + >>> rdd = sc.parallelize(range(10)) + >>> [x for x in rdd.toLocalIterator()] + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + """ + partitions = xrange(self.getNumPartitions()) + for partition in partitions: + rows = self.context.runJob(self, lambda x: x, [partition]) + for row in rows: + yield row + class PipelinedRDD(RDD): From 1c585263760d935d14030265f1856088cebff67a Mon Sep 17 00:00:00 2001 From: Michael Nazario Date: Wed, 28 Jan 2015 11:21:34 -0800 Subject: [PATCH 2/2] Fix documentation off by one error --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index e3bbf1cfb2b23..b3979b6722c08 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2065,7 +2065,7 @@ def toLocalIterator(self): The iterator will consume as much memory as the largest partition in this RDD. >>> rdd = sc.parallelize(range(10)) >>> [x for x in rdd.toLocalIterator()] - [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] """ partitions = xrange(self.getNumPartitions()) for partition in partitions: