From 305d1623eece72f26829b5f24f9d0adbcd8d8d72 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Wed, 24 May 2023 19:38:26 +0800 Subject: [PATCH 1/5] [SPARK-43773][PYTHON] Implement 'levenshtein(str1, str2[, threshold])' functions in python client --- python/pyspark/sql/functions.py | 18 ++++++++++++++++-- python/pyspark/sql/tests/test_functions.py | 7 +++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index e9b71f7d617db..b82bbfa43c430 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -6594,7 +6594,9 @@ def substring_index(str: "ColumnOrName", delim: str, count: int) -> Column: @try_remote_functions -def levenshtein(left: "ColumnOrName", right: "ColumnOrName") -> Column: +def levenshtein( + left: "ColumnOrName", right: "ColumnOrName", threshold: Optional[int] = None +) -> Column: """Computes the Levenshtein distance of the two given strings. .. versionadded:: 1.5.0 @@ -6602,12 +6604,18 @@ def levenshtein(left: "ColumnOrName", right: "ColumnOrName") -> Column: .. versionchanged:: 3.4.0 Supports Spark Connect. + .. versionchanged:: 3.5.0 + Supports Spark Connect. + Parameters ---------- left : :class:`~pyspark.sql.Column` or str first column value. right : :class:`~pyspark.sql.Column` or str second column value. + threshold : int, optional + if set when the levenshtein distance of the two given strings + less than or equal to a given threshold then return result distance, or -1 Returns ------- @@ -6619,8 +6627,14 @@ def levenshtein(left: "ColumnOrName", right: "ColumnOrName") -> Column: >>> df0 = spark.createDataFrame([('kitten', 'sitting',)], ['l', 'r']) >>> df0.select(levenshtein('l', 'r').alias('d')).collect() [Row(d=3)] + >>> df0.select(levenshtein('l', 'r', 2).alias('d')).collect() + [Row(d=-1)] """ - return _invoke_function_over_columns("levenshtein", left, right) + if threshold is None: + return _invoke_function_over_columns("levenshtein", left, right) + else: + return _invoke_function( + "levenshtein", _to_java_column(left), _to_java_column(right), threshold) @try_remote_functions diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index 9067de3463357..352accca86961 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -377,6 +377,13 @@ def test_array_contains_function(self): actual = df.select(F.array_contains(df.data, "1").alias("b")).collect() self.assertEqual([Row(b=True), Row(b=False)], actual) + def test_levenshtein_function(self): + df = self.spark.createDataFrame([('kitten', 'sitting')], ['l', 'r']) + actual1 = df.select(F.levenshtein(df.l, df.r).alias('b')).collect() + self.assertEqual([Row(b=3)], actual1) + actual2 = df.select(F.levenshtein(df.l, df.r, 2).alias('b')).collect() + self.assertEqual([Row(b=-1)], actual2) + def test_between_function(self): df = self.spark.createDataFrame( [Row(a=1, b=2, c=3), Row(a=2, b=1, c=3), Row(a=4, b=1, c=4)] From bdf0a9ec81b63b4d99176e781ed6cdacd7c4d595 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Thu, 25 May 2023 09:16:44 +0800 Subject: [PATCH 2/5] [SPARK-43773][PYTHON] Implement 'levenshtein(str1, str2[, threshold])' functions in python client --- python/pyspark/sql/connect/functions.py | 9 +++++++-- .../pyspark/sql/tests/connect/test_connect_function.py | 6 ++++++ python/pyspark/sql/tests/test_functions.py | 8 ++++---- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py index b7d7bc937cf8e..d3a05d6a1c608 100644 --- a/python/pyspark/sql/connect/functions.py +++ b/python/pyspark/sql/connect/functions.py @@ -1878,8 +1878,13 @@ def substring_index(str: "ColumnOrName", delim: str, count: int) -> Column: substring_index.__doc__ = pysparkfuncs.substring_index.__doc__ -def levenshtein(left: "ColumnOrName", right: "ColumnOrName") -> Column: - return _invoke_function_over_columns("levenshtein", left, right) +def levenshtein( + left: "ColumnOrName", right: "ColumnOrName", threshold: Optional[int] = None +) -> Column: + if threshold is None: + return _invoke_function_over_columns("levenshtein", left, right) + else: + return _invoke_function("levenshtein", _to_col(left), _to_col(right), lit(threshold)) levenshtein.__doc__ = pysparkfuncs.levenshtein.__doc__ diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py b/python/pyspark/sql/tests/connect/test_connect_function.py index e274635d3c627..c5a27270476a2 100644 --- a/python/pyspark/sql/tests/connect/test_connect_function.py +++ b/python/pyspark/sql/tests/connect/test_connect_function.py @@ -1920,10 +1920,16 @@ def test_string_functions_multi_args(self): cdf.select(CF.substring_index(cdf.e, ".", 2)).toPandas(), sdf.select(SF.substring_index(sdf.e, ".", 2)).toPandas(), ) + self.assert_eq( cdf.select(CF.levenshtein(cdf.b, cdf.c)).toPandas(), sdf.select(SF.levenshtein(sdf.b, sdf.c)).toPandas(), ) + self.assert_eq( + cdf.select(CF.levenshtein(cdf.b, cdf.c, 1)).toPandas(), + sdf.select(SF.levenshtein(sdf.b, sdf.c, 1)).toPandas(), + ) + self.assert_eq( cdf.select(CF.locate("e", cdf.b)).toPandas(), sdf.select(SF.locate("e", sdf.b)).toPandas(), diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index 352accca86961..2af92ded2a495 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -379,10 +379,10 @@ def test_array_contains_function(self): def test_levenshtein_function(self): df = self.spark.createDataFrame([('kitten', 'sitting')], ['l', 'r']) - actual1 = df.select(F.levenshtein(df.l, df.r).alias('b')).collect() - self.assertEqual([Row(b=3)], actual1) - actual2 = df.select(F.levenshtein(df.l, df.r, 2).alias('b')).collect() - self.assertEqual([Row(b=-1)], actual2) + actual_without_threshold = df.select(F.levenshtein(df.l, df.r).alias('b')).collect() + self.assertEqual([Row(b=3)], actual_without_threshold) + actual_with_threshold = df.select(F.levenshtein(df.l, df.r, 2).alias('b')).collect() + self.assertEqual([Row(b=-1)], actual_with_threshold) def test_between_function(self): df = self.spark.createDataFrame( From e426a9ed7077ae35d8726313ce566b7e68a93d7e Mon Sep 17 00:00:00 2001 From: panbingkun <84731559@qq.com> Date: Thu, 25 May 2023 11:37:01 +0800 Subject: [PATCH 3/5] Update python/pyspark/sql/tests/connect/test_connect_function.py Co-authored-by: Ruifeng Zheng --- python/pyspark/sql/tests/connect/test_connect_function.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py b/python/pyspark/sql/tests/connect/test_connect_function.py index c5a27270476a2..9e5e9319119c4 100644 --- a/python/pyspark/sql/tests/connect/test_connect_function.py +++ b/python/pyspark/sql/tests/connect/test_connect_function.py @@ -1920,7 +1920,6 @@ def test_string_functions_multi_args(self): cdf.select(CF.substring_index(cdf.e, ".", 2)).toPandas(), sdf.select(SF.substring_index(sdf.e, ".", 2)).toPandas(), ) - self.assert_eq( cdf.select(CF.levenshtein(cdf.b, cdf.c)).toPandas(), sdf.select(SF.levenshtein(sdf.b, sdf.c)).toPandas(), From 5dd507b74bf78a689fb0a755fe426c5ec9aede7c Mon Sep 17 00:00:00 2001 From: panbingkun Date: Thu, 25 May 2023 13:35:10 +0800 Subject: [PATCH 4/5] [SPARK-43773][PYTHON] Implement 'levenshtein(str1, str2[, threshold])' functions in python client --- python/pyspark/sql/functions.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index b82bbfa43c430..4215b6f9b217a 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -6604,9 +6604,6 @@ def levenshtein( .. versionchanged:: 3.4.0 Supports Spark Connect. - .. versionchanged:: 3.5.0 - Supports Spark Connect. - Parameters ---------- left : :class:`~pyspark.sql.Column` or str @@ -6617,6 +6614,9 @@ def levenshtein( if set when the levenshtein distance of the two given strings less than or equal to a given threshold then return result distance, or -1 + .. versionchanged: 3.5.0 + Added ``threshold`` argument. + Returns ------- :class:`~pyspark.sql.Column` From f04af9f347e3056da3f9084740f94b5eb9158c77 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Fri, 26 May 2023 10:25:49 +0800 Subject: [PATCH 5/5] [SPARK-43773][PYTHON] Implement 'levenshtein(str1, str2[, threshold])' functions in python client --- python/pyspark/sql/functions.py | 3 ++- python/pyspark/sql/tests/connect/test_connect_function.py | 2 +- python/pyspark/sql/tests/test_functions.py | 6 +++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 4215b6f9b217a..fe35f12c40215 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -6634,7 +6634,8 @@ def levenshtein( return _invoke_function_over_columns("levenshtein", left, right) else: return _invoke_function( - "levenshtein", _to_java_column(left), _to_java_column(right), threshold) + "levenshtein", _to_java_column(left), _to_java_column(right), threshold + ) @try_remote_functions diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py b/python/pyspark/sql/tests/connect/test_connect_function.py index 9e5e9319119c4..3e3b4dd5b1654 100644 --- a/python/pyspark/sql/tests/connect/test_connect_function.py +++ b/python/pyspark/sql/tests/connect/test_connect_function.py @@ -1928,7 +1928,7 @@ def test_string_functions_multi_args(self): cdf.select(CF.levenshtein(cdf.b, cdf.c, 1)).toPandas(), sdf.select(SF.levenshtein(sdf.b, sdf.c, 1)).toPandas(), ) - + self.assert_eq( cdf.select(CF.locate("e", cdf.b)).toPandas(), sdf.select(SF.locate("e", sdf.b)).toPandas(), diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index 2af92ded2a495..72c6c365b804b 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -378,10 +378,10 @@ def test_array_contains_function(self): self.assertEqual([Row(b=True), Row(b=False)], actual) def test_levenshtein_function(self): - df = self.spark.createDataFrame([('kitten', 'sitting')], ['l', 'r']) - actual_without_threshold = df.select(F.levenshtein(df.l, df.r).alias('b')).collect() + df = self.spark.createDataFrame([("kitten", "sitting")], ["l", "r"]) + actual_without_threshold = df.select(F.levenshtein(df.l, df.r).alias("b")).collect() self.assertEqual([Row(b=3)], actual_without_threshold) - actual_with_threshold = df.select(F.levenshtein(df.l, df.r, 2).alias('b')).collect() + actual_with_threshold = df.select(F.levenshtein(df.l, df.r, 2).alias("b")).collect() self.assertEqual([Row(b=-1)], actual_with_threshold) def test_between_function(self):