Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43942][CONNECT][PYTHON] Add string functions to Scala and Python - part 1 #41561

Closed
wants to merge 18 commits into from

Conversation

panbingkun
Copy link
Contributor

@panbingkun panbingkun commented Jun 13, 2023

What changes were proposed in this pull request?

Add following functions:

  • char
  • btrim
  • char_length
  • character_length
  • chr
  • contains
  • elt
  • find_in_set
  • like
  • ilike
  • lcase
  • ucase
  • len: Because it conflicts with Python keywords, and we already have length
  • left
  • right

to:

  • Scala API
  • Python API
  • Spark Connect Scala Client
  • Spark Connect Python Client

Why are the changes needed?

for parity

Does this PR introduce any user-facing change?

Yes, new functions.

How was this patch tested?

  • Add New UT.

* @since 3.5.0
*/
def contains(left: Column, right: Column): Column = withExpr {
ContainsExpressionBuilder.build("contains", Seq(left.expr, right.expr))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not Contains(left, right)?

Copy link
Contributor Author

@panbingkun panbingkun Jun 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Contains(left, right) only supports StringType, not include BinaryType.

object ContainsExpressionBuilder extends StringBinaryPredicateExpressionBuilderBase {
override protected def createStringPredicate(left: Expression, right: Expression): Expression = {
Contains(left, right)
}
}

trait StringBinaryPredicateExpressionBuilderBase extends ExpressionBuilder {
override def build(funcName: String, expressions: Seq[Expression]): Expression = {
val numArgs = expressions.length
if (numArgs == 2) {
if (expressions(0).dataType == BinaryType && expressions(1).dataType == BinaryType) {
BinaryPredicate(funcName, expressions(0), expressions(1))
} else {
createStringPredicate(expressions(0), expressions(1))
}
} else {
throw QueryCompilationErrors.wrongNumArgsError(funcName, Seq(2), numArgs)
}
}
protected def createStringPredicate(left: Expression, right: Expression): Expression
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, then I have another question:
does it works with df.select(contains(col("a"), col("b")))?
in this case the underlying expressions are not resolved, and I guess it can not get the datatype

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right, so do we only support stringType here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should only support StringType here, and document this limitation (contains in sql support both StringType and BinaryType).

also cc @HyukjinKwon @beliefer @cloud-fan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I see the limit. Because scala API wrapped column with unresolved expression, it seems we can't reuse the *ExpressionBuilder here.

@@ -8159,6 +8159,382 @@ def to_number(col: "ColumnOrName", format: "ColumnOrName") -> Column:
return _invoke_function_over_columns("to_number", col, format)


@try_remote_functions
def char(n: "ColumnOrName") -> Column:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i suggest we use col: "ColumnOrName" for similar cases in python

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g.

@try_remote_functions
def upper(col: "ColumnOrName") -> Column:
"""
Converts a string expression to upper case.
.. versionadded:: 1.5.0
.. versionchanged:: 3.4.0
Supports Spark Connect.
Parameters
----------
col : :class:`~pyspark.sql.Column` or str
target column to work on.
Returns
-------
:class:`~pyspark.sql.Column`
upper case values.
Examples
--------
>>> df = spark.createDataFrame(["Spark", "PySpark", "Pandas API"], "STRING")
>>> df.select(upper("value")).show()
+------------+
|upper(value)|
+------------+
| SPARK|
| PYSPARK|
| PANDAS API|
+------------+
"""
return _invoke_function_over_columns("upper", col)
@try_remote_functions
def lower(col: "ColumnOrName") -> Column:
"""
Converts a string expression to lower case.
.. versionadded:: 1.5.0
.. versionchanged:: 3.4.0
Supports Spark Connect.
Parameters
----------
col : :class:`~pyspark.sql.Column` or str
target column to work on.
Returns
-------
:class:`~pyspark.sql.Column`
lower case values.
Examples
--------
>>> df = spark.createDataFrame(["Spark", "PySpark", "Pandas API"], "STRING")
>>> df.select(lower("value")).show()
+------------+
|lower(value)|
+------------+
| spark|
| pyspark|
| pandas api|
+------------+
"""
return _invoke_function_over_columns("lower", col)
@try_remote_functions
def ascii(col: "ColumnOrName") -> Column:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@zhengruifeng
Copy link
Contributor

also cc @beliefer

Copy link
Contributor

@beliefer beliefer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update python/docs/source/reference/pyspark.sql/functions.rst too.

* @group string_funcs
* @since 3.5.0
*/
def like(str: Column, pattern: Column): Column = Column.fn("like", str, pattern)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add def like(str: Column, pattern: Column, escapeChar: Column): Column.

Copy link
Contributor Author

@panbingkun panbingkun Jun 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beliefer @zhengruifeng
I don't think it's necessary to add this function for the following reasons:

image

It only has two parameter usage forms. If a third parameter needs to be used, its usage form is:
image
rather than like(str, pattern, escapeChar)

  • In my first local version, I actually implemented this method and reported an error when executing 'SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "connect/testOnly org.apache.spark.sql.connect.ProtoToParsedPlanTestSuite"'
image image

It triggers the following detection logic:

val params = Seq.fill(expressions.size)(classOf[Expression])
val f = constructors.find(_.getParameterTypes.toSeq == params).getOrElse {
val validParametersCount = constructors
.filter(_.getParameterTypes.forall(_ == classOf[Expression]))
.map(_.getParameterCount).distinct.sorted
throw QueryCompilationErrors.wrongNumArgsError(
name, validParametersCount, params.length)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. We still could support it. Please refer the nth_value. You must treat it on special way at the server side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if escapeChar was not supported in SQL syntax, I think we can also hold on supporting it in functions.

If we want to support it now, for the Connect side, we need to add it in SparkConnectPlanner#transformUnregisteredFunction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have already added it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhengruifeng SQL syntax has been supported escapeChar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see, so let's support it in functions too

* @group string_funcs
* @since 3.5.0
*/
def ilike(str: Column, pattern: Column): Column = Column.fn("ilike", str, pattern)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

fn.btrim(fn.col("g"))
}

functionTest("btrim with trim") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btrim with specified trim string

fn.find_in_set(fn.col("g"), fn.col("g"))
}

functionTest("like") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the test cases with escape character.

fn.like(fn.col("g"), fn.col("g"))
}

functionTest("ilike") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

@@ -8159,6 +8159,382 @@ def to_number(col: "ColumnOrName", format: "ColumnOrName") -> Column:
return _invoke_function_over_columns("to_number", col, format)


@try_remote_functions
def char(n: "ColumnOrName") -> Column:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Parameters
----------
left : :class:`~pyspark.sql.Column` or str
Input column or strings.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we describe it more detail?

left : :class:`~pyspark.sql.Column` or str
Input column or strings.
right : :class:`~pyspark.sql.Column` or str
Input column or strings.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Parameters
----------
inputs : :class:`~pyspark.sql.Column` or str
Input column or strings.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Input columns or strings.

ucase.__doc__ = pysparkfuncs.ucase.__doc__


def py_len(str: "ColumnOrName") -> Column:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conflicting with Python keywords len

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let us also skip this one, since we have length

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

// TODO
val df = Seq(("Spark SQL", "Spark")).toDF("a", "b")
checkAnswer(df.selectExpr("contains(a, b)"), Seq(Row(true)))
checkAnswer(df.select(contains(lit("Spark SQL"), lit("Spark"))), Seq(Row(true)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not df.select(contains(col("a"), col("b"))), butcontains(lit("Spark SQL"), lit("Spark"))), because here it can't get the datatype.

Copy link
Contributor

@beliefer beliefer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except three comment.

"""
Returns a boolean. The value is True if right is found inside left.
Returns NULL if either input expression is NULL. Otherwise, returns False.
Both left or right must be of STRING or BINARY type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should update the comments too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, let's also add Notes here, you can refer to

Notes
-----
The position is not zero based, but 1 based index. Returns 0 if the given
value could not be found in the array.


@try_remote_functions
def like(
str: "ColumnOrName", pattern: "ColumnOrName", escapeChar: Optional["str"] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

escapeChar: Optional["ColumnOrName"] ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case, since escapeChar is a Char in the expression constructors, I feel it is a bit confusing if we treat str input as a column name.

shall we follow the Char types in expression? or directly use Column for both scala and python

also cc @HyukjinKwon

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At two weeks ago, I thought escapeChar: Optional["str"] = None is OK. But I'm not sure now.


@try_remote_functions
def ilike(
str: "ColumnOrName", pattern: "ColumnOrName", escapeChar: Optional["str"] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

@@ -67,7 +67,7 @@ def test_function_parity(self):

# For functions that are named differently in pyspark this is the mapping of their
# python name to the JVM equivalent
py_equiv_jvm = {"create_map": "map"}
py_equiv_jvm = {"create_map": "map", "py_len": "len"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this change any more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

* @group string_funcs
* @since 3.5.0
*/
def ilike(str: Column, pattern: Column, escapeChar: Char): Column = withExpr {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

escapeChar: Column

* @group string_funcs
* @since 3.5.0
*/
def like(str: Column, pattern: Column, escapeChar: Char): Column = withExpr {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

escapeChar: Column

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case class Like(left: Expression, right: Expression, escapeChar: Char)

Do we need to implement escapeChar as a Column here? @zhengruifeng

Comment on lines 3889 to 3891
* @note
* This is different from the `contains` method in SQL that supports both STRING and BINARY
* type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @note
* This is different from the `contains` method in SQL that supports both STRING and BINARY
* type.
* @note Only STRING type is supported in this function, while `contains` in SQL supports both STRING and BINARY.

"""
Returns a boolean. The value is True if right is found inside left.
Returns NULL if either input expression is NULL. Otherwise, returns False.
Both left or right must be of STRING or BINARY type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, let's also add Notes here, you can refer to

Notes
-----
The position is not zero based, but 1 based index. Returns 0 if the given
value could not be found in the array.

@panbingkun
Copy link
Contributor Author

Update all, exclude:
1.In Scala side: escapeChar: Char -> escapeChar: Column
2.In Python side: escapeChar: Optional["str"] -> Optional["ColumnOrName"]

@beliefer
Copy link
Contributor

Update all, exclude:
1.In Scala side: escapeChar: Char -> escapeChar: Column
2.In Python side: escapeChar: Optional["str"] -> Optional["ColumnOrName"]

OK. Please wait for @HyukjinKwon 's opinion.

@panbingkun
Copy link
Contributor Author

panbingkun commented Jun 17, 2023

@beliefer There is a small bug in PR SPARK-43938
image
Maybe need to exclude 'getbit' in DataFrameFunctionsSuite
image

@beliefer
Copy link
Contributor

@panbingkun Please wait #41640

@zhengruifeng
Copy link
Contributor

Update all, exclude: 1.In Scala side: escapeChar: Char -> escapeChar: Column 2.In Python side: escapeChar: Optional["str"] -> Optional["ColumnOrName"]

thanks for the update, my last suggestion would be

1.In Scala side: escapeChar: Char -> escapeChar: Column
2.In Python side: escapeChar: Optional["str"] -> Optional["Column"].  

I still feel it would be ambiguous if treat str as the column name here; so I suggest to use Optional["Column"] and then support other types later (if we support a type now, it is nearly impossible to remove it)

@@ -125,6 +125,13 @@ abstract class StringRegexExpression extends BinaryExpression
case class Like(left: Expression, right: Expression, escapeChar: Char)
extends StringRegexExpression {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For support Column, this construction method has to be added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use escapeChar.eval() in the functions side.

Copy link
Contributor

@beliefer beliefer Jun 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because escapeChar is a feature provided myself. I know some background.
The escape char must be a constant. So we didn't support it as expression.
I agree the comment wrote by @zhengruifeng . Please check escapeChar in the functions API.

@@ -125,6 +125,13 @@ abstract class StringRegexExpression extends BinaryExpression
case class Like(left: Expression, right: Expression, escapeChar: Char)
extends StringRegexExpression {

Copy link
Contributor

@beliefer beliefer Jun 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because escapeChar is a feature provided myself. I know some background.
The escape char must be a constant. So we didn't support it as expression.
I agree the comment wrote by @zhengruifeng . Please check escapeChar in the functions API.

* @since 3.5.0
*/
def like(str: Column, pattern: Column, escapeChar: Column): Column = withExpr {
new Like(str.expr, pattern.expr, escapeChar.expr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can check escapeChar here.

@panbingkun
Copy link
Contributor Author

Update all done.

@@ -395,6 +395,11 @@
"Input to <functionName> should all be the same type, but it's <dataType>."
]
},
"ESCAPE_CHAR_WRONG_TYPE" : {
"message" : [
"EscapeChar should be a string literal."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'EscapeChar' should be a string literal of length one.

Comment on lines 4195 to 4196
case escape @ Literal(_, StringType) =>
Like(str.expr, pattern.expr, escape.eval().toString.charAt(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
case escape @ Literal(_, StringType) =>
Like(str.expr, pattern.expr, escape.eval().toString.charAt(0))
case StringLiteral(v) if v.size == 1 =>
Like(str.expr, pattern.expr, v.charAt(0))

Copy link
Contributor

@beliefer beliefer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM expect one comment.

@@ -395,6 +395,11 @@
"Input to <functionName> should all be the same type, but it's <dataType>."
]
},
"ESCAPE_CHAR_WRONG_TYPE" : {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a new thought, we can simplify to INVALID_ESCAPE_CHAR.

@zhengruifeng
Copy link
Contributor

I am going to merge this PR after all tests pass, and I just notice that we can support BINARY via call_udf (see #41659), please help check it and send a follow-up PR if it works.

@panbingkun
Copy link
Contributor Author

I am going to merge this PR after all tests pass, and I just notice that we can support BINARY via call_udf (see #41659), please help check it and send a follow-up PR if it works.

👌🏻, let me to check it

@zhengruifeng
Copy link
Contributor

@panbingkun @beliefer thank you guys!

merged to master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants