Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Fix UDF bugs in the pandas backend #1637

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
134 changes: 92 additions & 42 deletions docs/source/udf.rst
Expand Up @@ -10,51 +10,96 @@ topic.
This section of the documentation will discuss some of the backend specific
details of user defined functions.

API
---
.. warning::

.. _udf.api:
The UDF API is provisional and subject to change.

.. warning::
.. _udf.pandas:

Pandas
------
Ibis supports defining three kinds of user-defined functions for operations on
expressions targeting the pandas backend: **element-wise**, **reduction**, and
**analytic**.

.. _udf.elementwise:

The UDF/UDAF API is experimental. It is provisional and subject to change.
Element-wise Functions
~~~~~~~~~~~~~~~~~~~~~~
An **element-wise** function is a function that takes N rows as input and
produces N rows of output. ``log``, ``exp``, and ``floor`` are examples of
element-wise functions.

The API for user defined *scalar* functions will look like this:
Here's how to define an element-wise function:

.. code-block:: python

@udf(input_type=[double], output_type=double)
import ibis.expr.datatypes as dt
from ibis.pandas import udf

@udf.elementwise(input_type=[dt.int64], output_type=.dtdouble)
def add_one(x):
return x + 1.0

.. _udf.reduction:

User defined *aggregate* functions are nearly identical, with the exception
of using the ``@udaf`` decorator instead of the ``@udf`` decorator.
Reduction Functions
~~~~~~~~~~~~~~~~~~~
A **reduction** is a function that takes N rows as input and produces 1 row
as output. ``sum``, ``mean`` and ``count`` are examples of reductions. In
the context of a ``GROUP BY``, reductions produce 1 row of output *per
group*.

Impala
------
Here's how to define a reduction function:

.. _udf.impala:
.. code-block:: python

TODO
import ibis.expr.datatypes as dt
from ibis.pandas import udf

Pandas
------
@udf.reduction(input_type=[dt.double], output_type=.dtdouble)
def double_mean(series):
return 2 * series.mean()

.. _udf.pandas:
.. _udf.analytic:

Analytic Functions
~~~~~~~~~~~~~~~~~~
An **analytic** function is like an **element-wise** function in that it
takes N rows as input and produces N rows of output. The key difference is
that analytic functions can be applied *per group* using window functions.
Z-score is an example of an analytic function.

Pandas supports defining both UDFs and UDAFs.
Here's how to define an analytic function:

When you define a UDF you automatically get support for applying that UDF in a
scalar context, as well as in any group by operation.
.. code-block:: python

When you define a UDAF you automatically get support for standard scalar
aggregations, group bys, *as well as* any supported windowing operation.
import ibis.expr.datatypes as dt
from ibis.pandas import udf

@udf.analytic(input_type=[dt.double], output_type=.dtdouble)
def zscore(series):
return (series - series.mean()) / series.std()

Details of Pandas UDFs
~~~~~~~~~~~~~~~~~~~~~~
- :ref:`Element-wise functions <udf.element>` automatically provide support for
applying your UDF to any combination of scalar values and columns.
- :ref:`Reduction functions <udf.reduction>` automatically provide support for
whole column aggregations, grouped aggregations, and application of your
function over a window.
- :ref:`Analytic functions <udf.analytic>` work in both grouped and non-grouped
settings
- The objects you receive as input arguments are either ``pandas.Series`` or
Python/NumPy scalars.

The API for these functions is the same as described above.
.. note::

The objects you receive as input arguments are either ``pandas.Series`` or
python or numpy scalars depending on the operation.
Any keyword arguments must be given a default value or the function **will
not work**.

A common Python convention is to set the default value to ``None`` and
handle setting it to something not ``None`` in the body of the function.

Using ``add_one`` from above as an example, the following call will receive a
``pandas.Series`` for the ``x`` argument:
Expand All @@ -74,32 +119,34 @@ And this will receive the ``int`` 1:

>>> expr = add_one(1)

Finally, since the pandas backend passes around ``**kwargs`` you can accept
``**kwargs`` in your function:
Since the pandas backend passes around ``**kwargs`` you can accept ``**kwargs``
in your function:

.. code-block:: python

@udf([double], double)
def add_one(x, **kwargs):
return x + 1.0
import ibis.expr.datatypes as dt
from ibis.pandas import udf

Or you can leave them out as we did in the example above. You can also
optionally accept *specific* keyword arguments. This requires knowledge of how
the pandas backend works for it to be useful:
@udf.elementwise([dt.int64], dt.double)
def add_two(x, **kwargs):
# do stuff with kwargs
return x + 2.0

.. note::

Any keyword arguments (other than ``**kwargs``) must be given a default
value or the UDF/UDAF **will not work**. A standard Python convention is to
set the default value to ``None``.
Or you can leave them out as we did in the example above. You can also
optionally accept specific keyword arguments.

For example:

.. code-block:: python

@udf([double], double)
def add_one(x, scope=None):
return x + 1.0
import ibis.expr.datatypes as dt
from ibis.pandas import udf

@udf.elementwise([dt.int64], dt.double)
def add_two_with_none(x, y=None):
if y is None:
y = 2.0
return x + y

BigQuery
--------
Expand All @@ -108,7 +155,7 @@ BigQuery

.. note::

BigQuery only supports scalar UDFs at this time.
BigQuery only supports element-wise UDFs at this time.

BigQuery supports UDFs through JavaScript. Ibis provides support for this by
turning Python code into JavaScript.
Expand All @@ -117,7 +164,10 @@ The interface is very similar to the pandas UDF API:

.. code-block:: python

@udf([double], double)
import ibis.expr.datatypes as dt
from ibis.bigquery import udf

@udf([dt.double], dt.double)
def my_bigquery_add_one(x):
return x + 1.0

Expand Down
5 changes: 5 additions & 0 deletions ibis/bigquery/__init__.py
@@ -0,0 +1,5 @@
from ibis.compat import PY2


if not PY2:
from ibis.bigquery.udf.api import udf # noqa: F401
10 changes: 10 additions & 0 deletions ibis/bigquery/api.py
@@ -1,3 +1,5 @@
"""BigQuery public API"""

import google.cloud.bigquery # noqa: F401 fail early if bigquery is missing
import ibis.common as com

Expand All @@ -11,6 +13,14 @@
pass


__all__ = (
'compile',
'connect',
'verify',
'udf',
)


def compile(expr, params=None):
"""
Force compilation of expression as though it were an expression depending
Expand Down
1 change: 0 additions & 1 deletion ibis/bigquery/udf/__init__.py
@@ -1 +0,0 @@
from ibis.bigquery.udf.api import udf # noqa: F401
2 changes: 1 addition & 1 deletion ibis/bigquery/udf/api.py
Expand Up @@ -68,7 +68,7 @@ def udf(input_type, output_type, strict=True, libraries=None):

Examples
--------
>>> from ibis.bigquery.api import udf
>>> from ibis.bigquery import udf
>>> import ibis.expr.datatypes as dt
>>> @udf(input_type=[dt.double], output_type=dt.double)
... def add_one(x):
Expand Down
2 changes: 1 addition & 1 deletion ibis/bigquery/udf/tests/test_udf_execute.py
Expand Up @@ -12,7 +12,7 @@

pytestmark = pytest.mark.bigquery

from ibis.bigquery.api import udf # noqa: E402
from ibis.bigquery import udf # noqa: E402

PROJECT_ID = os.environ.get('GOOGLE_BIGQUERY_PROJECT_ID', 'ibis-gbq')
DATASET_ID = 'testing'
Expand Down
8 changes: 2 additions & 6 deletions ibis/compat.py
Expand Up @@ -15,17 +15,14 @@
if not PY2:
unicode_type = str

def lzip(*x):
return list(zip(*x))

zip = zip
zip_longest = itertools.zip_longest

def viewkeys(x):
return x.keys()

from decimal import Decimal
from inspect import signature, Parameter, _empty
from inspect import signature, Parameter
import unittest.mock as mock
range = range
map = map
Expand All @@ -40,10 +37,9 @@ def viewkeys(x):
except ImportError:
from decimal import Decimal # noqa: F401

from funcsigs import signature, Parameter, _empty # noqa: F401
from funcsigs import signature, Parameter # noqa: F401

unicode_type = unicode # noqa: F821
lzip = zip
zip = itertools.izip
zip_longest = itertools.izip_longest
map = itertools.imap
Expand Down
17 changes: 8 additions & 9 deletions ibis/impala/client.py
@@ -1,9 +1,11 @@
import operator
import re
import six
import threading
import time
import weakref
import traceback
import threading
import weakref

import six

from posixpath import join as pjoin
from collections import deque
Expand All @@ -21,7 +23,7 @@

from ibis.config import options
from ibis.client import Query, Database, DatabaseEntity, SQLClient
from ibis.compat import lzip, parse_version
from ibis.compat import parse_version, zip
from ibis.filesystems import HDFS, WebHDFS
from ibis.impala import udf, ddl
from ibis.impala.compat import impyla, ImpylaError, HS2Error
Expand Down Expand Up @@ -839,10 +841,7 @@ def list_tables(self, like=None, database=None):

def _get_list(self, cur):
tuples = cur.fetchall()
if len(tuples) > 0:
return list(lzip(*tuples)[0])
else:
return []
return list(map(operator.itemgetter(0), tuples))

def set_database(self, name):
"""
Expand All @@ -863,7 +862,7 @@ def exists_database(self, name):
-------
if_exists : boolean
"""
return len(self.list_databases(like=name)) > 0
return bool(self.list_databases(like=name))

def create_database(self, name, path=None, force=False):
"""
Expand Down
2 changes: 1 addition & 1 deletion ibis/pandas/aggcontext.py
Expand Up @@ -327,7 +327,7 @@ def agg(self, grouped_data, function, *args, **kwargs):
if callable(function):
method = operator.methodcaller(
'apply',
_apply(function, args, kwargs)
_apply(function, args, kwargs),
)
else:
assert isinstance(function, six.string_types)
Expand Down
10 changes: 8 additions & 2 deletions ibis/pandas/api.py
Expand Up @@ -4,10 +4,16 @@
import toolz

from ibis.pandas.client import PandasClient
from ibis.pandas.execution import execute, execute_node # noqa: F401
from ibis.pandas.execution import execute, execute_node
from ibis.pandas.udf import udf


__all__ = 'connect', 'execute', 'dialect'
__all__ = (
'connect',
'dialect',
'execute',
'udf',
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
)


def connect(dictionary):
Expand Down