Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: KafkaIO could fail with BigQueryIO.Write.withAutoSharding() #22951

Closed
nbali opened this issue Aug 30, 2022 · 2 comments · Fixed by #24463
Closed

[Bug]: KafkaIO could fail with BigQueryIO.Write.withAutoSharding() #22951

nbali opened this issue Aug 30, 2022 · 2 comments · Fixed by #24463

Comments

@nbali
Copy link
Contributor

nbali commented Aug 30, 2022

What happened?

BigQueryIO.Write.FILE_LOADS.withAutoSharding() uses GroupIntoBatches.withShardedKey(), which uses 'workerUuid' and 'threadId' as the sharding key. According to my understanding the problem is that the Kafka consumer read in KafkaIO.Read for a single partition most likely happens without any parallelism on the same worker on the same thread as it's being read in a FIFO manner due to the offset. This essentially means that .withShardedKey() has no effect whatsoever.

Although there is a 'FILE_TRIGGERING_BATCHING_DURATION' with '1s' duration, and a 'FILE_TRIGGERING_RECORD_COUNT' with '500000' count - and both triggers grouping, it still means if we are under 500k elements, and under 1s it will try to fire them at once. It is totally possible - with sufficiently high throughput, or 'outputWithTimestamp' that we have 500k elements in a single sec). This could result in OOME.

We should also have a size limit, not only time and count.

Duration maxBufferingDuration =
options.getMaxBufferingDurationMilliSec() > 0
? Duration.millis(options.getMaxBufferingDurationMilliSec())
: FILE_TRIGGERING_BATCHING_DURATION;
// In contrast to fixed sharding with user trigger, here we use a global window with default
// trigger and rely on GroupIntoBatches transform to group, batch and at the same time
// parallelize properly. We also ensure that the files are written if a threshold number of
// records are ready. Dynamic sharding is achieved via the withShardedKey() option provided by
// GroupIntoBatches.
return input
.apply(
GroupIntoBatches.<DestinationT, ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
.withMaxBufferingDuration(maxBufferingDuration)
.withShardedKey())

Issue Priority

Priority: 2

Issue Component

Component: io-java-gcp

@nbali
Copy link
Contributor Author

nbali commented Aug 30, 2022

.take-issue

nbali added a commit to nbali/beam that referenced this issue Aug 30, 2022
@nbali nbali mentioned this issue Aug 30, 2022
2 tasks
nbali added a commit to nbali/beam that referenced this issue Sep 6, 2022
nbali added a commit to nbali/beam that referenced this issue Sep 6, 2022
nbali added a commit to nbali/beam that referenced this issue Sep 7, 2022
nbali added a commit to nbali/beam that referenced this issue Sep 7, 2022
… introduced test in GroupIntoBatchesTest
@nbali
Copy link
Contributor Author

nbali commented Sep 12, 2022

nbali added a commit to nbali/beam that referenced this issue Nov 16, 2022
nbali added a commit to nbali/beam that referenced this issue Nov 16, 2022
lukecwik pushed a commit to lukecwik/incubator-beam that referenced this issue Dec 1, 2022
lukecwik added a commit that referenced this issue Dec 2, 2022
* Fix for #22951

* Fix Dataflow GroupIntoBatchesOverride to match updated GroupIntoBatches size limit implementation.

Co-authored-by: Balázs Németh <nbali@users.noreply.github.com>
@github-actions github-actions bot added this to the 2.45.0 Release milestone Dec 2, 2022
prodriguezdefino pushed a commit to prodriguezdefino/beam-pabs that referenced this issue Dec 6, 2022
* Fix for apache#22951

* Fix Dataflow GroupIntoBatchesOverride to match updated GroupIntoBatches size limit implementation.

Co-authored-by: Balázs Németh <nbali@users.noreply.github.com>
lostluck pushed a commit to lostluck/beam that referenced this issue Dec 22, 2022
* Fix for apache#22951

* Fix Dataflow GroupIntoBatchesOverride to match updated GroupIntoBatches size limit implementation.

Co-authored-by: Balázs Németh <nbali@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
1 participant