New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CC-12445: Replace the JestClient with the ElasticsearchRestClient #468
Conversation
10f2413
to
a57b0da
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @levzem
This is going to be a tremendous upgrade!
Took a first quick pass over half the files and left a few comments. Will have to return soon for a more careful read.
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java
Outdated
Show resolved
Hide resolved
| url -> | ||
| credentialsProvider.setCredentials( | ||
| new AuthScope(new HttpHost(url)), | ||
| new UsernamePasswordCredentials(config.username(), config.password().value()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can password be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/test/java/io/confluent/connect/elasticsearch/ElasticsearchClientTest.java
Show resolved
Hide resolved
| try { | ||
| client.indices().create(request, RequestOptions.DEFAULT); | ||
| } catch (ElasticsearchStatusException | IOException e) { | ||
| if (!e.getMessage().contains(RESOURCE_ALREADY_EXISTS_EXCEPTION)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it still possible to get this exception if we already checked whether the index exists earlier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have seen it before in escalations and this maintains the previous behavior of the JestClient so I kept it just to be safe
unfortunately we have not been able to understand why this happens
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
| * @param id the document id | ||
| * @param record the record | ||
| */ | ||
| private void addToRecordMap(String id, SinkRecord record) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that it's called once, do we need this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leftover from BulkProcessor implementation, while yes it is only called once, given it is wrapped in a null check it is probably better to abstract it away in case of future use
|
|
||
| @Override | ||
| public void afterBulk(long executionId, BulkRequest request, Throwable failure) { | ||
| error.compareAndSet(null, new ConnectException("Bulk request failed.", failure)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we keeping the document in the recordMap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will fail the task so it wont matter if we empty the map or not, but I added it regardless
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
| response.getIndex() | ||
| ); | ||
|
|
||
| reportBadRecord(response); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method's Javadoc and the warn message says we are ignoring version conflict,s but it seems like we are still reporting this record. Is that intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reporting the record is the same as ignoring it (reporting a bad record does not fail anything), but I figured the user should be know which records had version conflicts in case they want to remediate manually somehow
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
| return Version.getVersion(); | ||
| } | ||
|
|
||
| private void checkMapping(SinkRecord record) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method name is a bit misleading IMO, maybe consider renaming it to maybeCreateMapping?
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java
Show resolved
Hide resolved
src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java
Outdated
Show resolved
Hide resolved
src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java
Show resolved
Hide resolved
| } | ||
|
|
||
| @Test | ||
| public void testCallWithRetriesSomeRetries() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a unit-test that ensures wait time is always lesser than MAX_RETRY_TIME_MS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already tested by https://github.com/confluentinc/kafka-connect-elasticsearch/blob/master/src/test/java/io/confluent/connect/elasticsearch/RetryUtilTest.java#L57
and
https://github.com/confluentinc/kafka-connect-elasticsearch/blob/master/src/test/java/io/confluent/connect/elasticsearch/RetryUtilTest.java#L23-L46
src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchHelperClient.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few non-critical suggestions, LGTM otherwise, thanks @levzem !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another round of comments. Mostly done. I have a few more files to check. Mostly tests.
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java
Show resolved
Hide resolved
src/main/java/io/confluent/connect/elasticsearch/Validator.java
Outdated
Show resolved
Hide resolved
|
@kkonstantine, addressed all review comments, ready for another pass! |
Signed-off-by: Lev Zemlyanov <lev@confluent.io>
Signed-off-by: Lev Zemlyanov <lev@confluent.io>
Signed-off-by: Lev Zemlyanov <lev@confluent.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice work.
Left a few more minor comments.
Also please make sure to run mvn javadoc:javadoc and fix the javadoc issues because they'll break during the release.
But this LGTM! 👏
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
Outdated
Show resolved
Hide resolved
| client.close(); | ||
| } | ||
|
|
||
| @Test(expected = ConnectException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be nice to start using assertThrows and available hamcrest matchers for maps etc. But we can consider on a follow up.
src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorBaseIT.java
Show resolved
Hide resolved
src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorIT.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Lev Zemlyanov <lev@confluent.io>
Signed-off-by: Lev Zemlyanov <lev@confluent.io>
Signed-off-by: Lev Zemlyanov <lev@confluent.io>
Signed-off-by: Lev Zemlyanov <lev@confluent.io>
Signed-off-by: Lev Zemlyanov lev@confluent.io
Problem
the jest client is outdated and needs to be replaced by the supported and maintained ES high level rest client
Solution
replace the client
major points
BulkProcessoris no longer required as ES provides its ownElasticsearchWriterwas absorbed intoSinkTasktype.nameas it is deprecated as of ES 7.xDoes this solution apply anywhere else?
If yes, where?
Test Strategy
an additional manual upgrade test was performed. write records to ES cloud with a
v10.0.2, then upgrade to the new client and try writing to the same ES cloud indexTesting done:
Release Plan
masterbecause this is a major change