-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-7589] Use only one KinesisProducer instance per JVM #8955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I also ran |
94ec996 to
d5f3317
Compare
iemejia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost LGTM pretty minor clean ups to do.
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
Show resolved
Hide resolved
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
Show resolved
Hide resolved
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
Outdated
Show resolved
Hide resolved
| // we remove from failures, so this code is safe. | ||
| private void checkForFailures(String message) | ||
| throws IOException, InterruptedException, ExecutionException { | ||
| if (failures.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe simpler to inverse the condition and put all the logic inside of the if block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be clearer to move this validation into the invoker method and do the checkForFailures method just produce the error string and use it only to throw the exception, but it is ok if you prefer to let the logic as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One advantage of the refactor is that you can provide a complete LOG.error if you prefer to.
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisProducerMock.java
Outdated
Show resolved
Hide resolved
| "Put record was not successful.", new UserRecordFailedException(result))); | ||
| } | ||
| } | ||
| throw new IOException(errorMessage); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe inline the errorMessage (or just use the return type if you do the suggested refactor above).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is a reason for that? Not clear for me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the refactor to return String is just to have a more composable (and testable) signature, for the return time it is just because errorMessage is never used afterwards.
| int n = rand.nextInt(numberOfShards) + 1; | ||
| return String.valueOf(n); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR but I am curious about what is this getExplicitHashKey for? Haven't noticed it is not documented in the interface either.
iemejia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Merged manually to fix a typo in the commit message and squash the extra review commit. |
Instead of creating new instance of
KinesisProducerin every thread, we have to instantiate only one per JVM as recommended in its Javadoc. Also, it's supposed to be thread-safe by design.R: @iemejia
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.