From 730df430329cbcae5db2771235f8871f5fc02fa0 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Wed, 19 Jun 2019 12:17:08 -0400 Subject: [PATCH] BUG: Partially revert #1758 This caused a few failures with downstream code, so I've reverted the changes to ibis/pandas/core.py. I've left other fixes from #1758 unchanged. I'll revisit the reimplementation in the near future. Author: Phillip Cloud Closes #1837 from cpcloud/revert-topo and squashes the following commits: c9662ac [Phillip Cloud] Just pass kwargs through 5d8ec82 [Phillip Cloud] Make zero argument UDFs work again 9fa4481 [Phillip Cloud] Revert toposort change d9c00ef [Phillip Cloud] Remove unnecessary code deed6aa [Phillip Cloud] Revert select parts --- ibis/file/client.py | 14 +- ibis/file/tests/test_csv.py | 3 + ibis/file/tests/test_hdf5.py | 3 + ibis/file/tests/test_parquet.py | 4 +- ibis/pandas/client.py | 7 +- ibis/pandas/core.py | 431 ++++++++++++--------- ibis/pandas/dispatch.py | 48 +-- ibis/pandas/execution/tests/test_window.py | 2 +- ibis/pandas/execution/window.py | 12 - 9 files changed, 258 insertions(+), 266 deletions(-) diff --git a/ibis/file/client.py b/ibis/file/client.py index 9764bee0e380..fb8b42b44ffa 100644 --- a/ibis/file/client.py +++ b/ibis/file/client.py @@ -2,8 +2,7 @@ import ibis import ibis.expr.types as ir -from ibis.pandas.core import execute -from ibis.pandas.dispatch import execute_last +from ibis.pandas.core import execute_and_reset class FileClient(ibis.client.Client): @@ -35,15 +34,8 @@ def database(self, name=None, path=None): return FileDatabase(name, self, path=path) def execute(self, expr, params=None, **kwargs): # noqa - assert isinstance(expr, ir.Expr), "Expected ir.Expr, got {}".format( - type(expr) - ) - return execute_last( - expr.op(), - execute(expr, params=params, **kwargs), - params=params, - **kwargs, - ) + assert isinstance(expr, ir.Expr) + return execute_and_reset(expr, params=params, **kwargs) def list_tables(self, path=None): raise NotImplementedError diff --git a/ibis/file/tests/test_csv.py b/ibis/file/tests/test_csv.py index f804cc5de2ff..f43159584e67 100644 --- a/ibis/file/tests/test_csv.py +++ b/ibis/file/tests/test_csv.py @@ -74,6 +74,9 @@ def test_read(csv, data): expected['time'] = expected['time'].astype(str) tm.assert_frame_equal(result, expected) + result = closes.execute() + tm.assert_frame_equal(result, expected) + def test_read_with_projection(csv2, data): diff --git a/ibis/file/tests/test_hdf5.py b/ibis/file/tests/test_hdf5.py index 0af8ac1ab381..988d57d4e4d0 100644 --- a/ibis/file/tests/test_hdf5.py +++ b/ibis/file/tests/test_hdf5.py @@ -90,6 +90,9 @@ def test_read(hdf, data): expected = data['close'] tm.assert_frame_equal(result, expected) + result = closes.execute() + tm.assert_frame_equal(result, expected) + def test_insert(transformed, tmpdir): diff --git a/ibis/file/tests/test_parquet.py b/ibis/file/tests/test_parquet.py index e53aea229f3c..8b0ad2110da0 100644 --- a/ibis/file/tests/test_parquet.py +++ b/ibis/file/tests/test_parquet.py @@ -4,7 +4,6 @@ from pandas.util import testing as tm import ibis - from ibis.file.client import FileDatabase pa = pytest.importorskip('pyarrow') # isort:skip @@ -87,6 +86,9 @@ def test_read(parquet, data): expected = data['close'] tm.assert_frame_equal(result, expected) + result = closes.execute() + tm.assert_frame_equal(result, expected) + def test_write(transformed, tmpdir): t = transformed diff --git a/ibis/pandas/client.py b/ibis/pandas/client.py index 6e1a6a73f133..cb13cc65710e 100644 --- a/ibis/pandas/client.py +++ b/ibis/pandas/client.py @@ -20,8 +20,7 @@ import ibis.expr.schema as sch import ibis.expr.types as ir from ibis.compat import CategoricalDtype, DatetimeTZDtype -from ibis.pandas.core import execute -from ibis.pandas.dispatch import execute_last +from ibis.pandas.core import execute_and_reset try: infer_pandas_dtype = pd.api.types.infer_dtype @@ -370,9 +369,7 @@ def execute(self, query, params=None, limit='default', **kwargs): type(query).__name__ ) ) - result = execute(query, params=params, **kwargs) - query_op = query.op() - return execute_last(query_op, result, params=params, **kwargs) + return execute_and_reset(query, params=params, **kwargs) def compile(self, expr, *args, **kwargs): """Compile `expr`. diff --git a/ibis/pandas/core.py b/ibis/pandas/core.py index 8b1ae0f83711..c66a01890c89 100644 --- a/ibis/pandas/core.py +++ b/ibis/pandas/core.py @@ -1,6 +1,6 @@ """The pandas backend is a departure from the typical ibis backend in that it doesn't compile to anything, and the execution of the ibis expression -is under the purview of ibis itself rather than executing SQL against a server. +is under the purview of ibis itself rather than executing SQL on a server. Design ------ @@ -74,30 +74,28 @@ :class:`~ibis.expr.operations.ScalarParameter`, in which case the ``scope`` passed to ``post_execute`` would be the bound values passed in at the time the ``execute`` method was called. - """ -import collections +from __future__ import absolute_import + import datetime import functools import numbers -from typing import Any, Mapping, Optional, Sequence import numpy as np import pandas as pd import toolz +from multipledispatch import Dispatcher import ibis +import ibis.common as com import ibis.expr.datatypes as dt import ibis.expr.operations as ops import ibis.expr.types as ir import ibis.expr.window as win import ibis.pandas.aggcontext as agg_ctx -import ibis.util from ibis.client import find_backends from ibis.pandas.dispatch import ( - execute, - execute_first, execute_literal, execute_node, post_execute, @@ -143,227 +141,282 @@ def is_computable_input_arg(arg): ) -def get_node(obj): - """Attempt to get the underlying :class:`Node` instance from `obj`.""" - try: - return obj.op() - except AttributeError: - return obj - - -def dependencies(expr: ir.Expr): - """Compute the dependencies of an expression. +def execute_with_scope(expr, scope, aggcontext=None, clients=None, **kwargs): + """Execute an expression `expr`, with data provided in `scope`. Parameters ---------- - expr - An ibis expression + expr : ibis.expr.types.Expr + The expression to execute. + scope : collections.Mapping + A dictionary mapping :class:`~ibis.expr.operations.Node` subclass + instances to concrete data such as a pandas DataFrame. + aggcontext : Optional[ibis.pandas.aggcontext.AggregationContext] Returns ------- - dict - Mapping from hashable objects to ibis expression inputs. + result : scalar, pd.Series, pd.DataFrame + """ + op = expr.op() - See Also - -------- - is_computable_input - dependents + # Call pre_execute, to allow clients to intercept the expression before + # computing anything *and* before associating leaf nodes with data. This + # allows clients to provide their own data for each leaf. + if clients is None: + clients = list(find_backends(expr)) - """ - stack = [expr] - dependencies = collections.defaultdict(list) - - while stack: - expr = stack.pop() - node = get_node(expr) - if isinstance(node, collections.abc.Hashable): - if not isinstance(node, ops.Node): - dependencies[node] = [] - if node not in dependencies: - computable_inputs = [ - arg for arg in node.inputs if is_computable_input(arg) - ] - stack.extend(computable_inputs) - dependencies[node].extend(computable_inputs) - return dict(dependencies) - - -def dependents(dependencies): - """Get dependents from dependencies. + if aggcontext is None: + aggcontext = agg_ctx.Summarize() - Parameters - ---------- - dependencies - A mapping from hashable objects to ibis expression inputs. + pre_executed_scope = pre_execute( + op, *clients, scope=scope, aggcontext=aggcontext, **kwargs + ) + new_scope = toolz.merge(scope, pre_executed_scope) + result = execute_until_in_scope( + expr, + new_scope, + aggcontext=aggcontext, + clients=clients, + # XXX: we *explicitly* pass in scope and not new_scope here so that + # post_execute sees the scope of execute_with_scope, not the scope of + # execute_until_in_scope + post_execute_=functools.partial( + post_execute, + scope=scope, + aggcontext=aggcontext, + clients=clients, + **kwargs, + ), + **kwargs, + ) - Returns - ------- - dict - A mapping from hashable objects to expressions that depend on the keys. + return result - See Also - -------- - dependencies - """ - dependents = collections.defaultdict(list) - for node in dependencies.keys(): - dependents[node] = [] +def execute_until_in_scope( + expr, scope, aggcontext=None, clients=None, post_execute_=None, **kwargs +): + """Execute until our op is in `scope`. - for node, deps in dependencies.items(): - for dep in deps: - dependents[get_node(dep)].append(node.to_expr()) - return dict(dependents) + Parameters + ---------- + expr : ibis.expr.types.Expr + scope : Mapping + aggcontext : Optional[AggregationContext] + clients : List[ibis.client.Client] + kwargs : Mapping + """ + # these should never be None + assert aggcontext is not None, 'aggcontext is None' + assert clients is not None, 'clients is None' + assert post_execute_ is not None, 'post_execute_ is None' + + # base case: our op has been computed (or is a leaf data node), so + # return the corresponding value + op = expr.op() + if op in scope: + return scope[op] + + new_scope = execute_bottom_up( + expr, + scope, + aggcontext=aggcontext, + post_execute_=post_execute_, + clients=clients, + **kwargs, + ) + new_scope = toolz.merge( + new_scope, pre_execute(op, *clients, scope=scope, **kwargs) + ) + return execute_until_in_scope( + expr, + new_scope, + aggcontext=aggcontext, + clients=clients, + post_execute_=post_execute_, + **kwargs, + ) -def toposort(expr: ir.Expr): - """Topologically sort the nodes that underly `expr`. +def execute_bottom_up( + expr, scope, aggcontext=None, post_execute_=None, clients=None, **kwargs +): + """Execute `expr` bottom-up. Parameters ---------- - expr - An ibis expression. + expr : ibis.expr.types.Expr + scope : Mapping[ibis.expr.operations.Node, object] + aggcontext : Optional[ibis.pandas.aggcontext.AggregationContext] + kwargs : Dict[str, object] Returns ------- - Tuple - A tuple whose first element is the topologically sorted values required - to compute `expr` and whose second element is the dependencies of - `expr`. - + result : Mapping[ + ibis.expr.operations.Node, + Union[pandas.Series, pandas.DataFrame, scalar_types] + ] + A mapping from node to the computed result of that Node """ - # compute dependencies and dependents - parents = dependencies(expr) - children = dependents(parents) - - # count the number of dependencies each node has - indegree = toolz.valmap(len, parents) + assert post_execute_ is not None, 'post_execute_ is None' + op = expr.op() + + # if we're in scope then return the scope, this will then be passed back + # into execute_bottom_up, which will then terminate + if op in scope: + return scope + elif isinstance(op, ops.Literal): + # special case literals to avoid the overhead of dispatching + # execute_node + return { + op: execute_literal( + op, op.value, expr.type(), aggcontext=aggcontext, **kwargs + ) + } + + # figure out what arguments we're able to compute on based on the + # expressions inputs. things like expressions, None, and scalar types are + # computable whereas ``list``s are not + computable_args = [arg for arg in op.inputs if is_computable_input(arg)] + + # recursively compute each node's arguments until we've changed type + scopes = [ + execute_bottom_up( + arg, + scope, + aggcontext=aggcontext, + post_execute_=post_execute_, + clients=clients, + **kwargs, + ) + if hasattr(arg, 'op') + else {arg: arg} + for arg in computable_args + ] + + # if we're unable to find data then raise an exception + if not scopes and computable_args: + raise com.UnboundExpressionError( + 'Unable to find data for expression:\n{}'.format(repr(expr)) + ) - # queue up the nodes with no dependencies - queue = collections.deque( - node for node, count in indegree.items() if not count + # there should be exactly one dictionary per computable argument + assert len(computable_args) == len(scopes) + + new_scope = toolz.merge(scopes) + + # pass our computed arguments to this node's execute_node implementation + data = [ + new_scope[arg.op()] if hasattr(arg, 'op') else arg + for arg in computable_args + ] + result = execute_node( + op, + *data, + scope=scope, + aggcontext=aggcontext, + clients=clients, + **kwargs, ) + computed = post_execute_(op, result) + return {op: computed} - toposorted = [] - - while queue: - node = queue.popleft() - - # invariant: every element of the queue has indegree 0, i.e., no - # dependencies - assert not indegree[node] - toposorted.append(node) - - # remove the node -> child edge for every child of node - for child in map(get_node, children[node]): - indegree[child] -= 1 - - # if we removed the last edge, enqueue the child - if not indegree[child]: - queue.append(child) - return toposorted, parents +execute = Dispatcher('execute') @execute.register(ir.Expr) -def main_execute( - expr: ir.Expr, - scope: Optional[Mapping] = None, - aggcontext: Optional[agg_ctx.AggregationContext] = None, - clients: Sequence[ibis.client.Client] = (), - params: Optional[Mapping] = None, - **kwargs: Any -): - """Execute an ibis expression against the pandas backend. +def main_execute(expr, params=None, scope=None, aggcontext=None, **kwargs): + """Execute an expression against data that are bound to it. If no data + are bound, raise an Exception. Parameters ---------- - expr - scope - aggcontext - clients - params + expr : ibis.expr.types.Expr + The expression to execute + params : Mapping[ibis.expr.types.Expr, object] + The data that an unbound parameter in `expr` maps to + scope : Mapping[ibis.expr.operations.Node, object] + Additional scope, mapping ibis operations to data + aggcontext : Optional[ibis.pandas.aggcontext.AggregationContext] + An object indicating how to compute aggregations. For example, + a rolling mean needs to be computed differently than the mean of a + column. + kwargs : Dict[str, object] + Additional arguments that can potentially be used by individual node + execution + Returns + ------- + result : Union[ + pandas.Series, pandas.DataFrame, ibis.pandas.core.simple_types + ] + + Raises + ------ + ValueError + * If no data are bound to the input expression """ - toposorted, dependencies = toposort(expr) - params = toolz.keymap(get_node, params if params is not None else {}) - - # Add to scope the objects that have no dependencies and are not ibis - # nodes. We have to filter out nodes for cases--such as zero argument - # UDFs--that do not have any dependencies yet still need to be evaluated. - full_scope = toolz.merge( - scope if scope is not None else {}, - { - key: key - for key, parents in dependencies.items() - if not parents and not isinstance(key, ops.Node) - }, - params, - ) + if scope is None: + scope = {} - if not clients: - clients = list(find_backends(expr)) + if params is None: + params = {} - if aggcontext is None: - aggcontext = agg_ctx.Summarize() + # TODO: make expresions hashable so that we can get rid of these .op() + # calls everywhere + params = {k.op() if hasattr(k, 'op') else k: v for k, v in params.items()} - # give backends a chance to inject scope if needed - execute_first_scope = execute_first( - expr.op(), *clients, scope=full_scope, aggcontext=aggcontext, **kwargs - ) - full_scope = toolz.merge(full_scope, execute_first_scope) + new_scope = toolz.merge(scope, params) + return execute_with_scope(expr, new_scope, aggcontext=aggcontext, **kwargs) - nodes = [node for node in toposorted if node not in full_scope] - # compute the nodes that are not currently in scope - for node in nodes: - # allow clients to pre compute nodes as they like - pre_executed_scope = pre_execute( - node, *clients, scope=full_scope, aggcontext=aggcontext, **kwargs - ) - # merge the existing scope with whatever was returned from pre_execute - execute_scope = toolz.merge(full_scope, pre_executed_scope) - - # if after pre_execute our node is in scope, then there's nothing to do - # in this iteration - if node in execute_scope: - full_scope = execute_scope - else: - # If we're evaluating a literal then we can be a bit quicker about - # evaluating the dispatch graph - if isinstance(node, ops.Literal): - executor = execute_literal - else: - executor = execute_node - - # Gather the inputs we've already computed that the current node - # depends on - execute_args = [ - full_scope[get_node(arg)] for arg in dependencies[node] - ] - - # execute the node with its inputs - execute_node_result = executor( - node, - *execute_args, - aggcontext=aggcontext, - scope=execute_scope, - clients=clients, - **kwargs, - ) +def execute_and_reset( + expr, params=None, scope=None, aggcontext=None, **kwargs +): + """Execute an expression against data that are bound to it. If no data + are bound, raise an Exception. - # last change to perform any additional computation on the result - # before it gets added to scope for the next node - full_scope[node] = post_execute( - node, - execute_node_result, - clients=clients, - aggcontext=aggcontext, - scope=full_scope, - ) + Notes + ----- + The difference between this function and :func:`~ibis.pandas.core.execute` + is that this function resets the index of the result, if the result has + an index. - # the last node in the toposorted graph is the root and maps to the desired - # result in scope - last_node = toposorted[-1] - result = full_scope[last_node] + Parameters + ---------- + expr : ibis.expr.types.Expr + The expression to execute + params : Mapping[ibis.expr.types.Expr, object] + The data that an unbound parameter in `expr` maps to + scope : Mapping[ibis.expr.operations.Node, object] + Additional scope, mapping ibis operations to data + aggcontext : Optional[ibis.pandas.aggcontext.AggregationContext] + An object indicating how to compute aggregations. For example, + a rolling mean needs to be computed differently than the mean of a + column. + kwargs : Dict[str, object] + Additional arguments that can potentially be used by individual node + execution + + Returns + ------- + result : Union[ + pandas.Series, pandas.DataFrame, ibis.pandas.core.simple_types + ] + + Raises + ------ + ValueError + * If no data are bound to the input expression + """ + result = execute( + expr, params=params, scope=scope, aggcontext=aggcontext, **kwargs + ) + if isinstance(result, pd.DataFrame): + schema = expr.schema() + df = result.reset_index() + return df.loc[:, schema.names] + elif isinstance(result, pd.Series): + return result.reset_index(drop=True) return result diff --git a/ibis/pandas/dispatch.py b/ibis/pandas/dispatch.py index c219fea5ccb7..70bcd7e09a18 100644 --- a/ibis/pandas/dispatch.py +++ b/ibis/pandas/dispatch.py @@ -2,7 +2,6 @@ from functools import partial -import pandas as pd import toolz from multipledispatch import Dispatcher @@ -34,8 +33,7 @@ def execute_node_without_scope(node, **kwargs): pre_execute = Dispatcher( 'pre_execute', doc="""\ -Given a node and zero or more clients, compute a partial scope prior to -execution. +Given a node, compute a (possibly partial) scope prior to standard execution. Notes ----- @@ -61,25 +59,6 @@ def pre_execute_multiple_clients(node, *clients, scope=None, **kwargs): ) -execute_first = Dispatcher( - "execute_first", doc="Execute code before any nodes have been evaluated." -) - - -@execute_first.register(ops.Node) -@execute_first.register(ops.Node, ibis.client.Client) -def execute_first_default(node, *clients, **kwargs): - return {} - - -@execute_first.register(ops.Node, [ibis.client.Client]) -def execute_first_multiple_clients(node, *clients, scope=None, **kwargs): - return toolz.merge( - scope, - *map(partial(execute_first, node, scope=scope, **kwargs), clients), - ) - - execute_literal = Dispatcher( 'execute_literal', doc="""\ @@ -119,29 +98,4 @@ def post_execute_default(op, data, **kwargs): return data -execute_last = Dispatcher( - "execute_last", doc="Execute code after all nodes have been evaluated." -) - - -@execute_last.register(ops.Node, object) -def execute_last_default(_, result, **kwargs): - """Return the input result.""" - return result - - -@execute_last.register(ops.Node, pd.DataFrame) -def execute_last_dataframe(op, result, **kwargs): - """Reset the `result` :class:`~pandas.DataFrame`.""" - schema = op.to_expr().schema() - df = result.reset_index() - return df.loc[:, schema.names] - - -@execute_last.register(ops.Node, pd.Series) -def execute_last_series(_, result, **kwargs): - """Reset the `result` :class:`~pandas.Series`.""" - return result.reset_index(drop=True) - - execute = Dispatcher("execute") diff --git a/ibis/pandas/execution/tests/test_window.py b/ibis/pandas/execution/tests/test_window.py index 4644768fbab1..b76bbfbfe17c 100644 --- a/ibis/pandas/execution/tests/test_window.py +++ b/ibis/pandas/execution/tests/test_window.py @@ -503,4 +503,4 @@ def test_pre_execute(op, client, **kwargs): # once in window op at the top to pickup any scope changes before computing # twice in window op when calling execute on the ops.Lag node at the # beginning of execute and once before the actual computation - assert called[0] == 2 + assert called[0] == 3 diff --git a/ibis/pandas/execution/window.py b/ibis/pandas/execution/window.py index 4362c6f61736..f592a5b8f44a 100644 --- a/ibis/pandas/execution/window.py +++ b/ibis/pandas/execution/window.py @@ -147,18 +147,6 @@ def execute_window_op( factory=OrderedDict, ) - # operand inputs are coming in computed, but we need to recompute them in - # the case of a group by - if group_by: - operand_inputs = { - arg.op() for arg in operand.op().inputs if hasattr(arg, "op") - } - new_scope = OrderedDict( - (node, value) - for node, value in new_scope.items() - if node not in operand_inputs - ) - # figure out what the dtype of the operand is operand_type = operand.type() operand_dtype = operand_type.to_pandas()