From 76c9f13798a3e9015c04a7d969cbb293e1f447fd Mon Sep 17 00:00:00 2001 From: Tommy YU Date: Fri, 22 Jan 2016 10:03:15 +0800 Subject: [PATCH 1/7] add note to rdd.take for review --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 9dad7944144d8..762a53c7eac84 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1194,6 +1194,8 @@ abstract class RDD[T: ClassTag]( * Take the first num elements of the RDD. It works by first scanning one partition, and use the * results from that partition to estimate the number of additional partitions needed to satisfy * the limit. + * Note that this method should only be used if the resulting array is expected to be small, as + * the whole thing is loaded into the driver's memory. * * @note due to complications in the internal implementation, this method will raise * an exception if called on an RDD of `Nothing` or `Null`. From 099765280630adbe37b2cdf410ed72c5ea7da3d7 Mon Sep 17 00:00:00 2001 From: Tommy YU Date: Mon, 25 Jan 2016 20:53:13 +0800 Subject: [PATCH 2/7] apply to other scala file --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 +++- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 762a53c7eac84..4521d5ce2a2ce 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -829,6 +829,8 @@ abstract class RDD[T: ClassTag]( /** * Return an array that contains all of the elements in this RDD. + * Note that this method should only be used if the resulting array is expected to be small, as + * all the data is loaded into the driver's memory. */ def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) @@ -1195,7 +1197,7 @@ abstract class RDD[T: ClassTag]( * results from that partition to estimate the number of additional partitions needed to satisfy * the limit. * Note that this method should only be used if the resulting array is expected to be small, as - * the whole thing is loaded into the driver's memory. + * all the data is loaded into the driver's memory. * * @note due to complications in the internal implementation, this method will raise * an exception if called on an RDD of `Nothing` or `Null`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 518f9dcf94a70..348ec2de50e1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1383,6 +1383,8 @@ class DataFrame private[sql]( /** * Returns the first `n` rows. + * Note that this method should only be used if the resulting array is expected to be small, as + * all the data is loaded into the driver's memory. * @group action * @since 1.3.0 */ From f177c9a40eadae601882531cc00cd42de8eebf38 Mon Sep 17 00:00:00 2001 From: Tommy YU Date: Mon, 25 Jan 2016 21:01:40 +0800 Subject: [PATCH 3/7] apply notes to other python function --- python/pyspark/rdd.py | 5 +++++ python/pyspark/sql/dataframe.py | 3 +++ 2 files changed, 8 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index c28594625457a..74956f1d18532 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -766,6 +766,8 @@ def func(it): def collect(self): """ Return a list that contains all of the elements in this RDD. + Note that this method should only be used if the resulting array is expected + to be small, as all the data is loaded into the driver's memory. """ with SCCallSiteSync(self.context) as css: port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) @@ -1254,6 +1256,9 @@ def take(self, num): that partition to estimate the number of additional partitions needed to satisfy the limit. + Note that this method should only be used if the resulting array is expected + to be small, as all the data is loaded into the driver's memory. + Translated from the Scala implementation in RDD#take(). >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 90a6b5d9c0dda..e93083fcdc029 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1330,6 +1330,9 @@ def toDF(self, *cols): def toPandas(self): """Returns the contents of this :class:`DataFrame` as Pandas ``pandas.DataFrame``. + Note that this method should only be used if the resulting array is expected + to be small, as all the data is loaded into the driver's memory. + This is only available if Pandas is installed and available. >>> df.toPandas() # doctest: +SKIP From aa9427c235ca5e9fff0dce13a528cc82f91d3f1b Mon Sep 17 00:00:00 2001 From: Tommy YU Date: Mon, 25 Jan 2016 21:38:29 +0800 Subject: [PATCH 4/7] add note to dataframe.head --- python/pyspark/sql/dataframe.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index e93083fcdc029..a0c3959734d1a 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -739,6 +739,9 @@ def describe(self, *cols): def head(self, n=None): """Returns the first ``n`` rows. + Note that this method should only be used if the resulting array is expected + to be small, as all the data is loaded into the driver's memory. + :param n: int, default 1. Number of rows to return. :return: If n is greater than 1, return a list of :class:`Row`. If n is 1, return a single Row. From a83a995d649818e96ec180734f961123480970e4 Mon Sep 17 00:00:00 2001 From: Tommy YU Date: Mon, 25 Jan 2016 22:56:23 +0800 Subject: [PATCH 5/7] add more notes for top, takeOrder, takeSample --- .../src/main/scala/org/apache/spark/rdd/RDD.scala | 15 +++++++++++++-- python/pyspark/rdd.py | 9 +++++++++ .../scala/org/apache/spark/sql/DataFrame.scala | 4 +++- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 4521d5ce2a2ce..ebf13a1879d1d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -474,6 +474,9 @@ abstract class RDD[T: ClassTag]( /** * Return a fixed-size sampled subset of this RDD in an array * + * @note this method should only be used if the resulting array is expected to be small, as + * all the data is loaded into the driver's memory. + * * @param withReplacement whether sampling is done with replacement * @param num size of the returned sample * @param seed seed for the random number generator @@ -829,7 +832,8 @@ abstract class RDD[T: ClassTag]( /** * Return an array that contains all of the elements in this RDD. - * Note that this method should only be used if the resulting array is expected to be small, as + * + * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. */ def collect(): Array[T] = withScope { @@ -1196,7 +1200,8 @@ abstract class RDD[T: ClassTag]( * Take the first num elements of the RDD. It works by first scanning one partition, and use the * results from that partition to estimate the number of additional partitions needed to satisfy * the limit. - * Note that this method should only be used if the resulting array is expected to be small, as + * + * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. * * @note due to complications in the internal implementation, this method will raise @@ -1260,6 +1265,9 @@ abstract class RDD[T: ClassTag]( * // returns Array(6, 5) * }}} * + * @note this method should only be used if the resulting array is expected to be small, as + * all the data is loaded into the driver's memory. + * * @param num k, the number of top elements to return * @param ord the implicit ordering for T * @return an array of top elements @@ -1280,6 +1288,9 @@ abstract class RDD[T: ClassTag]( * // returns Array(2, 3) * }}} * + * @note this method should only be used if the resulting array is expected to be small, as + * all the data is loaded into the driver's memory. + * * @param num k, the number of elements to return * @param ord the implicit ordering for T * @return an array of top elements diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 74956f1d18532..11e7ce160df7e 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -426,6 +426,9 @@ def takeSample(self, withReplacement, num, seed=None): """ Return a fixed-size sampled subset of this RDD. + Note that this method should only be used if the resulting array is expected + to be small, as all the data is loaded into the driver's memory. + >>> rdd = sc.parallelize(range(0, 10)) >>> len(rdd.takeSample(True, 20, 1)) 20 @@ -1215,6 +1218,9 @@ def top(self, num, key=None): """ Get the top N elements from a RDD. + Note that this method should only be used if the resulting array is expected + to be small, as all the data is loaded into the driver's memory. + Note: It returns the list sorted in descending order. >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) @@ -1237,6 +1243,9 @@ def takeOrdered(self, num, key=None): Get the N elements from a RDD ordered in ascending order or as specified by the optional key function. + Note that this method should only be used if the resulting array is expected + to be small, as all the data is loaded into the driver's memory. + >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) [1, 2, 3, 4, 5, 6] >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 348ec2de50e1b..ce38d1c102043 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1383,8 +1383,10 @@ class DataFrame private[sql]( /** * Returns the first `n` rows. - * Note that this method should only be used if the resulting array is expected to be small, as + * + * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. + * * @group action * @since 1.3.0 */ From ee1781fbffaf51617ad034a4b2bc0dab17ae1aba Mon Sep 17 00:00:00 2001 From: Tommy YU Date: Tue, 26 Jan 2016 12:50:07 +0800 Subject: [PATCH 6/7] add notes for java api, and minor change for python pandas --- .../apache/spark/api/java/JavaPairRDD.scala | 3 +++ .../apache/spark/api/java/JavaRDDLike.scala | 24 +++++++++++++++++++ python/pyspark/sql/dataframe.py | 2 +- 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index fb04472ee73fd..94d103588b696 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -636,6 +636,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Return the key-value pairs in this RDD to the master as a Map. + * + * @note this method should only be used if the resulting data is expected to be small, as + * all the data is loaded into the driver's memory. */ def collectAsMap(): java.util.Map[K, V] = mapAsSerializableJavaMap(rdd.collectAsMap()) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 0f8d13cf5cc2f..1b897ac1330b8 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -327,6 +327,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return an array that contains all of the elements in this RDD. + * + * @note this method should only be used if the resulting array is expected to be small, as + * all the data is loaded into the driver's memory. */ def collect(): JList[T] = rdd.collect().toSeq.asJava @@ -465,6 +468,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so * it will be slow if a lot of partitions are required. In that case, use collect() to get the * whole RDD instead. + * + * @note this method should only be used if the resulting array is expected to be small, as + * all the data is loaded into the driver's memory. */ def take(num: Int): JList[T] = rdd.take(num).toSeq.asJava @@ -548,6 +554,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Returns the top k (largest) elements from this RDD as defined by * the specified Comparator[T] and maintains the order. + * + * @note this method should only be used if the resulting array is expected to be small, as + * all the data is loaded into the driver's memory. * @param num k, the number of top elements to return * @param comp the comparator that defines the order * @return an array of top elements @@ -559,6 +568,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Returns the top k (largest) elements from this RDD using the * natural ordering for T and maintains the order. + * + * @note this method should only be used if the resulting array is expected to be small, as + * all the data is loaded into the driver's memory. * @param num k, the number of top elements to return * @return an array of top elements */ @@ -570,6 +582,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Returns the first k (smallest) elements from this RDD as defined by * the specified Comparator[T] and maintains the order. + * + * @note this method should only be used if the resulting array is expected to be small, as + * all the data is loaded into the driver's memory. * @param num k, the number of elements to return * @param comp the comparator that defines the order * @return an array of top elements @@ -601,6 +616,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Returns the first k (smallest) elements from this RDD using the * natural ordering for T while maintain the order. + * + * @note this method should only be used if the resulting array is expected to be small, as + * all the data is loaded into the driver's memory. * @param num k, the number of top elements to return * @return an array of top elements */ @@ -634,6 +652,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * The asynchronous version of `collect`, which returns a future for * retrieving an array containing all of the elements in this RDD. + * + * @note this method should only be used if the resulting array is expected to be small, as + * all the data is loaded into the driver's memory. */ def collectAsync(): JavaFutureAction[JList[T]] = { new JavaFutureActionWrapper(rdd.collectAsync(), (x: Seq[T]) => x.asJava) @@ -642,6 +663,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * The asynchronous version of the `take` action, which returns a * future for retrieving the first `num` elements of this RDD. + * + * @note this method should only be used if the resulting array is expected to be small, as + * all the data is loaded into the driver's memory. */ def takeAsync(num: Int): JavaFutureAction[JList[T]] = { new JavaFutureActionWrapper(rdd.takeAsync(num), (x: Seq[T]) => x.asJava) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index a0c3959734d1a..3a8c8305ee3d8 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1333,7 +1333,7 @@ def toDF(self, *cols): def toPandas(self): """Returns the contents of this :class:`DataFrame` as Pandas ``pandas.DataFrame``. - Note that this method should only be used if the resulting array is expected + Note that this method should only be used if the resulting Pandas's DataFrame is expected to be small, as all the data is loaded into the driver's memory. This is only available if Pandas is installed and available. From c34225fca5724f3c0d90f164fc3281eea74c2967 Mon Sep 17 00:00:00 2001 From: Tommy YU Date: Tue, 26 Jan 2016 21:16:01 +0800 Subject: [PATCH 7/7] add notes for collectAsMap for scala,python api --- .../src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 3 +++ python/pyspark/rdd.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 33f2f0b44f773..61905a8421124 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -726,6 +726,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only * one value per key is preserved in the map returned) + * + * @note this method should only be used if the resulting data is expected to be small, as + * all the data is loaded into the driver's memory. */ def collectAsMap(): Map[K, V] = self.withScope { val data = self.collect() diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 11e7ce160df7e..fe2264a63cf30 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1525,6 +1525,9 @@ def collectAsMap(self): """ Return the key-value pairs in this RDD to the master as a dictionary. + Note that this method should only be used if the resulting data is expected + to be small, as all the data is loaded into the driver's memory. + >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() >>> m[1] 2