New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-4194] support unbounded limit #5682
Conversation
run java precommit |
run java postcommit |
R: @kennknowles |
public void run() { | ||
while (!result.getState().isTerminal()) { | ||
try { | ||
Thread.sleep(DaemonThreadSleepIntervalMillis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think result.waitUntilFinish(duration) is good here: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L58
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great suggestion!
NOT_REACHED | ||
} | ||
|
||
private static class LimitStateWrapper implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is there a wrapper? Is this to make it clearly something like an LVar
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename it to LimitStateVar
options | ||
.getRunner() | ||
.getCanonicalName() | ||
.equals("org.apache.beam.runners.direct.DirectRunner")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a message to this check? That way when it fails it will explain what is going on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added something to say only DirectRunner is supported and please check option setting.
} | ||
|
||
private static class LimitCounter extends DoFn<Row, Void> { | ||
private static final Map<Long, Integer> globalLimitArguments = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the DirectRunner
this is OK. I think you could pretty easily make it a bit more general by using stateful ParDo(DoFn) for counting. Then the only thing left to make it work on a distributed runner is a communication path back to the shell. If you leave it like this, I think the DoFn
also is not ready, because in a distributed run there will be lots of copies and separate JVMs on different workers, not gathered to one key, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use stateful ParDo to implement counter/cancelling now.
run java precommit |
1 similar comment
run java precommit |
run java postcommit |
1 similar comment
run java postcommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really cool. It can probably be simplified significantly without any loss of functionality though.
return collect(options, node); | ||
} finally { | ||
Thread.currentThread().setContextClassLoader(originalClassLoader); | ||
} | ||
} | ||
|
||
enum LimitState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Isn't this just a Boolean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Using enum to have descriptive state than true/false.
@@ -111,6 +147,55 @@ private static PipelineResult run( | |||
return result; | |||
} | |||
|
|||
private static PipelineResult limitRun( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point of run
was to have exactly one for everything. Seeing as that isn't going to work, you should probably just inline this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to use this function to encapsulate related logic together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid a long limitCollect
function.
|
||
PipelineResult result = pipeline.run(); | ||
|
||
pool.execute( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extra thread seems unnecessary. You can do this in the main thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
@@ -133,14 +219,108 @@ private static PipelineResult run( | |||
options | |||
.getRunner() | |||
.getCanonicalName() | |||
.equals("org.apache.beam.runners.direct.DirectRunner")); | |||
.equals("org.apache.beam.runners.direct.DirectRunner"), | |||
"Only DirectRunner is supported in SQL Shell. " + "Please check your Runner setting."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't exactly true. Should be something like "SELECT without INSERT is only supported in DirectRunner in SQL Shell."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
// If state is null, or state is not null and indicates pipeline is not terminated | ||
// yet, check continue checking the limit var. | ||
try { | ||
if (limitStateVar.isReached()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you did something like values.size() >= limitCount
here you could replace LimitCanceller
and LimitCollector
with Collector
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could replace LimitCollector
with Collector
with the cost of removing extra collected values after the pipeline finishes, or I modify Collector
and add something like values.size() >= limitCount
to make Collector
general. To make code more readable, I wrote a dedicated LimitCollector
.
Also, I'd like to separate count and cancel
and collect
steps to support unbounded limit. Right now Collector
seems like only work in DirectRunner but I can make count and cancel
more general to fit into other runners by using stateful ParDo (that's what LimitCanceller
does). So in the future if we start to support unbounded limit in other runners, only collect
step needs modification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have a very different model as to how this will work in other runners. The runners shouldn't need to be aware of the limit, as that would be applied client side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the cleanup. I still think there is a significant opportunity for simplification here around Collector. LimitCollector, and LimitCanceller, but I'm happy to do it as a followup. (I'm a big fan of minimalist programming.)
@ProcessElement | ||
public void processElement(ProcessContext context) { | ||
Object[] input = context.element().getValues().toArray(); | ||
if (values.size() < count) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that I think about it, you have a race between checking this count and adding to the queue. The directRunner has a minimum of 3 worker threads so it is possible to hit. I still think the simple, safe, and portable implementation is to use Collector
, check the size of the queue outside of the pipeline, and truncate after terminating the pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great catch! Yes I agree to use Collector
here and drop extra values after terminating the pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I still need to keep LimitCanceller
(as a general "count and cancel" solution for all runners), and LimitCanceller
needs KV as its input type(a requirement of stateful ParDo), I will have to keep this LimitCollector
to emit KV elements to LimitCanceller
.
I will take care of the race condition.
run java postcommit |
LGTM, just squash commits and I'll merge |
e88d8f5
to
60c8baa
Compare
run java postcommit |
ElasticSearchIO |
run java postcommit |
|
run java postcommit |
The tests actually part of this PR passed. We have some other quota issues but just for Dataflow. |
What?
Support unbounded limit in Beam SQL shell. In the past, due to default global window and default trigger, queries like "SELECT col_name FROM unbounded_table LIMIT 1" will not return in SQL Shell.
This PR tries to support unbounded limit by starting a daemon thread to monitoring return value collection in BeanEnumerableConverter, and stopping pipeline when collected values reach limit count. More detailed description can be found here: https://docs.google.com/document/d/13zeTewHH9nfwhSlcE4x77WQwr1U2Z4sTiNRjOXUj2aw/edit?usp=sharing.
Testing
Adding two unit tests to mock bounded and unbounded input tables to test LIMIT functionality.
Adding one e2e integration test, which utilizes Google Cloud Pub/Sub to test the unbounded limit on auto-generated Pub/Sub messages.
Manually tested unbounded limit in Beam SQL shell on a Google Cloud Pub/Sub table. See the screenshot:
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username
) to look at it.