Skip to content

Commit

Permalink
[SPARK-45065][PYTHON][PS] Support Pandas 2.1.0
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to support pandas 2.1.0 for PySpark. See [What's new in 2.1.0](https://pandas.pydata.org/docs/dev/whatsnew/v2.1.0.html) for more detail.

### Why are the changes needed?

We should follow the latest version of pandas.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The existing CI should passed with Pandas 2.1.0

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42793 from itholic/pandas_2.1.0.

Lead-authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Co-authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
2 people authored and dongjoon-hyun committed Sep 18, 2023
1 parent 9813122 commit f83e5ec
Show file tree
Hide file tree
Showing 23 changed files with 415 additions and 67 deletions.
4 changes: 2 additions & 2 deletions dev/infra/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ RUN Rscript -e "devtools::install_version('roxygen2', version='7.2.0', repos='ht
# See more in SPARK-39735
ENV R_LIBS_SITE "/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library"

RUN pypy3 -m pip install numpy 'pandas<=2.0.3' scipy coverage matplotlib
RUN python3.9 -m pip install numpy pyarrow 'pandas<=2.0.3' scipy unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 'memory-profiler==0.60.0' 'scikit-learn==1.1.*'
RUN pypy3 -m pip install numpy 'pandas<=2.1.0' scipy coverage matplotlib
RUN python3.9 -m pip install numpy pyarrow 'pandas<=2.1.0' scipy unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 'memory-profiler==0.60.0' 'scikit-learn==1.1.*'

# Add Python deps for Spark Connect.
RUN python3.9 -m pip install 'grpcio>=1.48,<1.57' 'grpcio-status>=1.48,<1.57' 'protobuf==3.20.3' 'googleapis-common-protos==1.56.4'
Expand Down
3 changes: 3 additions & 0 deletions python/docs/source/migration_guide/pyspark_upgrade.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Upgrading PySpark
Upgrading from PySpark 3.5 to 4.0
---------------------------------

* In Spark 4.0, it is recommended to use Pandas version 2.0.0 or above with PySpark for optimal compatibility.
* In Spark 4.0, the minimum supported version for Pandas has been raised from 1.0.5 to 1.4.4 in PySpark.
* In Spark 4.0, the minimum supported version for Numpy has been raised from 1.15 to 1.21 in PySpark.
* In Spark 4.0, ``Int64Index`` and ``Float64Index`` have been removed from pandas API on Spark, ``Index`` should be used directly.
Expand All @@ -44,6 +45,8 @@ Upgrading from PySpark 3.5 to 4.0
* In Spark 4.0, ``squeeze`` parameter from ``ps.read_csv`` and ``ps.read_excel`` has been removed from pandas API on Spark.
* In Spark 4.0, ``null_counts`` parameter from ``DataFrame.info`` has been removed from pandas API on Spark, use ``show_counts`` instead.
* In Spark 4.0, the result of ``MultiIndex.append`` does not keep the index names from pandas API on Spark.
* In Spark 4.0, ``DataFrameGroupBy.agg`` with lists respecting ``as_index=False`` from pandas API on Spark.
* In Spark 4.0, ``DataFrame.stack`` guarantees the order of existing columns instead of sorting them lexicographically from pandas API on Spark.
* In Spark 4.0, ``True`` or ``False`` to ``inclusive`` parameter from ``Series.between`` has been removed from pandas API on Spark, use ``both`` or ``neither`` instead respectively.
* In Spark 4.0, ``Index.asi8`` has been removed from pandas API on Spark, use ``Index.astype`` instead.
* In Spark 4.0, ``Index.is_type_compatible`` has been removed from pandas API on Spark, use ``Index.isin`` instead.
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.pandas/frame.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ Function application, GroupBy & Window

DataFrame.apply
DataFrame.applymap
DataFrame.map
DataFrame.pipe
DataFrame.agg
DataFrame.aggregate
Expand Down
2 changes: 0 additions & 2 deletions python/pyspark/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,6 @@ def isnull(self: IndexOpsLike) -> IndexOpsLike:
NA values, such as None or numpy.NaN, get mapped to True values.
Everything else gets mapped to False values. Characters such as empty strings '' or
numpy.inf are not considered NA values
(unless you set pandas.options.mode.use_inf_as_na = True).
Returns
-------
Expand Down Expand Up @@ -1012,7 +1011,6 @@ def notnull(self: IndexOpsLike) -> IndexOpsLike:
Return a boolean same-sized object indicating if the values are not NA.
Non-missing values get mapped to True.
Characters such as empty strings '' or numpy.inf are not considered NA values
(unless you set pandas.options.mode.use_inf_as_na = True).
NA values, such as None or numpy.NaN, get mapped to False values.
Returns
Expand Down
143 changes: 135 additions & 8 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,8 @@ def applymap(self, func: Callable[[Any], Any]) -> "DataFrame":
This method applies a function that accepts and returns a scalar
to every element of a DataFrame.
.. deprecated:: 4.0.0
.. note:: this API executes the function once to infer the type which is
potentially expensive, for instance, when the dataset is created after
aggregations or sorting.
Expand Down Expand Up @@ -1321,7 +1323,74 @@ def applymap(self, func: Callable[[Any], Any]) -> "DataFrame":
0 1.000000 4.494400
1 11.262736 20.857489
"""
warnings.warn(
"DataFrame.applymap has been deprecated. Use DataFrame.map instead", FutureWarning
)

# TODO: We can implement shortcut theoretically since it creates new DataFrame
# anyway and we don't have to worry about operations on different DataFrames.
return self.map(func=func)

def map(self, func: Callable[[Any], Any]) -> "DataFrame":
"""
Apply a function to a Dataframe elementwise.
This method applies a function that accepts and returns a scalar
to every element of a DataFrame.
.. versionadded:: 4.0.0
DataFrame.applymap was deprecated and renamed to DataFrame.map.
.. note:: this API executes the function once to infer the type which is
potentially expensive, for instance, when the dataset is created after
aggregations or sorting.
To avoid this, specify return type in ``func``, for instance, as below:
>>> def square(x) -> np.int32:
... return x ** 2
pandas-on-Spark uses return type hints and does not try to infer the type.
Parameters
----------
func : callable
Python function returns a single value from a single value.
Returns
-------
DataFrame
Transformed DataFrame.
Examples
--------
>>> df = ps.DataFrame([[1, 2.12], [3.356, 4.567]])
>>> df
0 1
0 1.000 2.120
1 3.356 4.567
>>> def str_len(x) -> int:
... return len(str(x))
>>> df.map(str_len)
0 1
0 3 4
1 5 5
>>> def power(x) -> float:
... return x ** 2
>>> df.map(power)
0 1
0 1.000000 4.494400
1 11.262736 20.857489
You can omit type hints and let pandas-on-Spark infer its type.
>>> df.map(lambda x: x ** 2)
0 1
0 1.000000 4.494400
1 11.262736 20.857489
"""
# TODO: We can implement shortcut theoretically since it creates new DataFrame
# anyway and we don't have to worry about operations on different DataFrames.
return self._apply_series_op(lambda psser: psser.apply(func))
Expand Down Expand Up @@ -5556,6 +5625,10 @@ def from_records(
Parameters
----------
data : ndarray (structured dtype), list of tuples, dict, or DataFrame
.. deprecated:: 4.0.0
Passing a DataFrame is deprecated.
index : string, list of fields, array-like
Field of array to use as the index, alternately a specific set of input labels to use
exclude : sequence, default None
Expand Down Expand Up @@ -5952,6 +6025,9 @@ def fillna(
Method to use for filling holes in reindexed Series pad / ffill: propagate last valid
observation forward to next valid backfill / bfill:
use NEXT valid observation to fill gap
.. deprecated:: 4.0.0
axis : {0 or `index`}
1 and `columns` are not supported.
inplace : boolean, default False
Expand All @@ -5963,6 +6039,8 @@ def fillna(
this is the maximum number of entries along the entire axis where NaNs will be filled.
Must be greater than 0 if not None
.. deprecated:: 4.0.0
Returns
-------
DataFrame
Expand Down Expand Up @@ -6046,6 +6124,11 @@ def op(psser: ps.Series) -> ps.Series:
return psser._fillna(value=value, method=method, axis=axis, limit=limit)

elif method is not None:
warnings.warn(
"DataFrame.fillna with 'method' is deprecated and will raise in a future version. "
"Use DataFrame.ffill() or DataFrame.bfill() instead.",
FutureWarning,
)

def op(psser: ps.Series) -> ps.Series:
return psser._fillna(value=value, method=method, axis=axis, limit=limit)
Expand Down Expand Up @@ -6121,6 +6204,21 @@ def replace(
If value is a list or tuple, value should be of the same length with to_replace.
inplace : boolean, default False
Fill in place (do not create a new object)
limit : int, default None
Maximum size gap to forward or backward fill.
.. deprecated:: 4.0.0
regex : bool or str, default False
Whether to interpret to_replace and/or value as regular expressions.
If this is True then to_replace must be a string.
Alternatively, this could be a regular expression in which case to_replace must be None.
method : 'pad', default None
The method to use when for replacement, when to_replace is a scalar,
list or tuple and value is None.
.. deprecated:: 4.0.0
Returns
-------
Expand Down Expand Up @@ -6189,8 +6287,18 @@ def replace(
3 Hulk Smash
"""
if method != "pad":
warnings.warn(
"The 'method' keyword in DataFrame.replace is deprecated "
"and will be removed in a future version.",
FutureWarning,
)
raise NotImplementedError("replace currently works only for method='pad")
if limit is not None:
warnings.warn(
"The 'limit' keyword in DataFrame.replace is deprecated "
"and will be removed in a future version.",
FutureWarning,
)
raise NotImplementedError("replace currently works only when limit=None")
if regex is not False:
raise NotImplementedError("replace currently doesn't supports regex")
Expand Down Expand Up @@ -6221,6 +6329,13 @@ def op(psser: ps.Series) -> ps.Series:
return psser

else:
if value is None:
warnings.warn(
"DataFrame.replace without 'value' and with non-dict-like 'to_replace' "
"is deprecated and will raise in a future version. "
"Explicitly specify the new values instead.",
FutureWarning,
)

def op(psser: ps.Series) -> ps.Series:
return psser.replace(to_replace=to_replace, value=value, regex=regex)
Expand Down Expand Up @@ -6344,6 +6459,8 @@ def last(self, offset: Union[str, DateOffset]) -> "DataFrame":
When having a DataFrame with dates as index, this function can
select the last few rows based on a date offset.
.. deprecated:: 4.0.0
Parameters
----------
offset : str or DateOffset
Expand Down Expand Up @@ -6383,6 +6500,11 @@ def last(self, offset: Union[str, DateOffset]) -> "DataFrame":
3 observed days in the dataset, and therefore data for 2018-04-11 was
not returned.
"""
warnings.warn(
"last is deprecated and will be removed in a future version. "
"Please create a mask and filter using `.loc` instead",
FutureWarning,
)
# Check index type should be format DateTime
if not isinstance(self.index, ps.DatetimeIndex):
raise TypeError("'last' only supports a DatetimeIndex")
Expand All @@ -6401,6 +6523,8 @@ def first(self, offset: Union[str, DateOffset]) -> "DataFrame":
When having a DataFrame with dates as index, this function can
select the first few rows based on a date offset.
.. deprecated:: 4.0.0
Parameters
----------
offset : str or DateOffset
Expand Down Expand Up @@ -6440,6 +6564,11 @@ def first(self, offset: Union[str, DateOffset]) -> "DataFrame":
3 observed days in the dataset, and therefore data for 2018-04-13 was
not returned.
"""
warnings.warn(
"first is deprecated and will be removed in a future version. "
"Please create a mask and filter using `.loc` instead",
FutureWarning,
)
# Check index type should be format DatetimeIndex
if not isinstance(self.index, ps.DatetimeIndex):
raise TypeError("'first' only supports a DatetimeIndex")
Expand Down Expand Up @@ -10527,12 +10656,12 @@ def stack(self) -> DataFrameOrSeries:
kg m
cat 1.0 2.0
dog 3.0 4.0
>>> df_multi_level_cols2.stack().sort_index() # doctest: +SKIP
height weight
cat kg NaN 1.0
m 2.0 NaN
dog kg NaN 3.0
m 4.0 NaN
>>> df_multi_level_cols2.stack().sort_index()
weight height
cat kg 1.0 NaN
m NaN 2.0
dog kg 3.0 NaN
m NaN 4.0
"""
from pyspark.pandas.series import first_series

Expand All @@ -10558,8 +10687,6 @@ def stack(self) -> DataFrameOrSeries:

index_values.add(value)

column_labels = dict(sorted(column_labels.items(), key=lambda x: x[0]))

index_name = self._internal.column_label_names[-1]
column_label_names = self._internal.column_label_names[:-1]
if len(column_label_names) == 0:
Expand Down
42 changes: 42 additions & 0 deletions python/pyspark/pandas/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,13 @@ def sum(
>>> df['b'].sum(min_count=3)
nan
"""
if axis is None and isinstance(self, ps.DataFrame):
warnings.warn(
"The behavior of DataFrame.sum with axis=None is deprecated, "
"in a future version this will reduce over both axes and return a scalar. "
"To retain the old behavior, pass axis=0 (or do not pass axis)",
FutureWarning,
)
axis = validate_axis(axis)

if numeric_only is None and axis == 0:
Expand Down Expand Up @@ -1418,6 +1425,13 @@ def product(
>>> ps.Series([]).prod(min_count=1) # doctest: +SKIP
nan
"""
if axis is None and isinstance(self, ps.DataFrame):
warnings.warn(
"The behavior of DataFrame.product with axis=None is deprecated, "
"in a future version this will reduce over both axes and return a scalar. "
"To retain the old behavior, pass axis=0 (or do not pass axis)",
FutureWarning,
)
axis = validate_axis(axis)

if numeric_only is None and axis == 0:
Expand Down Expand Up @@ -1870,6 +1884,13 @@ def std(
if not isinstance(ddof, int):
raise TypeError("ddof must be integer")

if axis is None and isinstance(self, ps.DataFrame):
warnings.warn(
"The behavior of DataFrame.std with axis=None is deprecated, "
"in a future version this will reduce over both axes and return a scalar. "
"To retain the old behavior, pass axis=0 (or do not pass axis)",
FutureWarning,
)
axis = validate_axis(axis)

if numeric_only is None and axis == 0:
Expand Down Expand Up @@ -1962,6 +1983,13 @@ def var(
if not isinstance(ddof, int):
raise TypeError("ddof must be integer")

if axis is None and isinstance(self, ps.DataFrame):
warnings.warn(
"The behavior of DataFrame.var with axis=None is deprecated, "
"in a future version this will reduce over both axes and return a scalar. "
"To retain the old behavior, pass axis=0 (or do not pass axis)",
FutureWarning,
)
axis = validate_axis(axis)

if numeric_only is None and axis == 0:
Expand Down Expand Up @@ -2191,6 +2219,13 @@ def sem(
if not isinstance(ddof, int):
raise TypeError("ddof must be integer")

if axis is None and isinstance(self, ps.DataFrame):
warnings.warn(
"The behavior of DataFrame.sem with axis=None is deprecated, "
"in a future version this will reduce over both axes and return a scalar. "
"To retain the old behavior, pass axis=0 (or do not pass axis)",
FutureWarning,
)
axis = validate_axis(axis)

if numeric_only is None and axis == 0:
Expand Down Expand Up @@ -2448,6 +2483,8 @@ def bool(self) -> bool:
This must be a boolean scalar value, either True or False. Raise a ValueError if
the object does not have exactly 1 element, or that element is not boolean
.. deprecated:: 4.0.0
Returns
-------
bool
Expand Down Expand Up @@ -2479,6 +2516,11 @@ def bool(self) -> bool:
...
ValueError: bool cannot act on a non-boolean single element DataFrame
"""
warnings.warn(
f"{self.__class__.__name__}.bool is now deprecated "
"and will be removed in future version.",
FutureWarning,
)
if isinstance(self, ps.DataFrame):
df = self
elif isinstance(self, ps.Series):
Expand Down
Loading

0 comments on commit f83e5ec

Please sign in to comment.