-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-8374] Enable returning missing PublishResult fields in SnsIO.Write #9758
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this is a breaking change to any Dataflow streaming pipelines using the PublishResult.
...o/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/sns/AmazonSNSMockSuccess.java
Outdated
Show resolved
Hide resolved
@lukecwik Thanks for the feedback. I did not realize that, I'm very new to Beam. What do you suggest would be the best way to deal with that? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last week we merged SnsIO based on Amazon SDK for Java v2 I was wondering if these fixes could apply there too. We may consider this optional for this PR but maybe worth for consistency to check if we can have a similar approach there too. CC @cmachgodaddy
.../io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/PublishResultCoder.java
Outdated
Show resolved
Hide resolved
.../io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/PublishResultCoder.java
Outdated
Show resolved
Hide resolved
One extra issue that I have not really thought about is the size difference that we will have by adding the full headers/sdk metadata to all answers, mmm... now I am hesitating about maybe having a |
@iemejia for our use case we only need the HTTP status. We have written an SNS sink that uses this to check for error responses. I just included HTTP headers for completeness but I can remove them if it's a concern. In that case I wonder if a separate coder for SdkHttpMetadata makes sense though. |
Sure, I work with Cam and we discussed it today. I can apply the changes to v2 in this PR, no problem. |
I wonder if we should add unit test for this coder? I know we did not have it, b/c we had just one property to encode and decode. But, now we have quite a few. |
...o/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/PublishResultBuilder.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest keeping the "existing" coder and allowing users to choose a coder that encodes all the additional metadata and http response code. This way users who need the additional details could use a builder style method within the IO class like "withMetadata()" or "withFullMessage" which ensures that the full message is encoded. The class comment could say by default we only provide fields X and Y but if you need everything then specify "with..."
By making this change you won't break existing users and it won't impact their performance and it will enable your usecase. Finally for the V2 version we may want to use the "full" coder by default and use the stripped down coder when told via a "withMessageIdOnly()" or something along those lines.
...o/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/PublishResultBuilder.java
Outdated
Show resolved
Hide resolved
.../io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/PublishResultCoder.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The direction looks good, will let you follow up on the comments and fill out the tests.
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java
Outdated
Show resolved
Hide resolved
...eb-services/src/main/java/org/apache/beam/sdk/io/aws/coders/MinimalSdkHttpMetadataCoder.java
Outdated
Show resolved
Hide resolved
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java
Outdated
Show resolved
Hide resolved
...amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/FullPublishResultCoder.java
Outdated
Show resolved
Hide resolved
Hi @lukecwik thank you for the feedback. I believe all of the requested changes have been addressed now. |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
Sorry for the delay, taking a look now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great, mostly minor feedback.
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/coders/AwsCoders.java
Show resolved
Hide resolved
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/coders/AwsCoders.java
Outdated
Show resolved
Hide resolved
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/coders/AwsCoders.java
Outdated
Show resolved
Hide resolved
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/coders/AwsCoders.java
Outdated
Show resolved
Hide resolved
...io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/PublishResultCoders.java
Show resolved
Hide resolved
...io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/PublishResultCoders.java
Outdated
Show resolved
Hide resolved
3f3f64f
to
c94b046
Compare
Hi @lukecwik and @iemejia, I believe I have incorporated all of your review feedback. I also made a few changes to the interface that I hope will improve clarity:
I hope this PR is ready to merge now, please let me know if it needs any additional changes. |
Thanks for returning back to this. Taking a look now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added my suggestions to name changes inline as responses to your comments.
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/coders/AwsCoders.java
Outdated
Show resolved
Hide resolved
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/coders/AwsCoders.java
Outdated
Show resolved
Hide resolved
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java
Outdated
Show resolved
Hide resolved
...io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/PublishResultCoders.java
Outdated
Show resolved
Hide resolved
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/coders/AwsCoders.java
Outdated
Show resolved
Hide resolved
...io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/PublishResultCoders.java
Outdated
Show resolved
Hide resolved
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/coders/AwsCoders.java
Outdated
Show resolved
Hide resolved
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/coders/AwsCoders.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Run Java PreCommit
Run Java PreCommit |
Thanks for the feedback @lukecwik. I've addressed your comments and also added one more unit test in SnsIOTest. Please let me know what you think now. |
retest this please |
sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/sns/SnsIOTest.java
Outdated
Show resolved
Hide resolved
...io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/PublishResultCoders.java
Outdated
Show resolved
Hide resolved
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/coders/AwsCoders.java
Outdated
Show resolved
Hide resolved
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/coders/AwsCoders.java
Outdated
Show resolved
Hide resolved
@lukecwik thanks for the suggestions! |
One last question @lukecwik, would you prefer to apply these changes in amazon-web-services2 in this PR? Or should we take that to a separate PR? That might be a good opportunity to discuss making some improvements to how errors are handled in this IO. In my opinion it's somewhat suboptimal that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes!
* @return the SdkHttpMetadata coder | ||
*/ | ||
public static Coder<SdkHttpMetadata> sdkHttpMetadata() { | ||
return new SdkHttpMetadataCoder(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, why this coder is not following the pattern coder.of(xxx)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cmachgodaddy That's because this class is designed as a factory for coders, with the coder implementation being an internal detail, as @lukecwik suggested in an earlier comment thread.
Style note, you can create one class called
AwsCoders
which has three static methodsminimalSdkHttpMetadata()
andfullSdkHttpMetadata()
andresponseMetadata()
that return the coders. This way you can make all the coder implementations private inner static classes and they all become implementation details without needing to expose them to users. Users can still get an instance if they need one but it gives us flexibility from a maintenance point of view how the coders are organized.
I could create a static factory method of()
on SdkHttpMetadata but it wouldn't serve any purpose, it would only be called by sdkHttpMetadata() which is already a static factory method. SdkHttpMetadata is not meant to be instantiated directly by the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
of() prevents moving the class while a top level static method allows for stuff to still be moved around.
} | ||
|
||
@Override | ||
public void verifyDeterministic() throws NonDeterministicException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And one more curiosity, why we have to implement verifyDeterministic
for this coder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default coders are considered non-deterministic which prevent them from being used as the key coder for many things (GBK, map/multimap side input keys, map state keys, ...).
Implementing it allows us to be deterministic if the coders we use are deterministic. So if the internally used coders become deterministic then we get that for free.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CustomCoder
's implementation always throws NonDeterministicException so this seemed more correct.
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
Lines 47 to 53 in 33ec0bb
@Override | |
public void verifyDeterministic() throws NonDeterministicException { | |
throw new NonDeterministicException( | |
this, | |
"CustomCoder implementations must override verifyDeterministic," | |
+ " or they are presumed nondeterministic."); | |
} |
@@ -32,6 +32,7 @@ | |||
@Override | |||
public List<CoderProvider> getCoderProviders() { | |||
return ImmutableList.of( | |||
CoderProviders.forCoder(TypeDescriptor.of(PublishResult.class), PublishResultCoder.of())); | |||
CoderProviders.forCoder( | |||
TypeDescriptor.of(PublishResult.class), PublishResultCoders.defaultPublishResult())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, any reasons this is not XXXcoder.of(...)
pattern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's sort of the same reason as above, PublishResultCoders is a factory for creating several different coder implementations so the factory methods have different names to indicate which one. I could rename this method to of()
but since we are introducing new behavior I wanted to make it very clear that this is the default, or existing, behavior. Also I'm not sure if it makes sense to have a method called of()
on the factory not on the coder itself.
I would prefer a separate PR so that people can start using this sooner then later. Also smaller PRs are easier to review. |
@iemejia Any final comments? Otherwise I intend to merge when the tests are green. |
Run Java PreCommit |
1 similar comment
Run Java PreCommit |
This PR adds new coders for returning the entire PublishResult object from SnsIO, or the entire PublishResult without HTTP headers. This is necessary in order to check the HTTP status for errors since the current implementation only encodes the messageId field. These new coders can be selected by calling the new
withFullPublishResult()
andwithFullPublishResultWithoutHeaders()
methods on SnsIO.Write. ThewithCoder()
method also allows setting a custom coder. This PR also adds coders for common AWS SDK objectsResponseMetadata
andSdkHttpMetadata
.R: @lukecwik
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.