Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions python/pyspark/pandas/tests/computation/test_apply_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,19 @@ def f2(_) -> ps.Series[int]:
)

def test_apply_batch_with_type(self):
using_pandas3 = LooseVersion(pd.__version__) >= "3.0.0"

def normalize_array_values(pdf: pd.DataFrame) -> pd.DataFrame:
Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method only usable here (for now and for the future), @ueshin ? It's just a question because this looks like a general and useful function.

if not using_pandas3:
return pdf

pdf = pdf.copy()
for column in pdf.columns:
pdf[column] = pdf[column].map(
lambda value: list(value) if isinstance(value, np.ndarray) else value
)
return pdf

pdf = self.pdf
psdf = ps.from_pandas(pdf)

Expand Down Expand Up @@ -247,7 +260,7 @@ def identify3(x) -> ps.DataFrame[float, [int, List[int]]]:

actual = psdf.pandas_on_spark.apply_batch(identify3)
actual.columns = ["a", "b"]
self.assert_eq(actual, pdf)
self.assert_eq(normalize_array_values(actual._to_pandas()), normalize_array_values(pdf))

# For NumPy typing, NumPy version should be 1.21+
if LooseVersion(np.__version__) >= LooseVersion("1.21"):
Expand All @@ -262,7 +275,7 @@ def identify4(

actual = psdf.pandas_on_spark.apply_batch(identify4)
actual.columns = ["a", "b"]
self.assert_eq(actual, pdf)
self.assert_eq(normalize_array_values(actual._to_pandas()), normalize_array_values(pdf))

arrays = [[1, 2, 3, 4, 5, 6, 7, 8, 9], ["a", "b", "c", "d", "e", "f", "g", "h", "i"]]
idx = pd.MultiIndex.from_arrays(arrays, names=("number", "color"))
Expand All @@ -278,7 +291,7 @@ def identify4(x) -> ps.DataFrame[[int, str], [int, List[int]]]:
actual = psdf.pandas_on_spark.apply_batch(identify4)
actual.index.names = ["number", "color"]
actual.columns = ["a", "b"]
self.assert_eq(actual, pdf)
self.assert_eq(normalize_array_values(actual._to_pandas()), normalize_array_values(pdf))

def identify5(
x,
Expand All @@ -288,7 +301,7 @@ def identify5(
return x

actual = psdf.pandas_on_spark.apply_batch(identify5)
self.assert_eq(actual, pdf)
self.assert_eq(normalize_array_values(actual._to_pandas()), normalize_array_values(pdf))

def test_transform(self):
pdf = pd.DataFrame(
Expand Down