Skip to content

Commit

Permalink
[SPARK-22274][PYTHON][SQL] User-defined aggregation functions with pa…
Browse files Browse the repository at this point in the history
…ndas udf (full shuffle)

## What changes were proposed in this pull request?

Add support for using pandas UDFs with groupby().agg().

This PR introduces a new type of pandas UDF - group aggregate pandas UDF. This type of UDF defines a transformation of multiple pandas Series -> a scalar value. Group aggregate pandas UDFs can be used with groupby().agg(). Note group aggregate pandas UDF doesn't support partial aggregation, i.e., a full shuffle is required.

This PR doesn't support group aggregate pandas UDFs that return ArrayType, StructType or MapType. Support for these types is left for future PR.

## How was this patch tested?

GroupbyAggPandasUDFTests

Author: Li Jin <ice.xelloss@gmail.com>

Closes #19872 from icexelloss/SPARK-22274-groupby-agg.
  • Loading branch information
icexelloss authored and ueshin committed Jan 23, 2018
1 parent 51eb750 commit b2ce17b
Show file tree
Hide file tree
Showing 15 changed files with 792 additions and 61 deletions.
Expand Up @@ -39,12 +39,14 @@ private[spark] object PythonEvalType {

val SQL_PANDAS_SCALAR_UDF = 200
val SQL_PANDAS_GROUP_MAP_UDF = 201
val SQL_PANDAS_GROUP_AGG_UDF = 202

def toString(pythonEvalType: Int): String = pythonEvalType match {
case NON_UDF => "NON_UDF"
case SQL_BATCHED_UDF => "SQL_BATCHED_UDF"
case SQL_PANDAS_SCALAR_UDF => "SQL_PANDAS_SCALAR_UDF"
case SQL_PANDAS_GROUP_MAP_UDF => "SQL_PANDAS_GROUP_MAP_UDF"
case SQL_PANDAS_GROUP_AGG_UDF => "SQL_PANDAS_GROUP_AGG_UDF"
}
}

Expand Down
1 change: 1 addition & 0 deletions python/pyspark/rdd.py
Expand Up @@ -70,6 +70,7 @@ class PythonEvalType(object):

SQL_PANDAS_SCALAR_UDF = 200
SQL_PANDAS_GROUP_MAP_UDF = 201
SQL_PANDAS_GROUP_AGG_UDF = 202


def portable_hash(x):
Expand Down
36 changes: 34 additions & 2 deletions python/pyspark/sql/functions.py
Expand Up @@ -2089,6 +2089,8 @@ class PandasUDFType(object):

GROUP_MAP = PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF

GROUP_AGG = PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF


@since(1.3)
def udf(f=None, returnType=StringType()):
Expand Down Expand Up @@ -2159,7 +2161,7 @@ def pandas_udf(f=None, returnType=None, functionType=None):
1. SCALAR
A scalar UDF defines a transformation: One or more `pandas.Series` -> A `pandas.Series`.
The returnType should be a primitive data type, e.g., `DoubleType()`.
The returnType should be a primitive data type, e.g., :class:`DoubleType`.
The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
Scalar UDFs are used with :meth:`pyspark.sql.DataFrame.withColumn` and
Expand Down Expand Up @@ -2221,6 +2223,35 @@ def pandas_udf(f=None, returnType=None, functionType=None):
.. seealso:: :meth:`pyspark.sql.GroupedData.apply`
3. GROUP_AGG
A group aggregate UDF defines a transformation: One or more `pandas.Series` -> A scalar
The `returnType` should be a primitive data type, e.g., :class:`DoubleType`.
The returned scalar can be either a python primitive type, e.g., `int` or `float`
or a numpy data type, e.g., `numpy.int64` or `numpy.float64`.
:class:`ArrayType`, :class:`MapType` and :class:`StructType` are currently not supported as
output types.
Group aggregate UDFs are used with :meth:`pyspark.sql.GroupedData.agg`
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> @pandas_udf("double", PandasUDFType.GROUP_AGG) # doctest: +SKIP
... def mean_udf(v):
... return v.mean()
>>> df.groupby("id").agg(mean_udf(df['v'])).show() # doctest: +SKIP
+---+-----------+
| id|mean_udf(v)|
+---+-----------+
| 1| 1.5|
| 2| 6.0|
+---+-----------+
.. seealso:: :meth:`pyspark.sql.GroupedData.agg`
.. note:: The user-defined functions are considered deterministic by default. Due to
optimization, duplicate invocations may be eliminated or the function may even be invoked
more times than it is present in the query. If your function is not deterministic, call
Expand Down Expand Up @@ -2267,7 +2298,8 @@ def pandas_udf(f=None, returnType=None, functionType=None):
raise ValueError("Invalid returnType: returnType can not be None")

if eval_type not in [PythonEvalType.SQL_PANDAS_SCALAR_UDF,
PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF]:
PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF,
PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF]:
raise ValueError("Invalid functionType: "
"functionType must be one the values from PandasUDFType")

Expand Down
33 changes: 28 additions & 5 deletions python/pyspark/sql/group.py
Expand Up @@ -65,13 +65,27 @@ def __init__(self, jgd, df):
def agg(self, *exprs):
"""Compute aggregates and returns the result as a :class:`DataFrame`.
The available aggregate functions are `avg`, `max`, `min`, `sum`, `count`.
The available aggregate functions can be:
1. built-in aggregation functions, such as `avg`, `max`, `min`, `sum`, `count`
2. group aggregate pandas UDFs, created with :func:`pyspark.sql.functions.pandas_udf`
.. note:: There is no partial aggregation with group aggregate UDFs, i.e.,
a full shuffle is required. Also, all the data of a group will be loaded into
memory, so the user should be aware of the potential OOM risk if data is skewed
and certain groups are too large to fit in memory.
.. seealso:: :func:`pyspark.sql.functions.pandas_udf`
If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
is the column to perform aggregation on, and the value is the aggregate function.
Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
.. note:: Built-in aggregation functions and group aggregate pandas UDFs cannot be mixed
in a single call to this function.
:param exprs: a dict mapping from column name (string) to aggregate functions (string),
or a list of :class:`Column`.
Expand All @@ -82,6 +96,13 @@ def agg(self, *exprs):
>>> from pyspark.sql import functions as F
>>> sorted(gdf.agg(F.min(df.age)).collect())
[Row(name=u'Alice', min(age)=2), Row(name=u'Bob', min(age)=5)]
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> @pandas_udf('int', PandasUDFType.GROUP_AGG) # doctest: +SKIP
... def min_udf(v):
... return v.min()
>>> sorted(gdf.agg(min_udf(df.age)).collect()) # doctest: +SKIP
[Row(name=u'Alice', min_udf(age)=2), Row(name=u'Bob', min_udf(age)=5)]
"""
assert exprs, "exprs should not be empty"
if len(exprs) == 1 and isinstance(exprs[0], dict):
Expand Down Expand Up @@ -204,16 +225,18 @@ def apply(self, udf):
The user-defined function should take a `pandas.DataFrame` and return another
`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame`
to the user-function and the returned `pandas.DataFrame`s are combined as a
to the user-function and the returned `pandas.DataFrame` are combined as a
:class:`DataFrame`.
The returned `pandas.DataFrame` can be of arbitrary length and its schema must match the
returnType of the pandas udf.
This function does not support partial aggregation, and requires shuffling all the data in
the :class:`DataFrame`.
.. note:: This function requires a full shuffle. all the data of a group will be loaded
into memory, so the user should be aware of the potential OOM risk if data is skewed
and certain groups are too large to fit in memory.
:param udf: a group map user-defined function returned by
:meth:`pyspark.sql.functions.pandas_udf`.
:func:`pyspark.sql.functions.pandas_udf`.
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df = spark.createDataFrame(
Expand Down

0 comments on commit b2ce17b

Please sign in to comment.