-
Notifications
You must be signed in to change notification settings - Fork 431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CC-1385: Enhance connector to use text type with ES 5+ #169
Conversation
…r to use text type with ES 5+ The ES connector now queries for the version. The version is stored in an instance of ElasticsearchClient, which is passed around so that it can be queried at the appropriate times (such as when inferring schema mappings). As part of this change, the old Jest client is now wrapped by a higher-level client object. Also, all the Jest dependencies have been isolated to the io.confluent.connect.elasticsearch.jest package. This makes the third-party dependencies more clear and will facilitate moving to a different ES client library in the future should we choose to do so. These changes have been tested with ES 2.x, 5.x, and 6.x.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial review with some suggestions/comments/questions. I'll want to do another round after those are addressed and we sort out the build issue.
private final Version version; | ||
|
||
@VisibleForTesting | ||
public JestElasticsearchClient(String address) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be package-level scope?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to prefer protected
unless I have a good reason to leave package-private. Several refactorings in hdfs and storage-common showed that something might need to be used that way for testing but not reside exactly in the same package (for various reasons). Not feeling too strong about this though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently it needs to be public as it is used in a different package.
return inferPrimitive(esType, schema.defaultValue()); | ||
} | ||
} | ||
|
||
@VisibleForTesting | ||
public static String getElasticsearchType(ElasticsearchClient client, Schema.Type schemaType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be package-level scope?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll change it
|
||
private Version getServerVersion() throws IOException { | ||
// Default to newest version for forward compatibility | ||
Version defaultVersion = Version.SIX; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, ES 6 removes the old string type that was deprecated in 5. If that's true, then how is defaulting to 6 forward compatible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's forward compatible for ES 7.x+, meaning that the ES 6 functionality will be retained (using text instead of string, which only works for ES 2 and 5).
return defaultVersion; | ||
} | ||
|
||
String esVersion = nodeRoot.get("version").getAsString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this should work for all of the ES versions and the above logic is really handling exceptional cases? If so, then perhaps this method should have doc that explains that. And, the log messages should maybe be a bit more descriptive so that we can more easily tell which path the logic takes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it should work for all versions. I'll make the errors more descriptive.
/** | ||
* Gets the Elasticsearch version. | ||
* | ||
* @return the version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll add a comment
* @param query the search query | ||
* @param index the index to search | ||
* @param type the type to search | ||
* @return the search result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing @throws IOException if the client cannot execute the request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add it.
/** | ||
* Shuts down the client. | ||
*/ | ||
void shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered extending AutoCloseable
and then overriding the close()
method here to throw no exceptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, I'll do this.
import java.util.Map; | ||
import java.util.Set; | ||
|
||
public class JestElasticsearchClient implements ElasticsearchClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add unit tests for this? Might have to mock the JestClient, in which case we'd want a constructor that takes the JestClient so we can pass in the mock. This class' logic is not trivial, and it seems like it'd be better to test this logic in isolation. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll look into adding some tests for this.
if (!result.isSucceeded()) { | ||
String msg = result.getErrorMessage() != null ? ": " + result.getErrorMessage() : ""; | ||
throw new ConnectException("Could not create index '" + index + "'" + msg); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that two tasks attempt to look for the same index, neither finds it, and so both attempt to create the same index? If so, would it make sense if the create request fails to first check and see if it exists (perhaps again) and to fail only if the index does not already exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the current behavior, but yes, I can add an additional check before throwing the exception.
try { | ||
client.close(); | ||
} catch (IOException e) { | ||
LOG.warn("Could not close client"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about including at least the exception message in the log message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Thanks @rayokota
Gave it a first pass, noticing mainly packaging and stylistic things.
pom.xml
Outdated
@@ -39,11 +39,16 @@ | |||
</scm> | |||
|
|||
<properties> | |||
<es.version>6.0.0</es.version> | |||
<lucene.version>7.0.1</lucene.version> | |||
<!-- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I assume commenting out this section is temporary?
pom.xml
Outdated
<scope>test</scope> | ||
</dependency> | ||
<!-- For ES 2.x --> | ||
<!-- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: don't forget to remove commented out section.
pom.xml
Outdated
@@ -101,19 +100,46 @@ | |||
<version>${lucene.version}</version> | |||
<scope>test</scope> | |||
</dependency> | |||
<dependency> | |||
<groupId>org.apache.logging.log4j</groupId> | |||
<artifactId>log4j-api</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally we use slf4j-log4j12
(which also carries slf4j-api
) and they are available by inheriting from the common
repo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is test scope and was necessary for working with the new ES test libs. However, now that I've downgraded the ES test lib I will remove this.
try { | ||
log.info("Starting ElasticsearchSinkTask."); | ||
|
||
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props); | ||
String type = config.getString(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG); | ||
boolean ignoreKey = | ||
final String type = config.getString(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were all these final
s required by checkstyle? Otherwise it's a lot of noise that doesn't correspond to actual changes.
Additionally to that, the connectors style does not require to use final
keyword for local variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These finals were recommended by checkstyle at one point but I think they are not needed any longer so I can remove.
@@ -56,43 +56,27 @@ | |||
* @param schema The schema used to infer mapping. | |||
* @throws IOException from underlying JestClient | |||
*/ | |||
public static void createMapping(JestClient client, String index, String type, Schema schema) | |||
public static void createMapping(ElasticsearchClient client, | |||
String index, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation style is not the one we use in connectors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, reformatting code with new coding style left this as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Frequently, to apply the actual style, you need to put it all in one line, select the whole line, and press Alt+Cmd+L
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I do that, it just leaves it all on one line. What formatting should it have?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In IntelliJ, checking the settings under: Preferences -> Code Style -> Java
tab Wrapping and Braces
boxes for Method alignment parameters and Method call arguments
and make sure New line after '(' is ticked, Place ')' on new line is ticked (when that's the case I believe Align when multiline does not matter, but better let it unticked too).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks
|
||
package io.confluent.connect.elasticsearch.bulk; | ||
|
||
public interface BulkRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about a javadoc that this is a marker interface, and what is it for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will add JavaDoc for this.
|
||
public class JestBulkRequest implements BulkRequest { | ||
|
||
private Bulk bulk; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be final
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will make final.
private final Version version; | ||
|
||
@VisibleForTesting | ||
public JestElasticsearchClient(String address) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to prefer protected
unless I have a good reason to leave package-private. Several refactorings in hdfs and storage-common showed that something might need to be used that way for testing but not reside exactly in the same package (for various reasons). Not feeling too strong about this though.
|
||
private Version getServerVersion() throws IOException { | ||
// Default to newest version for forward compatibility | ||
Version defaultVersion = Version.SIX; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another example where I'm not 100% immediately we are referring to ES version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will change to ES_V6.
Version defaultVersion = Version.SIX; | ||
|
||
NodesInfo info = new NodesInfo.Builder().addCleanApiParameter("version").build(); | ||
JsonObject result = this.client.execute(info).getJsonObject(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again stylistic: Not sure if this was there already, but we tend to use this
mainly within constructors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will change it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for looking at this, @rayokota! Some minor comment.
pom.xml
Outdated
<lucene.version>7.0.1</lucene.version> | ||
<es.version>5.0.0</es.version> | ||
<lucene.version>6.2.0</lucene.version> | ||
--> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we delete this section instead of leaving it commented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove.
pom.xml
Outdated
<dependency> | ||
<groupId>org.elasticsearch</groupId> | ||
<artifactId>elasticsearch</artifactId> | ||
<version>${es.version}</version> | ||
<scope>test</scope> | ||
<type>test-jar</type> | ||
</dependency> | ||
<!-- For ES 5.x (requires Java 8) --> | ||
<!-- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will have to change a bunch of things to move to Java 8, so it might be fine to delete the commented sections for now. And then tackle the Java 8 independently when the time comes.
import java.util.List; | ||
import java.util.Set; | ||
|
||
public interface ElasticsearchClient extends AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
/** | ||
* Shuts down the client. | ||
*/ | ||
void close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't close
already defined in AutoCloseable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I overrode it to not throw Exception
@@ -34,19 +29,6 @@ | |||
public static final String FLOAT_TYPE = "float"; | |||
public static final String DOUBLE_TYPE = "double"; | |||
public static final String STRING_TYPE = "string"; | |||
public static final String TEXT_TYPE = "text"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ES also has a keyword
type. Should we support it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add it now as a placeholder.
} | ||
} | ||
if (assignedTopics == null) { | ||
throw new NullPointerException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be replaced with a call to java.util.Objects#requireNonNull(obj)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll change it.
* Implementations will typically hold state comprised of instances of classes that are | ||
* specific to the client library. | ||
*/ | ||
public interface BulkRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call this ConnectBulkRequest
to make it easier to read whenever it is used in the same class with org.elasticsearch.action.bulk.BulkRequest
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to leave this for now as all the other classes in this package start with Bulk..(BulkClient, BulkProcessor, BulkResponse).
private final Version version; | ||
|
||
// visible for testing | ||
public JestElasticsearchClient(String address) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have a @VisibleForTesting
constructor which accepts a JestClient as an argument. That way the test can construct the JestClient
and can be mocked appropriately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll add this as part of some unit tests for the Jest client.
I've added unit tests for the Jest client. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job @rayokota ! My comments have been addressed. Given that others had lots of comments too, I'll defer the final decision to them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice job, @rayokota. LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot, @rayokota! This looks great!
Thanks for the review, @rhauch , @kkonstantine , and @wicknicks ! |
The ES connector now queries for the version. The version is stored in
an instance of ElasticsearchClient, which is passed around so that
it can be queried at the appropriate times (such as when inferring
schema mappings).
As part of this change, the old Jest client is now wrapped by a
higher-level client object. Also, all the Jest dependencies have been
isolated to the io.confluent.connect.elasticsearch.jest package. This
makes the third-party dependencies more clear and will facilitate moving
to a different ES client library in the future should we choose to do
so.
These changes have been tested with ES 2.x, 5.x, and 6.x.