New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48062][PYTHON][SS][TESTS] Add pyspark test for SimpleDataSourceStreamingReader #46306
Conversation
@@ -136,6 +137,33 @@ def streamWriter(self, schema, overwrite): | |||
|
|||
return TestDataSource | |||
|
|||
def _get_simple_data_source(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just define this class and directly pass SimpleDataSource
to register
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
while len(q.recentProgress) < 10: | ||
time.sleep(0.2) | ||
q.stop() | ||
q.awaitTermination |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awaitTermination()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
q.stop() | ||
q.awaitTermination | ||
self.assertIsNone(q.exception(), "No exception has to be propagated.") | ||
|
||
def test_stream_writer(self): | ||
input_dir = tempfile.TemporaryDirectory(prefix="test_data_stream_write_input") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add cleanup
input_dir.cleanup()
output_dir.cleanup()
checkpoint_dir.cleanup()
those into finally
. Feel free to do it in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
q.stop() | ||
q.awaitTermination | ||
self.assertIsNone(q.exception(), "No exception has to be propagated.") | ||
|
||
def test_stream_writer(self): | ||
input_dir = tempfile.TemporaryDirectory(prefix="test_data_stream_write_input") | ||
output_dir = tempfile.TemporaryDirectory(prefix="test_data_stream_write_output") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class PythonStreamingDataSourceTests(BasePythonStreamingDataSourceTestsMixin, ReusedSQLTestCase):
...
has to be
class PythonStreamingDataSourceTests(BasePythonStreamingDataSourceTestsMixin, ReusedSQLTestCase):
pass
because ...
basically means to omit (and to be defined/implemented laster).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am assuming we also run this with Spark Connect enabled right?
Merged to master. |
…eStreamingReader ### What changes were proposed in this pull request? Add pyspark test for SimpleDataSourceStreamingReader. ### Why are the changes needed? To make sure SimpleDataSourceStreamingReader works end to end. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Test change. Also trigger python_streaming_datasource and python_parity_streaming_datasource locally. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#46306 from chaoqin-li1123/test_simple_reader. Authored-by: Chaoqin Li <chaoqin.li@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@@ -150,50 +151,90 @@ def check_batch(df, batch_id): | |||
q.awaitTermination |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chaoqin-li1123 can we fix all those instances?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate? is this test flaky?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
q.awaitTermination()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reminding!
What changes were proposed in this pull request?
Add pyspark test for SimpleDataSourceStreamingReader.
Why are the changes needed?
To make sure SimpleDataSourceStreamingReader works end to end.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Test change. Also trigger python_streaming_datasource and python_parity_streaming_datasource locally.
Was this patch authored or co-authored using generative AI tooling?
No.