From ce8f4b24f1ac1b9a474b77e353ad2a12f9491205 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Tue, 23 May 2023 06:36:39 +0000 Subject: [PATCH 01/10] Fix duckdb compatibility issues --- fugue_duckdb/_utils.py | 9 +++++-- fugue_duckdb/dask.py | 2 +- fugue_duckdb/dataframe.py | 27 ++++++++++++------- fugue_duckdb/execution_engine.py | 10 ++++--- fugue_polars/_utils.py | 39 ---------------------------- fugue_polars/polars_dataframe.py | 27 ++++++++++++++----- fugue_spark/execution_engine.py | 2 ++ setup.py | 2 +- tests/fugue_duckdb/test_dataframe.py | 4 +-- tests/fugue_duckdb/test_utils.py | 2 +- 10 files changed, 58 insertions(+), 66 deletions(-) diff --git a/fugue_duckdb/_utils.py b/fugue_duckdb/_utils.py index bbd44d2e..f8dbf614 100644 --- a/fugue_duckdb/_utils.py +++ b/fugue_duckdb/_utils.py @@ -27,7 +27,11 @@ "TIME": pa.time32("ms"), } -_PA_TYPES_TO_DUCK: Dict[pa.DataType, str] = {v: k for k, v in _DUCK_TYPES_TO_PA.items()} +_PA_TYPES_TO_DUCK: Dict[pa.DataType, str] = { + v: k + for k, v in list(_DUCK_TYPES_TO_PA.items()) + + [("VARCHAR", pa.large_string()), ("BLOB", pa.large_binary())] +} def encode_column_name(name: str) -> str: @@ -94,8 +98,9 @@ def to_duck_type(tp: pa.DataType) -> str: raise ValueError(f"can't convert {tp} to DuckDB data type") -def to_pa_type(duck_type: str) -> pa.DataType: +def to_pa_type(duck_type_raw: Any) -> pa.DataType: try: + duck_type = str(duck_type_raw) # for duckdb >= 0.8.0 if duck_type.endswith("[]"): return pa.list_(to_pa_type(duck_type[:-2])) p = duck_type.find("(") diff --git a/fugue_duckdb/dask.py b/fugue_duckdb/dask.py index b612de7d..8b456766 100644 --- a/fugue_duckdb/dask.py +++ b/fugue_duckdb/dask.py @@ -50,7 +50,7 @@ def to_df(self, df: Any, schema: Any = None) -> DuckDataFrame: res = DuckDataFrame(self.connection.from_df(ddf.as_pandas())) else: res = DuckDataFrame( - duckdb.arrow(ddf.as_arrow(), connection=self.connection) + duckdb.from_arrow(ddf.as_arrow(), connection=self.connection) ) if ddf.has_metadata: # pragma: no cover res.reset_metadata(ddf.metadata) diff --git a/fugue_duckdb/dataframe.py b/fugue_duckdb/dataframe.py index 5bf68e57..19f8aadf 100644 --- a/fugue_duckdb/dataframe.py +++ b/fugue_duckdb/dataframe.py @@ -4,14 +4,17 @@ import pyarrow as pa from duckdb import DuckDBPyRelation from triad import Schema +from triad.utils.pyarrow import LARGE_TYPES_REPLACEMENT, replace_types_in_table from fugue import ArrayDataFrame, ArrowDataFrame, DataFrame, LocalBoundedDataFrame from fugue.exceptions import FugueDataFrameOperationError, FugueDatasetEmptyError from fugue.plugins import ( + as_arrow, as_fugue_dataset, as_local_bounded, get_column_names, get_num_partitions, + get_schema, is_df, ) @@ -26,15 +29,7 @@ class DuckDataFrame(LocalBoundedDataFrame): def __init__(self, rel: DuckDBPyRelation): self._rel = rel - super().__init__(schema=self._get_schema) - - def _get_schema(self) -> Schema: - return Schema( - [ - pa.field(x, to_pa_type(y)) - for x, y in zip(self._rel.columns, self._rel.types) - ] - ) + super().__init__(schema=lambda: _duck_get_schema(self._rel)) @property def alias(self) -> str: @@ -98,7 +93,7 @@ def alter_columns(self, columns: Any) -> DataFrame: return DuckDataFrame(self._rel.project(", ".join(fields))) def as_arrow(self, type_safe: bool = False) -> pa.Table: - return self._rel.arrow() + return _duck_as_arrow(self._rel) def as_pandas(self) -> pd.DataFrame: if any(pa.types.is_nested(f.type) for f in self.schema.fields): @@ -169,6 +164,18 @@ def _duck_as_local(df: DuckDBPyRelation) -> DuckDBPyRelation: return df +@as_arrow.candidate(lambda df: isinstance(df, DuckDBPyRelation)) +def _duck_as_arrow(df: DuckDBPyRelation) -> pa.Table: + _df = df.arrow() + _df = replace_types_in_table(_df, LARGE_TYPES_REPLACEMENT, recursive=True) + return _df + + +@get_schema.candidate(lambda df: isinstance(df, DuckDBPyRelation)) +def _duck_get_schema(df: DuckDBPyRelation) -> Schema: + return Schema([pa.field(x, to_pa_type(y)) for x, y in zip(df.columns, df.types)]) + + @get_column_names.candidate(lambda df: isinstance(df, DuckDBPyRelation)) def _get_duckdb_columns(df: DuckDBPyRelation) -> List[Any]: return list(df.columns) diff --git a/fugue_duckdb/execution_engine.py b/fugue_duckdb/execution_engine.py index 1793fbf8..bc883fac 100644 --- a/fugue_duckdb/execution_engine.py +++ b/fugue_duckdb/execution_engine.py @@ -108,7 +108,7 @@ def _other_select(self, dfs: DataFrames, statement: str) -> DataFrame: conn = duckdb.connect() try: for k, v in dfs.items(): - duckdb.arrow(v.as_arrow(), connection=conn).create_view(k) + duckdb.from_arrow(v.as_arrow(), connection=conn).create_view(k) return ArrowDataFrame(conn.execute(statement).arrow()) finally: conn.close() @@ -229,7 +229,7 @@ def persist( # TODO: we should create DuckDB table, but it has bugs, so can't use by 0.3.1 if isinstance(df, DuckDataFrame): # materialize - res: DataFrame = ArrowDataFrame(df.native.arrow()) + res: DataFrame = ArrowDataFrame(df.as_arrow()) else: res = self.to_df(df) res.reset_metadata(df.metadata) @@ -540,12 +540,14 @@ def _gen_duck() -> DuckDataFrame: if isinstance(df, DuckDataFrame): return df rdf = DuckDataFrame( - duckdb.arrow(df.as_arrow(), connection=engine.connection) + duckdb.from_arrow(df.as_arrow(), connection=engine.connection) ) rdf.reset_metadata(df.metadata if df.has_metadata else None) return rdf tdf = ArrowDataFrame(df, schema) - return DuckDataFrame(duckdb.arrow(tdf.native, connection=engine.connection)) + return DuckDataFrame( + duckdb.from_arrow(tdf.native, connection=engine.connection) + ) res = _gen_duck() if create_view: diff --git a/fugue_polars/_utils.py b/fugue_polars/_utils.py index cd885d00..037dd45b 100644 --- a/fugue_polars/_utils.py +++ b/fugue_polars/_utils.py @@ -1,47 +1,8 @@ import polars as pl -import pyarrow as pa from triad import Schema -from triad.utils.pyarrow import get_alter_func from fugue.dataframe.arrow_dataframe import _build_empty_arrow -def pl_as_arrow(df: pl.DataFrame) -> pa.Table: - adf = df.to_arrow() - schema = convert_schema(adf.schema) - func = get_alter_func(adf.schema, schema, safe=False) - return func(adf) - - -def to_schema(df: pl.DataFrame) -> Schema: - return Schema(convert_schema(pl.DataFrame(schema=df.schema).to_arrow().schema)) - - def build_empty_pl(schema: Schema) -> pl.DataFrame: return pl.from_arrow(_build_empty_arrow(schema)) - - -def convert_schema(schema: pa.Schema) -> pa.Schema: - fields = [convert_field(f) for f in schema] - return pa.schema(fields) - - -def convert_field(field: pa.Field) -> pa.Field: - tp = convert_type(field.type) - if tp == field.type: - return field - return pa.field(field.name, tp) - - -def convert_type(tp: pa.DataType) -> pa.DataType: - if pa.types.is_struct(tp): - return pa.struct([convert_field(f) for f in tp]) - if pa.types.is_list(tp) or pa.types.is_large_list(tp): - return pa.list_(convert_type(tp.value_type)) - if pa.types.is_map(tp): # pragma: no cover - return pa.map_(convert_type(tp.key_type), convert_type(tp.value_type)) - if pa.types.is_large_string(tp): - return pa.string() - if pa.types.is_large_binary(tp): - return pa.binary() - return tp diff --git a/fugue_polars/polars_dataframe.py b/fugue_polars/polars_dataframe.py index f062eb5c..3d0d5c0d 100644 --- a/fugue_polars/polars_dataframe.py +++ b/fugue_polars/polars_dataframe.py @@ -6,9 +6,15 @@ from triad.collections.schema import Schema from triad.exceptions import InvalidOperationError from triad.utils.assertion import assert_or_throw +from triad.utils.pyarrow import ( + LARGE_TYPES_REPLACEMENT, + replace_types_in_schema, + replace_types_in_table, +) from fugue import ArrowDataFrame from fugue.api import ( + as_arrow, drop_columns, get_column_names, get_schema, @@ -28,7 +34,7 @@ ) from fugue.exceptions import FugueDataFrameOperationError -from ._utils import build_empty_pl, pl_as_arrow, to_schema +from ._utils import build_empty_pl class PolarsDataFrame(LocalBoundedDataFrame): @@ -55,7 +61,7 @@ def __init__( InvalidOperationError("can't reset schema for pl.DataFrame"), ) self._native = df - super().__init__(to_schema(df)) + super().__init__(_get_pl_schema(df)) @property def native(self) -> pl.DataFrame: @@ -107,7 +113,7 @@ def alter_columns(self, columns: Any) -> DataFrame: return PolarsDataFrame(pl.from_arrow(adf.native)) def as_arrow(self, type_safe: bool = False) -> pa.Table: - return pl_as_arrow(self.native) + return _pl_as_arrow(self.native) def as_array( self, columns: Optional[List[str]] = None, type_safe: bool = False @@ -121,7 +127,7 @@ def as_array_iterable( self, columns: Optional[List[str]] = None, type_safe: bool = False ) -> Iterable[Any]: if not self.empty: - yield from ArrowDataFrame(pl_as_arrow(self.native)).as_array_iterable( + yield from ArrowDataFrame(_pl_as_arrow(self.native)).as_array_iterable( columns=columns ) @@ -129,7 +135,7 @@ def as_dict_iterable( self, columns: Optional[List[str]] = None ) -> Iterable[Dict[str, Any]]: if not self.empty: - yield from ArrowDataFrame(pl_as_arrow(self.native)).as_dict_iterable( + yield from ArrowDataFrame(_pl_as_arrow(self.native)).as_dict_iterable( columns=columns ) @@ -144,6 +150,13 @@ def _pl_as_local_bounded(df: pl.DataFrame) -> pl.DataFrame: return df +@as_arrow.candidate(lambda df: isinstance(df, pl.DataFrame)) +def _pl_as_arrow(df: pl.DataFrame) -> pa.Table: + adf = df.to_arrow() + adf = replace_types_in_table(adf, LARGE_TYPES_REPLACEMENT) + return adf + + @is_df.candidate(lambda df: isinstance(df, pl.DataFrame)) def _pl_is_df(df: pl.DataFrame) -> bool: return True @@ -181,7 +194,9 @@ def _get_pl_columns(df: pl.DataFrame) -> List[Any]: @get_schema.candidate(lambda df: isinstance(df, pl.DataFrame)) def _get_pl_schema(df: pl.DataFrame) -> Schema: - return to_schema(df) + adf = df.to_arrow() + schema = replace_types_in_schema(adf.schema, LARGE_TYPES_REPLACEMENT) + return Schema(schema) @rename.candidate(lambda df, *args, **kwargs: isinstance(df, pl.DataFrame)) diff --git a/fugue_spark/execution_engine.py b/fugue_spark/execution_engine.py index 38a13690..c5da7687 100644 --- a/fugue_spark/execution_engine.py +++ b/fugue_spark/execution_engine.py @@ -246,6 +246,7 @@ def get_dfs() -> Iterable[LocalDataFrame]: ) if not cursor_set: cursor.set(lambda: pdf.peek_array(), 0, 0) + cursor_set = True yield pdf input_df = IterablePandasDataFrame(get_dfs(), input_schema) @@ -280,6 +281,7 @@ def get_dfs() -> Iterable[LocalDataFrame]: pdf = ArrowDataFrame(func(adf)) if not cursor_set: cursor.set(lambda: pdf.peek_array(), 0, 0) + cursor_set = True yield pdf input_df = IterableArrowDataFrame(get_dfs(), input_schema) diff --git a/setup.py b/setup.py index b2ca0c7b..097a6fe8 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ def get_version() -> str: keywords="distributed spark dask sql dsl domain specific language", url="http://github.com/fugue-project/fugue", install_requires=[ - "triad>=0.8.6", + "triad>=0.8.8", "adagio>=0.2.4", "qpd>=0.4.1", "fugue-sql-antlr>=0.1.6", diff --git a/tests/fugue_duckdb/test_dataframe.py b/tests/fugue_duckdb/test_dataframe.py index db25da60..bc6b351f 100644 --- a/tests/fugue_duckdb/test_dataframe.py +++ b/tests/fugue_duckdb/test_dataframe.py @@ -17,7 +17,7 @@ def setUpClass(cls): def df(self, data: Any = None, schema: Any = None) -> DuckDataFrame: df = ArrowDataFrame(data, schema) - return DuckDataFrame(duckdb.arrow(df.native, self._con)) + return DuckDataFrame(duckdb.from_arrow(df.native, self._con)) def test_as_array_special_values(self): for func in [ @@ -74,7 +74,7 @@ def setUpClass(cls): def df(self, data: Any = None, schema: Any = None) -> DuckDataFrame: df = ArrowDataFrame(data, schema) - return DuckDataFrame(duckdb.arrow(df.native, self._con)).native + return DuckDataFrame(duckdb.from_arrow(df.native, self._con)).native def to_native_df(self, pdf: pd.DataFrame) -> Any: return duckdb.from_df(pdf) diff --git a/tests/fugue_duckdb/test_utils.py b/tests/fugue_duckdb/test_utils.py index c515ece5..7c7aee25 100644 --- a/tests/fugue_duckdb/test_utils.py +++ b/tests/fugue_duckdb/test_utils.py @@ -42,7 +42,7 @@ def test_type_conversion(): con = duckdb.connect() def assert_(tp): - dt = duckdb.arrow(pa.Table.from_pydict(dict(a=pa.nulls(2, tp))), con).types[0] + dt = duckdb.from_arrow(pa.Table.from_pydict(dict(a=pa.nulls(2, tp))), con).types[0] assert to_pa_type(dt) == tp dt = to_duck_type(tp) assert to_pa_type(dt) == tp From 9d9a5761762682b7a3ce013db04f92e06e92092a Mon Sep 17 00:00:00 2001 From: Han Wang Date: Tue, 23 May 2023 07:01:41 +0000 Subject: [PATCH 02/10] update --- RELEASE.md | 3 +++ fugue_duckdb/execution_engine.py | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/RELEASE.md b/RELEASE.md index 9a742672..46108c12 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -2,6 +2,9 @@ ## 0.8.4 +- [471](https://github.com/fugue-project/fugue/issues/471) Fix compatibility issues for duckdb 0.8.0+ +- [464](https://github.com/fugue-project/fugue/issues/464) Support for spark/databricks connect +- [459](https://github.com/fugue-project/fugue/issues/459) DEPRECATION: Avro support - [455](https://github.com/fugue-project/fugue/issues/455) Make Fugue pandas 2 compatible ## 0.8.3 diff --git a/fugue_duckdb/execution_engine.py b/fugue_duckdb/execution_engine.py index bc883fac..ca67b43f 100644 --- a/fugue_duckdb/execution_engine.py +++ b/fugue_duckdb/execution_engine.py @@ -28,7 +28,7 @@ encode_schema_names, encode_value_to_expr, ) -from .dataframe import DuckDataFrame +from .dataframe import DuckDataFrame, _duck_as_arrow _FUGUE_DUCKDB_PRAGMA_CONFIG_PREFIX = "fugue.duckdb.pragma." _FUGUE_DUCKDB_EXTENSIONS = "fugue.duckdb.extensions" @@ -109,7 +109,7 @@ def _other_select(self, dfs: DataFrames, statement: str) -> DataFrame: try: for k, v in dfs.items(): duckdb.from_arrow(v.as_arrow(), connection=conn).create_view(k) - return ArrowDataFrame(conn.execute(statement).arrow()) + return ArrowDataFrame(_duck_as_arrow(conn.execute(statement))) finally: conn.close() From 172acec9463ee1976f73de814ae37ddc0b2c4bd8 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Tue, 23 May 2023 07:13:27 +0000 Subject: [PATCH 03/10] update --- RELEASE.md | 1 + 1 file changed, 1 insertion(+) diff --git a/RELEASE.md b/RELEASE.md index 46108c12..daddd462 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -3,6 +3,7 @@ ## 0.8.4 - [471](https://github.com/fugue-project/fugue/issues/471) Fix compatibility issues for duckdb 0.8.0+ +- [466](https://github.com/fugue-project/fugue/issues/466) Fix Ray 2.4.0 compatibility issue - [464](https://github.com/fugue-project/fugue/issues/464) Support for spark/databricks connect - [459](https://github.com/fugue-project/fugue/issues/459) DEPRECATION: Avro support - [455](https://github.com/fugue-project/fugue/issues/455) Make Fugue pandas 2 compatible From 17cae263fb68230df16c7e6508cdb6bb80815c7f Mon Sep 17 00:00:00 2001 From: Han Wang Date: Mon, 5 Jun 2023 07:13:35 +0000 Subject: [PATCH 04/10] Update spark --- fugue_spark/_utils/convert.py | 33 +++++++++++++++++++- fugue_spark/_utils/io.py | 14 +++++---- fugue_spark/execution_engine.py | 35 +++++++--------------- tests/fugue_spark/test_dataframe.py | 20 +++++-------- tests/fugue_spark/test_execution_engine.py | 10 ++++--- 5 files changed, 64 insertions(+), 48 deletions(-) diff --git a/fugue_spark/_utils/convert.py b/fugue_spark/_utils/convert.py index b1d89899..aa7ba05e 100644 --- a/fugue_spark/_utils/convert.py +++ b/fugue_spark/_utils/convert.py @@ -12,6 +12,14 @@ from triad.utils.pyarrow import TRIAD_DEFAULT_TIMESTAMP from triad.utils.schema import quote_name from .misc import is_spark_dataframe +import fugue.api as fa +from fugue import DataFrame + +try: + from pyspark.sql.types import TimestampNTZType # pylint: disable-all +except ImportError: # pragma: no cover + # pyspark < 3.2 + from pyspark.sql.types import TimestampType as TimestampNTZType def to_spark_schema(obj: Any) -> pt.StructType: @@ -108,9 +116,32 @@ def to_type_safe_input(rows: Iterable[ps.Row], schema: Schema) -> Iterable[List[ yield r +def to_spark_df(session: ps.SparkSession, df: Any, schema: Any = None) -> ps.DataFrame: + if schema is not None and not isinstance(schema, pt.StructType): + schema = to_spark_schema(schema) + if isinstance(df, pd.DataFrame): + if pd.__version__ >= "2" and session.version < "3.4": + # pyspark < 3.4 does not support pandas 2 when doing + # createDataFrame, see this issue: + # https://stackoverflow.com/a/75926954/12309438 + # this is a workaround with the cost of memory and speed. + if schema is None: + schema = to_spark_schema(fa.get_schema(df)) + df = fa.as_fugue_df(df).as_array(type_safe=True) + return session.createDataFrame(df, schema=schema) + if isinstance(df, DataFrame): + if schema is None: + schema = to_spark_schema(df.schema) + if pd.__version__ >= "2" and session.version < "3.4": + return session.createDataFrame(df.as_array(type_safe=True), schema=schema) + return session.createDataFrame(df.as_pandas(), schema=schema) + else: + return session.createDataFrame(df, schema=schema) + + def to_pandas(df: ps.DataFrame) -> pd.DataFrame: if pd.__version__ < "2" or not any( - isinstance(x.dataType, (pt.TimestampType, pt.TimestampNTZType)) + isinstance(x.dataType, (pt.TimestampType, TimestampNTZType)) for x in df.schema.fields ): return df.toPandas() diff --git a/fugue_spark/_utils/io.py b/fugue_spark/_utils/io.py index e424a832..fd4e8bf9 100644 --- a/fugue_spark/_utils/io.py +++ b/fugue_spark/_utils/io.py @@ -1,16 +1,18 @@ from typing import Any, Callable, Dict, List, Optional, Union import pyspark.sql as ps -from fugue.collections.partition import PartitionSpec -from fugue.dataframe import DataFrame -from fugue._utils.io import FileParser, save_df -from fugue_spark.dataframe import SparkDataFrame -from fugue_spark._utils.convert import to_schema, to_spark_schema from pyspark.sql import SparkSession from triad.collections import Schema +from triad.collections.dict import ParamDict from triad.collections.fs import FileSystem from triad.utils.assertion import assert_or_throw -from triad.collections.dict import ParamDict + +from fugue._utils.io import FileParser, save_df +from fugue.collections.partition import PartitionSpec +from fugue.dataframe import DataFrame +from fugue_spark.dataframe import SparkDataFrame + +from .convert import to_schema, to_spark_schema class SparkIO(object): diff --git a/fugue_spark/execution_engine.py b/fugue_spark/execution_engine.py index c5da7687..a29e4c75 100644 --- a/fugue_spark/execution_engine.py +++ b/fugue_spark/execution_engine.py @@ -43,7 +43,7 @@ from fugue.execution.execution_engine import ExecutionEngine, MapEngine, SQLEngine from ._constants import FUGUE_SPARK_CONF_USE_PANDAS_UDF, FUGUE_SPARK_DEFAULT_CONF -from ._utils.convert import to_schema, to_spark_schema, to_type_safe_input +from ._utils.convert import to_schema, to_spark_schema, to_type_safe_input, to_spark_df from ._utils.io import SparkIO from ._utils.misc import is_spark_connect as _is_spark_connect, is_spark_dataframe from ._utils.partition import even_repartition, hash_repartition, rand_repartition @@ -735,40 +735,29 @@ def _to_df() -> SparkDataFrame: ) if isinstance(df, SparkDataFrame): return df - if isinstance(df, ArrowDataFrame): - raw_df: Any = df.as_pandas() - sdf = self.spark_session.createDataFrame( - raw_df, to_spark_schema(df.schema) - ) - return SparkDataFrame(sdf, df.schema) if isinstance(df, (ArrayDataFrame, IterableDataFrame)): adf = ArrowDataFrame(df.as_array(type_safe=False), df.schema) - raw_df = adf.as_pandas() - sdf = self.spark_session.createDataFrame( - raw_df, to_spark_schema(df.schema) - ) + sdf = to_spark_df(self.spark_session, adf, df.schema) return SparkDataFrame(sdf, df.schema) if any(pa.types.is_struct(t) for t in df.schema.types): - sdf = self.spark_session.createDataFrame( - df.as_array(type_safe=True), to_spark_schema(df.schema) + sdf = to_spark_df( + self.spark_session, df.as_array(type_safe=True), df.schema ) else: - sdf = self.spark_session.createDataFrame( - df.as_pandas(), to_spark_schema(df.schema) - ) + sdf = to_spark_df(self.spark_session, df, df.schema) return SparkDataFrame(sdf, df.schema) if is_spark_dataframe(df): return SparkDataFrame(df, None if schema is None else to_schema(schema)) if isinstance(df, RDD): assert_arg_not_none(schema, "schema") - sdf = self.spark_session.createDataFrame(df, to_spark_schema(schema)) + sdf = to_spark_df(self.spark_session, df, schema) return SparkDataFrame(sdf, to_schema(schema)) if isinstance(df, pd.DataFrame): if PD_UTILS.empty(df): temp_schema = to_spark_schema(PD_UTILS.to_schema(df)) - sdf = self.spark_session.createDataFrame([], temp_schema) + sdf = to_spark_df(self.spark_session, [], temp_schema) else: - sdf = self.spark_session.createDataFrame(df) + sdf = to_spark_df(self.spark_session, df) return SparkDataFrame(sdf, schema) # use arrow dataframe here to handle nulls in int cols @@ -778,9 +767,7 @@ def _to_df() -> SparkDataFrame: adf = ArrowDataFrame(df, to_schema(schema)) map_pos = [i for i, t in enumerate(adf.schema.types) if pa.types.is_map(t)] if len(map_pos) == 0: - sdf = self.spark_session.createDataFrame( - adf.as_array(), to_spark_schema(adf.schema) - ) + sdf = to_spark_df(self.spark_session, adf.as_array(), adf.schema) else: def to_dict(rows: Iterable[List[Any]]) -> Iterable[List[Any]]: @@ -789,8 +776,8 @@ def to_dict(rows: Iterable[List[Any]]) -> Iterable[List[Any]]: row[p] = dict(row[p]) yield row - sdf = self.spark_session.createDataFrame( - to_dict(adf.as_array_iterable()), to_spark_schema(adf.schema) + sdf = to_spark_df( + self.spark_session, to_dict(adf.as_array_iterable()), adf.schema ) return SparkDataFrame(sdf, adf.schema) diff --git a/tests/fugue_spark/test_dataframe.py b/tests/fugue_spark/test_dataframe.py index 5063413a..d4e19dc5 100644 --- a/tests/fugue_spark/test_dataframe.py +++ b/tests/fugue_spark/test_dataframe.py @@ -12,7 +12,7 @@ from fugue.dataframe.pandas_dataframe import PandasDataFrame from fugue.plugins import get_column_names, rename from fugue_spark import SparkExecutionEngine -from fugue_spark._utils.convert import to_schema, to_spark_schema +from fugue_spark._utils.convert import to_schema, to_spark_schema, to_spark_df from fugue_spark.dataframe import SparkDataFrame from fugue_test.dataframe_suite import DataFrameTests @@ -42,7 +42,7 @@ def df(self, data: Any = None, schema: Any = None): return engine.to_df(data, schema=schema).native def to_native_df(self, pdf: pd.DataFrame) -> Any: - return self.spark_session.createDataFrame(pdf) + return to_spark_df(self.spark_session, pdf) def test_not_local(self): assert not fi.is_local(self.df([], "a:int,b:str")) @@ -131,30 +131,24 @@ def _df(data, schema=None): session = SparkSession.builder.getOrCreate() if schema is not None: pdf = PandasDataFrame(data, to_schema(schema)) - df = session.createDataFrame(pdf.native, to_spark_schema(schema)) + df = to_spark_df(session, pdf.native, schema) else: - df = session.createDataFrame(data) + df = to_spark_df(session, data) return SparkDataFrame(df, schema) def _test_get_column_names(spark_session): - df = spark_session.createDataFrame( - pd.DataFrame([[0, 1, 2]], columns=["0", "1", "2"]) - ) + df = to_spark_df(spark_session, pd.DataFrame([[0, 1, 2]], columns=["0", "1", "2"])) assert get_column_names(df) == ["0", "1", "2"] def _test_rename(spark_session): - pdf = spark_session.createDataFrame( - pd.DataFrame([[0, 1, 2]], columns=["a", "b", "c"]) - ) + pdf = to_spark_df(spark_session, pd.DataFrame([[0, 1, 2]], columns=["a", "b", "c"])) df = rename(pdf, {}) assert isinstance(df, ps.DataFrame) assert get_column_names(df) == ["a", "b", "c"] - pdf = spark_session.createDataFrame( - pd.DataFrame([[0, 1, 2]], columns=["0", "1", "2"]) - ) + pdf = to_spark_df(spark_session, pd.DataFrame([[0, 1, 2]], columns=["0", "1", "2"])) df = rename(pdf, {"0": "_0", "1": "_1", "2": "_2"}) assert isinstance(df, ps.DataFrame) assert get_column_names(df) == ["_0", "_1", "_2"] diff --git a/tests/fugue_spark/test_execution_engine.py b/tests/fugue_spark/test_execution_engine.py index 055bcd66..1d5a1168 100644 --- a/tests/fugue_spark/test_execution_engine.py +++ b/tests/fugue_spark/test_execution_engine.py @@ -2,12 +2,10 @@ import numpy as np import pandas as pd -import pyspark import pyspark.rdd as pr import pyspark.sql as ps import pytest from pyspark import SparkContext, StorageLevel -from pyspark.sql import DataFrame as SDataFrame from pyspark.sql import SparkSession from pytest import raises from triad import Schema @@ -26,7 +24,7 @@ from fugue.extensions.transformer import Transformer, transformer from fugue.plugins import infer_execution_engine from fugue.workflow.workflow import FugueWorkflow -from fugue_spark._utils.convert import to_pandas +from fugue_spark._utils.convert import to_pandas, to_spark_df from fugue_spark._utils.misc import is_spark_dataframe, is_spark_session from fugue_spark.dataframe import SparkDataFrame from fugue_spark.execution_engine import SparkExecutionEngine @@ -121,7 +119,7 @@ def test_sample_n(self): assert abs(len(b.as_array()) - 90) < 2 def test_infer_engine(self): - df = self.spark_session.createDataFrame(pd.DataFrame([[0]], columns=["a"])) + df = to_spark_df(self.spark_session, pd.DataFrame([[0]], columns=["a"])) assert is_spark_session(infer_execution_engine([df])) fdf = SparkDataFrame(df) @@ -216,6 +214,10 @@ def test_df_init(self): def test_yield_table(self): pass + def test_any_column_name(self): + if self.spark_session.version >= "3.3.1": + return super().test_any_column_name() + def test_default_session(self): a = FugueWorkflow().df([[0]], "a:int") df_eq(a.compute(SparkExecutionEngine), [[0]], "a:int") From 61c41574a35a09fa577a2d24619336ef2d28e91b Mon Sep 17 00:00:00 2001 From: Han Wang Date: Mon, 5 Jun 2023 07:38:10 +0000 Subject: [PATCH 05/10] update --- .github/workflows/test_spark.yml | 44 ++++++++++++++++++++++++++++++++ fugue_spark/_utils/convert.py | 27 +++++++++++--------- fugue_version/__init__.py | 2 +- setup.py | 2 +- 4 files changed, 61 insertions(+), 14 deletions(-) create mode 100644 .github/workflows/test_spark.yml diff --git a/.github/workflows/test_spark.yml b/.github/workflows/test_spark.yml new file mode 100644 index 00000000..808925b6 --- /dev/null +++ b/.github/workflows/test_spark.yml @@ -0,0 +1,44 @@ +# This workflow will install Python dependencies, run tests and lint with a variety of Python versions +# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions + +name: Spark Tests + +on: + push: + branches: [ master ] + paths-ignore: + - 'docs/**' + - '**.md' + pull_request: + branches: [ master ] + paths-ignore: + - 'docs/**' + - '**.md' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.8] + spark-version: ["3.1.1","3.4.0"] + pandas-version: ["1.5.3","2.0.1"] + + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v1 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: make devenv + - name: Install Spark ${{ matrix.spark-version }} + run: pip install "pyspark==${{ matrix.spark-version }}" + - name: Install Pandas ${{ matrix.pandas-version }} + run: pip install "pandas==${{ matrix.pandas-version }}" + - name: Test + run: make testspark diff --git a/fugue_spark/_utils/convert.py b/fugue_spark/_utils/convert.py index aa7ba05e..fc7ab3e5 100644 --- a/fugue_spark/_utils/convert.py +++ b/fugue_spark/_utils/convert.py @@ -1,6 +1,5 @@ from typing import Any, Iterable, List, Tuple -import cloudpickle import pandas as pd import pyarrow as pa import pyspark.sql as ps @@ -11,10 +10,12 @@ from triad.utils.assertion import assert_arg_not_none, assert_or_throw from triad.utils.pyarrow import TRIAD_DEFAULT_TIMESTAMP from triad.utils.schema import quote_name -from .misc import is_spark_dataframe + import fugue.api as fa from fugue import DataFrame +from .misc import is_spark_dataframe + try: from pyspark.sql.types import TimestampNTZType # pylint: disable-all except ImportError: # pragma: no cover @@ -120,7 +121,7 @@ def to_spark_df(session: ps.SparkSession, df: Any, schema: Any = None) -> ps.Dat if schema is not None and not isinstance(schema, pt.StructType): schema = to_spark_schema(schema) if isinstance(df, pd.DataFrame): - if pd.__version__ >= "2" and session.version < "3.4": + if pd.__version__ >= "2" and session.version < "3.4": # pragma: no cover # pyspark < 3.4 does not support pandas 2 when doing # createDataFrame, see this issue: # https://stackoverflow.com/a/75926954/12309438 @@ -130,9 +131,9 @@ def to_spark_df(session: ps.SparkSession, df: Any, schema: Any = None) -> ps.Dat df = fa.as_fugue_df(df).as_array(type_safe=True) return session.createDataFrame(df, schema=schema) if isinstance(df, DataFrame): - if schema is None: - schema = to_spark_schema(df.schema) - if pd.__version__ >= "2" and session.version < "3.4": + if pd.__version__ >= "2" and session.version < "3.4": # pragma: no cover + if schema is None: + schema = to_spark_schema(df.schema) return session.createDataFrame(df.as_array(type_safe=True), schema=schema) return session.createDataFrame(df.as_pandas(), schema=schema) else: @@ -145,14 +146,16 @@ def to_pandas(df: ps.DataFrame) -> pd.DataFrame: for x in df.schema.fields ): return df.toPandas() + else: + import cloudpickle - def serialize(dfs): # pragma: no cover - for df in dfs: - data = cloudpickle.dumps(df) - yield pd.DataFrame([[data]], columns=["data"]) + def serialize(dfs): # pragma: no cover + for df in dfs: + data = cloudpickle.dumps(df) + yield pd.DataFrame([[data]], columns=["data"]) - sdf = df.mapInPandas(serialize, schema="data binary") - return pd.concat(cloudpickle.loads(x.data) for x in sdf.collect()) + sdf = df.mapInPandas(serialize, schema="data binary") + return pd.concat(cloudpickle.loads(x.data) for x in sdf.collect()) # TODO: the following function always set nullable to true, diff --git a/fugue_version/__init__.py b/fugue_version/__init__.py index fa3ddd8c..af46754d 100644 --- a/fugue_version/__init__.py +++ b/fugue_version/__init__.py @@ -1 +1 @@ -__version__ = "0.8.4" +__version__ = "0.8.5" diff --git a/setup.py b/setup.py index 097a6fe8..9a176a76 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ def get_version() -> str: ], extras_require={ "cpp_sql_parser": ["fugue-sql-antlr[cpp]>=0.1.6"], - "spark": ["pyspark"], + "spark": ["pyspark>=3.1.1"], "dask": [ "dask[distributed,dataframe]; python_version < '3.8'", "dask[distributed,dataframe]>=2022.9.0; python_version >= '3.8'", From bc10247eaa4576c459aef142181070e3d510e9a2 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Mon, 5 Jun 2023 07:54:10 +0000 Subject: [PATCH 06/10] update --- .github/workflows/test_spark.yml | 24 +++++++++++--- .github/workflows/test_spark_connect.yml | 41 ------------------------ 2 files changed, 20 insertions(+), 45 deletions(-) delete mode 100644 .github/workflows/test_spark_connect.yml diff --git a/.github/workflows/test_spark.yml b/.github/workflows/test_spark.yml index 808925b6..a1d98457 100644 --- a/.github/workflows/test_spark.yml +++ b/.github/workflows/test_spark.yml @@ -20,20 +20,20 @@ concurrency: cancel-in-progress: true jobs: - build: + test_combinations: + name: Spark ${{ matrix.spark-version }} Pandas ${{ matrix.pandas-version }} runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8] spark-version: ["3.1.1","3.4.0"] pandas-version: ["1.5.3","2.0.1"] steps: - uses: actions/checkout@v2 - - name: Set up Python ${{ matrix.python-version }} + - name: Set up Python 3.9 uses: actions/setup-python@v1 with: - python-version: ${{ matrix.python-version }} + python-version: 3.9 - name: Install dependencies run: make devenv - name: Install Spark ${{ matrix.spark-version }} @@ -42,3 +42,19 @@ jobs: run: pip install "pandas==${{ matrix.pandas-version }}" - name: Test run: make testspark + + test_connect: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.9 + uses: actions/setup-python@v1 + with: + python-version: 3.9 + - name: Install dependencies + run: make devenv + - name: Setup Spark + run: make sparkconnect + - name: Test + run: make testsparkconnect diff --git a/.github/workflows/test_spark_connect.yml b/.github/workflows/test_spark_connect.yml deleted file mode 100644 index 11cc112f..00000000 --- a/.github/workflows/test_spark_connect.yml +++ /dev/null @@ -1,41 +0,0 @@ -# This workflow will install Python dependencies, run tests and lint with a variety of Python versions -# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions - -name: Test Spark Connect - -on: - push: - branches: [ master ] - paths-ignore: - - 'docs/**' - - '**.md' - pull_request: - branches: [ master ] - paths-ignore: - - 'docs/**' - - '**.md' - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -jobs: - build: - runs-on: ubuntu-latest - - strategy: - matrix: - python-version: [3.9] - - steps: - - uses: actions/checkout@v2 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v1 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: make devenv - - name: Setup Spark - run: make sparkconnect - - name: Test - run: make testsparkconnect From 68b45cbb83cb044f4c1e8d2736b7f8c69d3e9740 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Tue, 6 Jun 2023 06:02:06 +0000 Subject: [PATCH 07/10] update --- .github/workflows/test_spark.yml | 4 ++++ setup.py | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test_spark.yml b/.github/workflows/test_spark.yml index a1d98457..dc5985a7 100644 --- a/.github/workflows/test_spark.yml +++ b/.github/workflows/test_spark.yml @@ -40,10 +40,14 @@ jobs: run: pip install "pyspark==${{ matrix.spark-version }}" - name: Install Pandas ${{ matrix.pandas-version }} run: pip install "pandas==${{ matrix.pandas-version }}" + - name: Downgrade Ibis + if: matrix.spark-version <= "3.2.1" + run: pip install "ibis-framework<5" - name: Test run: make testspark test_connect: + name: Spark Connect runs-on: ubuntu-latest steps: diff --git a/setup.py b/setup.py index 9a176a76..2342f024 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ def get_version() -> str: setup( name="fugue", version=get_version(), - packages=find_packages(), + packages=find_packages(include=["fugue*"]), description="An abstraction layer for distributed computation", long_description=LONG_DESCRIPTION, long_description_content_type="text/markdown", @@ -62,7 +62,7 @@ def get_version() -> str: "notebook": ["notebook", "jupyterlab", "ipython>=7.10.0"], "all": [ "fugue-sql-antlr[cpp]>=0.1.6", - "pyspark", + "pyspark>=3.1.1", "dask[distributed,dataframe]; python_version < '3.8'", "dask[distributed,dataframe]>=2022.9.0; python_version >= '3.8'", "ray[data]>=2.0.0", From 3549664f8693e62d81f821c449035865be3d32c7 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Tue, 6 Jun 2023 06:09:30 +0000 Subject: [PATCH 08/10] update --- .github/workflows/test_spark.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_spark.yml b/.github/workflows/test_spark.yml index dc5985a7..baebc56e 100644 --- a/.github/workflows/test_spark.yml +++ b/.github/workflows/test_spark.yml @@ -41,7 +41,7 @@ jobs: - name: Install Pandas ${{ matrix.pandas-version }} run: pip install "pandas==${{ matrix.pandas-version }}" - name: Downgrade Ibis - if: matrix.spark-version <= "3.2.1" + if: matrix.spark-version == "3.1.1" run: pip install "ibis-framework<5" - name: Test run: make testspark From 307dabf396d0b6704ff4199cc873f00bf3038638 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Tue, 6 Jun 2023 06:11:10 +0000 Subject: [PATCH 09/10] update --- .github/workflows/test_spark.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_spark.yml b/.github/workflows/test_spark.yml index baebc56e..ebd64829 100644 --- a/.github/workflows/test_spark.yml +++ b/.github/workflows/test_spark.yml @@ -41,7 +41,7 @@ jobs: - name: Install Pandas ${{ matrix.pandas-version }} run: pip install "pandas==${{ matrix.pandas-version }}" - name: Downgrade Ibis - if: matrix.spark-version == "3.1.1" + if: matrix.spark-version < '3.4.0' run: pip install "ibis-framework<5" - name: Test run: make testspark From 605fd32fdba80ae2cb5c99160f9b3a94d8f327c8 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Tue, 6 Jun 2023 06:34:42 +0000 Subject: [PATCH 10/10] replace cloudpickle with pickle in fugue_spark --- fugue_spark/_utils/convert.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fugue_spark/_utils/convert.py b/fugue_spark/_utils/convert.py index fc7ab3e5..356fc3d4 100644 --- a/fugue_spark/_utils/convert.py +++ b/fugue_spark/_utils/convert.py @@ -1,3 +1,4 @@ +import pickle from typing import Any, Iterable, List, Tuple import pandas as pd @@ -147,15 +148,14 @@ def to_pandas(df: ps.DataFrame) -> pd.DataFrame: ): return df.toPandas() else: - import cloudpickle def serialize(dfs): # pragma: no cover for df in dfs: - data = cloudpickle.dumps(df) + data = pickle.dumps(df) yield pd.DataFrame([[data]], columns=["data"]) sdf = df.mapInPandas(serialize, schema="data binary") - return pd.concat(cloudpickle.loads(x.data) for x in sdf.collect()) + return pd.concat(pickle.loads(x.data) for x in sdf.collect()) # TODO: the following function always set nullable to true,