Skip to content
Permalink
Browse files

[SPARK-23261][PYSPARK][BACKPORT-2.3] Rename Pandas UDFs

This PR is to backport #20428 to Spark 2.3 without adding the changes regarding `GROUPED AGG PANDAS UDF`

---

## What changes were proposed in this pull request?
Rename the public APIs and names of pandas udfs.

- `PANDAS SCALAR UDF` -> `SCALAR PANDAS UDF`
- `PANDAS GROUP MAP UDF` -> `GROUPED MAP PANDAS UDF`

## How was this patch tested?
The existing tests

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20439 from gatorsmile/backport2.3.
  • Loading branch information...
gatorsmile committed Jan 31, 2018
1 parent f4802dc commit 7b9fe08658b529901a5f22bf81ffdd4410180809
@@ -37,14 +37,14 @@ private[spark] object PythonEvalType {

val SQL_BATCHED_UDF = 100

val SQL_PANDAS_SCALAR_UDF = 200
val SQL_PANDAS_GROUP_MAP_UDF = 201
val SQL_SCALAR_PANDAS_UDF = 200
val SQL_GROUPED_MAP_PANDAS_UDF = 201

def toString(pythonEvalType: Int): String = pythonEvalType match {
case NON_UDF => "NON_UDF"
case SQL_BATCHED_UDF => "SQL_BATCHED_UDF"
case SQL_PANDAS_SCALAR_UDF => "SQL_PANDAS_SCALAR_UDF"
case SQL_PANDAS_GROUP_MAP_UDF => "SQL_PANDAS_GROUP_MAP_UDF"
case SQL_SCALAR_PANDAS_UDF => "SQL_SCALAR_PANDAS_UDF"
case SQL_GROUPED_MAP_PANDAS_UDF => "SQL_GROUPED_MAP_PANDAS_UDF"
}
}

@@ -1684,7 +1684,7 @@ Spark will fall back to create the DataFrame without Arrow.
Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and
Pandas to work with the data. A Pandas UDF is defined using the keyword `pandas_udf` as a decorator
or to wrap the function, no additional configuration is required. Currently, there are two types of
Pandas UDF: Scalar and Group Map.
Pandas UDF: Scalar and Grouped Map.
### Scalar
@@ -1702,8 +1702,8 @@ The following example shows how to create a scalar Pandas UDF that computes the
</div>
</div>
### Group Map
Group map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern.
### Grouped Map
Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern.
Split-apply-combine consists of three steps:
* Split the data into groups by using `DataFrame.groupBy`.
* Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The
@@ -1723,7 +1723,7 @@ The following example shows how to use `groupby().apply()` to subtract the mean
<div class="codetabs">
<div data-lang="python" markdown="1">
{% include_example group_map_pandas_udf python/sql/arrow.py %}
{% include_example grouped_map_pandas_udf python/sql/arrow.py %}
</div>
</div>
@@ -86,15 +86,15 @@ def multiply_func(a, b):
# $example off:scalar_pandas_udf$


def group_map_pandas_udf_example(spark):
# $example on:group_map_pandas_udf$
def grouped_map_pandas_udf_example(spark):
# $example on:grouped_map_pandas_udf$
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def substract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
@@ -110,7 +110,7 @@ def substract_mean(pdf):
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
# $example off:group_map_pandas_udf$
# $example off:grouped_map_pandas_udf$


if __name__ == "__main__":
@@ -123,7 +123,7 @@ def substract_mean(pdf):
dataframe_with_arrow_example(spark)
print("Running pandas_udf scalar example")
scalar_pandas_udf_example(spark)
print("Running pandas_udf group map example")
group_map_pandas_udf_example(spark)
print("Running pandas_udf grouped map example")
grouped_map_pandas_udf_example(spark)

spark.stop()
@@ -68,8 +68,8 @@ class PythonEvalType(object):

SQL_BATCHED_UDF = 100

SQL_PANDAS_SCALAR_UDF = 200
SQL_PANDAS_GROUP_MAP_UDF = 201
SQL_SCALAR_PANDAS_UDF = 200
SQL_GROUPED_MAP_PANDAS_UDF = 201


def portable_hash(x):
@@ -1737,8 +1737,8 @@ def translate(srcCol, matching, replace):
def create_map(*cols):
"""Creates a new map column.
:param cols: list of column names (string) or list of :class:`Column` expressions that grouped
as key-value pairs, e.g. (key1, value1, key2, value2, ...).
:param cols: list of column names (string) or list of :class:`Column` expressions that are
grouped as key-value pairs, e.g. (key1, value1, key2, value2, ...).
>>> df.select(create_map('name', 'age').alias("map")).collect()
[Row(map={u'Alice': 2}), Row(map={u'Bob': 5})]
@@ -2085,9 +2085,9 @@ def map_values(col):
class PandasUDFType(object):
"""Pandas UDF Types. See :meth:`pyspark.sql.functions.pandas_udf`.
"""
SCALAR = PythonEvalType.SQL_PANDAS_SCALAR_UDF
SCALAR = PythonEvalType.SQL_SCALAR_PANDAS_UDF

GROUP_MAP = PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF
GROUPED_MAP = PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF


@since(1.3)
@@ -2191,20 +2191,20 @@ def pandas_udf(f=None, returnType=None, functionType=None):
Therefore, this can be used, for example, to ensure the length of each returned
`pandas.Series`, and can not be used as the column length.
2. GROUP_MAP
2. GROUPED_MAP
A group map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`
A grouped map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`
The returnType should be a :class:`StructType` describing the schema of the returned
`pandas.DataFrame`.
The length of the returned `pandas.DataFrame` can be arbitrary.
Group map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`.
Grouped map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`.
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v")) # doctest: +SKIP
>>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP) # doctest: +SKIP
>>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) # doctest: +SKIP
... def normalize(pdf):
... v = pdf.v
... return pdf.assign(v=(v - v.mean()) / v.std())
@@ -2254,20 +2254,20 @@ def pandas_udf(f=None, returnType=None, functionType=None):
eval_type = returnType
else:
# @pandas_udf(dataType) or @pandas_udf(returnType=dataType)
eval_type = PythonEvalType.SQL_PANDAS_SCALAR_UDF
eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF
else:
return_type = returnType

if functionType is not None:
eval_type = functionType
else:
eval_type = PythonEvalType.SQL_PANDAS_SCALAR_UDF
eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF

if return_type is None:
raise ValueError("Invalid returnType: returnType can not be None")

if eval_type not in [PythonEvalType.SQL_PANDAS_SCALAR_UDF,
PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF]:
if eval_type not in [PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF]:
raise ValueError("Invalid functionType: "
"functionType must be one the values from PandasUDFType")

@@ -212,14 +212,14 @@ def apply(self, udf):
This function does not support partial aggregation, and requires shuffling all the data in
the :class:`DataFrame`.
:param udf: a group map user-defined function returned by
:meth:`pyspark.sql.functions.pandas_udf`.
:param udf: a grouped map user-defined function returned by
:func:`pyspark.sql.functions.pandas_udf`.
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP) # doctest: +SKIP
>>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) # doctest: +SKIP
... def normalize(pdf):
... v = pdf.v
... return pdf.assign(v=(v - v.mean()) / v.std())
@@ -239,9 +239,9 @@ def apply(self, udf):
"""
# Columns are special because hasattr always return True
if isinstance(udf, Column) or not hasattr(udf, 'func') \
or udf.evalType != PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF:
or udf.evalType != PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
raise ValueError("Invalid udf: the udf argument must be a pandas_udf of type "
"GROUP_MAP.")
"GROUPED_MAP.")
df = self._df
udf_column = udf(*[df[col] for col in df.columns])
jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())

0 comments on commit 7b9fe08

Please sign in to comment.
You can’t perform that action at this time.