Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 210 additions & 36 deletions python/pyspark/sql/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@
from pyspark.sql.types import (
ArrayType,
BinaryType,
BooleanType,
ByteType,
ShortType,
IntegerType,
LongType,
DataType,
FloatType,
DoubleType,
DecimalType,
GeographyType,
Geography,
Expand All @@ -43,8 +50,12 @@
StringType,
StructField,
StructType,
DateType,
TimeType,
TimestampNTZType,
TimestampType,
DayTimeIntervalType,
YearMonthIntervalType,
UserDefinedType,
VariantType,
VariantVal,
Expand Down Expand Up @@ -1012,6 +1023,62 @@ class ArrowArrayToPandasConversion:
where Arrow data needs to be converted to pandas for Python UDF processing.
"""

@classmethod
def convert(
cls,
arrow_column: Union["pa.Array", "pa.ChunkedArray"],
target_type: DataType,
*,
timezone: Optional[str] = None,
struct_in_pandas: str = "dict",
ndarray_as_list: bool = False,
df_for_struct: bool = False,
) -> Union["pd.Series", "pd.DataFrame"]:
"""
Convert a PyArrow Array or ChunkedArray to a pandas Series or DataFrame.

Parameters
----------
arrow_column : pa.Array or pa.ChunkedArray
The Arrow column to convert.
target_type : DataType
The target Spark type for the column to be converted to.
timezone : str, optional
Timezone for timestamp conversion. Required if the data contains timestamp types.
struct_in_pandas : str, optional
How to represent struct types in pandas. Valid values are "dict", "row", or "legacy".
Default is "dict".
ndarray_as_list : bool, optional
Whether to convert numpy ndarrays to Python lists. Default is False.
df_for_struct : bool, optional
If True, convert struct columns to a DataFrame with columns corresponding
to struct fields instead of a Series. Default is False.

Returns
-------
pd.Series or pd.DataFrame
Converted pandas Series. If df_for_struct is True and the type is StructType,
returns a DataFrame with columns corresponding to struct fields.
"""
if cls._prefer_convert_numpy(target_type, df_for_struct):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our target is to replace convert_legacy with convert_numpy

return cls.convert_numpy(
arrow_column,
target_type,
timezone=timezone,
struct_in_pandas=struct_in_pandas,
ndarray_as_list=ndarray_as_list,
df_for_struct=df_for_struct,
)

return cls.convert_legacy(
arrow_column,
target_type,
timezone=timezone,
struct_in_pandas=struct_in_pandas,
ndarray_as_list=ndarray_as_list,
df_for_struct=df_for_struct,
)

@classmethod
def convert_legacy(
cls,
Expand Down Expand Up @@ -1112,47 +1179,154 @@ def convert_legacy(
return converter(ser)

@classmethod
def convert(
def _prefer_convert_numpy(
cls,
arrow_column: Union["pa.Array", "pa.ChunkedArray"],
target_type: DataType,
spark_type: DataType,
df_for_struct: bool,
) -> bool:
supported_types = (
NullType,
BinaryType,
BooleanType,
FloatType,
DoubleType,
ByteType,
ShortType,
IntegerType,
LongType,
)
if df_for_struct and isinstance(spark_type, StructType):
return all(isinstance(f.dataType, supported_types) for f in spark_type.fields)
else:
return isinstance(spark_type, supported_types)

@classmethod
def convert_numpy(
cls,
arr: Union["pa.Array", "pa.ChunkedArray"],
spark_type: DataType,
*,
timezone: Optional[str] = None,
struct_in_pandas: str = "dict",
struct_in_pandas: Optional[str] = None,
ndarray_as_list: bool = False,
df_for_struct: bool = False,
) -> Union["pd.Series", "pd.DataFrame"]:
"""
Convert a PyArrow Array or ChunkedArray to a pandas Series or DataFrame.
import pyarrow as pa
import pandas as pd

Parameters
----------
arrow_column : pa.Array or pa.ChunkedArray
The Arrow column to convert.
target_type : DataType
The target Spark type for the column to be converted to.
timezone : str, optional
Timezone for timestamp conversion. Required if the data contains timestamp types.
struct_in_pandas : str, optional
How to represent struct types in pandas. Valid values are "dict", "row", or "legacy".
Default is "dict".
ndarray_as_list : bool, optional
Whether to convert numpy ndarrays to Python lists. Default is False.
df_for_struct : bool, optional
If True, convert struct columns to a DataFrame with columns corresponding
to struct fields instead of a Series. Default is False.
assert isinstance(arr, (pa.Array, pa.ChunkedArray))

Returns
-------
pd.Series or pd.DataFrame
Converted pandas Series. If df_for_struct is True and the type is StructType,
returns a DataFrame with columns corresponding to struct fields.
"""
return cls.convert_legacy(
arrow_column,
target_type,
timezone=timezone,
struct_in_pandas=struct_in_pandas,
ndarray_as_list=ndarray_as_list,
df_for_struct=df_for_struct,
)
if df_for_struct and isinstance(spark_type, StructType):
import pyarrow.types as types

assert types.is_struct(arr.type)
assert len(spark_type.names) == len(arr.type.names), f"{spark_type} {arr.type} "

series = [
cls.convert_numpy(
field_arr,
spark_type=field.dataType,
timezone=timezone,
struct_in_pandas=struct_in_pandas,
ndarray_as_list=ndarray_as_list,
df_for_struct=False, # always False for child fields
)
for field_arr, field in zip(arr.flatten(), spark_type)
]
pdf = pd.concat(series, axis=1)
pdf.columns = spark_type.names # type: ignore[assignment]
return pdf

arr = ArrowTimestampConversion.localize_tz(arr)

# TODO(SPARK-55332): Create benchmark for pa.array -> pd.series integer conversion
# 1, benchmark a nullable integral array
# a = pa.array(list(range(10000000)) + [9223372036854775707, None], type=pa.int64())
# %timeit a.to_pandas(types_mapper=pd.ArrowDtype)
# 11.9 μs ± 407 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
# %timeit a.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int64Dtype())
# 589 ms ± 9.35 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# %timeit pd.Series(a.to_pylist(), dtype=pd.Int64Dtype())
# 2.94 s ± 19.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# %timeit a.to_pandas(integer_object_nulls=True).astype(pd.Int64Dtype())
# 2.05 s ± 22.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# pd.Series(a, dtype=pd.Int64Dtype())
# fails due to internal np.float64 coercion
# OverflowError: Python int too large to convert to C long
#
# 2, benchmark a nullable integral array
# b = pa.array(list(range(10000000)) + [9223372036854775707, 1], type=pa.int64())
# %timeit b.to_pandas(types_mapper=pd.ArrowDtype).astype(np.int64)
# 30.2 μs ± 831 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
# %timeit pd.Series(b.to_pandas(types_mapper=pd.ArrowDtype), dtype=np.int64)
# 33.3 μs ± 928 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
# %timeit pd.Series(b, dtype=np.int64) <- lose the name
# 11.9 μs ± 125 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
# %timeit b.to_pandas()
# 7.56 μs ± 96.5 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
# %timeit b.to_pandas().astype(np.int64) <- astype is non-trivial
# 19.1 μs ± 242 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
if isinstance(spark_type, ByteType):
if arr.null_count > 0:
return arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int8Dtype())
else:
return arr.to_pandas()
elif isinstance(spark_type, ShortType):
if arr.null_count > 0:
return arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int16Dtype())
else:
return arr.to_pandas()
elif isinstance(spark_type, IntegerType):
if arr.null_count > 0:
return arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int32Dtype())
else:
return arr.to_pandas()
elif isinstance(spark_type, LongType):
if arr.null_count > 0:
return arr.to_pandas(types_mapper=pd.ArrowDtype).astype(pd.Int64Dtype())
else:
return arr.to_pandas()
elif isinstance(
spark_type,
(
NullType,
BinaryType,
BooleanType,
FloatType,
DoubleType,
DecimalType,
StringType,
DateType,
TimeType,
TimestampType,
TimestampNTZType,
DayTimeIntervalType,
YearMonthIntervalType,
),
):
# TODO(SPARK-55333): Revisit date_as_object in arrow->pandas conversion
# TODO(SPARK-55334): Implement coerce_temporal_nanoseconds
# If the given column is a date type column, creates a series of datetime.date directly
# instead of creating datetime64[ns] as intermediate data to avoid overflow caused by
# datetime64[ns] type handling.
# Cast dates to objects instead of datetime64[ns] dtype to avoid overflow.
pandas_options = {
"date_as_object": True,
"coerce_temporal_nanoseconds": True,
}
return arr.to_pandas(**pandas_options)
# elif isinstance(
# spark_type,
# (
# ArrayType,
# MapType,
# StructType,
# UserDefinedType,
# VariantType,
# GeographyType,
# GeometryType,
# ),
# ):
# TODO(SPARK-55324): Support complex types
else: # pragma: no cover
assert False, f"Need converter for {spark_type} but failed to find one."