Skip to content

Commit

Permalink
[SPARK-5865][API DOC] Add doc warnings for methods that return local …
Browse files Browse the repository at this point in the history
…data structures

rxin srowen
I work out note message for rdd.take function, please help to review.

If it's fine, I can apply to all other function later.

Author: Tommy YU <tummyyu@163.com>

Closes #10874 from Wenpei/spark-5865-add-warning-for-localdatastructure.
  • Loading branch information
Wenpei authored and srowen committed Feb 6, 2016
1 parent 4f28291 commit 81da3be
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

/**
* Return the key-value pairs in this RDD to the master as a Map.
*
* @note this method should only be used if the resulting data is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collectAsMap(): java.util.Map[K, V] = mapAsSerializableJavaMap(rdd.collectAsMap())

Expand Down
24 changes: 24 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {

/**
* Return an array that contains all of the elements in this RDD.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collect(): JList[T] =
rdd.collect().toSeq.asJava
Expand Down Expand Up @@ -465,6 +468,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
* whole RDD instead.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def take(num: Int): JList[T] =
rdd.take(num).toSeq.asJava
Expand Down Expand Up @@ -548,6 +554,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Returns the top k (largest) elements from this RDD as defined by
* the specified Comparator[T] and maintains the order.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
* @param num k, the number of top elements to return
* @param comp the comparator that defines the order
* @return an array of top elements
Expand All @@ -559,6 +568,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Returns the top k (largest) elements from this RDD using the
* natural ordering for T and maintains the order.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
* @param num k, the number of top elements to return
* @return an array of top elements
*/
Expand All @@ -570,6 +582,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Returns the first k (smallest) elements from this RDD as defined by
* the specified Comparator[T] and maintains the order.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
* @param num k, the number of elements to return
* @param comp the comparator that defines the order
* @return an array of top elements
Expand Down Expand Up @@ -601,6 +616,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Returns the first k (smallest) elements from this RDD using the
* natural ordering for T while maintain the order.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
* @param num k, the number of top elements to return
* @return an array of top elements
*/
Expand Down Expand Up @@ -634,6 +652,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* The asynchronous version of `collect`, which returns a future for
* retrieving an array containing all of the elements in this RDD.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collectAsync(): JavaFutureAction[JList[T]] = {
new JavaFutureActionWrapper(rdd.collectAsync(), (x: Seq[T]) => x.asJava)
Expand All @@ -642,6 +663,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* The asynchronous version of the `take` action, which returns a
* future for retrieving the first `num` elements of this RDD.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def takeAsync(num: Int): JavaFutureAction[JList[T]] = {
new JavaFutureActionWrapper(rdd.takeAsync(num), (x: Seq[T]) => x.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*
* Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
* one value per key is preserved in the map returned)
*
* @note this method should only be used if the resulting data is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collectAsMap(): Map[K, V] = self.withScope {
val data = self.collect()
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,9 @@ abstract class RDD[T: ClassTag](
/**
* Return a fixed-size sampled subset of this RDD in an array
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*
* @param withReplacement whether sampling is done with replacement
* @param num size of the returned sample
* @param seed seed for the random number generator
Expand Down Expand Up @@ -836,6 +839,9 @@ abstract class RDD[T: ClassTag](

/**
* Return an array that contains all of the elements in this RDD.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Expand Down Expand Up @@ -1202,6 +1208,9 @@ abstract class RDD[T: ClassTag](
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*
* @note due to complications in the internal implementation, this method will raise
* an exception if called on an RDD of `Nothing` or `Null`.
*/
Expand Down Expand Up @@ -1263,6 +1272,9 @@ abstract class RDD[T: ClassTag](
* // returns Array(6, 5)
* }}}
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*
* @param num k, the number of top elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
Expand All @@ -1283,6 +1295,9 @@ abstract class RDD[T: ClassTag](
* // returns Array(2, 3)
* }}}
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*
* @param num k, the number of elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
Expand Down
17 changes: 17 additions & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,9 @@ def takeSample(self, withReplacement, num, seed=None):
"""
Return a fixed-size sampled subset of this RDD.
Note that this method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.
>>> rdd = sc.parallelize(range(0, 10))
>>> len(rdd.takeSample(True, 20, 1))
20
Expand Down Expand Up @@ -766,6 +769,8 @@ def func(it):
def collect(self):
"""
Return a list that contains all of the elements in this RDD.
Note that this method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.
"""
with SCCallSiteSync(self.context) as css:
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
Expand Down Expand Up @@ -1213,6 +1218,9 @@ def top(self, num, key=None):
"""
Get the top N elements from a RDD.
Note that this method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.
Note: It returns the list sorted in descending order.
>>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
Expand All @@ -1235,6 +1243,9 @@ def takeOrdered(self, num, key=None):
Get the N elements from a RDD ordered in ascending order or as
specified by the optional key function.
Note that this method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
[1, 2, 3, 4, 5, 6]
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
Expand All @@ -1254,6 +1265,9 @@ def take(self, num):
that partition to estimate the number of additional partitions needed
to satisfy the limit.
Note that this method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.
Translated from the Scala implementation in RDD#take().
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
Expand Down Expand Up @@ -1511,6 +1525,9 @@ def collectAsMap(self):
"""
Return the key-value pairs in this RDD to the master as a dictionary.
Note that this method should only be used if the resulting data is expected
to be small, as all the data is loaded into the driver's memory.
>>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
>>> m[1]
2
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,9 @@ def describe(self, *cols):
def head(self, n=None):
"""Returns the first ``n`` rows.
Note that this method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.
:param n: int, default 1. Number of rows to return.
:return: If n is greater than 1, return a list of :class:`Row`.
If n is 1, return a single Row.
Expand Down Expand Up @@ -1330,6 +1333,9 @@ def toDF(self, *cols):
def toPandas(self):
"""Returns the contents of this :class:`DataFrame` as Pandas ``pandas.DataFrame``.
Note that this method should only be used if the resulting Pandas's DataFrame is expected
to be small, as all the data is loaded into the driver's memory.
This is only available if Pandas is installed and available.
>>> df.toPandas() # doctest: +SKIP
Expand Down
4 changes: 4 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,10 @@ class DataFrame private[sql](

/**
* Returns the first `n` rows.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*
* @group action
* @since 1.3.0
*/
Expand Down

0 comments on commit 81da3be

Please sign in to comment.