Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataflowRunner: Experiment added to disable unbounded PCcollection checks turning batch into streaming #16773

Conversation

nbali
Copy link
Contributor

@nbali nbali commented Feb 8, 2022

There are IOs where the previous logic detected them as unbounded thereforce enforced streaming behaviour when the pipeline itself should be considered as batch. For example KafkaIO.withStopReadTime always launches as streaming, meanwhile it's actually a limited amount of data. Until proper detection has been implemented, this is a workaround to let developers who know when a pipeline should actually be batch can override the built-in streaming enforcing.

See #15951 for further context and original "trigger" for this contribution.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • [ ] Update CHANGES.md with noteworthy changes.
  • [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status Build Status Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status ---
XLang Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@nbali
Copy link
Contributor Author

nbali commented Feb 8, 2022

R: @lukecwik

@nbali
Copy link
Contributor Author

nbali commented Feb 8, 2022

@fpeter8 fyi

@nbali
Copy link
Contributor Author

nbali commented Feb 8, 2022

( at the moment I'm not sure if I should open a JIRA ticket for this, or the PR in itself is enough)

@nbali
Copy link
Contributor Author

nbali commented Feb 15, 2022

R: @kennknowles

@kennknowles
Copy link
Member

One problem here is that there is no batch implementation for UnboundedSource and also batch execution of an unbounded splittable DoFn (where each element may produce unbounded output.

This is an example where finite != bounded. It is up to the IO to produce a bounded PCollection, and the runner to figure out how to execute it. But batch Dataflow will never be able to evaluate an unbounded PCollection.

@kennknowles
Copy link
Member

kennknowles commented Feb 15, 2022

Disabling the autodetection would have to have a scan for unbounded PCollections and then fail the job before submitting it.

@kennknowles kennknowles self-requested a review February 15, 2022 19:13
@kennknowles
Copy link
Member

Specifically: #15951 (comment)

@nbali
Copy link
Contributor Author

nbali commented Feb 18, 2022

@kennknowles

One problem here is that there is no batch implementation for UnboundedSource and also batch execution of an unbounded splittable DoFn (where each element may produce unbounded output.

This is an example where finite != bounded. It is up to the IO to produce a bounded PCollection, and the runner to figure out how to execute it. But batch Dataflow will never be able to evaluate an unbounded PCollection.

Well this might be a hopefully pleasant surprise for you then, but with DataflowRunner a pipeline having an unbounded collection - I'm guessing due to ReadFromKafkaDoFn having @UnboundedPerElement - with actually a finite amount of data - due to KafkaIO.Read.withStopReadTime - stops perfectly both in batch and streaming mode. Apart from the obvious differences predestined by the batch/streaming mode already the biggest differences are the execution time and the cost. It takes longer and costs more to process the same data with streaming. This intends to fix that.

Actual data from the GCP console from two jobs launched with KafkaIO.Read.withStartTime and KafkaIO.Read.withStopTime at the same time at the same topic/data with the same configurations/workers/etc with batch/streaming being the only difference:

Name Value
Job type Batch
Job status Succeeded
Elapsed time 5 min 46 sec
Total memory time 0,15 GB hr
Total Shuffle data processed 81,43 MB
Billable Shuffle data processed 20,36 MB
Name Value
Job type Streaming
Job status Succeeded
Elapsed time 6 min 42 sec
Total memory time 0,226 GB hr
Total streaming data processed 121,47 MB

The price difference seems significant, but I haven't tested with big amount of data so it might be more linear later... but what's the biggest PitA for us is the lack of batch processing with the simple grouping, etc, and the need to handle the data as streaming with windows, triggers, etc, when business-wise it would be absolutely NOT necessarily.

Disabling the autodetection would have to have a scan for unbounded PCollections and then fail the job before submitting it.

So based on the actual behaviour I respectfully disagree with your statement, because it actually works. I introduced it as an experiment, so if someone knows what he is doing, he can turn it off, but it's still there for everybody else "just to be sure".

Specifically: #15951 (comment)

I agree that this should be the "proper" solution for this "specific" issue, but this workaround doesn't only handle that, but every similar issue as well.

@kennknowles
Copy link
Member

If the pipeline does succeed in batch, then we actually have a bug. The intent is that anything that batch can run, it does run.

@kennknowles
Copy link
Member

Ah, I see now that it is in fact bounded data, but not "already sitting there" but streamed in. This is an interesting case to consider.

@kennknowles
Copy link
Member

Based on my reading of the code, use of withStopReadTime depends on the new SDF functionality and goes through a Watch style of transform that tends to be targeting streaming. Semantically, it seems like this should result in bounded PCollections, even though the batch read of those PCollections may be inefficient and/or blocking. I don't know what pitfalls might occur with all this. Pinging @chamikaramj and @lukecwik who will have more details expertise about the implementations.

@nbali
Copy link
Contributor Author

nbali commented Feb 28, 2022

Semantically, it seems like this should result in bounded PCollections, even though the batch read of those PCollections may be inefficient and/or blocking.

I think that is one thing we all agree on that for KafkaIO the code should actually be separated into unbounded/bounded for these scenarios and having completely different codepaths would be the proper way to solve that one.

This is strictly about giving developers the ability to circumvent cases where that implementations does not exist yet. I really can't see how having this option - disabled by default - could hurt. As far as I can tell if the experiment hasn't been enabled it behaves exactly the same.

@aaltay
Copy link
Member

aaltay commented Mar 10, 2022

What is the next step on this PR?

@kennknowles
Copy link
Member

I need to convince myself that an experiment to explicitly enable unsupported behavior is safe, short term and long term. I see why it works for this case. I don't really know the overall failure mode. If this experiment makes it into a StackOverflow answer, I worry we'll have lots of people using it and having unpredictable problems.

@nbali
Copy link
Contributor Author

nbali commented Apr 6, 2022

Any ETA?

@kennknowles
Copy link
Member

I looked into this, and I think the right fix is to have a separate ReadBoundedFromKafkaDoFn that indicates statically that it is bounded per element. Then the proper behavior of "multiplying" the input cardinality by the DoFn's per-element cardinality can work as intended.

I am less familiar with Kafka's details, but would it also be possible that the event timestamp / watermark on the topic does not advance? In that case we would want the above change (which is safe) along with this experiment, which should be renamed to something like unsafely_treat_pcollections_as_bounded.

@nbali
Copy link
Contributor Author

nbali commented Apr 11, 2022

I looked into this, and I think the right fix is to have a separate ReadBoundedFromKafkaDoFn that indicates statically that it is bounded per element.

As far as I understand:
ReadFromKafkaDoFn is the DoFn we are talking about that has @UnboundedPerElement right now. It influences bounded/unbounded nature by creating OffsetRangeTracker vs GrowableOffsetRangeTracker in ReadFromKafkaDoFn.restrictionTracker(...). That calculation is being influenced by the OffsetRange input which is being created in ReadFromKafkaDoFn.initialRestriction(...) based on the properties of the input KafkaSourceDescriptor. KafkaSourceDescriptor is the @Element itself, so it's a runtime value. How would you determine during the beam graph building phase that the runtime value that is being dynamically generated/created will be unbounded or bounded? I mean when the developer provides the stopReadTime through the KafkaIO methods it is doable, but we also have KafkaIO.readSourceDescriptors() - in that case KafkaIO has no idea if the descriptors will be unbounded/bounded so automatic detection can't have a 100% coverage. Or did you mean to make a hybrid solution where it tries to detect it, but can be overridden manually?

we would want the above change (which is safe) along with this experiment

I don't get this. If the bounded/unbounded separation is implemented, why would having this experiment make any sense for KafkaIO? The bounded/unbounded would be already handled properly according to the annotation - so there would be no batch->streaming protection to prevent.

@kennknowles
Copy link
Member

I looked into this, and I think the right fix is to have a separate ReadBoundedFromKafkaDoFn that indicates statically that it is bounded per element.

As far as I understand:
ReadFromKafkaDoFn is the DoFn we are talking about that has @UnboundedPerElement right now. It influences bounded/unbounded nature by creating OffsetRangeTracker vs GrowableOffsetRangeTracker in ReadFromKafkaDoFn.restrictionTracker(...). That calculation is being influenced by the OffsetRange input which is being created in ReadFromKafkaDoFn.initialRestriction(...) based on the properties of the input KafkaSourceDescriptor. KafkaSourceDescriptor is the @Element itself, so it's a runtime value. How would you determine during the beam graph building phase that the runtime value that is being dynamically generated/created will be unbounded or bounded? I mean when the developer provides the stopReadTime through the KafkaIO methods it is doable, but we also have KafkaIO.readSourceDescriptors() - in that case KafkaIO has no idea if the descriptors will be unbounded/bounded so automatic detection can't have a 100% coverage. Or did you mean to make a hybrid solution where it tries to detect it, but can be overridden manually?

Thanks for all the details. Just to make sure the premises are clear:

  • Bounded/unbounded is by definition a static property of a PCollection. Bounded means it is finite and you can know it statically. Unbounded means it could be finite or infinite and you can only find out dynamically.
  • Bounded/unbounded output per element is a static property of a DoFn
  • A DoFn must declare whether it is bounded or unbounded per element, and is responsible for adhering to that contract, or it may crash or hang or any number of problems.

So two separate code paths - either entry points or a conditional based on stopReadTime - provides two static contexts for setting whether it will be bounded or unbounded.

I think if you start at the DoFn and work outwards you would find all the things that need to be forked in order to plumb the boundedness all the way through. For example readSourceDescriptors may need either a flag saying "all descriptors will be bounded" or a forked version that means the same thing.

we would want the above change (which is safe) along with this experiment

I don't get this. If the bounded/unbounded separation is implemented, why would having this experiment make any sense for KafkaIO? The bounded/unbounded would be already handled properly according to the annotation - so there would be no batch->streaming protection to prevent.

I agree. Not sure what I was thinking about. I thought there was still a case where you end up using this to force something. But ideally you would not need it.

@nbali
Copy link
Contributor Author

nbali commented Apr 12, 2022

I thought there was still a case where you end up using this to force something. But ideally you would not need it.

Ideally there would have been no need for this PR at all. The conceptual goal it tries to work around will still exist. Separating the Kafka-related DoFns only fixes one possible occurrence.

@kennknowles
Copy link
Member

I thought there was still a case where you end up using this to force something. But ideally you would not need it.

Ideally there would have been no need for this PR at all. The conceptual goal it tries to work around will still exist. Separating the Kafka-related DoFns only fixes one possible occurrence.

I don't understand this comment. Once you have a bounded read code path that you can choose, what are the additional occurrences?

@nbali
Copy link
Contributor Author

nbali commented Apr 12, 2022

I have no idea. That's the point. KafkaIO.Read isn't the only functionality in Beam. Anything might have something similar.

@kennknowles
Copy link
Member

Ah, OK. Then I think it is fine. Does it make sense? Would you be up for forking the DoFn and building a code path that does a proper bounded read?

@nbali
Copy link
Contributor Author

nbali commented Apr 13, 2022

Due to the expected time requirements required for that task that is not up to me anymore.

@nbali nbali force-pushed the disable_unbounded_pcollection_checks_turning_batch_into_streaming branch from f0ed47c to 2f4f00e Compare April 16, 2022 01:12
@nbali
Copy link
Contributor Author

nbali commented Apr 16, 2022

Hopefully this is enough.

@nbali
Copy link
Contributor Author

nbali commented Apr 20, 2022

ping? :)

@@ -197,6 +197,10 @@
})
public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {

/** Experiment to "disable unbounded pcollection checks turning batch into streaming". */
public static final String DISABLE_UNBOUNDED_PCOLLECTION_CHECKS_TURNING_BATCH_INTO_STREAMING =
"disable_unbounded_pcollection_checks_turning_batch_into_streaming";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"unsafely_attempt_to_process_unbounded_data_in_batch_mode"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Btw my bad, totally missed this in the previous comment due to the Kafka separation.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course I couldn't do it from the GitHub UI. NOW done. (80a19be)

@nbali
Copy link
Contributor Author

nbali commented May 2, 2022

ping?

@aaltay
Copy link
Member

aaltay commented May 12, 2022

@kennknowles - could someone else help with this review?

@nbali nbali requested a review from kennknowles May 24, 2022 17:07
@nbali
Copy link
Contributor Author

nbali commented May 26, 2022

ping?

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working through these changes. LGTM. There is a git conflict but I assume it will be immaterial. I also want to ping a few people so they know about this since it could come up in the future. @reuvenlax @chamikaramj @lukecwik. I am not 100% certain of the lifespan of this experiment, or if others on the team will have other opinions. It is a small change in code but a big UX thing.

…ng_batch_into_streaming

# Conflicts:
#	sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
@nbali
Copy link
Contributor Author

nbali commented May 31, 2022

The conflict was only regarding separate test use cases there were added at the same location. Nothing relevant.

@nbali
Copy link
Contributor Author

nbali commented May 31, 2022

org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImplTest.testInsertWithinRowCountLimits
Gotta be a flaky test. I haven't touched anything in that module.

@kennknowles
Copy link
Member

Run Java PreCommit

@nbali
Copy link
Contributor Author

nbali commented Jun 3, 2022

Flaky again, not sure if I can trigger it or not, so lets see

@nbali
Copy link
Contributor Author

nbali commented Jun 3, 2022

Run Java PreCommit

1 similar comment
@kennknowles
Copy link
Member

Run Java PreCommit

@nbali
Copy link
Contributor Author

nbali commented Jun 7, 2022

Finally all green :D

@kennknowles kennknowles merged commit e95ef97 into apache:master Jun 7, 2022
@nbali nbali deleted the disable_unbounded_pcollection_checks_turning_batch_into_streaming branch June 7, 2022 22:49
steveniemitz added a commit to twitter-forks/beam that referenced this pull request Jul 1, 2022
…ction checks, allowing batch execution over unbounded PCollections (apache#16773)"

This reverts commit e95ef97.
steveniemitz added a commit to twitter-forks/beam that referenced this pull request Jul 15, 2022
…ction checks, allowing batch execution over unbounded PCollections (apache#16773)"

This reverts commit e95ef97.
micaelal pushed a commit to twitter-forks/beam that referenced this pull request Apr 5, 2023
…ction checks, allowing batch execution over unbounded PCollections (apache#16773)"

This reverts commit e95ef97.

(cherry picked from commit 25a3e56)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants