-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-14104] Support shard aware aggregation in Kinesis writer. #17113
Conversation
13bf1c0
to
22724c9
Compare
@aromanenko-dev Not sure if feasible in terms of reviewing, though would be great to get this into the 2.38 release as well :) |
22724c9
to
7dec404
Compare
...ava/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientPool.java
Show resolved
Hide resolved
...ava/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
Show resolved
Hide resolved
...ava/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
Show resolved
Hide resolved
Run Java PostCommit |
1 similar comment
Run Java PostCommit |
Same BigQuery it tests failing |
7dec404
to
e4630b6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, sorry for delay.
LGTM, just several minor notes.
...ava/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientPool.java
Show resolved
Hide resolved
...ava/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ObjectPool.java
Show resolved
Hide resolved
...mazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/RetryConfiguration.java
Show resolved
Hide resolved
...ava/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
Outdated
Show resolved
Hide resolved
...azon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisPartitioner.java
Outdated
Show resolved
Hide resolved
e4630b6
to
416fc9b
Compare
Run Java PostCommit |
LGTM, just a very minor note on the not obvious constant. |
All significant tests passed, failures are unrelated ... |
This PR introduces shard aware aggregation to achieve better aggregation results (to better max out Kinesis API limits).
If random partitioning is sufficient, this can already be achieved using the explicit random partitioner. The benefit of this approach is that it doesn't require pulling shard details through the API.
However, the random partitioner uses a static configuration that doesn't handle any resharding, that may occur.
Shard aware aggregation is implemented as follows:
Periodically the writer pulls hash key ranges assigned to shards. These are statically shared using an ObjectPool to minimize the necessary API calls.
The configured partitioner is used to generate the
partitionKey
and the optionalexplicitHashKey
for each record. IfexplicitHashKey
is set, it is used as is with no change in behavior. Otherwise, however, the lower bound of the hash key range of the target shard (based on the hashedpartitionKey
) is chosen for aggregation and set asexplicitHashKey
.This makes sure aggregated records always contain only records that match the shard they are routed to even if the stream is in the process of resharding.
To simplify statically sharing the shard state,
ClientPool
was generalized toObjectPool
. This should be fine as the code wasn't released yet.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.