From bc761ff69105035c5b31e14178db974e457995bc Mon Sep 17 00:00:00 2001 From: Nong Li Date: Thu, 5 Nov 2015 10:48:21 -0800 Subject: [PATCH 1/3] [SPARK-11410] [SQL] [PYSPARK] Add python bindings for repartition and sortWithinPartitions. --- python/pyspark/sql/dataframe.py | 102 +++++++++++++++++++++++++++----- 1 file changed, 86 insertions(+), 16 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 3baff8147753d..a60a9d3983b53 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -422,6 +422,49 @@ def repartition(self, numPartitions): """ return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx) + @since(1.6) + def repartitionBy(self, *cols, **kwargs): + """ + Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The + resulting DataFrame is hash partitioned. + + :param numPartitions: int + If specified, the number of partitions to partition into. If not specified, preserves + the current number of partitions. + + >>> data = df.unionAll(df).repartitionBy("age") + >>> data.show() + +---+-----+ + |age| name| + +---+-----+ + | 2|Alice| + | 2|Alice| + | 5| Bob| + | 5| Bob| + +---+-----+ + >>> data.rdd.getNumPartitions() + 200 + >>> data = data.repartitionBy("age", numPartitions=7) + >>> data.show() + +---+-----+ + |age| name| + +---+-----+ + | 5| Bob| + | 5| Bob| + | 2|Alice| + | 2|Alice| + +---+-----+ + >>> data.rdd.getNumPartitions() + 7 + """ + numPartitions = kwargs.get('numPartitions', -1) + if not isinstance(numPartitions, int): + raise TypeError("numPartitions can only be int, but got %s" % type(numPartitions)) + if numPartitions == -1: + return DataFrame(self._jdf.repartition(self._jcols(*cols)), self.sql_ctx) + else: + return DataFrame(self._jdf.repartition(numPartitions, self._jcols(*cols)), self.sql_ctx) + @since(1.3) def distinct(self): """Returns a new :class:`DataFrame` containing the distinct rows in this :class:`DataFrame`. @@ -589,6 +632,29 @@ def join(self, other, on=None, how=None): jdf = self._jdf.join(other._jdf, on._jc, how) return DataFrame(jdf, self.sql_ctx) + @ignore_unicode_prefix + @since(1.6) + def sortWithinPartitions(self, *cols, **kwargs): + """Returns a new :class:`DataFrame` with each partition sorted by the specified column(s). + + :param cols: list of :class:`Column` or column names to sort by. + :param ascending: boolean or list of boolean (default True). + Sort ascending vs. descending. Specify list for multiple sort orders. + If a list is specified, length of the list must equal length of the `cols`. + + >>> df.rdd.getNumPartitions() + 4 + >>> df.sortWithinPartitions("age", ascending=False).show() + +---+-----+ + |age| name| + +---+-----+ + | 2|Alice| + | 5| Bob| + +---+-----+ + """ + jdf = self._jdf.sortWithinPartitions(self._sort_cols(cols, kwargs)) + return DataFrame(jdf, self.sql_ctx) + @ignore_unicode_prefix @since(1.3) def sort(self, *cols, **kwargs): @@ -613,22 +679,7 @@ def sort(self, *cols, **kwargs): >>> df.orderBy(["age", "name"], ascending=[0, 1]).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')] """ - if not cols: - raise ValueError("should sort by at least one column") - if len(cols) == 1 and isinstance(cols[0], list): - cols = cols[0] - jcols = [_to_java_column(c) for c in cols] - ascending = kwargs.get('ascending', True) - if isinstance(ascending, (bool, int)): - if not ascending: - jcols = [jc.desc() for jc in jcols] - elif isinstance(ascending, list): - jcols = [jc if asc else jc.desc() - for asc, jc in zip(ascending, jcols)] - else: - raise TypeError("ascending can only be boolean or list, but got %s" % type(ascending)) - - jdf = self._jdf.sort(self._jseq(jcols)) + jdf = self._jdf.sort(self._sort_cols(cols, kwargs)) return DataFrame(jdf, self.sql_ctx) orderBy = sort @@ -650,6 +701,25 @@ def _jcols(self, *cols): cols = cols[0] return self._jseq(cols, _to_java_column) + def _sort_cols(self, cols, kwargs): + """ Return a JVM Seq of Columns that describes the sort order + """ + if not cols: + raise ValueError("should sort by at least one column") + if len(cols) == 1 and isinstance(cols[0], list): + cols = cols[0] + jcols = [_to_java_column(c) for c in cols] + ascending = kwargs.get('ascending', True) + if isinstance(ascending, (bool, int)): + if not ascending: + jcols = [jc.desc() for jc in jcols] + elif isinstance(ascending, list): + jcols = [jc if asc else jc.desc() + for asc, jc in zip(ascending, jcols)] + else: + raise TypeError("ascending can only be boolean or list, but got %s" % type(ascending)) + return self._jseq(jcols) + @since("1.3.1") def describe(self, *cols): """Computes statistics for numeric columns. From a63f552ff2b5286c968307be954557a718ce373c Mon Sep 17 00:00:00 2001 From: Nong Li Date: Fri, 6 Nov 2015 11:30:45 -0800 Subject: [PATCH 2/3] Code review fixes. --- python/pyspark/sql/dataframe.py | 45 ++++++++++++++++++++------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index a60a9d3983b53..9854643cb2df2 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -423,16 +423,18 @@ def repartition(self, numPartitions): return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx) @since(1.6) - def repartitionBy(self, *cols, **kwargs): + def repartition(self, numPartitions, *cols): """ Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned. - :param numPartitions: int - If specified, the number of partitions to partition into. If not specified, preserves - the current number of partitions. + ``numPartitions`` can be an int to specify the target number of partitions or a Column. + If it is a Column, it will be used as the first partitioning column. If not specified, + the default number of partitions is used. - >>> data = df.unionAll(df).repartitionBy("age") + >>> df.repartition(10).rdd.getNumPartitions() + 10 + >>> data = df.unionAll(df).repartition("age") >>> data.show() +---+-----+ |age| name| @@ -442,9 +444,7 @@ def repartitionBy(self, *cols, **kwargs): | 5| Bob| | 5| Bob| +---+-----+ - >>> data.rdd.getNumPartitions() - 200 - >>> data = data.repartitionBy("age", numPartitions=7) + >>> data = data.repartition(7, "age") >>> data.show() +---+-----+ |age| name| @@ -456,14 +456,28 @@ def repartitionBy(self, *cols, **kwargs): +---+-----+ >>> data.rdd.getNumPartitions() 7 + >>> data = data.repartition("name", "age") + >>> data.show() + +---+-----+ + |age| name| + +---+-----+ + | 5| Bob| + | 5| Bob| + | 2|Alice| + | 2|Alice| + +---+-----+ """ - numPartitions = kwargs.get('numPartitions', -1) - if not isinstance(numPartitions, int): - raise TypeError("numPartitions can only be int, but got %s" % type(numPartitions)) - if numPartitions == -1: - return DataFrame(self._jdf.repartition(self._jcols(*cols)), self.sql_ctx) + if isinstance(numPartitions, int): + if len(cols) == 0: + return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx) + else: + return DataFrame( + self._jdf.repartition(numPartitions, self._jcols(*cols)), self.sql_ctx) + elif isinstance(numPartitions, (basestring, Column)): + cols = (numPartitions, ) + cols + return DataFrame(self._jdf.repartition(self._jcols(*cols)), self.sql_ctx) else: - return DataFrame(self._jdf.repartition(numPartitions, self._jcols(*cols)), self.sql_ctx) + raise TypeError("numPartitions should be an int or Column") @since(1.3) def distinct(self): @@ -632,7 +646,6 @@ def join(self, other, on=None, how=None): jdf = self._jdf.join(other._jdf, on._jc, how) return DataFrame(jdf, self.sql_ctx) - @ignore_unicode_prefix @since(1.6) def sortWithinPartitions(self, *cols, **kwargs): """Returns a new :class:`DataFrame` with each partition sorted by the specified column(s). @@ -642,8 +655,6 @@ def sortWithinPartitions(self, *cols, **kwargs): Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of the `cols`. - >>> df.rdd.getNumPartitions() - 4 >>> df.sortWithinPartitions("age", ascending=False).show() +---+-----+ |age| name| From 9067172502c0bd253854448b681008e34d6ce188 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Fri, 6 Nov 2015 12:48:17 -0800 Subject: [PATCH 3/3] Update doc with versionchanged. --- python/pyspark/sql/dataframe.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 9854643cb2df2..09d53034c683b 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -422,7 +422,7 @@ def repartition(self, numPartitions): """ return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx) - @since(1.6) + @since(1.3) def repartition(self, numPartitions, *cols): """ Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The @@ -432,6 +432,10 @@ def repartition(self, numPartitions, *cols): If it is a Column, it will be used as the first partitioning column. If not specified, the default number of partitions is used. + .. versionchanged:: 1.6 + Added optional arguments to specify the partitioning columns. Also made numPartitions + optional if partitioning columns are specified. + >>> df.repartition(10).rdd.getNumPartitions() 10 >>> data = df.unionAll(df).repartition("age")