Skip to content

Commit

Permalink
refactor(result-handling): remove result_handler in favor of expres…
Browse files Browse the repository at this point in the history
…sion specific methods
  • Loading branch information
cpcloud authored and kszucs committed Jul 25, 2023
1 parent 4edaab5 commit 3dc7143
Show file tree
Hide file tree
Showing 30 changed files with 401 additions and 429 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ibis-backends.yml
Expand Up @@ -420,7 +420,7 @@ jobs:
- "psycopg2@2.8.4"
- "GeoAlchemy2@0.6.3"
- "geopandas@0.6"
- "Shapely@1.6"
- "Shapely@2"
services:
- postgres
extras:
Expand All @@ -435,7 +435,7 @@ jobs:
- "psycopg2@2.8.4"
- "GeoAlchemy2@0.6.3"
- "geopandas@0.6"
- "Shapely@1.6"
- "Shapely@2"
services:
- postgres
extras:
Expand All @@ -449,7 +449,7 @@ jobs:
- "psycopg2@2.8.4"
- "GeoAlchemy2@0.6.3"
- "geopandas@0.6"
- "Shapely@1.6"
- "Shapely@2"
services:
- postgres
extras:
Expand Down
28 changes: 11 additions & 17 deletions ibis/backends/base/__init__.py
Expand Up @@ -270,29 +270,23 @@ def to_pyarrow(
"""
pa = self._import_pyarrow()
self._run_pre_execute_hooks(expr)
table_expr = expr.as_table()
arrow_schema = table_expr.schema().to_pyarrow()
try:
# Can't construct an array from record batches
# so construct at one column table (if applicable)
# then return the column _from_ the table
with self.to_pyarrow_batches(
expr, params=params, limit=limit, **kwargs
table_expr, params=params, limit=limit, **kwargs
) as reader:
table = pa.Table.from_batches(reader)
table = (
pa.Table.from_batches(reader)
.rename_columns(table_expr.columns)
.cast(arrow_schema)
)
except pa.lib.ArrowInvalid:
raise
except ValueError:
# The pyarrow batches iterator is empty so pass in an empty
# iterator and a pyarrow schema
table = expr.as_table().schema().to_pyarrow().empty_table()

if isinstance(expr, ir.Table):
return table
elif isinstance(expr, ir.Column):
return table[0]
elif isinstance(expr, ir.Scalar):
return table[0][0]
else:
raise ValueError
table = arrow_schema.empty_table()

return expr.__pyarrow_result__(table)

@util.experimental
def to_pyarrow_batches(
Expand Down
5 changes: 1 addition & 4 deletions ibis/backends/base/sql/__init__.py
Expand Up @@ -262,10 +262,7 @@ def execute(
with self._safe_raw_sql(sql, **kwargs) as cursor:
result = self.fetch_from_cursor(cursor, schema)

if hasattr(getattr(query_ast, 'dml', query_ast), 'result_handler'):
result = query_ast.dml.result_handler(result)

return result
return expr.__pandas_result__(result)

def _register_in_memory_table(self, _: ops.InMemoryTable) -> None:
raise NotImplementedError(self.name)
Expand Down
7 changes: 3 additions & 4 deletions ibis/backends/base/sql/alchemy/__init__.py
Expand Up @@ -185,10 +185,9 @@ def _to_geodataframe(df, schema):
geom_col = None
for name, dtype in schema.items():
if dtype.is_geospatial():
geom_col = geom_col or name
df[name] = df[name].map(
lambda row: None if row is None else shape.to_shape(row)
)
if not geom_col:
geom_col = name
df[name] = df[name].map(shape.to_shape, na_action="ignore")
if geom_col:
df[geom_col] = gpd.array.GeometryArray(df[geom_col].values)
df = gpd.GeoDataFrame(df, geometry=geom_col)
Expand Down
7 changes: 7 additions & 0 deletions ibis/backends/base/sql/alchemy/query_builder.py
Expand Up @@ -121,6 +121,13 @@ def _format_table(self, op):
backend._create_temp_view(view=result, definition=definition)
elif isinstance(ref_op, ops.InMemoryTable):
result = self._format_in_memory_table(op, ref_op, translator)
elif isinstance(ref_op, ops.DummyTable):
result = sa.select(
*(
translator.translate(value).label(name)
for name, value in zip(ref_op.schema.names, ref_op.values)
)
)
else:
# A subquery
if ctx.is_extracted(ref_op):
Expand Down
3 changes: 0 additions & 3 deletions ibis/backends/base/sql/compiler/query_builder.py
Expand Up @@ -189,7 +189,6 @@ def __init__(
limit=None,
distinct=False,
indent=2,
result_handler=None,
parent_op=None,
):
self.translator_class = translator_class
Expand All @@ -214,8 +213,6 @@ def __init__(

self.indent = indent

self.result_handler = result_handler

def _translate(self, expr, named=False, permit_subquery=False):
translator = self.translator_class(
expr,
Expand Down
44 changes: 6 additions & 38 deletions ibis/backends/base/sql/compiler/select_builder.py
Expand Up @@ -4,9 +4,6 @@
from collections.abc import Mapping
from typing import NamedTuple

import toolz

import ibis.common.exceptions as com
import ibis.expr.analysis as an
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
Expand Down Expand Up @@ -56,7 +53,7 @@ def to_select(
self.context = context
self.translator_class = translator_class

self.op, self.result_handler = self._adapt_operation(node)
self.op = node.to_expr().as_table().op()
assert isinstance(self.op, ops.Node), type(self.op)

self.table_set = None
Expand All @@ -75,38 +72,6 @@ def to_select(

return select_query

@staticmethod
def _adapt_operation(node):
# Non-table expressions need to be adapted to some well-formed table
# expression, along with a way to adapt the results to the desired
# arity (whether array-like or scalar, for example)
#
# Canonical case is scalar values or arrays produced by some reductions
# (simple reductions, or distinct, say)
if isinstance(node, ops.TableNode):
return node, toolz.identity

elif isinstance(node, ops.Value):
if node.output_shape.is_scalar():
if an.is_scalar_reduction(node):
table_expr = an.reduction_to_aggregation(node)
return table_expr.op(), _get_scalar(node.name)
else:
return node, _get_scalar(node.name)
elif node.output_shape.is_columnar():
if isinstance(node, ops.TableColumn):
table_expr = node.table.to_expr()[[node.name]]
result_handler = _get_column(node.name)
else:
table_expr = node.to_expr().as_table()
result_handler = _get_column(node.name)

return table_expr.op(), result_handler
else:
raise com.TranslationError(f"Unexpected shape {node.output_shape}")
else:
raise com.TranslationError(f'Do not know how to execute: {type(node)}')

def _build_result_query(self):
self._collect_elements()
self._analyze_subqueries()
Expand All @@ -125,7 +90,6 @@ def _build_result_query(self):
limit=self.limit,
order_by=self.order_by,
distinct=self.distinct,
result_handler=self.result_handler,
parent_op=self.op,
)

Expand Down Expand Up @@ -165,7 +129,6 @@ def _collect_elements(self):

if isinstance(self.op, ops.TableNode):
self._collect(self.op, toplevel=True)
assert self.table_set is not None
else:
self.select_set = [self.op]

Expand Down Expand Up @@ -320,6 +283,11 @@ def _collect_PhysicalTable(self, op, toplevel=False):
self.select_set = [op]
self.table_set = op

def _collect_DummyTable(self, op, toplevel=False):
if toplevel:
self.select_set = list(op.values)
self.table_set = None

def _collect_SelfReference(self, op, toplevel=False):
if toplevel:
self._collect(op.table, toplevel=toplevel)
Expand Down
16 changes: 3 additions & 13 deletions ibis/backends/bigquery/__init__.py
Expand Up @@ -328,13 +328,10 @@ def execute(self, expr, params=None, limit="default", **kwargs):
sql = query_ast.compile()
self._log(sql)
cursor = self.raw_sql(sql, params=params, **kwargs)
schema = expr.as_table().schema()
result = self.fetch_from_cursor(cursor, schema)

if hasattr(getattr(query_ast, "dml", query_ast), "result_handler"):
result = query_ast.dml.result_handler(result)
result = self.fetch_from_cursor(cursor, expr.as_table().schema())

return result
return expr.__pandas_result__(result)

def fetch_from_cursor(self, cursor, schema):
arrow_t = self._cursor_to_arrow(cursor)
Expand Down Expand Up @@ -379,14 +376,7 @@ def to_pyarrow(
sql = query_ast.compile()
cursor = self.raw_sql(sql, params=params, **kwargs)
table = self._cursor_to_arrow(cursor)
if isinstance(expr, ir.Scalar):
assert len(table.columns) == 1, "len(table.columns) != 1"
return table[0][0]
elif isinstance(expr, ir.Column):
assert len(table.columns) == 1, "len(table.columns) != 1"
return table[0]
else:
return table
return expr.__pyarrow_result__(table)

def to_pyarrow_batches(
self,
Expand Down
21 changes: 7 additions & 14 deletions ibis/backends/clickhouse/__init__.py
Expand Up @@ -282,14 +282,9 @@ def to_pyarrow(
external_tables=external_tables,
**kwargs,
) as reader:
t = reader.read_all()
table = reader.read_all()

if isinstance(expr, ir.Scalar):
return t[0][0]
elif isinstance(expr, ir.Column):
return t[0]
else:
return t
return expr.__pyarrow_result__(table)

def to_pyarrow_batches(
self,
Expand Down Expand Up @@ -392,13 +387,11 @@ def execute(
if df.empty:
df = pd.DataFrame(columns=schema.names)

result = PandasData.convert_table(df, schema)
if isinstance(expr, ir.Scalar):
return result.iat[0, 0]
elif isinstance(expr, ir.Column):
return result.iloc[:, 0]
else:
return result
# TODO: remove the extra conversion
#
# the extra __pandas_result__ call is to work around slight differences
# in single column conversion and whole table conversion
return expr.__pandas_result__(table.__pandas_result__(df))

def compile(self, expr: ir.Expr, limit: str | None = None, params=None, **_: Any):
table_expr = expr.as_table()
Expand Down
42 changes: 5 additions & 37 deletions ibis/backends/datafusion/__init__.py
Expand Up @@ -7,8 +7,6 @@

import pyarrow as pa

import ibis.common.exceptions as com
import ibis.expr.analysis as an
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir
Expand All @@ -21,9 +19,8 @@
except ImportError:
from datafusion import SessionContext

import datafusion

if TYPE_CHECKING:
import datafusion
import pandas as pd


Expand Down Expand Up @@ -292,27 +289,7 @@ def _get_frame(
limit: int | str | None = None,
**kwargs: Any,
) -> datafusion.DataFrame:
if isinstance(expr, ir.Table):
return self.compile(expr, params, **kwargs)
elif isinstance(expr, ir.Column):
# expression must be named for the projection
expr = expr.as_table()
return self.compile(expr, params, **kwargs)
elif isinstance(expr, ir.Scalar):
if an.find_immediate_parent_tables(expr.op()):
# there are associated datafusion tables so convert the expr
# to a selection which we can directly convert to a datafusion
# plan
expr = expr.as_table()
frame = self.compile(expr, params, **kwargs)
else:
# doesn't have any tables associated so create a plan from a
# dummy datafusion table
compiled = self.compile(expr, params, **kwargs)
frame = self._context.empty_table().select(compiled)
return frame
else:
raise com.IbisError(f"Cannot execute expression of type: {type(expr)}")
return self.compile(expr.as_table(), params, **kwargs)

def to_pyarrow_batches(
self,
Expand All @@ -334,25 +311,16 @@ def execute(
limit: int | str | None = "default",
**kwargs: Any,
):
output = self.to_pyarrow(expr, params=params, limit=limit, **kwargs)
if isinstance(expr, ir.Table):
return output.to_pandas()
elif isinstance(expr, ir.Column):
series = output.to_pandas()
series.name = expr.get_name()
return series
elif isinstance(expr, ir.Scalar):
return output.as_py()
else:
raise com.IbisError(f"Cannot execute expression of type: {type(expr)}")
output = self.to_pyarrow(expr.as_table(), params=params, limit=limit, **kwargs)
return expr.__pandas_result__(output.to_pandas(timestamp_as_object=True))

def compile(
self,
expr: ir.Expr,
params: Mapping[ir.Expr, object] | None = None,
**kwargs: Any,
):
return translate(expr.op())
return translate(expr.op(), ctx=self._context)

@classmethod
@lru_cache
Expand Down

0 comments on commit 3dc7143

Please sign in to comment.