New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-17623][Connectors/Elasticsearch] Support user resource cleanup in Elasticsearch sink #12619
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit a73bb36 (Fri Jun 12 00:42:29 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @yun-wang . I left two minor comments.
/** | ||
* Tear-down method for the function. It is called when the sink closes. | ||
*/ | ||
default void close () throws Exception {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit weird that the open
function does not allow to throw an exception. I admit it is out of the scope of this PR though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can update open
to default void open() throws Exception {}
real quick. It's a backward compatible change and should have no user impact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me.
ElasticsearchSinkFunction<String> sinkFunction = (ElasticsearchSinkFunction<String>) mock(ElasticsearchSinkFunction.class); | ||
final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>( | ||
new HashMap<>(), sinkFunction, new DummyRetryFailureHandler()); | ||
|
||
sink.open(mock(Configuration.class)); | ||
sink.close(); | ||
|
||
verify(sinkFunction, times(1)).open(); | ||
verify(sinkFunction, times(1)).close(); | ||
verifyNoMoreInteractions(sinkFunction); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The community tends to avoid mock
if possible. In this case, I think we could add TestingElasticsearchSinkFunction
or just an anonymous class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I can update the test.
Out of curiosity, any particular reason to avoid mock
whenever possible? In this case adding a simple implementation class would serve the exact same purpose as the mock, only lengthier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this reflects how well our design adheres to the principles of dependency injection. If our design is good enough, we could directly use the constructor in the test.
… in Elasticsearch sink
…en to throw an exception
final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>( | ||
new HashMap<>(), sinkFunction, new DummyRetryFailureHandler()); | ||
|
||
sink.open(mock(Configuration.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sink.open(mock(Configuration.class)); | |
sink.open(new Configuration())); |
@@ -586,6 +600,27 @@ public void process(String element, RuntimeContext ctx, RequestIndexer indexer) | |||
} | |||
} | |||
|
|||
private static class SimpleClosableSinkFunction<String> implements ElasticsearchSinkFunction<String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe TestingClosableSinkFunction
? Not quite sure about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing my comments, @yun-wang . It now LGTM. Just two minor comments you could take a look.
@wuchong Could you help to review and merge it? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…searchSinkFunction This closes apache#12619
What is the purpose of the change
This pull request supports user resource cleanup in Elasticsearch sink.
Brief change log
ElasticsearchSinkFunction
.Verifying this change
This change added tests and can be verified as follows:
open()
andclose()
methods are invoked in the Elasticsearch sink as expected.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes)Documentation