Skip to content

Commit

Permalink
refactor(formats): move the TableProxy object to formats from the o…
Browse files Browse the repository at this point in the history
…perations
  • Loading branch information
kszucs authored and cpcloud committed Dec 15, 2023
1 parent 3575858 commit 05964b1
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 80 deletions.
4 changes: 2 additions & 2 deletions ibis/expr/api.py
Expand Up @@ -445,7 +445,7 @@ def _memtable_from_pyarrow_table(
schema: SupportsSchema | None = None,
columns: Iterable[str] | None = None,
):
from ibis.expr.operations.relations import PyArrowTableProxy
from ibis.formats.pyarrow import PyArrowTableProxy

if columns is not None:
assert schema is None, "if `columns` is not `None` then `schema` must be `None`"
Expand All @@ -467,7 +467,7 @@ def _memtable_from_dataframe(
) -> Table:
import pandas as pd

from ibis.expr.operations.relations import PandasDataFrameProxy
from ibis.formats.pandas import PandasDataFrameProxy

if not isinstance(data, pd.DataFrame):
df = pd.DataFrame(data, columns=columns)
Expand Down
74 changes: 2 additions & 72 deletions ibis/expr/operations/relations.py
@@ -1,6 +1,5 @@
from __future__ import annotations

import abc
import itertools
from abc import abstractmethod
from typing import TYPE_CHECKING, Annotated, Any, Literal, Optional
Expand All @@ -15,17 +14,15 @@
from ibis.common.annotations import annotated, attribute
from ibis.common.collections import FrozenDict # noqa: TCH001
from ibis.common.deferred import Deferred
from ibis.common.grounds import Concrete, Immutable
from ibis.common.grounds import Concrete
from ibis.common.patterns import Between, Coercible, Eq
from ibis.common.typing import VarTuple # noqa: TCH001
from ibis.expr.operations.core import Column, Named, Node, Scalar, Value
from ibis.expr.operations.sortkeys import SortKey # noqa: TCH001
from ibis.expr.schema import Schema
from ibis.formats import TableProxy # noqa: TCH001

if TYPE_CHECKING:
import pandas as pd
import pyarrow as pa

import ibis.expr.types as ir


Expand Down Expand Up @@ -112,73 +109,6 @@ class SQLQueryResult(TableNode):
source: Any


# TODO(kszucs): Add a pseudohashable wrapper and use that from InMemoryTable
# subclasses PandasTable, PyArrowTable


class TableProxy(Immutable):
__slots__ = ("_data", "_hash")
_data: Any
_hash: int

def __init__(self, data) -> None:
object.__setattr__(self, "_data", data)
object.__setattr__(self, "_hash", hash((type(data), id(data))))

def __hash__(self) -> int:
return self._hash

def __repr__(self) -> str:
data_repr = util.indent(repr(self._data), spaces=2)
return f"{self.__class__.__name__}:\n{data_repr}"

@abc.abstractmethod
def to_frame(self) -> pd.DataFrame: # pragma: no cover
"""Convert this input to a pandas DataFrame."""

@abc.abstractmethod
def to_pyarrow(self, schema: Schema) -> pa.Table: # pragma: no cover
"""Convert this input to a PyArrow Table."""

def to_pyarrow_bytes(self, schema: Schema) -> bytes:
import pyarrow as pa
import pyarrow_hotfix # noqa: F401

data = self.to_pyarrow(schema=schema)
out = pa.BufferOutputStream()
with pa.RecordBatchFileWriter(out, data.schema) as writer:
writer.write(data)
return out.getvalue()

def __len__(self) -> int:
return len(self._data)


class PyArrowTableProxy(TableProxy):
__slots__ = ()

def to_frame(self):
return self._data.to_pandas()

def to_pyarrow(self, schema: Schema) -> pa.Table:
return self._data


class PandasDataFrameProxy(TableProxy):
__slots__ = ()

def to_frame(self) -> pd.DataFrame:
return self._data

def to_pyarrow(self, schema: Schema) -> pa.Table:
import pyarrow as pa
import pyarrow_hotfix # noqa: F401

from ibis.formats.pyarrow import PyArrowSchema

return pa.Table.from_pandas(self._data, schema=PyArrowSchema.from_ibis(schema))


@public
class InMemoryTable(PhysicalTable):
name: str
Expand Down
33 changes: 33 additions & 0 deletions ibis/formats/__init__.py
@@ -1,8 +1,14 @@
from __future__ import annotations

from abc import abstractmethod
from typing import TYPE_CHECKING, Generic, TypeVar

from ibis.util import PseudoHashable, indent

if TYPE_CHECKING:
import pandas as pd
import pyarrow as pa

from ibis.expr.datatypes import DataType
from ibis.expr.schema import Schema

Expand Down Expand Up @@ -214,3 +220,30 @@ def infer_table(cls, obj: T) -> Schema:
Ibis schema corresponding to the given format-specific table.
"""
raise NotImplementedError


class TableProxy(PseudoHashable[T]):
def __repr__(self) -> str:
data_repr = indent(repr(self.obj), spaces=2)
return f"{self.__class__.__name__}:\n{data_repr}"

def __len__(self) -> int:
return len(self.obj)

@abstractmethod
def to_frame(self) -> pd.DataFrame: # pragma: no cover
"""Convert this input to a pandas DataFrame."""

@abstractmethod
def to_pyarrow(self, schema: Schema) -> pa.Table: # pragma: no cover
"""Convert this input to a PyArrow Table."""

def to_pyarrow_bytes(self, schema: Schema) -> bytes:
import pyarrow as pa
import pyarrow_hotfix # noqa: F401

data = self.to_pyarrow(schema=schema)
out = pa.BufferOutputStream()
with pa.RecordBatchFileWriter(out, data.schema) as writer:
writer.write(data)
return out.getvalue()
14 changes: 12 additions & 2 deletions ibis/formats/pandas.py
Expand Up @@ -6,12 +6,13 @@
import numpy as np
import pandas as pd
import pandas.api.types as pdt
import pyarrow as pa

import ibis.expr.datatypes as dt
import ibis.expr.schema as sch
from ibis.formats import DataMapper, SchemaMapper
from ibis.formats import DataMapper, SchemaMapper, TableProxy
from ibis.formats.numpy import NumpyType
from ibis.formats.pyarrow import PyArrowData, PyArrowType
from ibis.formats.pyarrow import PyArrowData, PyArrowSchema, PyArrowType

_has_arrow_dtype = hasattr(pd, "ArrowDtype")

Expand Down Expand Up @@ -284,3 +285,12 @@ class DaskData(PandasData):
@classmethod
def infer_column(cls, s):
return PyArrowData.infer_column(s.compute())


class PandasDataFrameProxy(TableProxy[pd.DataFrame]):
def to_frame(self) -> pd.DataFrame:
return self.obj

def to_pyarrow(self, schema: sch.Schema) -> pa.Table:
pyarrow_schema = PyArrowSchema.from_ibis(schema)
return pa.Table.from_pandas(self.obj, schema=pyarrow_schema)
10 changes: 9 additions & 1 deletion ibis/formats/pyarrow.py
Expand Up @@ -8,7 +8,7 @@

import ibis.expr.datatypes as dt
from ibis.expr.schema import Schema
from ibis.formats import DataMapper, SchemaMapper, TypeMapper
from ibis.formats import DataMapper, SchemaMapper, TableProxy, TypeMapper

if TYPE_CHECKING:
from collections.abc import Sequence
Expand Down Expand Up @@ -293,5 +293,13 @@ def convert_table(cls, table: pa.Table, schema: Schema) -> pa.Table:
return table


class PyArrowTableProxy(TableProxy[pa.Table]):
def to_frame(self):
return self.obj.to_pandas()

def to_pyarrow(self, schema: Schema) -> pa.Table:
return self.obj


PYARROW_JSON_TYPE = JSONType()
pa.register_extension_type(PYARROW_JSON_TYPE)
15 changes: 12 additions & 3 deletions ibis/util.py
Expand Up @@ -20,12 +20,15 @@
TYPE_CHECKING,
Any,
Callable,
Generic,
TypeVar,
)
from uuid import uuid4

import toolz

from ibis.common.typing import Coercible

if TYPE_CHECKING:
from collections.abc import Iterator, Sequence
from numbers import Real
Expand Down Expand Up @@ -652,13 +655,13 @@ def __getattr__(self, name: str):
return self._factory(obj)


# TODO(kszucs): use this for the TableProxy objects
class PseudoHashable:
class PseudoHashable(Coercible, Generic[V]):
"""A wrapper that provides a best effort precomputed hash."""

__slots__ = ("obj", "hash")
obj: V

def __init__(self, obj):
def __init__(self, obj: V):
if isinstance(obj, collections.abc.Hashable):
raise TypeError(f"Cannot wrap a hashable object: {obj!r}")
elif isinstance(obj, collections.abc.Sequence):
Expand All @@ -673,6 +676,12 @@ def __init__(self, obj):
self.obj = obj
self.hash = hash((type(obj), hashable_obj))

@classmethod
def __coerce__(cls, value: V) -> PseudoHashable[V]:
if isinstance(value, cls):
return value
return cls(value)

def __hash__(self):
return self.hash

Expand Down

0 comments on commit 05964b1

Please sign in to comment.