-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24565][SS] Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame #21571
Conversation
dev/sparktestsupport/modules.py
Outdated
# "pyspark.sql.readwriter", | ||
# "pyspark.sql.streaming", | ||
# "pyspark.sql.udf", | ||
# "pyspark.sql.window", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
temporary change for faster local testing. will remove before finalizing.
python/pyspark/sql/streaming.py
Outdated
@@ -854,6 +856,20 @@ def trigger(self, processingTime=None, once=None, continuous=None): | |||
self._jwrite = self._jwrite.trigger(jTrigger) | |||
return self | |||
|
|||
def foreachBatch(self, func): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add docs
python/pyspark/sql/tests.py
Outdated
@@ -269,6 +269,7 @@ def test_struct_field_type_name(self): | |||
struct_field = StructField("a", IntegerType()) | |||
self.assertRaises(TypeError, struct_field.typeName) | |||
|
|||
''' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
temporary change for faster local testing. will remove before finalizing.
Test build #91873 has finished for PR 21571 at commit
|
Test build #91875 has finished for PR 21571 at commit
|
Test build #91897 has finished for PR 21571 at commit
|
@@ -145,3 +145,26 @@ def do_server_auth(conn, auth_secret): | |||
if reply != "ok": | |||
conn.close() | |||
raise Exception("Unexpected reply from iterator server.") | |||
|
|||
|
|||
def ensure_callback_server_started(gw): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was copied verbatim from python streaming/context.py
dev/sparktestsupport/modules.py
Outdated
# "pyspark.sql.readwriter", | ||
# "pyspark.sql.streaming", | ||
# "pyspark.sql.udf", | ||
# "pyspark.sql.window", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
temp changes only to speed up local testing. will revert after first round of review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. Left some minor comments.
* @since 2.4.0 | ||
*/ | ||
@InterfaceStability.Evolving | ||
def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's unclear that only can one of foreachBatch
and foreach
be set. Reading from the doc, the user may think he can set both of them. Maybe we should disallow this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
goood point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well... that is an existing problem because one can write the following confusion code
df.writeStream.format("kafka").foreach(...).start()
This will execute the foreach but it looks confusing nonetheless. In fact you can also do
df.writeStream.format("kafka").format("bla").format("random")....
This is a general existing problem that should be addressed in a different PR.
|
||
wrapped_func = ForeachBatchFunction(self._spark, func) | ||
gw.jvm.PythonForeachBatchHelper.callForeachBatch(self._jwrite, wrapped_func) | ||
ensure_callback_server_started(gw) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be above otherwise there is a race that the streaming query calls this python func before the callback server is started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not possible because the callback from JVM ForeachBatch sink to Python is made ONLY after the query is started. And the query cannot be started until this foreach() method finishes and start() is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry if I'm mistaken but can't we still put this above? Looks weird we ensure the callback server at the end.
python/pyspark/sql/utils.py
Outdated
@@ -62,6 +62,7 @@ def deco(*a, **kw): | |||
try: | |||
return f(*a, **kw) | |||
except py4j.protocol.Py4JJavaError as e: | |||
print(str(e)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this
Test build #91940 has finished for PR 21571 at commit
|
Test build #91938 has finished for PR 21571 at commit
|
Test build #91982 has started for PR 21571 at commit |
Jenkins retest this please |
Test build #91994 has finished for PR 21571 at commit
|
jenkins retest this please |
Test build #92005 has finished for PR 21571 at commit
|
Test build #92018 has finished for PR 21571 at commit
|
LGTM. Merging to master. |
Seems fine to me. Left one question / few nits but not a big deal. |
q = None | ||
collected = dict() | ||
|
||
def collectBatch(batch_df, batch_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collectBatch -> collect_batch per PEP 8.
# gateway with real port | ||
gw._python_proxy_port = gw._callback_server.port | ||
# get the GatewayServer object in JVM by ID | ||
jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we could remove this import in this file though.
@tdas @SparkQA @zsxwing @HyukjinKwon , I have few questions related to batchId
|
…tput rows of each microbatch as a DataFrame Currently, the micro-batches in the MicroBatchExecution is not exposed to the user through any public API. This was because we did not want to expose the micro-batches, so that all the APIs we expose, we can eventually support them in the Continuous engine. But now that we have better sense of buiding a ContinuousExecution, I am considering adding APIs which will run only the MicroBatchExecution. I have quite a few use cases where exposing the microbatch output as a dataframe is useful. - Pass the output rows of each batch to a library that is designed only the batch jobs (example, uses many ML libraries need to collect() while learning). - Reuse batch data sources for output whose streaming version does not exists (e.g. redshift data source). - Writer the output rows to multiple places by writing twice for each batch. This is not the most elegant thing to do for multiple-output streaming queries but is likely to be better than running two streaming queries processing the same data twice. The proposal is to add a method `foreachBatch(f: Dataset[T] => Unit)` to Scala/Java/Python `DataStreamWriter`. New unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#21571 from tdas/foreachBatch. Ref: LIHADOOP-48531 RB=1854649 G=superfriends-reviewers R=latang,yezhou,zolin,fli,mshen A=
What changes were proposed in this pull request?
Currently, the micro-batches in the MicroBatchExecution is not exposed to the user through any public API. This was because we did not want to expose the micro-batches, so that all the APIs we expose, we can eventually support them in the Continuous engine. But now that we have better sense of buiding a ContinuousExecution, I am considering adding APIs which will run only the MicroBatchExecution. I have quite a few use cases where exposing the microbatch output as a dataframe is useful.
The proposal is to add a method
foreachBatch(f: Dataset[T] => Unit)
to Scala/Java/PythonDataStreamWriter
.How was this patch tested?
New unit tests.