From 238c69d6f2b022b8dd75959997979045cc9758b7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 4 Mar 2024 07:55:26 -0700 Subject: [PATCH] remove SQL om polars/pandas/cudf experiment --- README.md | 31 +++------ datafusion/context.py | 142 -------------------------------------- datafusion/cudf.py | 97 -------------------------- datafusion/pandas.py | 93 ------------------------- datafusion/polars.py | 100 --------------------------- examples/sql-on-cudf.py | 24 ------- examples/sql-on-pandas.py | 24 ------- examples/sql-on-polars.py | 24 ------- 8 files changed, 10 insertions(+), 525 deletions(-) delete mode 100644 datafusion/context.py delete mode 100644 datafusion/cudf.py delete mode 100644 datafusion/pandas.py delete mode 100644 datafusion/polars.py delete mode 100644 examples/sql-on-cudf.py delete mode 100644 examples/sql-on-pandas.py delete mode 100644 examples/sql-on-polars.py diff --git a/README.md b/README.md index a2e3efd4e..4345b52ed 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,16 @@ This is a Python library that binds to [Apache Arrow](https://arrow.apache.org/) in-memory query engine [DataFusion](https://github.com/apache/arrow-datafusion). -DataFusion's Python bindings can be used as an end-user tool as well as providing a foundation for building new systems. +DataFusion's Python bindings can be used as a foundation for building new data systems in Python. Here are some examples: + +- [Dask SQL](https://github.com/dask-contrib/dask-sql) uses DataFusion's Python bindings for SQL parsing, query + planning, and logical plan optimizations, and then transpiles the logical plan to Dask operations for execution. +- [DataFusion Ballista](https://github.com/apache/arrow-ballista) is a distributed SQL query engine that extends + DataFusion's Python bindings for distributed use cases. + +It is also possible to use these Python bindings directly for DataFrame and SQL operations, but you may find that +[Polars](http://pola.rs/) and [DuckDB](http://www.duckdb.org/) are more suitable for this use case, since they have +more of an end-user focus and are more actively maintained than these Python bindings. ## Features @@ -35,20 +44,6 @@ DataFusion's Python bindings can be used as an end-user tool as well as providin - Serialize and deserialize query plans in Substrait format. - Experimental support for transpiling SQL queries to DataFrame calls with Polars, Pandas, and cuDF. -## Comparison with other projects - -Here is a comparison with similar projects that may help understand when DataFusion might be suitable and unsuitable -for your needs: - -- [DuckDB](http://www.duckdb.org/) is an open source, in-process analytic database. Like DataFusion, it supports - very fast execution, both from its custom file format and directly from Parquet files. Unlike DataFusion, it is - written in C/C++ and it is primarily used directly by users as a serverless database and query system rather than - as a library for building such database systems. - -- [Polars](http://pola.rs/) is one of the fastest DataFrame libraries at the time of writing. Like DataFusion, it - is also written in Rust and uses the Apache Arrow memory model, but unlike DataFusion it does not provide full SQL - support, nor as many extension points. - ## Example Usage The following example demonstrates running a SQL query against a Parquet file using DataFusion, storing the results @@ -143,12 +138,6 @@ See [examples](examples/README.md) for more information. - [Serialize query plans using Substrait](./examples/substrait.py) -### Executing SQL against DataFrame Libraries (Experimental) - -- [Executing SQL on Polars](./examples/sql-on-polars.py) -- [Executing SQL on Pandas](./examples/sql-on-pandas.py) -- [Executing SQL on cuDF](./examples/sql-on-cudf.py) - ## How to install (from pip) ### Pip diff --git a/datafusion/context.py b/datafusion/context.py deleted file mode 100644 index 30602402c..000000000 --- a/datafusion/context.py +++ /dev/null @@ -1,142 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from abc import ABC, abstractmethod -from typing import Any, Dict, List - -from datafusion.common import SqlSchema, SqlTable - - -class BaseSessionContext(ABC): - """ - Abstraction defining all methods, properties, and common functionality - shared amongst implementations using DataFusion as their SQL Parser/Engine - """ - - DEFAULT_CATALOG_NAME = "root" - DEFAULT_SCHEMA_NAME = "datafusion" - - @abstractmethod - def create_schema( - self, - schema_name: str, - **kwargs, - ): - """ - Creates/Registers a logical container that holds database - objects such as tables, views, indexes, and other - related objects. It provides a way to group related database - objects together. A schema can be owned by a database - user and can be used to separate objects in different - logical groups for easy management. - """ - pass - - @abstractmethod - def update_schema( - self, - schema_name: str, - new_schema: SqlSchema, - **kwargs, - ): - """ - Updates an existing schema in the SessionContext - """ - pass - - @abstractmethod - def drop_schema( - self, - schema_name: str, - **kwargs, - ): - """ - Drops the specified Schema, based on name, from the current context - """ - pass - - @abstractmethod - def show_schemas(self, **kwargs) -> Dict[str, SqlSchema]: - """ - Return all schemas in the current SessionContext impl. - """ - pass - - @abstractmethod - def create_table( - self, - schema_name: str, - table_name: str, - input_source: Any, - **kwargs, - ): - """ - Creates/Registers a table in the specied schema instance - """ - pass - - @abstractmethod - def update_table( - self, - schema_name: str, - table_name: str, - new_table: SqlTable, - **kwargs, - ): - """ - Updates an existing table in the SessionContext - """ - pass - - @abstractmethod - def drop_table( - self, - schema_name: str, - table_name: str, - **kwargs, - ): - """ - Drops the specified table, based on name, from the current context - """ - pass - - @abstractmethod - def show_tables(self, **kwargs) -> List[SqlTable]: - """ - Return all tables in the current SessionContext impl. - """ - pass - - @abstractmethod - def register_table( - self, - table_name: str, - path: str, - **kwargs, - ): - pass - - # TODO: Remove abstraction, this functionality can be shared - # between all implementing classes since it just prints the - # logical plan from DataFusion - @abstractmethod - def explain(self, sql): - pass - - @abstractmethod - def sql(self, sql): - pass diff --git a/datafusion/cudf.py b/datafusion/cudf.py deleted file mode 100644 index d8bc8e6d0..000000000 --- a/datafusion/cudf.py +++ /dev/null @@ -1,97 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import logging -import cudf -from datafusion.context import BaseSessionContext -from datafusion.expr import Projection, TableScan, Column - -from datafusion.common import SqlSchema - -logger = logging.getLogger(__name__) - - -class SessionContext(BaseSessionContext): - def __init__(self, context, logging_level=logging.INFO): - """ - Create a new Session. - """ - # Cudf requires a provided context - self.context = context - - # Set the logging level for this SQL context - logging.basicConfig(level=logging_level) - - # Name of the root catalog - self.catalog_name = self.DEFAULT_CATALOG_NAME - # Name of the root schema - self.schema_name = self.DEFAULT_SCHEMA_NAME - # Add the schema to the context - sch = SqlSchema(self.schema_name) - self.schemas = {} - self.schemas[self.schema_name] = sch - self.context.register_schema(self.schema_name, sch) - - def to_cudf_expr(self, expr): - # get Python wrapper for logical expression - expr = expr.to_variant() - - if isinstance(expr, Column): - return expr.name() - else: - raise Exception("unsupported expression: {}".format(expr)) - - def to_cudf_df(self, plan): - # recurse down first to translate inputs into pandas data frames - inputs = [self.to_cudf_df(x) for x in plan.inputs()] - - # get Python wrapper for logical operator node - node = plan.to_variant() - - if isinstance(node, Projection): - args = [self.to_cudf_expr(expr) for expr in node.projections()] - return inputs[0][args] - elif isinstance(node, TableScan): - return cudf.read_parquet(self.parquet_tables[node.table_name()]) - else: - raise Exception("unsupported logical operator: {}".format(type(node))) - - def create_schema(self, schema_name: str, **kwargs): - logger.debug(f"Creating schema: {schema_name}") - self.schemas[schema_name] = SqlSchema(schema_name) - self.context.register_schema(schema_name, SqlSchema(schema_name)) - - def update_schema(self, schema_name: str, new_schema: SqlSchema, **kwargs): - self.schemas[schema_name] = new_schema - - def drop_schema(self, schema_name, **kwargs): - del self.schemas[schema_name] - - def show_schemas(self, **kwargs): - return self.schemas - - def register_table(self, name, path, **kwargs): - self.parquet_tables[name] = path - self.datafusion_ctx.register_parquet(name, path) - - def explain(self, sql): - super.explain() - - def sql(self, sql): - datafusion_df = self.datafusion_ctx.sql(sql) - plan = datafusion_df.logical_plan() - return self.to_cudf_df(plan) diff --git a/datafusion/pandas.py b/datafusion/pandas.py deleted file mode 100644 index cf1fa6f7b..000000000 --- a/datafusion/pandas.py +++ /dev/null @@ -1,93 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import logging -import pandas as pd -import datafusion -from datafusion.common import SqlSchema -from datafusion.context import BaseSessionContext -from datafusion.expr import Projection, TableScan, Column - -logger = logging.getLogger(__name__) - - -class SessionContext(BaseSessionContext): - def __init__(self, logging_level=logging.INFO): - self.datafusion_ctx = datafusion.SessionContext() - self.parquet_tables = {} - - # Set the logging level for this SQL context - logging.basicConfig(level=logging_level) - - # Name of the root catalog - self.catalog_name = self.DEFAULT_CATALOG_NAME - # Name of the root schema - self.schema_name = self.DEFAULT_SCHEMA_NAME - # Add the schema to the context - sch = SqlSchema(self.schema_name) - self.schemas[self.schema_name] = sch - self.context.register_schema(self.schema_name, sch) - - def to_pandas_expr(self, expr): - # get Python wrapper for logical expression - expr = expr.to_variant() - - if isinstance(expr, Column): - return expr.name() - else: - raise Exception("unsupported expression: {}".format(expr)) - - def to_pandas_df(self, plan): - # recurse down first to translate inputs into pandas data frames - inputs = [self.to_pandas_df(x) for x in plan.inputs()] - - # get Python wrapper for logical operator node - node = plan.to_variant() - - if isinstance(node, Projection): - args = [self.to_pandas_expr(expr) for expr in node.projections()] - return inputs[0][args] - elif isinstance(node, TableScan): - return pd.read_parquet(self.parquet_tables[node.table_name()]) - else: - raise Exception("unsupported logical operator: {}".format(type(node))) - - def create_schema(self, schema_name: str, **kwargs): - logger.debug(f"Creating schema: {schema_name}") - self.schemas[schema_name] = SqlSchema(schema_name) - self.context.register_schema(schema_name, SqlSchema(schema_name)) - - def update_schema(self, schema_name: str, new_schema: SqlSchema, **kwargs): - self.schemas[schema_name] = new_schema - - def drop_schema(self, schema_name, **kwargs): - del self.schemas[schema_name] - - def show_schemas(self, **kwargs): - return self.schemas - - def register_table(self, name, path, **kwargs): - self.parquet_tables[name] = path - self.datafusion_ctx.register_parquet(name, path) - - def explain(self, sql): - super.explain() - - def sql(self, sql): - datafusion_df = self.datafusion_ctx.sql(sql) - plan = datafusion_df.logical_plan() - return self.to_pandas_df(plan) diff --git a/datafusion/polars.py b/datafusion/polars.py deleted file mode 100644 index ac5e26e3e..000000000 --- a/datafusion/polars.py +++ /dev/null @@ -1,100 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import logging -import polars -import datafusion -from datafusion.context import BaseSessionContext -from datafusion.expr import Projection, TableScan, Aggregate -from datafusion.expr import Column, AggregateFunction - -from datafusion.common import SqlSchema - -logger = logging.getLogger(__name__) - - -class SessionContext(BaseSessionContext): - def __init__(self, logging_level=logging.INFO): - self.datafusion_ctx = datafusion.SessionContext() - self.parquet_tables = {} - - def to_polars_expr(self, expr): - # get Python wrapper for logical expression - expr = expr.to_variant() - - if isinstance(expr, Column): - return polars.col(expr.name()) - else: - raise Exception("unsupported expression: {}".format(expr)) - - def to_polars_df(self, plan): - # recurse down first to translate inputs into Polars data frames - inputs = [self.to_polars_df(x) for x in plan.inputs()] - - # get Python wrapper for logical operator node - node = plan.to_variant() - - if isinstance(node, Projection): - args = [self.to_polars_expr(expr) for expr in node.projections()] - return inputs[0].select(*args) - elif isinstance(node, Aggregate): - groupby_expr = [self.to_polars_expr(expr) for expr in node.group_by_exprs()] - aggs = [] - for expr in node.aggregate_exprs(): - expr = expr.to_variant() - if isinstance(expr, AggregateFunction): - if expr.aggregate_type() == "COUNT": - aggs.append(polars.count().alias("{}".format(expr))) - else: - raise Exception( - "Unsupported aggregate function {}".format( - expr.aggregate_type() - ) - ) - else: - raise Exception("Unsupported aggregate function {}".format(expr)) - df = inputs[0].groupby(groupby_expr).agg(aggs) - return df - elif isinstance(node, TableScan): - return polars.read_parquet(self.parquet_tables[node.table_name()]) - else: - raise Exception("unsupported logical operator: {}".format(type(node))) - - def create_schema(self, schema_name: str, **kwargs): - logger.debug(f"Creating schema: {schema_name}") - self.schemas[schema_name] = SqlSchema(schema_name) - self.context.register_schema(schema_name, SqlSchema(schema_name)) - - def update_schema(self, schema_name: str, new_schema: SqlSchema, **kwargs): - self.schemas[schema_name] = new_schema - - def drop_schema(self, schema_name, **kwargs): - del self.schemas[schema_name] - - def show_schemas(self, **kwargs): - return self.schemas - - def register_table(self, name, path, **kwargs): - self.parquet_tables[name] = path - self.datafusion_ctx.register_parquet(name, path) - - def explain(self, sql): - super.explain() - - def sql(self, sql): - datafusion_df = self.datafusion_ctx.sql(sql) - plan = datafusion_df.logical_plan() - return self.to_polars_df(plan) diff --git a/examples/sql-on-cudf.py b/examples/sql-on-cudf.py deleted file mode 100644 index b64d8f046..000000000 --- a/examples/sql-on-cudf.py +++ /dev/null @@ -1,24 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from datafusion.cudf import SessionContext - - -ctx = SessionContext() -ctx.register_table("taxi", "yellow_tripdata_2021-01.parquet") -df = ctx.sql("select passenger_count from taxi") -print(df) diff --git a/examples/sql-on-pandas.py b/examples/sql-on-pandas.py deleted file mode 100644 index e3312a201..000000000 --- a/examples/sql-on-pandas.py +++ /dev/null @@ -1,24 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from datafusion.pandas import SessionContext - - -ctx = SessionContext() -ctx.register_table("taxi", "yellow_tripdata_2021-01.parquet") -df = ctx.sql("select passenger_count from taxi") -print(df) diff --git a/examples/sql-on-polars.py b/examples/sql-on-polars.py deleted file mode 100644 index ffcb12b70..000000000 --- a/examples/sql-on-polars.py +++ /dev/null @@ -1,24 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from datafusion.polars import SessionContext - - -ctx = SessionContext() -ctx.register_table("taxi", "yellow_tripdata_2021-01.parquet") -df = ctx.sql("select passenger_count, count(*) from taxi group by passenger_count") -print(df)