From f3bf69b43be2f606b440ab74af61fa130df2dfb6 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 1 Dec 2025 10:18:12 +0900 Subject: [PATCH 1/2] [SPARK-54478][SPARK-54479][SPARK-54480][SPARK-54484] Re-enable streaming tests for connect compat test CI --- python/pyspark/sql/streaming/readwriter.py | 7 +++---- .../streaming/test_parity_pandas_grouped_map_with_state.py | 3 --- .../tests/connect/streaming/test_parity_foreach_batch.py | 3 --- .../pyspark/sql/tests/test_python_streaming_datasource.py | 4 ---- 4 files changed, 3 insertions(+), 14 deletions(-) diff --git a/python/pyspark/sql/streaming/readwriter.py b/python/pyspark/sql/streaming/readwriter.py index 4d36846b74b5..8121dd609950 100644 --- a/python/pyspark/sql/streaming/readwriter.py +++ b/python/pyspark/sql/streaming/readwriter.py @@ -1568,7 +1568,6 @@ def foreach(self, f: Union[Callable[[Row], None], "SupportsProcess"]) -> "DataSt self._jwrite.foreach(jForeachWriter) return self - # SPARK-54478: Reenable doctest def foreachBatch(self, func: Callable[["DataFrame", int], None]) -> "DataStreamWriter": """ Sets the output of the streaming query to be processed using the provided @@ -1601,9 +1600,9 @@ def foreachBatch(self, func: Callable[["DataFrame", int], None]) -> "DataStreamW ... my_value = 100 ... batch_df.collect() ... - >>> q = df.writeStream.foreachBatch(func).start() # doctest: +SKIP - >>> time.sleep(3) # doctest: +SKIP - >>> q.stop() # doctest: +SKIP + >>> q = df.writeStream.foreachBatch(func).start() + >>> time.sleep(3) + >>> q.stop() >>> # if in Spark Connect, my_value = -1, else my_value = 100 """ from py4j.java_gateway import java_import diff --git a/python/pyspark/sql/tests/connect/pandas/streaming/test_parity_pandas_grouped_map_with_state.py b/python/pyspark/sql/tests/connect/pandas/streaming/test_parity_pandas_grouped_map_with_state.py index 18f9d4e9dcf4..3373903fbc3b 100644 --- a/python/pyspark/sql/tests/connect/pandas/streaming/test_parity_pandas_grouped_map_with_state.py +++ b/python/pyspark/sql/tests/connect/pandas/streaming/test_parity_pandas_grouped_map_with_state.py @@ -23,9 +23,6 @@ from pyspark.testing.connectutils import ReusedConnectTestCase -@unittest.skipIf( - os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "SPARK-54479: To be reenabled" -) class GroupedApplyInPandasWithStateTests( GroupedApplyInPandasWithStateTestsMixin, ReusedConnectTestCase ): diff --git a/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py index c2a747dd57f3..4a8270367be9 100644 --- a/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py +++ b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py @@ -26,9 +26,6 @@ from pyspark.errors.exceptions.connect import StreamingPythonRunnerInitializationException -@unittest.skipIf( - os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "SPARK-54480: To be reenabled" -) class StreamingForeachBatchParityTests(StreamingTestsForeachBatchMixin, ReusedConnectTestCase): def test_streaming_foreach_batch_propagates_python_errors(self): super().test_streaming_foreach_batch_propagates_python_errors() diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py b/python/pyspark/sql/tests/test_python_streaming_datasource.py index 4130b66fce65..9879231540f1 100644 --- a/python/pyspark/sql/tests/test_python_streaming_datasource.py +++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py @@ -139,9 +139,6 @@ def streamWriter(self, schema, overwrite): return TestDataSource - @unittest.skipIf( - os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "SPARK-54484: To be reenabled" - ) def test_stream_reader(self): self.spark.dataSource.register(self._get_test_data_source()) df = self.spark.readStream.format("TestDataSource").load() @@ -216,7 +213,6 @@ def streamReader(self, schema): assertDataFrameEqual(df, expected_data) - @unittest.skipIf(os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "To be reenabled") def test_simple_stream_reader(self): class SimpleStreamReader(SimpleDataSourceStreamReader): def initialOffset(self): From 0883798af9f7da8fd1e4eb90edb2c4a4c46a23f6 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 1 Dec 2025 12:49:25 +0900 Subject: [PATCH 2/2] linter fix --- .../streaming/test_parity_pandas_grouped_map_with_state.py | 1 - .../sql/tests/connect/streaming/test_parity_foreach_batch.py | 1 - 2 files changed, 2 deletions(-) diff --git a/python/pyspark/sql/tests/connect/pandas/streaming/test_parity_pandas_grouped_map_with_state.py b/python/pyspark/sql/tests/connect/pandas/streaming/test_parity_pandas_grouped_map_with_state.py index 3373903fbc3b..8e60f8a168df 100644 --- a/python/pyspark/sql/tests/connect/pandas/streaming/test_parity_pandas_grouped_map_with_state.py +++ b/python/pyspark/sql/tests/connect/pandas/streaming/test_parity_pandas_grouped_map_with_state.py @@ -15,7 +15,6 @@ # limitations under the License. # import unittest -import os from pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state import ( GroupedApplyInPandasWithStateTestsMixin, diff --git a/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py index 4a8270367be9..632fa4628d1b 100644 --- a/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py +++ b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py @@ -16,7 +16,6 @@ # import unittest -import os from pyspark.sql.tests.streaming.test_streaming_foreach_batch import StreamingTestsForeachBatchMixin from pyspark.testing.connectutils import ReusedConnectTestCase, should_test_connect