diff --git a/.github/workflows/test_spark.yml b/.github/workflows/test_spark.yml new file mode 100644 index 00000000..ebd64829 --- /dev/null +++ b/.github/workflows/test_spark.yml @@ -0,0 +1,64 @@ +# This workflow will install Python dependencies, run tests and lint with a variety of Python versions +# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions + +name: Spark Tests + +on: + push: + branches: [ master ] + paths-ignore: + - 'docs/**' + - '**.md' + pull_request: + branches: [ master ] + paths-ignore: + - 'docs/**' + - '**.md' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + test_combinations: + name: Spark ${{ matrix.spark-version }} Pandas ${{ matrix.pandas-version }} + runs-on: ubuntu-latest + strategy: + matrix: + spark-version: ["3.1.1","3.4.0"] + pandas-version: ["1.5.3","2.0.1"] + + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.9 + uses: actions/setup-python@v1 + with: + python-version: 3.9 + - name: Install dependencies + run: make devenv + - name: Install Spark ${{ matrix.spark-version }} + run: pip install "pyspark==${{ matrix.spark-version }}" + - name: Install Pandas ${{ matrix.pandas-version }} + run: pip install "pandas==${{ matrix.pandas-version }}" + - name: Downgrade Ibis + if: matrix.spark-version < '3.4.0' + run: pip install "ibis-framework<5" + - name: Test + run: make testspark + + test_connect: + name: Spark Connect + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.9 + uses: actions/setup-python@v1 + with: + python-version: 3.9 + - name: Install dependencies + run: make devenv + - name: Setup Spark + run: make sparkconnect + - name: Test + run: make testsparkconnect diff --git a/.github/workflows/test_spark_connect.yml b/.github/workflows/test_spark_connect.yml deleted file mode 100644 index 11cc112f..00000000 --- a/.github/workflows/test_spark_connect.yml +++ /dev/null @@ -1,41 +0,0 @@ -# This workflow will install Python dependencies, run tests and lint with a variety of Python versions -# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions - -name: Test Spark Connect - -on: - push: - branches: [ master ] - paths-ignore: - - 'docs/**' - - '**.md' - pull_request: - branches: [ master ] - paths-ignore: - - 'docs/**' - - '**.md' - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -jobs: - build: - runs-on: ubuntu-latest - - strategy: - matrix: - python-version: [3.9] - - steps: - - uses: actions/checkout@v2 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v1 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: make devenv - - name: Setup Spark - run: make sparkconnect - - name: Test - run: make testsparkconnect diff --git a/fugue_spark/_utils/convert.py b/fugue_spark/_utils/convert.py index b1d89899..356fc3d4 100644 --- a/fugue_spark/_utils/convert.py +++ b/fugue_spark/_utils/convert.py @@ -1,6 +1,6 @@ +import pickle from typing import Any, Iterable, List, Tuple -import cloudpickle import pandas as pd import pyarrow as pa import pyspark.sql as ps @@ -11,8 +11,18 @@ from triad.utils.assertion import assert_arg_not_none, assert_or_throw from triad.utils.pyarrow import TRIAD_DEFAULT_TIMESTAMP from triad.utils.schema import quote_name + +import fugue.api as fa +from fugue import DataFrame + from .misc import is_spark_dataframe +try: + from pyspark.sql.types import TimestampNTZType # pylint: disable-all +except ImportError: # pragma: no cover + # pyspark < 3.2 + from pyspark.sql.types import TimestampType as TimestampNTZType + def to_spark_schema(obj: Any) -> pt.StructType: assert_arg_not_none(obj, "schema") @@ -108,20 +118,44 @@ def to_type_safe_input(rows: Iterable[ps.Row], schema: Schema) -> Iterable[List[ yield r +def to_spark_df(session: ps.SparkSession, df: Any, schema: Any = None) -> ps.DataFrame: + if schema is not None and not isinstance(schema, pt.StructType): + schema = to_spark_schema(schema) + if isinstance(df, pd.DataFrame): + if pd.__version__ >= "2" and session.version < "3.4": # pragma: no cover + # pyspark < 3.4 does not support pandas 2 when doing + # createDataFrame, see this issue: + # https://stackoverflow.com/a/75926954/12309438 + # this is a workaround with the cost of memory and speed. + if schema is None: + schema = to_spark_schema(fa.get_schema(df)) + df = fa.as_fugue_df(df).as_array(type_safe=True) + return session.createDataFrame(df, schema=schema) + if isinstance(df, DataFrame): + if pd.__version__ >= "2" and session.version < "3.4": # pragma: no cover + if schema is None: + schema = to_spark_schema(df.schema) + return session.createDataFrame(df.as_array(type_safe=True), schema=schema) + return session.createDataFrame(df.as_pandas(), schema=schema) + else: + return session.createDataFrame(df, schema=schema) + + def to_pandas(df: ps.DataFrame) -> pd.DataFrame: if pd.__version__ < "2" or not any( - isinstance(x.dataType, (pt.TimestampType, pt.TimestampNTZType)) + isinstance(x.dataType, (pt.TimestampType, TimestampNTZType)) for x in df.schema.fields ): return df.toPandas() + else: - def serialize(dfs): # pragma: no cover - for df in dfs: - data = cloudpickle.dumps(df) - yield pd.DataFrame([[data]], columns=["data"]) + def serialize(dfs): # pragma: no cover + for df in dfs: + data = pickle.dumps(df) + yield pd.DataFrame([[data]], columns=["data"]) - sdf = df.mapInPandas(serialize, schema="data binary") - return pd.concat(cloudpickle.loads(x.data) for x in sdf.collect()) + sdf = df.mapInPandas(serialize, schema="data binary") + return pd.concat(pickle.loads(x.data) for x in sdf.collect()) # TODO: the following function always set nullable to true, diff --git a/fugue_spark/_utils/io.py b/fugue_spark/_utils/io.py index e424a832..fd4e8bf9 100644 --- a/fugue_spark/_utils/io.py +++ b/fugue_spark/_utils/io.py @@ -1,16 +1,18 @@ from typing import Any, Callable, Dict, List, Optional, Union import pyspark.sql as ps -from fugue.collections.partition import PartitionSpec -from fugue.dataframe import DataFrame -from fugue._utils.io import FileParser, save_df -from fugue_spark.dataframe import SparkDataFrame -from fugue_spark._utils.convert import to_schema, to_spark_schema from pyspark.sql import SparkSession from triad.collections import Schema +from triad.collections.dict import ParamDict from triad.collections.fs import FileSystem from triad.utils.assertion import assert_or_throw -from triad.collections.dict import ParamDict + +from fugue._utils.io import FileParser, save_df +from fugue.collections.partition import PartitionSpec +from fugue.dataframe import DataFrame +from fugue_spark.dataframe import SparkDataFrame + +from .convert import to_schema, to_spark_schema class SparkIO(object): diff --git a/fugue_spark/execution_engine.py b/fugue_spark/execution_engine.py index c5da7687..a29e4c75 100644 --- a/fugue_spark/execution_engine.py +++ b/fugue_spark/execution_engine.py @@ -43,7 +43,7 @@ from fugue.execution.execution_engine import ExecutionEngine, MapEngine, SQLEngine from ._constants import FUGUE_SPARK_CONF_USE_PANDAS_UDF, FUGUE_SPARK_DEFAULT_CONF -from ._utils.convert import to_schema, to_spark_schema, to_type_safe_input +from ._utils.convert import to_schema, to_spark_schema, to_type_safe_input, to_spark_df from ._utils.io import SparkIO from ._utils.misc import is_spark_connect as _is_spark_connect, is_spark_dataframe from ._utils.partition import even_repartition, hash_repartition, rand_repartition @@ -735,40 +735,29 @@ def _to_df() -> SparkDataFrame: ) if isinstance(df, SparkDataFrame): return df - if isinstance(df, ArrowDataFrame): - raw_df: Any = df.as_pandas() - sdf = self.spark_session.createDataFrame( - raw_df, to_spark_schema(df.schema) - ) - return SparkDataFrame(sdf, df.schema) if isinstance(df, (ArrayDataFrame, IterableDataFrame)): adf = ArrowDataFrame(df.as_array(type_safe=False), df.schema) - raw_df = adf.as_pandas() - sdf = self.spark_session.createDataFrame( - raw_df, to_spark_schema(df.schema) - ) + sdf = to_spark_df(self.spark_session, adf, df.schema) return SparkDataFrame(sdf, df.schema) if any(pa.types.is_struct(t) for t in df.schema.types): - sdf = self.spark_session.createDataFrame( - df.as_array(type_safe=True), to_spark_schema(df.schema) + sdf = to_spark_df( + self.spark_session, df.as_array(type_safe=True), df.schema ) else: - sdf = self.spark_session.createDataFrame( - df.as_pandas(), to_spark_schema(df.schema) - ) + sdf = to_spark_df(self.spark_session, df, df.schema) return SparkDataFrame(sdf, df.schema) if is_spark_dataframe(df): return SparkDataFrame(df, None if schema is None else to_schema(schema)) if isinstance(df, RDD): assert_arg_not_none(schema, "schema") - sdf = self.spark_session.createDataFrame(df, to_spark_schema(schema)) + sdf = to_spark_df(self.spark_session, df, schema) return SparkDataFrame(sdf, to_schema(schema)) if isinstance(df, pd.DataFrame): if PD_UTILS.empty(df): temp_schema = to_spark_schema(PD_UTILS.to_schema(df)) - sdf = self.spark_session.createDataFrame([], temp_schema) + sdf = to_spark_df(self.spark_session, [], temp_schema) else: - sdf = self.spark_session.createDataFrame(df) + sdf = to_spark_df(self.spark_session, df) return SparkDataFrame(sdf, schema) # use arrow dataframe here to handle nulls in int cols @@ -778,9 +767,7 @@ def _to_df() -> SparkDataFrame: adf = ArrowDataFrame(df, to_schema(schema)) map_pos = [i for i, t in enumerate(adf.schema.types) if pa.types.is_map(t)] if len(map_pos) == 0: - sdf = self.spark_session.createDataFrame( - adf.as_array(), to_spark_schema(adf.schema) - ) + sdf = to_spark_df(self.spark_session, adf.as_array(), adf.schema) else: def to_dict(rows: Iterable[List[Any]]) -> Iterable[List[Any]]: @@ -789,8 +776,8 @@ def to_dict(rows: Iterable[List[Any]]) -> Iterable[List[Any]]: row[p] = dict(row[p]) yield row - sdf = self.spark_session.createDataFrame( - to_dict(adf.as_array_iterable()), to_spark_schema(adf.schema) + sdf = to_spark_df( + self.spark_session, to_dict(adf.as_array_iterable()), adf.schema ) return SparkDataFrame(sdf, adf.schema) diff --git a/fugue_version/__init__.py b/fugue_version/__init__.py index fa3ddd8c..af46754d 100644 --- a/fugue_version/__init__.py +++ b/fugue_version/__init__.py @@ -1 +1 @@ -__version__ = "0.8.4" +__version__ = "0.8.5" diff --git a/setup.py b/setup.py index 097a6fe8..2342f024 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ def get_version() -> str: setup( name="fugue", version=get_version(), - packages=find_packages(), + packages=find_packages(include=["fugue*"]), description="An abstraction layer for distributed computation", long_description=LONG_DESCRIPTION, long_description_content_type="text/markdown", @@ -42,7 +42,7 @@ def get_version() -> str: ], extras_require={ "cpp_sql_parser": ["fugue-sql-antlr[cpp]>=0.1.6"], - "spark": ["pyspark"], + "spark": ["pyspark>=3.1.1"], "dask": [ "dask[distributed,dataframe]; python_version < '3.8'", "dask[distributed,dataframe]>=2022.9.0; python_version >= '3.8'", @@ -62,7 +62,7 @@ def get_version() -> str: "notebook": ["notebook", "jupyterlab", "ipython>=7.10.0"], "all": [ "fugue-sql-antlr[cpp]>=0.1.6", - "pyspark", + "pyspark>=3.1.1", "dask[distributed,dataframe]; python_version < '3.8'", "dask[distributed,dataframe]>=2022.9.0; python_version >= '3.8'", "ray[data]>=2.0.0", diff --git a/tests/fugue_spark/test_dataframe.py b/tests/fugue_spark/test_dataframe.py index 5063413a..d4e19dc5 100644 --- a/tests/fugue_spark/test_dataframe.py +++ b/tests/fugue_spark/test_dataframe.py @@ -12,7 +12,7 @@ from fugue.dataframe.pandas_dataframe import PandasDataFrame from fugue.plugins import get_column_names, rename from fugue_spark import SparkExecutionEngine -from fugue_spark._utils.convert import to_schema, to_spark_schema +from fugue_spark._utils.convert import to_schema, to_spark_schema, to_spark_df from fugue_spark.dataframe import SparkDataFrame from fugue_test.dataframe_suite import DataFrameTests @@ -42,7 +42,7 @@ def df(self, data: Any = None, schema: Any = None): return engine.to_df(data, schema=schema).native def to_native_df(self, pdf: pd.DataFrame) -> Any: - return self.spark_session.createDataFrame(pdf) + return to_spark_df(self.spark_session, pdf) def test_not_local(self): assert not fi.is_local(self.df([], "a:int,b:str")) @@ -131,30 +131,24 @@ def _df(data, schema=None): session = SparkSession.builder.getOrCreate() if schema is not None: pdf = PandasDataFrame(data, to_schema(schema)) - df = session.createDataFrame(pdf.native, to_spark_schema(schema)) + df = to_spark_df(session, pdf.native, schema) else: - df = session.createDataFrame(data) + df = to_spark_df(session, data) return SparkDataFrame(df, schema) def _test_get_column_names(spark_session): - df = spark_session.createDataFrame( - pd.DataFrame([[0, 1, 2]], columns=["0", "1", "2"]) - ) + df = to_spark_df(spark_session, pd.DataFrame([[0, 1, 2]], columns=["0", "1", "2"])) assert get_column_names(df) == ["0", "1", "2"] def _test_rename(spark_session): - pdf = spark_session.createDataFrame( - pd.DataFrame([[0, 1, 2]], columns=["a", "b", "c"]) - ) + pdf = to_spark_df(spark_session, pd.DataFrame([[0, 1, 2]], columns=["a", "b", "c"])) df = rename(pdf, {}) assert isinstance(df, ps.DataFrame) assert get_column_names(df) == ["a", "b", "c"] - pdf = spark_session.createDataFrame( - pd.DataFrame([[0, 1, 2]], columns=["0", "1", "2"]) - ) + pdf = to_spark_df(spark_session, pd.DataFrame([[0, 1, 2]], columns=["0", "1", "2"])) df = rename(pdf, {"0": "_0", "1": "_1", "2": "_2"}) assert isinstance(df, ps.DataFrame) assert get_column_names(df) == ["_0", "_1", "_2"] diff --git a/tests/fugue_spark/test_execution_engine.py b/tests/fugue_spark/test_execution_engine.py index 055bcd66..1d5a1168 100644 --- a/tests/fugue_spark/test_execution_engine.py +++ b/tests/fugue_spark/test_execution_engine.py @@ -2,12 +2,10 @@ import numpy as np import pandas as pd -import pyspark import pyspark.rdd as pr import pyspark.sql as ps import pytest from pyspark import SparkContext, StorageLevel -from pyspark.sql import DataFrame as SDataFrame from pyspark.sql import SparkSession from pytest import raises from triad import Schema @@ -26,7 +24,7 @@ from fugue.extensions.transformer import Transformer, transformer from fugue.plugins import infer_execution_engine from fugue.workflow.workflow import FugueWorkflow -from fugue_spark._utils.convert import to_pandas +from fugue_spark._utils.convert import to_pandas, to_spark_df from fugue_spark._utils.misc import is_spark_dataframe, is_spark_session from fugue_spark.dataframe import SparkDataFrame from fugue_spark.execution_engine import SparkExecutionEngine @@ -121,7 +119,7 @@ def test_sample_n(self): assert abs(len(b.as_array()) - 90) < 2 def test_infer_engine(self): - df = self.spark_session.createDataFrame(pd.DataFrame([[0]], columns=["a"])) + df = to_spark_df(self.spark_session, pd.DataFrame([[0]], columns=["a"])) assert is_spark_session(infer_execution_engine([df])) fdf = SparkDataFrame(df) @@ -216,6 +214,10 @@ def test_df_init(self): def test_yield_table(self): pass + def test_any_column_name(self): + if self.spark_session.version >= "3.3.1": + return super().test_any_column_name() + def test_default_session(self): a = FugueWorkflow().df([[0]], "a:int") df_eq(a.compute(SparkExecutionEngine), [[0]], "a:int")