Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23886][SS] Update query status for ContinuousExecution #21063

Closed

Conversation

@efim-poberezkin
Copy link
Contributor

efim-poberezkin commented Apr 13, 2018

What changes were proposed in this pull request?

Added query status updates to ContinuousExecution

How was this patch tested?

Existing unit tests, added a test to StreamingQueryStatusAndProgressSuite

@efim-poberezkin

This comment has been minimized.

Copy link
Contributor Author

efim-poberezkin commented Apr 13, 2018

@jose-torres Hi Jose, I've added a couple of update messages to places you suggested, query stop indeed seems to be reported in StreamExecution.
I have a question regarding updating isDataAvailable field of query status. In MicroBatchExecution it is updated based on the result of comparison of committed and available offsets, and the latter is being changed in constructNextBatch(). Would the same approach be valid for ContinuousExecuction and if yes how should available offsets be updated?

@jose-torres

This comment has been minimized.

Copy link
Contributor

jose-torres commented Apr 13, 2018

I'm not sure isDataAvailable makes sense in the context of continuous processing; it seems fundamentally tied to the microbatch execution model. I think the best option is to just leave it and isTriggerActive always false, and take a TODO to restructure StreamingQueryStatus to eliminate the assumption they're meaningful. (The latter would be an API change, so we'd definitely want a separate PR for it - fortunately StreamingQueryStatus isn't stable yet.)

@jose-torres

This comment has been minimized.

Copy link
Contributor

jose-torres commented Apr 13, 2018

I guess we might not even need to make an API change, just document that these flags only mean anything for microbatch execution. In any case that's a separate discussion.

@efim-poberezkin efim-poberezkin changed the title [SPARK-23886][Structured Streaming][WIP] Update query status for ContinuousExecution [SPARK-23886][Structured Streaming] Update query status for ContinuousExecution Apr 17, 2018
@efim-poberezkin

This comment has been minimized.

Copy link
Contributor Author

efim-poberezkin commented Apr 17, 2018

Hi @jose-torres, I made some changes to this PR according to your comment, could you review it please?

@jose-torres

This comment has been minimized.

Copy link
Contributor

jose-torres commented Apr 17, 2018

The approach looks good to me, but we probably want to add some tests to StreamingQueryStatusAndProgressSuite. (See test("basic") in ContinuousSuite for how to set up a continuous processing memory stream, and note that processAllAvailable() won't work properly for continuous execution - you'll want to use CheckAnswer to await the added data and Execute to do the test-specific progress checks)

@efim-poberezkin efim-poberezkin force-pushed the efim-poberezkin:pr/update-query-status branch to 9dcb746 Apr 23, 2018
@efim-poberezkin

This comment has been minimized.

Copy link
Contributor Author

efim-poberezkin commented Apr 23, 2018

@jose-torres Hi Jose, could you review and tell me if we need more tests or a single one will do? Also is it fine to assert status after every stream action?

currentStatus = currentStatus.copy(isTriggerActive = true)
// isTriggerActive field is kept false for ContinuousExecution
// since it is tied to MicroBatchExecution
this match {

This comment has been minimized.

Copy link
@HeartSaVioR

HeartSaVioR Apr 26, 2018

Contributor

nit: someone may have a concern that a trait needs to be aware of actual implementation.

There looks like two options:

  1. extract method to only update currentStatus for starting trigger defaulting to isTriggerActive = true, and let ContinuousExecution overrides the method.
  2. just override startTrigger() in ContinuousExecution, and call super.startTrigger() and update currentStatus once again. It might open very small window for other threads to read invalid status information (isTriggerActive = true), but will require less change if it is acceptable.

This comment has been minimized.

Copy link
@efim-poberezkin

efim-poberezkin Apr 26, 2018

Author Contributor

Yep, an oversight on my part. Fixed it.

This comment has been minimized.

Copy link
@attilapiros

attilapiros Oct 25, 2018

Contributor

For me a bit strange that ProgressReporter as a base for MicroBatchExecution and ContinuousExecution contains an implementation which is only good for MicroBatchExecution and overriden only in ContinuousExecution. I mean why providing an implementation here which is only good for one of the inherited class and why not letting it to be just an abstract method here?

@efim-poberezkin efim-poberezkin changed the title [SPARK-23886][Structured Streaming] Update query status for ContinuousExecution [SPARK-23886][SS] Update query status for ContinuousExecution Apr 26, 2018
@AmplabJenkins

This comment has been minimized.

Copy link

AmplabJenkins commented Jun 9, 2018

Can one of the admins verify this patch?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants
You can’t perform that action at this time.