Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Pandas 2+ and Spark < 3.4 incompatibility issues #474

Merged
merged 11 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions .github/workflows/test_spark.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# This workflow will install Python dependencies, run tests and lint with a variety of Python versions
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions

name: Spark Tests

on:
push:
branches: [ master ]
paths-ignore:
- 'docs/**'
- '**.md'
pull_request:
branches: [ master ]
paths-ignore:
- 'docs/**'
- '**.md'

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
test_combinations:
name: Spark ${{ matrix.spark-version }} Pandas ${{ matrix.pandas-version }}
runs-on: ubuntu-latest
strategy:
matrix:
spark-version: ["3.1.1","3.4.0"]
pandas-version: ["1.5.3","2.0.1"]

steps:
- uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v1
with:
python-version: 3.9
- name: Install dependencies
run: make devenv
- name: Install Spark ${{ matrix.spark-version }}
run: pip install "pyspark==${{ matrix.spark-version }}"
- name: Install Pandas ${{ matrix.pandas-version }}
run: pip install "pandas==${{ matrix.pandas-version }}"
- name: Downgrade Ibis
if: matrix.spark-version < '3.4.0'
run: pip install "ibis-framework<5"
- name: Test
run: make testspark

test_connect:
name: Spark Connect
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v1
with:
python-version: 3.9
- name: Install dependencies
run: make devenv
- name: Setup Spark
run: make sparkconnect
- name: Test
run: make testsparkconnect
41 changes: 0 additions & 41 deletions .github/workflows/test_spark_connect.yml

This file was deleted.

50 changes: 42 additions & 8 deletions fugue_spark/_utils/convert.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pickle
from typing import Any, Iterable, List, Tuple

import cloudpickle
import pandas as pd
import pyarrow as pa
import pyspark.sql as ps
Expand All @@ -11,8 +11,18 @@
from triad.utils.assertion import assert_arg_not_none, assert_or_throw
from triad.utils.pyarrow import TRIAD_DEFAULT_TIMESTAMP
from triad.utils.schema import quote_name

import fugue.api as fa
from fugue import DataFrame

from .misc import is_spark_dataframe

try:
from pyspark.sql.types import TimestampNTZType # pylint: disable-all
except ImportError: # pragma: no cover
# pyspark < 3.2
from pyspark.sql.types import TimestampType as TimestampNTZType


def to_spark_schema(obj: Any) -> pt.StructType:
assert_arg_not_none(obj, "schema")
Expand Down Expand Up @@ -108,20 +118,44 @@ def to_type_safe_input(rows: Iterable[ps.Row], schema: Schema) -> Iterable[List[
yield r


def to_spark_df(session: ps.SparkSession, df: Any, schema: Any = None) -> ps.DataFrame:
if schema is not None and not isinstance(schema, pt.StructType):
schema = to_spark_schema(schema)
if isinstance(df, pd.DataFrame):
if pd.__version__ >= "2" and session.version < "3.4": # pragma: no cover
# pyspark < 3.4 does not support pandas 2 when doing
# createDataFrame, see this issue:
# https://stackoverflow.com/a/75926954/12309438
# this is a workaround with the cost of memory and speed.
if schema is None:
schema = to_spark_schema(fa.get_schema(df))
df = fa.as_fugue_df(df).as_array(type_safe=True)
return session.createDataFrame(df, schema=schema)
if isinstance(df, DataFrame):
if pd.__version__ >= "2" and session.version < "3.4": # pragma: no cover
if schema is None:
schema = to_spark_schema(df.schema)
return session.createDataFrame(df.as_array(type_safe=True), schema=schema)
return session.createDataFrame(df.as_pandas(), schema=schema)
else:
return session.createDataFrame(df, schema=schema)


def to_pandas(df: ps.DataFrame) -> pd.DataFrame:
if pd.__version__ < "2" or not any(
isinstance(x.dataType, (pt.TimestampType, pt.TimestampNTZType))
isinstance(x.dataType, (pt.TimestampType, TimestampNTZType))
for x in df.schema.fields
):
return df.toPandas()
else:

def serialize(dfs): # pragma: no cover
for df in dfs:
data = cloudpickle.dumps(df)
yield pd.DataFrame([[data]], columns=["data"])
def serialize(dfs): # pragma: no cover
for df in dfs:
data = pickle.dumps(df)
yield pd.DataFrame([[data]], columns=["data"])

sdf = df.mapInPandas(serialize, schema="data binary")
return pd.concat(cloudpickle.loads(x.data) for x in sdf.collect())
sdf = df.mapInPandas(serialize, schema="data binary")
return pd.concat(pickle.loads(x.data) for x in sdf.collect())


# TODO: the following function always set nullable to true,
Expand Down
14 changes: 8 additions & 6 deletions fugue_spark/_utils/io.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
from typing import Any, Callable, Dict, List, Optional, Union

import pyspark.sql as ps
from fugue.collections.partition import PartitionSpec
from fugue.dataframe import DataFrame
from fugue._utils.io import FileParser, save_df
from fugue_spark.dataframe import SparkDataFrame
from fugue_spark._utils.convert import to_schema, to_spark_schema
from pyspark.sql import SparkSession
from triad.collections import Schema
from triad.collections.dict import ParamDict
from triad.collections.fs import FileSystem
from triad.utils.assertion import assert_or_throw
from triad.collections.dict import ParamDict

from fugue._utils.io import FileParser, save_df
from fugue.collections.partition import PartitionSpec
from fugue.dataframe import DataFrame
from fugue_spark.dataframe import SparkDataFrame

from .convert import to_schema, to_spark_schema


class SparkIO(object):
Expand Down
35 changes: 11 additions & 24 deletions fugue_spark/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from fugue.execution.execution_engine import ExecutionEngine, MapEngine, SQLEngine

from ._constants import FUGUE_SPARK_CONF_USE_PANDAS_UDF, FUGUE_SPARK_DEFAULT_CONF
from ._utils.convert import to_schema, to_spark_schema, to_type_safe_input
from ._utils.convert import to_schema, to_spark_schema, to_type_safe_input, to_spark_df
from ._utils.io import SparkIO
from ._utils.misc import is_spark_connect as _is_spark_connect, is_spark_dataframe
from ._utils.partition import even_repartition, hash_repartition, rand_repartition
Expand Down Expand Up @@ -735,40 +735,29 @@ def _to_df() -> SparkDataFrame:
)
if isinstance(df, SparkDataFrame):
return df
if isinstance(df, ArrowDataFrame):
raw_df: Any = df.as_pandas()
sdf = self.spark_session.createDataFrame(
raw_df, to_spark_schema(df.schema)
)
return SparkDataFrame(sdf, df.schema)
if isinstance(df, (ArrayDataFrame, IterableDataFrame)):
adf = ArrowDataFrame(df.as_array(type_safe=False), df.schema)
raw_df = adf.as_pandas()
sdf = self.spark_session.createDataFrame(
raw_df, to_spark_schema(df.schema)
)
sdf = to_spark_df(self.spark_session, adf, df.schema)
return SparkDataFrame(sdf, df.schema)
if any(pa.types.is_struct(t) for t in df.schema.types):
sdf = self.spark_session.createDataFrame(
df.as_array(type_safe=True), to_spark_schema(df.schema)
sdf = to_spark_df(
self.spark_session, df.as_array(type_safe=True), df.schema
)
else:
sdf = self.spark_session.createDataFrame(
df.as_pandas(), to_spark_schema(df.schema)
)
sdf = to_spark_df(self.spark_session, df, df.schema)
return SparkDataFrame(sdf, df.schema)
if is_spark_dataframe(df):
return SparkDataFrame(df, None if schema is None else to_schema(schema))
if isinstance(df, RDD):
assert_arg_not_none(schema, "schema")
sdf = self.spark_session.createDataFrame(df, to_spark_schema(schema))
sdf = to_spark_df(self.spark_session, df, schema)
return SparkDataFrame(sdf, to_schema(schema))
if isinstance(df, pd.DataFrame):
if PD_UTILS.empty(df):
temp_schema = to_spark_schema(PD_UTILS.to_schema(df))
sdf = self.spark_session.createDataFrame([], temp_schema)
sdf = to_spark_df(self.spark_session, [], temp_schema)
else:
sdf = self.spark_session.createDataFrame(df)
sdf = to_spark_df(self.spark_session, df)
return SparkDataFrame(sdf, schema)

# use arrow dataframe here to handle nulls in int cols
Expand All @@ -778,9 +767,7 @@ def _to_df() -> SparkDataFrame:
adf = ArrowDataFrame(df, to_schema(schema))
map_pos = [i for i, t in enumerate(adf.schema.types) if pa.types.is_map(t)]
if len(map_pos) == 0:
sdf = self.spark_session.createDataFrame(
adf.as_array(), to_spark_schema(adf.schema)
)
sdf = to_spark_df(self.spark_session, adf.as_array(), adf.schema)
else:

def to_dict(rows: Iterable[List[Any]]) -> Iterable[List[Any]]:
Expand All @@ -789,8 +776,8 @@ def to_dict(rows: Iterable[List[Any]]) -> Iterable[List[Any]]:
row[p] = dict(row[p])
yield row

sdf = self.spark_session.createDataFrame(
to_dict(adf.as_array_iterable()), to_spark_schema(adf.schema)
sdf = to_spark_df(
self.spark_session, to_dict(adf.as_array_iterable()), adf.schema
)
return SparkDataFrame(sdf, adf.schema)

Expand Down
2 changes: 1 addition & 1 deletion fugue_version/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.8.4"
__version__ = "0.8.5"
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def get_version() -> str:
setup(
name="fugue",
version=get_version(),
packages=find_packages(),
packages=find_packages(include=["fugue*"]),
description="An abstraction layer for distributed computation",
long_description=LONG_DESCRIPTION,
long_description_content_type="text/markdown",
Expand All @@ -42,7 +42,7 @@ def get_version() -> str:
],
extras_require={
"cpp_sql_parser": ["fugue-sql-antlr[cpp]>=0.1.6"],
"spark": ["pyspark"],
"spark": ["pyspark>=3.1.1"],
"dask": [
"dask[distributed,dataframe]; python_version < '3.8'",
"dask[distributed,dataframe]>=2022.9.0; python_version >= '3.8'",
Expand All @@ -62,7 +62,7 @@ def get_version() -> str:
"notebook": ["notebook", "jupyterlab", "ipython>=7.10.0"],
"all": [
"fugue-sql-antlr[cpp]>=0.1.6",
"pyspark",
"pyspark>=3.1.1",
"dask[distributed,dataframe]; python_version < '3.8'",
"dask[distributed,dataframe]>=2022.9.0; python_version >= '3.8'",
"ray[data]>=2.0.0",
Expand Down
20 changes: 7 additions & 13 deletions tests/fugue_spark/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from fugue.dataframe.pandas_dataframe import PandasDataFrame
from fugue.plugins import get_column_names, rename
from fugue_spark import SparkExecutionEngine
from fugue_spark._utils.convert import to_schema, to_spark_schema
from fugue_spark._utils.convert import to_schema, to_spark_schema, to_spark_df
from fugue_spark.dataframe import SparkDataFrame
from fugue_test.dataframe_suite import DataFrameTests

Expand Down Expand Up @@ -42,7 +42,7 @@ def df(self, data: Any = None, schema: Any = None):
return engine.to_df(data, schema=schema).native

def to_native_df(self, pdf: pd.DataFrame) -> Any:
return self.spark_session.createDataFrame(pdf)
return to_spark_df(self.spark_session, pdf)

def test_not_local(self):
assert not fi.is_local(self.df([], "a:int,b:str"))
Expand Down Expand Up @@ -131,30 +131,24 @@ def _df(data, schema=None):
session = SparkSession.builder.getOrCreate()
if schema is not None:
pdf = PandasDataFrame(data, to_schema(schema))
df = session.createDataFrame(pdf.native, to_spark_schema(schema))
df = to_spark_df(session, pdf.native, schema)
else:
df = session.createDataFrame(data)
df = to_spark_df(session, data)
return SparkDataFrame(df, schema)


def _test_get_column_names(spark_session):
df = spark_session.createDataFrame(
pd.DataFrame([[0, 1, 2]], columns=["0", "1", "2"])
)
df = to_spark_df(spark_session, pd.DataFrame([[0, 1, 2]], columns=["0", "1", "2"]))
assert get_column_names(df) == ["0", "1", "2"]


def _test_rename(spark_session):
pdf = spark_session.createDataFrame(
pd.DataFrame([[0, 1, 2]], columns=["a", "b", "c"])
)
pdf = to_spark_df(spark_session, pd.DataFrame([[0, 1, 2]], columns=["a", "b", "c"]))
df = rename(pdf, {})
assert isinstance(df, ps.DataFrame)
assert get_column_names(df) == ["a", "b", "c"]

pdf = spark_session.createDataFrame(
pd.DataFrame([[0, 1, 2]], columns=["0", "1", "2"])
)
pdf = to_spark_df(spark_session, pd.DataFrame([[0, 1, 2]], columns=["0", "1", "2"]))
df = rename(pdf, {"0": "_0", "1": "_1", "2": "_2"})
assert isinstance(df, ps.DataFrame)
assert get_column_names(df) == ["_0", "_1", "_2"]
Loading
Loading