Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions python/pyspark/sql/streaming/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,6 @@ def foreach(self, f: Union[Callable[[Row], None], "SupportsProcess"]) -> "DataSt
self._jwrite.foreach(jForeachWriter)
return self

# SPARK-54478: Reenable doctest
def foreachBatch(self, func: Callable[["DataFrame", int], None]) -> "DataStreamWriter":
"""
Sets the output of the streaming query to be processed using the provided
Expand Down Expand Up @@ -1601,9 +1600,9 @@ def foreachBatch(self, func: Callable[["DataFrame", int], None]) -> "DataStreamW
... my_value = 100
... batch_df.collect()
...
>>> q = df.writeStream.foreachBatch(func).start() # doctest: +SKIP
>>> time.sleep(3) # doctest: +SKIP
>>> q.stop() # doctest: +SKIP
>>> q = df.writeStream.foreachBatch(func).start()
>>> time.sleep(3)
>>> q.stop()
>>> # if in Spark Connect, my_value = -1, else my_value = 100
"""
from py4j.java_gateway import java_import
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@
# limitations under the License.
#
import unittest
import os

from pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state import (
GroupedApplyInPandasWithStateTestsMixin,
)
from pyspark.testing.connectutils import ReusedConnectTestCase


@unittest.skipIf(
os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "SPARK-54479: To be reenabled"
)
class GroupedApplyInPandasWithStateTests(
GroupedApplyInPandasWithStateTestsMixin, ReusedConnectTestCase
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#

import unittest
import os

from pyspark.sql.tests.streaming.test_streaming_foreach_batch import StreamingTestsForeachBatchMixin
from pyspark.testing.connectutils import ReusedConnectTestCase, should_test_connect
Expand All @@ -26,9 +25,6 @@
from pyspark.errors.exceptions.connect import StreamingPythonRunnerInitializationException


@unittest.skipIf(
os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "SPARK-54480: To be reenabled"
)
class StreamingForeachBatchParityTests(StreamingTestsForeachBatchMixin, ReusedConnectTestCase):
def test_streaming_foreach_batch_propagates_python_errors(self):
super().test_streaming_foreach_batch_propagates_python_errors()
Expand Down
4 changes: 0 additions & 4 deletions python/pyspark/sql/tests/test_python_streaming_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,6 @@ def streamWriter(self, schema, overwrite):

return TestDataSource

@unittest.skipIf(
os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "SPARK-54484: To be reenabled"
)
def test_stream_reader(self):
self.spark.dataSource.register(self._get_test_data_source())
df = self.spark.readStream.format("TestDataSource").load()
Expand Down Expand Up @@ -216,7 +213,6 @@ def streamReader(self, schema):

assertDataFrameEqual(df, expected_data)

@unittest.skipIf(os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "To be reenabled")
def test_simple_stream_reader(self):
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
Expand Down