Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
# limitations under the License.
#

import unittest

import time
from pyspark.sql.tests.streaming.test_streaming_foreach_batch import StreamingTestsForeachBatchMixin
from pyspark.testing.connectutils import ReusedConnectTestCase, should_test_connect
from pyspark.testing.utils import eventually, timeout
from pyspark.errors import PySparkPicklingError

if should_test_connect:
Expand All @@ -29,9 +29,17 @@ class StreamingForeachBatchParityTests(StreamingTestsForeachBatchMixin, ReusedCo
def test_streaming_foreach_batch_propagates_python_errors(self):
super().test_streaming_foreach_batch_propagates_python_errors()

@unittest.skip("This seems specific to py4j and pinned threads. The intention is unclear")
@eventually(timeout=180, catch_timeout=True)
@timeout(timeout=60)
def test_streaming_foreach_batch_graceful_stop(self):
super().test_streaming_foreach_batch_graceful_stop()
# SPARK-39218: Make foreachBatch streaming query stop gracefully
def func(batch_df, _):
time.sleep(10)

q = self.spark.readStream.format("rate").load().writeStream.foreachBatch(func).start()
time.sleep(3) # 'rowsPerSecond' defaults to 1. Waits 3 secs out for the input.
q.stop()
self.assertIsNone(q.exception(), "No exception has to be propagated.")

def test_nested_dataframes(self):
def curried_function(df):
Expand Down