Skip to content

Commit

Permalink
[SPARK-46858][PYTHON][PS][BUILD] Upgrade Pandas to 2.2.0
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to upgrade Pandas to 2.2.0.

See [What's new in 2.2.0 (January 19, 2024)](https://pandas.pydata.org/docs/whatsnew/v2.2.0.html)

### Why are the changes needed?

Pandas 2.2.0 is released, and we should support the latest Pandas.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The existing CI should pass

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44881 from itholic/pandas_2.2.0.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
itholic authored and dongjoon-hyun committed Feb 20, 2024
1 parent e6a3385 commit 8e82887
Show file tree
Hide file tree
Showing 25 changed files with 253 additions and 105 deletions.
4 changes: 2 additions & 2 deletions dev/infra/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ RUN mkdir -p /usr/local/pypy/pypy3.8 && \
ln -sf /usr/local/pypy/pypy3.8/bin/pypy /usr/local/bin/pypy3.8 && \
ln -sf /usr/local/pypy/pypy3.8/bin/pypy /usr/local/bin/pypy3
RUN curl -sS https://bootstrap.pypa.io/get-pip.py | pypy3
RUN pypy3 -m pip install numpy 'six==1.16.0' 'pandas<=2.1.4' scipy coverage matplotlib lxml
RUN pypy3 -m pip install numpy 'six==1.16.0' 'pandas<=2.2.0' scipy coverage matplotlib lxml


ARG BASIC_PIP_PKGS="numpy pyarrow>=15.0.0 six==1.16.0 pandas<=2.1.4 scipy plotly>=4.8 mlflow>=2.8.1 coverage matplotlib openpyxl memory-profiler>=0.61.0 scikit-learn>=1.3.2"
ARG BASIC_PIP_PKGS="numpy pyarrow>=15.0.0 six==1.16.0 pandas<=2.2.0 scipy plotly>=4.8 mlflow>=2.8.1 coverage matplotlib openpyxl memory-profiler>=0.61.0 scikit-learn>=1.3.2"
# Python deps for Spark Connect
ARG CONNECT_PIP_PKGS="grpcio==1.59.3 grpcio-status==1.59.3 protobuf==4.25.1 googleapis-common-protos==1.56.4"

Expand Down
1 change: 1 addition & 0 deletions python/docs/source/migration_guide/pyspark_upgrade.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ Upgrading from PySpark 3.5 to 4.0
* In Spark 4.0, ``Series.dt.week`` and ``Series.dt.weekofyear`` have been removed from Pandas API on Spark, use ``Series.dt.isocalendar().week`` instead.
* In Spark 4.0, when applying ``astype`` to a decimal type object, the existing missing value is changed to ``True`` instead of ``False`` from Pandas API on Spark.
* In Spark 4.0, ``pyspark.testing.assertPandasOnSparkEqual`` has been removed from Pandas API on Spark, use ``pyspark.pandas.testing.assert_frame_equal`` instead.
* In Spark 4.0, the aliases ``Y``, ``M``, ``H``, ``T``, ``S`` have been deprecated from Pandas API on Spark, use ``YE``, ``ME``, ``h``, ``min``, ``s`` instead respectively.



Expand Down
6 changes: 4 additions & 2 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -10609,8 +10609,10 @@ def melt(
name_like_string(name) if name is not None else "variable_{}".format(i)
for i, name in enumerate(self._internal.column_label_names)
]
elif isinstance(var_name, str):
var_name = [var_name]
elif is_list_like(var_name):
raise ValueError(f"{var_name=} must be a scalar.")
else:
var_name = [var_name] # type: ignore[list-item]

pairs = F.explode(
F.array(
Expand Down
5 changes: 4 additions & 1 deletion python/pyspark/pandas/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -2554,7 +2554,10 @@ def resolve_func(psdf, this_column_labels, that_column_labels):
if isinstance(obj, Series):
num_series += 1
series_names.add(obj.name)
new_objs.append(obj.to_frame(DEFAULT_SERIES_NAME))
if not ignore_index and not should_return_series:
new_objs.append(obj.to_frame())
else:
new_objs.append(obj.to_frame(DEFAULT_SERIES_NAME))
else:
assert isinstance(obj, DataFrame)
new_objs.append(obj)
Expand Down
99 changes: 81 additions & 18 deletions python/pyspark/pandas/plot/matplotlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
# limitations under the License.
#

from typing import final

from pyspark.loose_version import LooseVersion

import matplotlib as mat
import numpy as np
from matplotlib.axes._base import _process_plot_format # type: ignore[attr-defined]
from matplotlib.figure import Figure
from pandas.core.dtypes.inference import is_list_like
from pandas.io.formats.printing import pprint_thing
from pandas.plotting._matplotlib import ( # type: ignore[attr-defined]
Expand All @@ -44,10 +47,29 @@
unsupported_function,
KdePlotBase,
)
from pyspark.pandas.series import Series, first_series

_all_kinds = PlotAccessor._all_kinds # type: ignore[attr-defined]


def _set_ticklabels(ax, labels, is_vertical, **kwargs) -> None:
"""Set the tick labels of a given axis.
Due to https://github.com/matplotlib/matplotlib/pull/17266, we need to handle the
case of repeated ticks (due to `FixedLocator`) and thus we duplicate the number of
labels.
"""
ticks = ax.get_xticks() if is_vertical else ax.get_yticks()
if len(ticks) != len(labels):
i, remainder = divmod(len(ticks), len(labels))
assert remainder == 0, remainder
labels *= i
if is_vertical:
ax.set_xticklabels(labels, **kwargs)
else:
ax.set_yticklabels(labels, **kwargs)


class PandasOnSparkBarPlot(PandasBarPlot, TopNPlotBase):
_kind = "bar"

Expand Down Expand Up @@ -231,10 +253,23 @@ def _plot(self, ax, bxpstats, column_num=None, return_type="axes", **kwds):
else:
return ax, bp

@final
def _ensure_frame(self, data):
if isinstance(data, Series):
label = self.label
if label is None and data.name is None:
label = ""
if label is None:
data = data.to_frame()
else:
data = data.to_frame(name=label)
return data

def _compute_plot_data(self):
colname = self.data.name
spark_column_name = self.data._internal.spark_column_name_for(self.data._column_label)
data = self.data
data = first_series(data) if not isinstance(data, Series) else data
colname = data.name
spark_column_name = data._internal.spark_column_name_for(data._column_label)

# Updates all props with the rc defaults from matplotlib
self.kwds.update(PandasOnSparkBoxPlot.rc_defaults(**self.kwds))
Expand Down Expand Up @@ -277,7 +312,7 @@ def _compute_plot_data(self):

self.data = {labels[0]: stats}

def _make_plot(self):
def _make_plot(self, fig: Figure):
bxpstats = list(self.data.values())[0]
ax = self._get_ax(0)
kwds = self.kwds.copy()
Expand All @@ -303,7 +338,7 @@ def _make_plot(self):
labels = [pprint_thing(lbl) for lbl in labels]
if not self.use_index:
labels = [pprint_thing(key) for key in range(len(labels))]
self._set_ticklabels(ax, labels)
_set_ticklabels(ax, labels, self.orientation == "vertical")

@staticmethod
def rc_defaults(
Expand Down Expand Up @@ -363,10 +398,32 @@ def _args_adjust(self):
if is_list_like(self.bottom):
self.bottom = np.array(self.bottom)

@final
def _ensure_frame(self, data):
if isinstance(data, Series):
label = self.label
if label is None and data.name is None:
label = ""
if label is None:
data = data.to_frame()
else:
data = data.to_frame(name=label)
return data

def _calculate_bins(self, data, bins):
return bins

def _compute_plot_data(self):
self.data, self.bins = HistogramPlotBase.prepare_hist_data(self.data, self.bins)

def _make_plot(self):
def _make_plot_keywords(self, kwds, y):
"""merge BoxPlot/KdePlot properties to passed kwds"""
# y is required for KdePlot
kwds["bottom"] = self.bottom
kwds["bins"] = self.bins
return kwds

def _make_plot(self, fig: Figure):
# TODO: this logic is similar to KdePlot. Might have to deduplicate it.
# 'num_colors' requires to calculate `shape` which has to count all.
# Use 1 for now to save the computation.
Expand Down Expand Up @@ -423,9 +480,9 @@ class PandasOnSparkPiePlot(PandasPiePlot, TopNPlotBase):
def __init__(self, data, **kwargs):
super().__init__(self.get_top_n(data), **kwargs)

def _make_plot(self):
def _make_plot(self, fig: Figure):
self.set_result_text(self._get_ax(0))
super()._make_plot()
super()._make_plot(fig)


class PandasOnSparkAreaPlot(PandasAreaPlot, SampledPlotBase):
Expand All @@ -434,9 +491,9 @@ class PandasOnSparkAreaPlot(PandasAreaPlot, SampledPlotBase):
def __init__(self, data, **kwargs):
super().__init__(self.get_sampled(data), **kwargs)

def _make_plot(self):
def _make_plot(self, fig: Figure):
self.set_result_text(self._get_ax(0))
super()._make_plot()
super()._make_plot(fig)


class PandasOnSparkLinePlot(PandasLinePlot, SampledPlotBase):
Expand All @@ -445,9 +502,9 @@ class PandasOnSparkLinePlot(PandasLinePlot, SampledPlotBase):
def __init__(self, data, **kwargs):
super().__init__(self.get_sampled(data), **kwargs)

def _make_plot(self):
def _make_plot(self, fig: Figure):
self.set_result_text(self._get_ax(0))
super()._make_plot()
super()._make_plot(fig)


class PandasOnSparkBarhPlot(PandasBarhPlot, TopNPlotBase):
Expand All @@ -456,9 +513,9 @@ class PandasOnSparkBarhPlot(PandasBarhPlot, TopNPlotBase):
def __init__(self, data, **kwargs):
super().__init__(self.get_top_n(data), **kwargs)

def _make_plot(self):
def _make_plot(self, fig: Figure):
self.set_result_text(self._get_ax(0))
super()._make_plot()
super()._make_plot(fig)


class PandasOnSparkScatterPlot(PandasScatterPlot, TopNPlotBase):
Expand All @@ -467,9 +524,9 @@ class PandasOnSparkScatterPlot(PandasScatterPlot, TopNPlotBase):
def __init__(self, data, x, y, **kwargs):
super().__init__(self.get_top_n(data), x, y, **kwargs)

def _make_plot(self):
def _make_plot(self, fig: Figure):
self.set_result_text(self._get_ax(0))
super()._make_plot()
super()._make_plot(fig)


class PandasOnSparkKdePlot(PandasKdePlot, KdePlotBase):
Expand All @@ -478,7 +535,12 @@ class PandasOnSparkKdePlot(PandasKdePlot, KdePlotBase):
def _compute_plot_data(self):
self.data = KdePlotBase.prepare_kde_data(self.data)

def _make_plot(self):
def _make_plot_keywords(self, kwds, y):
kwds["bw_method"] = self.bw_method
kwds["ind"] = type(self)._get_ind(y, ind=self.ind)
return kwds

def _make_plot(self, fig: Figure):
# 'num_colors' requires to calculate `shape` which has to count all.
# Use 1 for now to save the computation.
colors = self._get_colors(num_colors=1)
Expand Down Expand Up @@ -515,8 +577,9 @@ def _make_plot(self):
self, "_append_legend_handles_labels"
) else self._add_legend_handle(artists[0], label, index=i)

def _get_ind(self, y):
return KdePlotBase.get_ind(y, self.ind)
@staticmethod
def _get_ind(y, ind):
return KdePlotBase.get_ind(y, ind)

@classmethod
def _plot(
Expand Down
24 changes: 16 additions & 8 deletions python/pyspark/pandas/resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,21 @@ def __init__(
self._resamplekey = resamplekey

self._offset = to_offset(rule)
if self._offset.rule_code not in ["A-DEC", "M", "D", "H", "T", "S"]:

if self._offset.rule_code not in ["A-DEC", "M", "ME", "D", "H", "h", "T", "min", "S", "s"]:
raise ValueError("rule code {} is not supported".format(self._offset.rule_code))
if not getattr(self._offset, "n") > 0:
raise ValueError("rule offset must be positive")

if closed is None:
self._closed = "right" if self._offset.rule_code in ["A-DEC", "M"] else "left"
self._closed = "right" if self._offset.rule_code in ["A-DEC", "M", "ME"] else "left"
elif closed in ["left", "right"]:
self._closed = closed
else:
raise ValueError("invalid closed: '{}'".format(closed))

if label is None:
self._label = "right" if self._offset.rule_code in ["A-DEC", "M"] else "left"
self._label = "right" if self._offset.rule_code in ["A-DEC", "M", "ME"] else "left"
elif label in ["left", "right"]:
self._label = label
else:
Expand Down Expand Up @@ -184,7 +185,7 @@ def _bin_timestamp(self, origin: pd.Timestamp, ts_scol: Column) -> Column:
)
)

elif rule_code == "M":
elif rule_code in ["ME", "M"]:
assert (
origin.is_month_end
and origin.hour == 0
Expand Down Expand Up @@ -264,8 +265,15 @@ def _bin_timestamp(self, origin: pd.Timestamp, ts_scol: Column) -> Column:

ret = F.when(edge_cond, edge_label).otherwise(non_edge_label)

elif rule_code in ["H", "T", "S"]:
unit_mapping = {"H": "HOUR", "T": "MINUTE", "S": "SECOND"}
elif rule_code in ["h", "min", "s", "H", "T", "S"]:
unit_mapping = {
"h": "HOUR",
"min": "MINUTE",
"s": "SECOND",
"H": "HOUR",
"T": "MINUTE",
"S": "SECOND",
}
unit_str = unit_mapping[rule_code]

truncated_ts_scol = F.date_trunc(unit_str, ts_scol)
Expand All @@ -274,10 +282,10 @@ def _bin_timestamp(self, origin: pd.Timestamp, ts_scol: Column) -> Column:
diff = timestampdiff(unit_str, origin_scol, truncated_ts_scol)
mod = F.lit(0) if n == 1 else (diff % F.lit(n))

if rule_code == "H":
if rule_code in ["h", "H"]:
assert origin.minute == 0 and origin.second == 0
edge_cond = (mod == 0) & (F.minute(ts_scol) == 0) & (F.second(ts_scol) == 0)
elif rule_code == "T":
elif rule_code in ["min", "T"]:
assert origin.second == 0
edge_cond = (mod == 0) & (F.second(ts_scol) == 0)
else:
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -7092,15 +7092,15 @@ def resample(
----------
rule : str
The offset string or object representing target conversion.
Currently, supported units are {'Y', 'A', 'M', 'D', 'H',
'T', 'MIN', 'S'}.
Currently, supported units are {'YE', 'A', 'ME', 'D', 'h',
'min', 'MIN', 's'}.
closed : {{'right', 'left'}}, default None
Which side of bin interval is closed. The default is 'left'
for all frequency offsets except for 'A', 'Y' and 'M' which all
for all frequency offsets except for 'A', 'YE' and 'ME' which all
have a default of 'right'.
label : {{'right', 'left'}}, default None
Which bin edge label to label bucket with. The default is 'left'
for all frequency offsets except for 'A', 'Y' and 'M' which all
for all frequency offsets except for 'A', 'YE' and 'ME' which all
have a default of 'right'.
on : Series, optional
For a DataFrame, column to use instead of index for resampling.
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/pandas/supported_api_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
MAX_MISSING_PARAMS_SIZE = 5
COMMON_PARAMETER_SET = {"kwargs", "args", "cls"}
MODULE_GROUP_MATCH = [(pd, ps), (pdw, psw), (pdg, psg)]
PANDAS_LATEST_VERSION = "2.1.4"
PANDAS_LATEST_VERSION = "2.2.0"

RST_HEADER = """
=====================
Expand Down
13 changes: 3 additions & 10 deletions python/pyspark/pandas/tests/computation/test_melt.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,16 @@ def test_melt(self):
.sort_values(["variable_0", "variable_1", "value"])
.rename(columns=name_like_string),
)
self.assert_eq(
psdf.melt(
self.assertRaises(
ValueError,
lambda: psdf.melt(
id_vars=[(TEN, "A")],
value_vars=[(TEN, "B")],
var_name=["myV1", "myV2"],
value_name="myValname",
)
.sort_values(["myV1", "myV2", "myValname"])
.reset_index(drop=True),
pdf.melt(
id_vars=[(TEN, "A")],
value_vars=[(TEN, "B")],
var_name=["myV1", "myV2"],
value_name="myValname",
)
.sort_values(["myV1", "myV2", "myValname"])
.rename(columns=name_like_string),
)

columns.names = ["v0", "v1"]
Expand Down
Loading

0 comments on commit 8e82887

Please sign in to comment.