Skip to content

Commit

Permalink
[SPARK-41666][PYTHON] Support parameterized SQL by sql()
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
In the PR, I propose to extend the `sql()` method in PySpark to support parameterized SQL queries, see #38864, and add new parameter - `args` of the type `Dict[str, str]`. This parameter maps named parameters that can occur in the input SQL query to SQL literals like 1, INTERVAL '1-1' YEAR TO MONTH, DATE'2022-12-22' (see [the doc ](https://spark.apache.org/docs/latest/sql-ref-literals.html)of supported literals).

For example:
```python
    >>> spark.sql("SELECT * FROM range(10) WHERE id > :minId", args = {"minId" : "7"})
       id
    0   8
    1   9
```

Closes #39159

### Why are the changes needed?
To achieve feature parity with Scala/Java API, and provide PySpark users the same feature.

### Does this PR introduce _any_ user-facing change?
No, it shouldn't.

### How was this patch tested?
Checked the examples locally, and running the tests:
```
$ python/run-tests --modules=pyspark-sql --parallelism=1
```

Closes #39183 from MaxGekk/parameterized-sql-pyspark-dict.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
MaxGekk committed Dec 23, 2022
1 parent d0ec53c commit a1c727f
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 9 deletions.
6 changes: 3 additions & 3 deletions core/src/main/resources/error/error-classes.json
Expand Up @@ -813,7 +813,7 @@
},
"INVALID_SQL_ARG" : {
"message" : [
"The argument <name> of `sql()` is invalid. Consider to replace it by a SQL literal statement."
"The argument <name> of `sql()` is invalid. Consider to replace it by a SQL literal."
]
},
"INVALID_SQL_SYNTAX" : {
Expand Down Expand Up @@ -1164,7 +1164,7 @@
},
"UNBOUND_SQL_PARAMETER" : {
"message" : [
"Found the unbound parameter: <name>. Please, fix `args` and provide a mapping of the parameter to a SQL literal statement."
"Found the unbound parameter: <name>. Please, fix `args` and provide a mapping of the parameter to a SQL literal."
]
},
"UNCLOSED_BRACKETED_COMMENT" : {
Expand Down Expand Up @@ -5225,4 +5225,4 @@
"grouping() can only be used with GroupingSets/Cube/Rollup"
]
}
}
}
2 changes: 2 additions & 0 deletions python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst
Expand Up @@ -39,3 +39,5 @@ Upgrading from PySpark 3.3 to 3.4
* In Spark 3.4, the ``Series.concat`` sort parameter will be respected to follow pandas 1.4 behaviors.

* In Spark 3.4, the ``DataFrame.__setitem__`` will make a copy and replace pre-existing arrays, which will NOT be over-written to follow pandas 1.4 behaviors.

* In Spark 3.4, the ``SparkSession.sql`` and the Pandas on Spark API ``sql`` have got new parameter ``args`` which provides binding of named parameters to their SQL literals.
20 changes: 18 additions & 2 deletions python/pyspark/pandas/sql_formatter.py
Expand Up @@ -17,7 +17,7 @@

import os
import string
from typing import Any, Optional, Union, List, Sequence, Mapping, Tuple
from typing import Any, Dict, Optional, Union, List, Sequence, Mapping, Tuple
import uuid
import warnings

Expand All @@ -43,6 +43,7 @@
def sql(
query: str,
index_col: Optional[Union[str, List[str]]] = None,
args: Dict[str, str] = {},
**kwargs: Any,
) -> DataFrame:
"""
Expand All @@ -57,6 +58,8 @@ def sql(
* pandas Series
* string
Also the method can bind named parameters to SQL literals from `args`.
Parameters
----------
query : str
Expand Down Expand Up @@ -99,6 +102,12 @@ def sql(
e f 3 6
Also note that the index name(s) should be matched to the existing name.
args : dict
A dictionary of named parameters that begin from the `:` marker and
their SQL literals for substituting.
.. versionadded:: 3.4.0
kwargs
other variables that the user want to set that can be referenced in the query
Expand Down Expand Up @@ -152,6 +161,13 @@ def sql(
0 1
1 2
2 3
And substitude named parameters with the `:` prefix by SQL literals.
>>> ps.sql("SELECT * FROM range(10) WHERE id > :bound1", args={"bound1":"7"})
id
0 8
1 9
"""
if os.environ.get("PYSPARK_PANDAS_SQL_LEGACY") == "1":
from pyspark.pandas import sql_processor
Expand All @@ -166,7 +182,7 @@ def sql(
session = default_session()
formatter = PandasSQLStringFormatter(session)
try:
sdf = session.sql(formatter.format(query, **kwargs))
sdf = session.sql(formatter.format(query, **kwargs), args)
finally:
formatter.clear()

Expand Down
23 changes: 19 additions & 4 deletions python/pyspark/sql/session.py
Expand Up @@ -1293,20 +1293,26 @@ def prepare(obj: Any) -> Any:
df._schema = struct
return df

def sql(self, sqlQuery: str, **kwargs: Any) -> DataFrame:
def sql(self, sqlQuery: str, args: Dict[str, str] = {}, **kwargs: Any) -> DataFrame:
"""Returns a :class:`DataFrame` representing the result of the given query.
When ``kwargs`` is specified, this method formats the given string by using the Python
standard formatter.
standard formatter. The method binds named parameters to SQL literals from `args`.
.. versionadded:: 2.0.0
.. versionchanged:: 3.4.0
Support Spark Connect.
Support Spark Connect and parameterized SQL.
Parameters
----------
sqlQuery : str
SQL query string.
args : dict
A dictionary of named parameters that begin from the `:` marker and
their SQL literals for substituting.
.. versionadded:: 3.4.0
kwargs : dict
Other variables that the user wants to set that can be referenced in the query
Expand Down Expand Up @@ -1380,13 +1386,22 @@ def sql(self, sqlQuery: str, **kwargs: Any) -> DataFrame:
| 2| 4|
| 3| 6|
+---+---+
And substitude named parameters with the `:` prefix by SQL literals.
>>> spark.sql("SELECT * FROM {df} WHERE {df[B]} > :minB", {"minB" : "5"}, df=mydf).show()
+---+---+
| A| B|
+---+---+
| 3| 6|
+---+---+
"""

formatter = SQLStringFormatter(self)
if len(kwargs) > 0:
sqlQuery = formatter.format(sqlQuery, **kwargs)
try:
return DataFrame(self._jsparkSession.sql(sqlQuery), self)
return DataFrame(self._jsparkSession.sql(sqlQuery, args), self)
finally:
if len(kwargs) > 0:
formatter.clear()
Expand Down

0 comments on commit a1c727f

Please sign in to comment.