Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add to_pyarrow and to_pyarrow_batches #4454

Merged
merged 5 commits into from
Oct 5, 2022

Conversation

gforsyth
Copy link
Member

@gforsyth gforsyth commented Sep 2, 2022

Adds to_pyarrow and to_pyarrow_batches to the alchemy backends, datafusion, and pandas. More to come.

Some open questions / issues:
Where should the schema inference stuff live? The type mapping is already defined in the pyarrow backend but it feels weird to import that backend into other backends.

chunk_size? chunksize? batch_size?

In this context, are chunks / batches properly defined in terms of rows or is that a conflation?

Datafusion has some slightly wonky behavior when it comes to a consistent schema across recordbatches
DuckDB does not seem to respect their own chunksize argument.

xref #4443

@github-actions
Copy link
Contributor

github-actions bot commented Sep 2, 2022

Test Results

       35 files         35 suites   1h 15m 40s ⏱️
10 051 tests   7 873 ✔️ 2 178 💤 0
36 686 runs  28 308 ✔️ 8 378 💤 0

Results for commit 68298f1.

♻️ This comment has been updated with latest results.

@kszucs kszucs self-requested a review September 5, 2022 10:20
# TODO should we return a RecordBatchReader from this iterator?
yield pa.RecordBatch.from_struct_array(struct_array)

def to_pyarrow(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity we could call this to_pyarrow_table

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm torn on this -- I like things to be explicit AND I don't want to make it too verbose

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we consider pa.Table as the default pyarrow dataframe-like object, then I'm fine with leaving it as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both cudf and polars call this to_arrow, so there's precedent for a simple name. I have a slight preference for to_pyarrow over to_arrow, but could go either way here. I don't think we need to specify _table though.

@@ -237,6 +238,59 @@ def _find_backend(self) -> BaseBackend:

return backends[0]

def to_pyarrow_batches(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we generalize this API to make the output formats pluggable? I also like the verb execute because it indicates that we are actually executing the ibis expression.

I have something like the following in mind:

expr.execute(format=pd.DataFrame, **options)
expr.execute(pa.Table, **options)
expr.execute(pa.RecordBatch, **options)
expr.execute(polars.DataFrame, **options)

We could have string aliases for different formats too.

Copy link
Member

@kszucs kszucs Sep 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could have an intermediate abstraction, like a ResultFormatter or Executor which can be overloaded:

@executor.register(PandasBackend, pa.Table)
def pandas_to_pyarrow_table(backend, typ):
    ...


@executor.register(PandasBackend, pa.RecordBatch)
def pandas_to_pyarrow_recordbatch(backend, typ):
    ...


@executor.register(PandasBackend, list)
def pandas_to_plain_python_list(backend, typ):
    ...

@executor.register(PyarrowBackend, pd.DataFrame)
def pyarrow_to_pandas_dataframe(backend, typ):
    ...

This way new backends could plug their own optimized execution/conversion paths without changing the other backends' code.

Copy link
Member

@jcrist jcrist Sep 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a huge fan of overloading execute.

  • It makes the method harder to understand, especially if these methods take additional kwargs (e.g. chunksize, ...) that aren't available for all output types
  • It makes writing and understanding type annotations for execute much trickier
  • It makes the functionality available harder to discover (users are more likely to notice a method on the Table type than an optional kwarg to the execute method).

Since ibis table expressions don't generally have side effects, any execution here is also effectively a conversion - execute could just as easily be called to_pandas. We also plan on adding some file-writing functionality with to_csv/to_parquet/... - users should be able to make the assumption that any to_* method results in computation.

This way new backends could plug their own optimized execution/conversion paths without changing the other backends' code.

I do agree that backends should be able to specialize these conversions, but AFAICT that's already provided here with methods on the existing backend class - what would a new abstraction get us?

Copy link
Member

@kszucs kszucs Sep 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also prefer more explicit method definitions for the reasons you mentioned above, but I don't know how could we support the following case (I may not mentioned it as my concern).

ds = ibis.dataset("s3://bucket/...", schema=...)  # this could be an unbound table too
expr = ds.filter(...).to_arrow()

I would like to execute it using datafusion not pyarrow, how could I define it?
Or I'd like to try out clickhouse and compare the performance of the two?

As far as I can see we need to define both the backend and the format somehow so the user can easily switch engines under the hood.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand - how would overloading execute solve that? The to_* methods specify an output format, not a backend to use to get there. There'd still be multiple possible conversion paths from an unbound table expression to a given memory type.

I see this as an unrelated issue. Perhaps it could be handled by making it easy to set a default backend, maybe contextually? Something like:

with ibis.options(default_backend="clickhouse"):
    expr.to_arrow()

with ibis.options(default_backend="datafusion"):
    expr.to_arrow()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine, but it is also exposed on Expr.

I don't follow? So is .execute(), which also doesn't work for unbound tables. All our docs point users to using bound tables - a method version for these conversions is more ergonomic and is what I would expect as a user. For expert users making use of unbound tables, the method on the Backend is still available. The existence of one doesn't prevent the user of the other.

Copy link
Member

@kszucs kszucs Sep 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether expr.to_pyarrow() -> pa.Table or expr.to_pyarrow().execute() -> pa.Table is the right way to implement multiple output format support, I could also imagine a third API.

If we implement to_pyarrow() methods only on the backends then we can use the ibis.<backend>.to_pyarrow(expr) API without tying our hands with expr.to_pyarrow(). I'm fine with that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbound tables are an important use case. When using ibis as a library, you often are just merrily constructing expressions without a backend. Eventually you apply that to the backend. To belabor the point: this scenario is most common when ibis is being used as a library not as a tool for interactive data analysis.

We should take unbound tables into account when designing APIs (I am definitely guilty not doing this all the time!) and not treat them as uncommon or not the norm.

That's the library case. On the other hand when you're in interactive mode, you want to type less and have things "just run" or "just work" or "Just Do What I Think This Should".

There are few things to disentangle here, but I think before we do any more work on this we should come up with a few scenarios and design the API from that.

I think the dependencies issue can be resolved :)

The most important thing in my mind is to get those scenarios (let's say 2 or 3).

I'll start a github discussion so we can map those out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, until that we can merge this marked as an experimental API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

umm how about we have this discussion first before merging

# TODO should we return a RecordBatchReader from this iterator?
yield pa.RecordBatch.from_struct_array(struct_array)

def to_pyarrow(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a default implementation would make it obsolete.

Copy link
Member

@kszucs kszucs Sep 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about factoring out these methods to a utility class where we can have sane defaults? Something like:

# should have a better name
class ResultHandler:

    @staticmethod
    def _import_pandas():
        try:
            import pandas
            import ibis.backends.pandas
        except ImportError:
            raise ModuleNotFoundError(
                "Exporting to pandas formats requires `pandas` but it is not installed"  # noqa: ignore
            )
        else:
            return pandas
      
    @staticmethod
    def _import_pyarrow():
        try:
            import pyarrow 
            import ibis.backends.pyarrow
        except ImportError:
            raise ModuleNotFoundError(
                "Exporting to arrow formats requires `pyarrow` but it is not installed"  # noqa: ignore
            )
        else:
            return pyarrow

    def to_pylist(self, ...):
        return toolz.concat(self.to_pylist_batches(expr, ...))

    def to_pylist_batches(self, expr, ...):
        ...

    def to_pandas(self, expr, ...):
        pd = self._import_pandas()
        return pd.DataFrame(self.to_pylist(expr, ...))

    def to_pyarrow(self, expr, ...):
        pa = self._import_pyarrow()
        return pa.Table.from_batches(self.to_pyarrow_batches(expr, ...))

    def to_pyarrow_batches(self, expr, ...):
        pa = self._import_pyarrow()
        for batch in self.to_pylist_batches(expr, ...):
            struct_array = pa.array(
                map(tuple, batch), type=expr.schema().to_pyarrow_struct()
            )
            yield pa.RecordBatch.from_struct_array(struct_array)
        

class BaseBackend(abc.ABC, ResultHandler)
    ...

    def execute(self, expr, ...):
        return self.to_pandas(self, expr, ...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried this out as you suggested and I think I like it. Not yet re-routing calls to execute

@gforsyth gforsyth force-pushed the gil/pyarrow branch 9 times, most recently from 4aba82b to 460a637 Compare September 15, 2022 17:03
@cpcloud cpcloud added this to the 4.0.0 milestone Sep 15, 2022
@gforsyth gforsyth force-pushed the gil/pyarrow branch 5 times, most recently from e8a6ee3 to ef7e120 Compare September 19, 2022 15:56
@gforsyth gforsyth marked this pull request as ready for review September 19, 2022 17:42
@gforsyth gforsyth force-pushed the gil/pyarrow branch 3 times, most recently from a907cfd to 7a62634 Compare September 29, 2022 18:05
@gforsyth
Copy link
Member Author

Adds to_pyarrow and to_pyarrow_batches to the BaseBackend.

to_pyarrow returns pyarrow objects consistent with the dimension of
the output:

  • a table -> pa.Table
  • a column -> pa.Array
  • a scalar -> pa.Scalar

to_pyarrow_batches returns a RecordBatchReader that returns batches of
pyarrow tables. It does not have the same dimension handling because
that is not available in RecordBatchReaders.

to_pyarrow_batches is implemented for AlchemyBackend, datafusion,
and duckdb.

The pandas backend has to_pyarrow implemented using
pandas.DataFrame.to_pyarrow().

Backends that do not require pyarrow already will only require it when
using to_pyarrow* methods.

There are warnings on these methods to indicate that they are
experimental and that they may break in the future irrespective of
semantic versioning.

The DuckDB to_pyarrow_batches makes use of a proxy object to escape
garbage collection so that the underlying record batches are still
available even after the cursor used to generate them would have
been garbage collected (but isn't because it is embedded in the proxy
object)

to_pyarrow_batches is implemented for scalars even though it's ambiguous, because to_pyarrow makes use of the RecordBatchReader from to_pyarrow_batches to output tables (and arrays, and scalars). If it is called on a Scalar, it returns a RecordBatchReader that has a single batch that resolves to a pyarrow table with one row and one column.

@gforsyth gforsyth force-pushed the gil/pyarrow branch 4 times, most recently from c91e2d6 to f086f8f Compare September 29, 2022 18:56
else:
return pyarrow

def to_pyarrow(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, could we move this to ibis.backends.pyarrow somehow (fine in a follow-up too)?

Also, how could another backend, say polars register its own result types?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could move it, but I was planning (in a follow-up) to add a to_pandas method to the mixin. However, it might be useful (to your point about e.g. polars registering a result type) to have a means for backends to register their own to_* methods. Then the to_pyarrow would be defined in the pyarrow backend and to_pandas in the pandas backend.

Seems doable for the methods called via backend.to_* -- I'm not sure yet how we might also then add the relevant methods to expression objects

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing it in a follow up sounds good to me.

@kszucs
Copy link
Member

kszucs commented Sep 29, 2022

Thanks for the updates and the summary, going to have a more thorough look tomorrow.


from ibis.backends.pyarrow.datatypes import ibis_to_pyarrow_struct

if hasattr(expr, "schema"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to_pyarrow_batches() should raise if the expression is not table like. I understand that this is used by the other methods, so it's fine for now. We can probably flesh it out when adding support for backends to register their own result types.

Adds `to_pyarrow` and `to_pyarrow_batches` to the `BaseBackend`.

`to_pyarrow` returns pyarrow objects consistent with the dimension of
the output:
  - a table -> pa.Table
  - a column -> pa.Array
  - a scalar -> pa.Scalar

`to_pyarrow_batches` returns a RecordBatchReader that returns batches of
pyarrow tables.  It does not have the same dimension handling because
that is not available in RecordBatchReaders.

`to_pyarrow_batches` is implemented for `AlchemyBackend`, `datafusion`,
and `duckdb`.

The `pandas` backend has `to_pyarrow` implemented by using
`pandas.DataFrame.to_pyarrow()`.

Backends that do not require pyarrow already will only require it when
using to_pyarrow* methods.

There are warnings on these methods to indicate that they are
experimental and that they may break in the future irrespective of
semantic versioning.

The DuckDB `to_pyarrow_batches` makes use of a proxy object to escape
garbage collection so that the underlying record batches are still
available even after the `cursor` used to generate them _would have_
been garbage collected (but isn't because it is embedded in the proxy
object)
@cpcloud cpcloud added feature Features or general enhancements backends Issues related to all backends labels Oct 4, 2022
@cpcloud
Copy link
Member

cpcloud commented Oct 5, 2022

@kszucs If you're happy with this can you merge it?

Copy link
Member

@kszucs kszucs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gforsyth! Merging.

@kszucs kszucs merged commit 791335f into ibis-project:master Oct 5, 2022
@gforsyth gforsyth deleted the gil/pyarrow branch October 5, 2022 13:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backends Issues related to all backends feature Features or general enhancements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants