From 887c920126f2e313d17cc8d92b07e4ecab7e28d1 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 31 Oct 2025 16:07:48 +0800 Subject: [PATCH 1/6] test --- .../pyspark/sql/pandas/_typing/__init__.pyi | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/python/pyspark/sql/pandas/_typing/__init__.pyi b/python/pyspark/sql/pandas/_typing/__init__.pyi index 4841b50544fd..5f07f71a08bd 100644 --- a/python/pyspark/sql/pandas/_typing/__init__.pyi +++ b/python/pyspark/sql/pandas/_typing/__init__.pyi @@ -19,11 +19,9 @@ from typing import ( Any, Callable, - Iterable, Iterator, NewType, Tuple, - Type, TypeVar, Union, ) @@ -142,8 +140,8 @@ ArrowScalarToScalarFunction = Union[ ] ArrowScalarIterFunction = Union[ - Callable[[Iterable[pyarrow.Array]], Iterable[pyarrow.Array]], - Callable[[Tuple[pyarrow.Array, ...]], Iterable[pyarrow.Array]], + Callable[[Iterator[pyarrow.Array]], Iterator[pyarrow.Array]], + Callable[[Tuple[pyarrow.Array, ...]], Iterator[pyarrow.Array]], ] class PandasVariadicScalarToScalarFunction(Protocol): @@ -341,8 +339,8 @@ PandasScalarToStructFunction = Union[ ] PandasScalarIterFunction = Union[ - Callable[[Iterable[DataFrameOrSeriesLike_]], Iterable[SeriesLike]], - Callable[[Tuple[DataFrameOrSeriesLike_, ...]], Iterable[SeriesLike]], + Callable[[Iterator[DataFrameOrSeriesLike_]], Iterator[SeriesLike]], + Callable[[Tuple[DataFrameOrSeriesLike_, ...]], Iterator[SeriesLike]], ] PandasGroupedMapFunction = Union[ @@ -353,7 +351,7 @@ PandasGroupedMapFunction = Union[ ] PandasGroupedMapFunctionWithState = Callable[ - [Any, Iterable[DataFrameLike], GroupState], Iterable[DataFrameLike] + [Any, Iterator[DataFrameLike], GroupState], Iterator[DataFrameLike] ] class PandasVariadicGroupedAggFunction(Protocol): @@ -426,26 +424,23 @@ PandasGroupedAggFunction = Union[ PandasVariadicGroupedAggFunction, ] -PandasMapIterFunction = Callable[[Iterable[DataFrameLike]], Iterable[DataFrameLike]] +PandasMapIterFunction = Callable[[Iterator[DataFrameLike]], Iterator[DataFrameLike]] -ArrowMapIterFunction = Callable[[Iterable[pyarrow.RecordBatch]], Iterable[pyarrow.RecordBatch]] +ArrowMapIterFunction = Callable[[Iterator[pyarrow.RecordBatch]], Iterator[pyarrow.RecordBatch]] PandasCogroupedMapFunction = Union[ Callable[[DataFrameLike, DataFrameLike], DataFrameLike], Callable[[Any, DataFrameLike, DataFrameLike], DataFrameLike], ] -ArrowGroupedMapTableFunction = Union[ +ArrowGroupedMapFunction = Union[ Callable[[pyarrow.Table], pyarrow.Table], Callable[[Tuple[pyarrow.Scalar, ...], pyarrow.Table], pyarrow.Table], -] -ArrowGroupedMapIterFunction = Union[ Callable[[Iterator[pyarrow.RecordBatch]], Iterator[pyarrow.RecordBatch]], Callable[ [Tuple[pyarrow.Scalar, ...], Iterator[pyarrow.RecordBatch]], Iterator[pyarrow.RecordBatch] ], ] -ArrowGroupedMapFunction = Union[ArrowGroupedMapTableFunction, ArrowGroupedMapIterFunction] ArrowCogroupedMapFunction = Union[ Callable[[pyarrow.Table, pyarrow.Table], pyarrow.Table], From a9e980bbcabb4c6d23b03d9ba27f0d9485a3dd13 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Mon, 3 Nov 2025 09:49:49 +0800 Subject: [PATCH 2/6] test --- python/pyspark/ml/connect/util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/connect/util.py b/python/pyspark/ml/connect/util.py index 8bf4b3480e32..17b072ff57ee 100644 --- a/python/pyspark/ml/connect/util.py +++ b/python/pyspark/ml/connect/util.py @@ -15,7 +15,7 @@ # limitations under the License. # -from typing import Any, TypeVar, Callable, List, Tuple, Union, Iterable, TYPE_CHECKING +from typing import Any, TypeVar, Callable, List, Tuple, Union, Iterator, TYPE_CHECKING import pandas as pd @@ -74,7 +74,7 @@ def aggregate_dataframe( dataframe = dataframe.select(*input_col_names) - def compute_state(iterator: Iterable["pd.DataFrame"]) -> Iterable["pd.DataFrame"]: + def compute_state(iterator: Iterator["pd.DataFrame"]) -> Iterator["pd.DataFrame"]: state = None for batch_pandas_df in iterator: From 32ce78c282de49b255021fe87beab056c89ccbe3 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 4 Nov 2025 09:02:31 +0800 Subject: [PATCH 3/6] fix example --- examples/src/main/python/sql/arrow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py index e30584002fcb..62d8385849ca 100644 --- a/examples/src/main/python/sql/arrow.py +++ b/examples/src/main/python/sql/arrow.py @@ -24,7 +24,7 @@ # NOTE that this file is imported in tutorials in PySpark documentation. # The codes are referred via line numbers. See also `literalinclude` directive in Sphinx. import pandas as pd -from typing import Iterable +from typing import Iterator from pyspark.sql import SparkSession from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version @@ -256,7 +256,7 @@ def subtract_mean(pdf: pd.DataFrame) -> pd.DataFrame: def map_in_pandas_example(spark: SparkSession) -> None: df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age")) - def filter_func(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]: + def filter_func(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]: for pdf in iterator: yield pdf[pdf.id == 1] From 14559296ec466f96a7263ad13e93479ce20eeef7 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 4 Nov 2025 12:54:21 +0800 Subject: [PATCH 4/6] fix example --- .../structured_network_wordcount_session_window.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py b/examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py index fbab71f05ab7..4fe61c94980e 100644 --- a/examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py +++ b/examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py @@ -31,7 +31,7 @@ localhost 9999` """ import sys -from typing import Iterable, Any +from typing import Iterator, Any import pandas as pd @@ -95,8 +95,8 @@ ) def func( - key: Any, pdfs: Iterable[pd.DataFrame], state: GroupState - ) -> Iterable[pd.DataFrame]: + key: Any, pdfs: Iterator[pd.DataFrame], state: GroupState + ) -> Iterator[pd.DataFrame]: if state.hasTimedOut: count, start, end = state.get state.remove() From 7c0f1e1efb1ff63e3cc544039b25349d5914352f Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 4 Nov 2025 13:46:10 +0800 Subject: [PATCH 5/6] fix test_udf --- python/pyspark/sql/tests/typing/test_udf.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests/typing/test_udf.yml b/python/pyspark/sql/tests/typing/test_udf.yml index ef74b311d0bb..deefbe266fef 100644 --- a/python/pyspark/sql/tests/typing/test_udf.yml +++ b/python/pyspark/sql/tests/typing/test_udf.yml @@ -47,10 +47,10 @@ from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType from pyspark.sql.types import IntegerType import pandas.core.series - from typing import Iterable + from typing import Iterator @pandas_udf(IntegerType(), PandasUDFType.SCALAR_ITER) - def f(xs: Iterable[pandas.core.series.Series]) -> Iterable[pandas.core.series.Series]: + def f(xs: Iterator[pandas.core.series.Series]) -> Iterator[pandas.core.series.Series]: for x in xs: yield x + 1 From 69a008b9dfd720139445314274145bd4a1825140 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 4 Nov 2025 13:46:44 +0800 Subject: [PATCH 6/6] fix test_udf --- python/pyspark/sql/tests/typing/test_udf.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests/typing/test_udf.yml b/python/pyspark/sql/tests/typing/test_udf.yml index deefbe266fef..ab4843f8473b 100644 --- a/python/pyspark/sql/tests/typing/test_udf.yml +++ b/python/pyspark/sql/tests/typing/test_udf.yml @@ -114,12 +114,12 @@ - case: mapIterUdf main: | from pyspark.sql.session import SparkSession - from typing import Iterable + from typing import Iterator import pandas.core.frame spark = SparkSession.builder.getOrCreate() - def f(batch_iter: Iterable[pandas.core.frame.DataFrame]) -> Iterable[pandas.core.frame.DataFrame]: + def f(batch_iter: Iterator[pandas.core.frame.DataFrame]) -> Iterator[pandas.core.frame.DataFrame]: for pdf in batch_iter: yield pdf[pdf.id == 1]