From 0690596029d1c061c16564f12416311e7624e5b2 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Thu, 25 Jan 2018 13:50:21 -0800 Subject: [PATCH 1/7] [SPARK-23084][PYTHON]Add unboundedPreceding(), unboundedFollowing() and currentRow() to PySpark --- python/pyspark/sql/functions.py | 30 ++++++++++++++++++++++++++++++ python/pyspark/sql/window.py | 30 ++++++++++++++++++------------ 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index a291c9b71913f..218b495a1e588 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -809,6 +809,36 @@ def ntile(n): return Column(sc._jvm.functions.ntile(int(n))) +@since(2.3) +def unboundedPreceding(): + """ + Window function: returns the special frame boundary that represents the first row + in the window partition. + """ + sc = SparkContext._active_spark_context + return Column(sc._jvm.functions.unboundedPreceding()) + + +@since(2.3) +def unboundedFollowing(): + """ + Window function: returns the special frame boundary that represents the last row + in the window partition. + """ + sc = SparkContext._active_spark_context + return Column(sc._jvm.functions.unboundedFollowing()) + + +@since(2.3) +def currentRow(): + """ + Window function: returns the special frame boundary that represents the current row + in the window partition. + """ + sc = SparkContext._active_spark_context + return Column(sc._jvm.functions.currentRow()) + + # ---------------------- Date/Timestamp functions ------------------------------ @since(1.5) diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py index 7ce27f9b102c0..8378d3ceca6f6 100644 --- a/python/pyspark/sql/window.py +++ b/python/pyspark/sql/window.py @@ -124,16 +124,19 @@ def rangeBetween(start, end): values directly. :param start: boundary start, inclusive. - The frame is unbounded if this is ``Window.unboundedPreceding``, or + The frame is unbounded if this is ``Window.unboundedPreceding``, + ``org.apache.spark.sql.catalyst.expressions.UnboundedPreceding``, or any value less than or equal to max(-sys.maxsize, -9223372036854775808). :param end: boundary end, inclusive. - The frame is unbounded if this is ``Window.unboundedFollowing``, or + The frame is unbounded if this is ``Window.unboundedFollowing``, + org.apache.spark.sql.catalyst.expressions.UnboundedPFollowing, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). """ - if start <= Window._PRECEDING_THRESHOLD: - start = Window.unboundedPreceding - if end >= Window._FOLLOWING_THRESHOLD: - end = Window.unboundedFollowing + if isinstance(start, long) and isinstance(end, long): + if start <= Window._PRECEDING_THRESHOLD: + start = Window.unboundedPreceding + if end >= Window._FOLLOWING_THRESHOLD: + end = Window.unboundedFollowing sc = SparkContext._active_spark_context jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end) return WindowSpec(jspec) @@ -212,16 +215,19 @@ def rangeBetween(self, start, end): values directly. :param start: boundary start, inclusive. - The frame is unbounded if this is ``Window.unboundedPreceding``, or + The frame is unbounded if this is ``Window.unboundedPreceding``, + ``org.apache.spark.sql.catalyst.expressions.UnboundedPreceding``, or any value less than or equal to max(-sys.maxsize, -9223372036854775808). :param end: boundary end, inclusive. - The frame is unbounded if this is ``Window.unboundedFollowing``, or + The frame is unbounded if this is ``Window.unboundedFollowing``, + ``org.apache.spark.sql.catalyst.expressions.UnboundedFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). """ - if start <= Window._PRECEDING_THRESHOLD: - start = Window.unboundedPreceding - if end >= Window._FOLLOWING_THRESHOLD: - end = Window.unboundedFollowing + if isinstance(start, long) and isinstance(end, long): + if start <= Window._PRECEDING_THRESHOLD: + start = Window.unboundedPreceding + if end >= Window._FOLLOWING_THRESHOLD: + end = Window.unboundedFollowing return WindowSpec(self._jspec.rangeBetween(start, end)) From 2c661561df80b551559d0685602e37fa62bf5d1e Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Thu, 25 Jan 2018 14:40:41 -0800 Subject: [PATCH 2/7] fix pycodestyle problem --- python/pyspark/sql/window.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py index 8378d3ceca6f6..206c49db061e9 100644 --- a/python/pyspark/sql/window.py +++ b/python/pyspark/sql/window.py @@ -129,7 +129,7 @@ def rangeBetween(start, end): any value less than or equal to max(-sys.maxsize, -9223372036854775808). :param end: boundary end, inclusive. The frame is unbounded if this is ``Window.unboundedFollowing``, - org.apache.spark.sql.catalyst.expressions.UnboundedPFollowing, or + ``org.apache.spark.sql.catalyst.expressions.UnboundedPFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). """ if isinstance(start, long) and isinstance(end, long): From b85686099c2effe99319d05422c510658becd8fc Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Thu, 25 Jan 2018 17:27:12 -0800 Subject: [PATCH 3/7] change type long to int for python3 --- python/pyspark/sql/window.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py index 206c49db061e9..fc174c9476bfc 100644 --- a/python/pyspark/sql/window.py +++ b/python/pyspark/sql/window.py @@ -132,7 +132,7 @@ def rangeBetween(start, end): ``org.apache.spark.sql.catalyst.expressions.UnboundedPFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). """ - if isinstance(start, long) and isinstance(end, long): + if isinstance(start, int) and isinstance(end, int): if start <= Window._PRECEDING_THRESHOLD: start = Window.unboundedPreceding if end >= Window._FOLLOWING_THRESHOLD: @@ -223,7 +223,7 @@ def rangeBetween(self, start, end): ``org.apache.spark.sql.catalyst.expressions.UnboundedFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). """ - if isinstance(start, long) and isinstance(end, long): + if isinstance(start, int) and isinstance(end, int): if start <= Window._PRECEDING_THRESHOLD: start = Window.unboundedPreceding if end >= Window._FOLLOWING_THRESHOLD: From bbf8778a963a5e0b8de1b5ab1fddf4cafe13c180 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 29 Jan 2018 21:33:08 -0800 Subject: [PATCH 4/7] address comments --- python/pyspark/sql/functions.py | 9 +++++++++ python/pyspark/sql/window.py | 10 +++++++--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 218b495a1e588..20758aef085a8 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -814,6 +814,9 @@ def unboundedPreceding(): """ Window function: returns the special frame boundary that represents the first row in the window partition. + >>> df = spark.createDataFrame([(5,)]) + >>> df.select(unboundedPreceding()).show + """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.unboundedPreceding()) @@ -824,6 +827,9 @@ def unboundedFollowing(): """ Window function: returns the special frame boundary that represents the last row in the window partition. + >>> df = spark.createDataFrame([(5,)]) + >>> df.select(unboundedFollowing()).show + """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.unboundedFollowing()) @@ -834,6 +840,9 @@ def currentRow(): """ Window function: returns the special frame boundary that represents the current row in the window partition. + >>> df = spark.createDataFrame([(5,)]) + >>> df.select(currentRow()).show + """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.currentRow()) diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py index fc174c9476bfc..14e8650dcc695 100644 --- a/python/pyspark/sql/window.py +++ b/python/pyspark/sql/window.py @@ -16,6 +16,8 @@ # import sys +if sys.version >= '3': + long = int from pyspark import since, SparkContext from pyspark.sql.column import _to_seq, _to_java_column @@ -129,10 +131,11 @@ def rangeBetween(start, end): any value less than or equal to max(-sys.maxsize, -9223372036854775808). :param end: boundary end, inclusive. The frame is unbounded if this is ``Window.unboundedFollowing``, - ``org.apache.spark.sql.catalyst.expressions.UnboundedPFollowing``, or + ``org.apache.spark.sql.catalyst.expressions.UnboundedFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). + any value greater than or equal to 9223372036854775807. """ - if isinstance(start, int) and isinstance(end, int): + if isinstance(start, (int, long)) and isinstance(end, (int, long)): if start <= Window._PRECEDING_THRESHOLD: start = Window.unboundedPreceding if end >= Window._FOLLOWING_THRESHOLD: @@ -222,8 +225,9 @@ def rangeBetween(self, start, end): The frame is unbounded if this is ``Window.unboundedFollowing``, ``org.apache.spark.sql.catalyst.expressions.UnboundedFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). + """ - if isinstance(start, int) and isinstance(end, int): + if isinstance(start, (int, long)) and isinstance(end, (int, long)): if start <= Window._PRECEDING_THRESHOLD: start = Window.unboundedPreceding if end >= Window._FOLLOWING_THRESHOLD: From 45545d65ce9ecc077bf842602ecc465ceeeda061 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 31 Jan 2018 16:27:35 -0800 Subject: [PATCH 5/7] address comments(2) --- python/pyspark/sql/functions.py | 15 ++++++++----- python/pyspark/sql/window.py | 40 ++++++++++++++++++++++++--------- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 20758aef085a8..b9e7893224281 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -814,9 +814,10 @@ def unboundedPreceding(): """ Window function: returns the special frame boundary that represents the first row in the window partition. + >>> df = spark.createDataFrame([(5,)]) - >>> df.select(unboundedPreceding()).show - + >>> df.select(unboundedPreceding()).columns[0] + 'UNBOUNDED PRECEDING' """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.unboundedPreceding()) @@ -827,9 +828,10 @@ def unboundedFollowing(): """ Window function: returns the special frame boundary that represents the last row in the window partition. + >>> df = spark.createDataFrame([(5,)]) - >>> df.select(unboundedFollowing()).show - + >>> df.select(unboundedFollowing()).columns[0] + 'UNBOUNDED FOLLOWING' """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.unboundedFollowing()) @@ -840,9 +842,10 @@ def currentRow(): """ Window function: returns the special frame boundary that represents the current row in the window partition. + >>> df = spark.createDataFrame([(5,)]) - >>> df.select(currentRow()).show - + >>> df.select(currentRow()).columns[0] + 'CURRENT ROW' """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.currentRow()) diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py index 14e8650dcc695..8e4b0db13912f 100644 --- a/python/pyspark/sql/window.py +++ b/python/pyspark/sql/window.py @@ -20,7 +20,7 @@ long = int from pyspark import since, SparkContext -from pyspark.sql.column import _to_seq, _to_java_column +from pyspark.sql.column import Column, _to_seq, _to_java_column __all__ = ["Window", "WindowSpec"] @@ -126,20 +126,39 @@ def rangeBetween(start, end): values directly. :param start: boundary start, inclusive. - The frame is unbounded if this is ``Window.unboundedPreceding``, - ``org.apache.spark.sql.catalyst.expressions.UnboundedPreceding``, or + The frame is unbounded if this is ``Window.unboundedPreceding``, or any value less than or equal to max(-sys.maxsize, -9223372036854775808). :param end: boundary end, inclusive. - The frame is unbounded if this is ``Window.unboundedFollowing``, - ``org.apache.spark.sql.catalyst.expressions.UnboundedFollowing``, or + The frame is unbounded if this is ``Window.unboundedFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). - any value greater than or equal to 9223372036854775807. + + >>> from pyspark.sql import functions as F, SparkSession, Window + >>> spark = SparkSession.builder.getOrCreate() + >>> df = spark.createDataFrame([(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), + ... (3, "b")], ["id", "category"]) + >>> window = Window.orderBy("id").partitionBy("category").rangeBetween(F.currentRow(), + ... F.lit(1)) + >>> df.withColumn("sum", F.sum("id").over(window)).show() + +---+--------+---+ + | id|category|sum| + +---+--------+---+ + | 1| b| 3| + | 2| b| 5| + | 3| b| 3| + | 1| a| 4| + | 1| a| 4| + | 2| a| 2| + +---+--------+---+ + """ if isinstance(start, (int, long)) and isinstance(end, (int, long)): if start <= Window._PRECEDING_THRESHOLD: start = Window.unboundedPreceding if end >= Window._FOLLOWING_THRESHOLD: end = Window.unboundedFollowing + elif isinstance(start, Column) and isinstance(end, Column): + start = start._jc + end = end._jc sc = SparkContext._active_spark_context jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end) return WindowSpec(jspec) @@ -218,12 +237,10 @@ def rangeBetween(self, start, end): values directly. :param start: boundary start, inclusive. - The frame is unbounded if this is ``Window.unboundedPreceding``, - ``org.apache.spark.sql.catalyst.expressions.UnboundedPreceding``, or + The frame is unbounded if this is ``Window.unboundedPreceding``, or any value less than or equal to max(-sys.maxsize, -9223372036854775808). :param end: boundary end, inclusive. - The frame is unbounded if this is ``Window.unboundedFollowing``, - ``org.apache.spark.sql.catalyst.expressions.UnboundedFollowing``, or + The frame is unbounded if this is ``Window.unboundedFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). """ @@ -232,6 +249,9 @@ def rangeBetween(self, start, end): start = Window.unboundedPreceding if end >= Window._FOLLOWING_THRESHOLD: end = Window.unboundedFollowing + elif isinstance(start, Column) and isinstance(end, Column): + start = start._jc + end = end._jc return WindowSpec(self._jspec.rangeBetween(start, end)) From 25fee3901cfba3599330da394e437c91a9783368 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 31 Jan 2018 23:37:34 -0800 Subject: [PATCH 6/7] address comments(3) --- python/pyspark/sql/functions.py | 12 ------------ python/pyspark/sql/window.py | 31 ++++++++++++++++++------------- 2 files changed, 18 insertions(+), 25 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index b9e7893224281..218b495a1e588 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -814,10 +814,6 @@ def unboundedPreceding(): """ Window function: returns the special frame boundary that represents the first row in the window partition. - - >>> df = spark.createDataFrame([(5,)]) - >>> df.select(unboundedPreceding()).columns[0] - 'UNBOUNDED PRECEDING' """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.unboundedPreceding()) @@ -828,10 +824,6 @@ def unboundedFollowing(): """ Window function: returns the special frame boundary that represents the last row in the window partition. - - >>> df = spark.createDataFrame([(5,)]) - >>> df.select(unboundedFollowing()).columns[0] - 'UNBOUNDED FOLLOWING' """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.unboundedFollowing()) @@ -842,10 +834,6 @@ def currentRow(): """ Window function: returns the special frame boundary that represents the current row in the window partition. - - >>> df = spark.createDataFrame([(5,)]) - >>> df.select(currentRow()).columns[0] - 'CURRENT ROW' """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.currentRow()) diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py index 8e4b0db13912f..da7d0f36e7f9a 100644 --- a/python/pyspark/sql/window.py +++ b/python/pyspark/sql/window.py @@ -122,22 +122,25 @@ def rangeBetween(start, end): and "5" means the five off after the current row. We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``, - and ``Window.currentRow`` to specify special boundary values, rather than using integral - values directly. + ``Window.currentRow``, ``pyspark.sql.functions.unboundedPreceding``, + ``pyspark.sql.functions.unboundedFollowing`` and ``pyspark.sql.functions.currentRow`` + to specify special boundary values, rather than using integral values directly. :param start: boundary start, inclusive. - The frame is unbounded if this is ``Window.unboundedPreceding``, or + The frame is unbounded if this is ``Window.unboundedPreceding``, + a column returned by ``pyspark.sql.functions.unboundedPreceding``, or any value less than or equal to max(-sys.maxsize, -9223372036854775808). :param end: boundary end, inclusive. - The frame is unbounded if this is ``Window.unboundedFollowing``, or + The frame is unbounded if this is ``Window.unboundedFollowing``, + a column returned by ``pyspark.sql.functions.unboundedFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). >>> from pyspark.sql import functions as F, SparkSession, Window >>> spark = SparkSession.builder.getOrCreate() - >>> df = spark.createDataFrame([(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), - ... (3, "b")], ["id", "category"]) - >>> window = Window.orderBy("id").partitionBy("category").rangeBetween(F.currentRow(), - ... F.lit(1)) + >>> df = spark.createDataFrame( + ... [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]) + >>> window = Window.orderBy("id").partitionBy("category").rangeBetween( + ... F.currentRow(), F.lit(1)) >>> df.withColumn("sum", F.sum("id").over(window)).show() +---+--------+---+ | id|category|sum| @@ -233,16 +236,18 @@ def rangeBetween(self, start, end): and "5" means the five off after the current row. We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``, - and ``Window.currentRow`` to specify special boundary values, rather than using integral - values directly. + ``Window.currentRow``, ``pyspark.sql.functions.unboundedPreceding``, + ``pyspark.sql.functions.unboundedFollowing`` and ``pyspark.sql.functions.currentRow`` + to specify special boundary values, rather than using integral values directly. :param start: boundary start, inclusive. - The frame is unbounded if this is ``Window.unboundedPreceding``, or + The frame is unbounded if this is ``Window.unboundedPreceding``, + a column returned by ``pyspark.sql.functions.unboundedPreceding``, or any value less than or equal to max(-sys.maxsize, -9223372036854775808). :param end: boundary end, inclusive. - The frame is unbounded if this is ``Window.unboundedFollowing``, or + The frame is unbounded if this is ``Window.unboundedFollowing``, + a column returned by ``pyspark.sql.functions.unboundedFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). - """ if isinstance(start, (int, long)) and isinstance(end, (int, long)): if start <= Window._PRECEDING_THRESHOLD: From f82c7d11d12611eed5ac5bd9f4b8a4e6516fdf84 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sun, 4 Feb 2018 23:02:09 -0800 Subject: [PATCH 7/7] address comments(4) --- python/pyspark/sql/functions.py | 6 +++--- python/pyspark/sql/window.py | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 218b495a1e588..870faa1b9669c 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -809,7 +809,7 @@ def ntile(n): return Column(sc._jvm.functions.ntile(int(n))) -@since(2.3) +@since(2.4) def unboundedPreceding(): """ Window function: returns the special frame boundary that represents the first row @@ -819,7 +819,7 @@ def unboundedPreceding(): return Column(sc._jvm.functions.unboundedPreceding()) -@since(2.3) +@since(2.4) def unboundedFollowing(): """ Window function: returns the special frame boundary that represents the last row @@ -829,7 +829,7 @@ def unboundedFollowing(): return Column(sc._jvm.functions.unboundedFollowing()) -@since(2.3) +@since(2.4) def currentRow(): """ Window function: returns the special frame boundary that represents the current row diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py index da7d0f36e7f9a..bb841a9b9ff7c 100644 --- a/python/pyspark/sql/window.py +++ b/python/pyspark/sql/window.py @@ -152,7 +152,6 @@ def rangeBetween(start, end): | 1| a| 4| | 2| a| 2| +---+--------+---+ - """ if isinstance(start, (int, long)) and isinstance(end, (int, long)): if start <= Window._PRECEDING_THRESHOLD: @@ -263,7 +262,7 @@ def rangeBetween(self, start, end): def _test(): import doctest SparkContext('local[4]', 'PythonTest') - (failure_count, test_count) = doctest.testmod() + (failure_count, test_count) = doctest.testmod(optionflags=doctest.NORMALIZE_WHITESPACE) if failure_count: exit(-1)